File Coverage

amqp_api.c
Criterion Covered Total %
statement 90 142 63.3
branch 21 52 40.3
condition n/a
subroutine n/a
pod n/a
total 111 194 57.2


line stmt bran cond sub pod time code
1             /*
2             * ***** BEGIN LICENSE BLOCK *****
3             * Version: MIT
4             *
5             * Portions created by Alan Antonuk are Copyright (c) 2012-2013
6             * Alan Antonuk. All Rights Reserved.
7             *
8             * Portions created by VMware are Copyright (c) 2007-2012 VMware, Inc.
9             * All Rights Reserved.
10             *
11             * Portions created by Tony Garnock-Jones are Copyright (c) 2009-2010
12             * VMware, Inc. and Tony Garnock-Jones. All Rights Reserved.
13             *
14             * Permission is hereby granted, free of charge, to any person
15             * obtaining a copy of this software and associated documentation
16             * files (the "Software"), to deal in the Software without
17             * restriction, including without limitation the rights to use, copy,
18             * modify, merge, publish, distribute, sublicense, and/or sell copies
19             * of the Software, and to permit persons to whom the Software is
20             * furnished to do so, subject to the following conditions:
21             *
22             * The above copyright notice and this permission notice shall be
23             * included in all copies or substantial portions of the Software.
24             *
25             * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
26             * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
27             * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
28             * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
29             * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
30             * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
31             * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
32             * SOFTWARE.
33             * ***** END LICENSE BLOCK *****
34             */
35              
36             #ifdef HAVE_CONFIG_H
37             #include "config.h"
38             #endif
39              
40             #ifdef _MSC_VER
41             /* MSVC complains about sprintf being deprecated in favor of sprintf_s */
42             #define _CRT_SECURE_NO_WARNINGS
43             /* MSVC complains about strdup being deprecated in favor of _strdup */
44             #define _CRT_NONSTDC_NO_DEPRECATE
45             #endif
46              
47             #include "amqp_private.h"
48             #include "amqp_time.h"
49             #include
50             #include
51             #include
52             #include
53             #include
54              
55             #define ERROR_MASK (0x00FF)
56             #define ERROR_CATEGORY_MASK (0xFF00)
57              
58             enum error_category_enum_ { EC_base = 0, EC_tcp = 1, EC_ssl = 2 };
59              
60             static const char *base_error_strings[] = {
61             /* AMQP_STATUS_OK 0x0 */
62             "operation completed successfully",
63             /* AMQP_STATUS_NO_MEMORY -0x0001 */
64             "could not allocate memory",
65             /* AMQP_STATUS_BAD_AQMP_DATA -0x0002 */
66             "invalid AMQP data",
67             /* AMQP_STATUS_UNKNOWN_CLASS -0x0003 */
68             "unknown AMQP class id",
69             /* AMQP_STATUS_UNKNOWN_METHOD -0x0004 */
70             "unknown AMQP method id",
71             /* AMQP_STATUS_HOSTNAME_RESOLUTION_FAILED -0x0005 */
72             "hostname lookup failed",
73             /* AMQP_STATUS_INCOMPATIBLE_AMQP_VERSION -0x0006 */
74             "incompatible AMQP version",
75             /* AMQP_STATUS_CONNECTION_CLOSED -0x0007 */
76             "connection closed unexpectedly",
77             /* AMQP_STATUS_BAD_AMQP_URL -0x0008 */
78             "could not parse AMQP URL",
79             /* AMQP_STATUS_SOCKET_ERROR -0x0009 */
80             "a socket error occurred",
81             /* AMQP_STATUS_INVALID_PARAMETER -0x000A */
82             "invalid parameter",
83             /* AMQP_STATUS_TABLE_TOO_BIG -0x000B */
84             "table too large for buffer",
85             /* AMQP_STATUS_WRONG_METHOD -0x000C */
86             "unexpected method received",
87             /* AMQP_STATUS_TIMEOUT -0x000D */
88             "request timed out",
89             /* AMQP_STATUS_TIMER_FAILED -0x000E */
90             "system timer has failed",
91             /* AMQP_STATUS_HEARTBEAT_TIMEOUT -0x000F */
92             "heartbeat timeout, connection closed",
93             /* AMQP_STATUS_UNEXPECTED STATE -0x0010 */
94             "unexpected protocol state",
95             /* AMQP_STATUS_SOCKET_CLOSED -0x0011 */
96             "socket is closed",
97             /* AMQP_STATUS_SOCKET_INUSE -0x0012 */
98             "socket already open",
99             /* AMQP_STATUS_BROKER_UNSUPPORTED_SASL_METHOD -0x00013 */
100             "unsupported sasl method requested",
101             /* AMQP_STATUS_UNSUPPORTED -0x0014 */
102             "parameter value is unsupported"};
103              
104             static const char *tcp_error_strings[] = {
105             /* AMQP_STATUS_TCP_ERROR -0x0100 */
106             "a socket error occurred",
107             /* AMQP_STATUS_TCP_SOCKETLIB_INIT_ERROR -0x0101 */
108             "socket library initialization failed"};
109              
110             static const char *ssl_error_strings[] = {
111             /* AMQP_STATUS_SSL_ERROR -0x0200 */
112             "a SSL error occurred",
113             /* AMQP_STATUS_SSL_HOSTNAME_VERIFY_FAILED -0x0201 */
114             "SSL hostname verification failed",
115             /* AMQP_STATUS_SSL_PEER_VERIFY_FAILED -0x0202 */
116             "SSL peer cert verification failed",
117             /* AMQP_STATUS_SSL_CONNECTION_FAILED -0x0203 */
118             "SSL handshake failed",
119             /* AMQP_STATUS_SSL_SET_ENGINE_FAILED -0x0204 */
120             "SSL setting engine failed"};
121              
122             static const char *unknown_error_string = "(unknown error)";
123              
124 1           const char *amqp_error_string2(int code) {
125             const char *error_string;
126 1           size_t category = (((-code) & ERROR_CATEGORY_MASK) >> 8);
127 1           size_t error = (-code) & ERROR_MASK;
128              
129 1           switch (category) {
130             case EC_base:
131 1 50         if (error < (sizeof(base_error_strings) / sizeof(char *))) {
132 1           error_string = base_error_strings[error];
133             } else {
134 0           error_string = unknown_error_string;
135             }
136 1           break;
137              
138             case EC_tcp:
139 0 0         if (error < (sizeof(tcp_error_strings) / sizeof(char *))) {
140 0           error_string = tcp_error_strings[error];
141             } else {
142 0           error_string = unknown_error_string;
143             }
144 0           break;
145              
146             case EC_ssl:
147 0 0         if (error < (sizeof(ssl_error_strings) / sizeof(char *))) {
148 0           error_string = ssl_error_strings[error];
149             } else {
150 0           error_string = unknown_error_string;
151             }
152              
153 0           break;
154              
155             default:
156 0           error_string = unknown_error_string;
157 0           break;
158             }
159              
160 1           return error_string;
161             }
162              
163 0           char *amqp_error_string(int code) {
164             /* Previously sometimes clients had to flip the sign on a return value from a
165             * function to get the correct error code. Now, all error codes are negative.
166             * To keep people's legacy code running correctly, we map all error codes to
167             * negative values.
168             *
169             * This is only done with this deprecated function.
170             */
171 0 0         if (code > 0) {
172 0           code = -code;
173             }
174 0           return strdup(amqp_error_string2(code));
175             }
176              
177 0           void amqp_abort(const char *fmt, ...) {
178             va_list ap;
179 0           va_start(ap, fmt);
180 0           vfprintf(stderr, fmt, ap);
181 0           va_end(ap);
182 0           fputc('\n', stderr);
183 0           abort();
184             }
185              
186             const amqp_bytes_t amqp_empty_bytes = {0, NULL};
187             const amqp_table_t amqp_empty_table = {0, NULL};
188             const amqp_array_t amqp_empty_array = {0, NULL};
189              
190 29           int amqp_basic_publish(amqp_connection_state_t state, amqp_channel_t channel,
191             amqp_bytes_t exchange, amqp_bytes_t routing_key,
192             amqp_boolean_t mandatory, amqp_boolean_t immediate,
193             amqp_basic_properties_t const *properties,
194             amqp_bytes_t body) {
195             amqp_frame_t f;
196             size_t body_offset;
197 29           size_t usable_body_payload_size =
198 29           state->frame_max - (HEADER_SIZE + FOOTER_SIZE);
199             int res;
200             int flagz;
201              
202             amqp_basic_publish_t m;
203             amqp_basic_properties_t default_properties;
204              
205 29           m.exchange = exchange;
206 29           m.routing_key = routing_key;
207 29           m.mandatory = mandatory;
208 29           m.immediate = immediate;
209 29           m.ticket = 0;
210              
211             /* TODO(alanxz): this heartbeat check is happening in the wrong place, it
212             * should really be done in amqp_try_send/writev */
213 29           res = amqp_time_has_past(state->next_recv_heartbeat);
214 29 50         if (AMQP_STATUS_TIMER_FAILURE == res) {
215 0           return res;
216 29 100         } else if (AMQP_STATUS_TIMEOUT == res) {
217 2           res = amqp_try_recv(state);
218 2 50         if (AMQP_STATUS_TIMEOUT == res) {
219 0           return AMQP_STATUS_HEARTBEAT_TIMEOUT;
220 2 50         } else if (AMQP_STATUS_OK != res) {
221 0           return res;
222             }
223             }
224              
225 29           res = amqp_send_method_inner(state, channel, AMQP_BASIC_PUBLISH_METHOD, &m,
226             AMQP_SF_MORE, amqp_time_infinite());
227 29 100         if (res < 0) {
228 1           return res;
229             }
230              
231 28 50         if (properties == NULL) {
232 0           memset(&default_properties, 0, sizeof(default_properties));
233 0           properties = &default_properties;
234             }
235              
236 28           f.frame_type = AMQP_FRAME_HEADER;
237 28           f.channel = channel;
238 28           f.payload.properties.class_id = AMQP_BASIC_CLASS;
239 28           f.payload.properties.body_size = body.len;
240 28           f.payload.properties.decoded = (void *)properties;
241              
242 28 50         if (body.len > 0) {
243 28           flagz = AMQP_SF_MORE;
244             } else {
245 0           flagz = AMQP_SF_NONE;
246             }
247 28           res = amqp_send_frame_inner(state, &f, flagz, amqp_time_infinite());
248 28 50         if (res < 0) {
249 0           return res;
250             }
251              
252 28           body_offset = 0;
253 65 100         while (body_offset < body.len) {
254 37           size_t remaining = body.len - body_offset;
255              
256 37 50         if (remaining == 0) {
257 0           break;
258             }
259              
260 37           f.frame_type = AMQP_FRAME_BODY;
261 37           f.channel = channel;
262 37           f.payload.body_fragment.bytes = amqp_offset(body.bytes, body_offset);
263 37 100         if (remaining >= usable_body_payload_size) {
264 9           f.payload.body_fragment.len = usable_body_payload_size;
265 9           flagz = AMQP_SF_MORE;
266             } else {
267 28           f.payload.body_fragment.len = remaining;
268 28           flagz = AMQP_SF_NONE;
269             }
270              
271 37           body_offset += f.payload.body_fragment.len;
272 37           res = amqp_send_frame_inner(state, &f, flagz, amqp_time_infinite());
273 37 50         if (res < 0) {
274 0           return res;
275             }
276             }
277              
278 29           return AMQP_STATUS_OK;
279             }
280              
281 1           amqp_rpc_reply_t amqp_channel_close(amqp_connection_state_t state,
282             amqp_channel_t channel, int code) {
283             char codestr[13];
284 1           amqp_method_number_t replies[2] = {AMQP_CHANNEL_CLOSE_OK_METHOD, 0};
285             amqp_channel_close_t req;
286              
287 1 50         if (code < 0 || code > UINT16_MAX) {
    50          
288 0           return amqp_rpc_reply_error(AMQP_STATUS_INVALID_PARAMETER);
289             }
290              
291 1           req.reply_code = (uint16_t)code;
292 1           req.reply_text.bytes = codestr;
293 1           req.reply_text.len = sprintf(codestr, "%d", code);
294 1           req.class_id = 0;
295 1           req.method_id = 0;
296              
297 1           return amqp_simple_rpc(state, channel, AMQP_CHANNEL_CLOSE_METHOD, replies,
298             &req);
299             }
300              
301 31           amqp_rpc_reply_t amqp_connection_close(amqp_connection_state_t state,
302             int code) {
303             char codestr[13];
304 31           amqp_method_number_t replies[2] = {AMQP_CONNECTION_CLOSE_OK_METHOD, 0};
305             amqp_channel_close_t req;
306              
307 31 50         if (code < 0 || code > UINT16_MAX) {
    50          
308 0           return amqp_rpc_reply_error(AMQP_STATUS_INVALID_PARAMETER);
309             }
310              
311 31           req.reply_code = (uint16_t)code;
312 31           req.reply_text.bytes = codestr;
313 31           req.reply_text.len = sprintf(codestr, "%d", code);
314 31           req.class_id = 0;
315 31           req.method_id = 0;
316              
317 31           return amqp_simple_rpc(state, 0, AMQP_CONNECTION_CLOSE_METHOD, replies, &req);
318             }
319              
320 2           int amqp_basic_ack(amqp_connection_state_t state, amqp_channel_t channel,
321             uint64_t delivery_tag, amqp_boolean_t multiple) {
322             amqp_basic_ack_t m;
323 2           m.delivery_tag = delivery_tag;
324 2           m.multiple = multiple;
325 2           return amqp_send_method(state, channel, AMQP_BASIC_ACK_METHOD, &m);
326             }
327              
328 22           amqp_rpc_reply_t amqp_basic_get(amqp_connection_state_t state,
329             amqp_channel_t channel, amqp_bytes_t queue,
330             amqp_boolean_t no_ack) {
331 22           amqp_method_number_t replies[] = {AMQP_BASIC_GET_OK_METHOD,
332             AMQP_BASIC_GET_EMPTY_METHOD, 0};
333             amqp_basic_get_t req;
334 22           req.ticket = 0;
335 22           req.queue = queue;
336 22           req.no_ack = no_ack;
337              
338 22           state->most_recent_api_result =
339 22           amqp_simple_rpc(state, channel, AMQP_BASIC_GET_METHOD, replies, &req);
340 22           return state->most_recent_api_result;
341             }
342              
343 1           int amqp_basic_reject(amqp_connection_state_t state, amqp_channel_t channel,
344             uint64_t delivery_tag, amqp_boolean_t requeue) {
345             amqp_basic_reject_t req;
346 1           req.delivery_tag = delivery_tag;
347 1           req.requeue = requeue;
348 1           return amqp_send_method(state, channel, AMQP_BASIC_REJECT_METHOD, &req);
349             }
350              
351 1           int amqp_basic_nack(amqp_connection_state_t state, amqp_channel_t channel,
352             uint64_t delivery_tag, amqp_boolean_t multiple,
353             amqp_boolean_t requeue) {
354             amqp_basic_nack_t req;
355 1           req.delivery_tag = delivery_tag;
356 1           req.multiple = multiple;
357 1           req.requeue = requeue;
358 1           return amqp_send_method(state, channel, AMQP_BASIC_NACK_METHOD, &req);
359             }
360              
361 0           struct timeval *amqp_get_handshake_timeout(amqp_connection_state_t state) {
362 0           return state->handshake_timeout;
363             }
364              
365 0           int amqp_set_handshake_timeout(amqp_connection_state_t state,
366             const struct timeval *timeout) {
367 0 0         if (timeout) {
368 0 0         if (timeout->tv_sec < 0 || timeout->tv_usec < 0) {
    0          
369 0           return AMQP_STATUS_INVALID_PARAMETER;
370             }
371 0           state->internal_handshake_timeout = *timeout;
372 0           state->handshake_timeout = &state->internal_handshake_timeout;
373             } else {
374 0           state->handshake_timeout = NULL;
375             }
376              
377 0           return AMQP_STATUS_OK;
378             }
379              
380 0           struct timeval *amqp_get_rpc_timeout(amqp_connection_state_t state) {
381 0           return state->rpc_timeout;
382             }
383              
384 0           int amqp_set_rpc_timeout(amqp_connection_state_t state,
385             const struct timeval *timeout) {
386 0 0         if (timeout) {
387 0 0         if (timeout->tv_sec < 0 || timeout->tv_usec < 0) {
    0          
388 0           return AMQP_STATUS_INVALID_PARAMETER;
389             }
390 0           state->rpc_timeout = &state->internal_rpc_timeout;
391 0           *state->rpc_timeout = *timeout;
392             } else {
393 0           state->rpc_timeout = NULL;
394             }
395 0           return AMQP_STATUS_OK;
396             }