File Coverage

rabbitmq-include/amqp_private.h
Criterion Covered Total %
statement 62 76 81.5
branch 13 34 38.2
condition n/a
subroutine n/a
pod n/a
total 75 110 68.1


line stmt bran cond sub pod time code
1             #ifndef librabbitmq_amqp_private_h
2             #define librabbitmq_amqp_private_h
3              
4             /*
5             * ***** BEGIN LICENSE BLOCK *****
6             * Version: MIT
7             *
8             * Portions created by Alan Antonuk are Copyright (c) 2012-2014
9             * Alan Antonuk. All Rights Reserved.
10             *
11             * Portions created by VMware are Copyright (c) 2007-2012 VMware, Inc.
12             * All Rights Reserved.
13             *
14             * Portions created by Tony Garnock-Jones are Copyright (c) 2009-2010
15             * VMware, Inc. and Tony Garnock-Jones. All Rights Reserved.
16             *
17             * Permission is hereby granted, free of charge, to any person
18             * obtaining a copy of this software and associated documentation
19             * files (the "Software"), to deal in the Software without
20             * restriction, including without limitation the rights to use, copy,
21             * modify, merge, publish, distribute, sublicense, and/or sell copies
22             * of the Software, and to permit persons to whom the Software is
23             * furnished to do so, subject to the following conditions:
24             *
25             * The above copyright notice and this permission notice shall be
26             * included in all copies or substantial portions of the Software.
27             *
28             * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
29             * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
30             * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
31             * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
32             * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
33             * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
34             * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
35             * SOFTWARE.
36             * ***** END LICENSE BLOCK *****
37             */
38              
39             #ifdef HAVE_CONFIG_H
40             #include "config.h"
41             #endif
42              
43             #define AMQ_COPYRIGHT \
44             "Copyright (c) 2007-2014 VMWare Inc, Tony Garnock-Jones," \
45             " and Alan Antonuk."
46              
47             #include "amqp.h"
48             #include "amqp_framing.h"
49             #include
50              
51             #if ((defined(_WIN32)) || (defined(__MINGW32__)) || (defined(__MINGW64__)))
52             #ifndef WINVER
53             /* WINVER 0x0502 is WinXP SP2+, Windows Server 2003 SP1+
54             * See:
55             * http://msdn.microsoft.com/en-us/library/windows/desktop/aa383745(v=vs.85).aspx#macros_for_conditional_declarations
56             */
57             #define WINVER 0x0502
58             #endif
59             #ifndef WIN32_LEAN_AND_MEAN
60             #define WIN32_LEAN_AND_MEAN
61             #endif
62             #include
63             #else
64             #include
65             #include
66             #endif
67              
68             /* GCC attributes */
69             #if __GNUC__ > 2 || (__GNUC__ == 2 && __GNUC_MINOR__ > 4)
70             #define AMQP_NORETURN __attribute__((__noreturn__))
71             #define AMQP_UNUSED __attribute__((__unused__))
72             #elif defined(_MSC_VER)
73             #define AMQP_NORETURN __declspec(noreturn)
74             #define AMQP_UNUSED __pragma(warning(suppress : 4100))
75             #else
76             #define AMQP_NORETURN
77             #define AMQP_UNUSED
78             #endif
79              
80             #if __GNUC__ >= 4
81             #define AMQP_PRIVATE __attribute__((visibility("hidden")))
82             #else
83             #define AMQP_PRIVATE
84             #endif
85              
86             char *amqp_os_error_string(int err);
87              
88             #ifdef WITH_SSL
89             char *amqp_ssl_error_string(int err);
90             #endif
91              
92             #include "amqp_socket.h"
93             #include "amqp_time.h"
94              
95             /*
96             * Connection states: XXX FIX THIS
97             *
98             * - CONNECTION_STATE_INITIAL: The initial state, when we cannot be
99             * sure if the next thing we will get is the first AMQP frame, or a
100             * protocol header from the server.
101             *
102             * - CONNECTION_STATE_IDLE: The normal state between
103             * frames. Connections may only be reconfigured, and the
104             * connection's pools recycled, when in this state. Whenever we're
105             * in this state, the inbound_buffer's bytes pointer must be NULL;
106             * any other state, and it must point to a block of memory allocated
107             * from the frame_pool.
108             *
109             * - CONNECTION_STATE_HEADER: Some bytes of an incoming frame have
110             * been seen, but not a complete frame header's worth.
111             *
112             * - CONNECTION_STATE_BODY: A complete frame header has been seen, but
113             * the frame is not yet complete. When it is completed, it will be
114             * returned, and the connection will return to IDLE state.
115             *
116             */
117             typedef enum amqp_connection_state_enum_ {
118             CONNECTION_STATE_IDLE = 0,
119             CONNECTION_STATE_INITIAL,
120             CONNECTION_STATE_HEADER,
121             CONNECTION_STATE_BODY
122             } amqp_connection_state_enum;
123              
124             typedef enum amqp_status_private_enum_ {
125             /* 0x00xx -> AMQP_STATUS_*/
126             /* 0x01xx -> AMQP_STATUS_TCP_* */
127             /* 0x02xx -> AMQP_STATUS_SSL_* */
128             AMQP_PRIVATE_STATUS_SOCKET_NEEDREAD = -0x1301,
129             AMQP_PRIVATE_STATUS_SOCKET_NEEDWRITE = -0x1302
130             } amqp_status_private_enum;
131              
132             /* 7 bytes up front, then payload, then 1 byte footer */
133             #define HEADER_SIZE 7
134             #define FOOTER_SIZE 1
135              
136             #define AMQP_PSEUDOFRAME_PROTOCOL_HEADER 'A'
137              
138             typedef struct amqp_link_t_ {
139             struct amqp_link_t_ *next;
140             void *data;
141             } amqp_link_t;
142              
143             #define POOL_TABLE_SIZE 16
144              
145             typedef struct amqp_pool_table_entry_t_ {
146             struct amqp_pool_table_entry_t_ *next;
147             amqp_pool_t pool;
148             amqp_channel_t channel;
149             } amqp_pool_table_entry_t;
150              
151             struct amqp_connection_state_t_ {
152             amqp_pool_table_entry_t *pool_table[POOL_TABLE_SIZE];
153              
154             amqp_connection_state_enum state;
155              
156             int channel_max;
157             int frame_max;
158              
159             /* Heartbeat interval in seconds. If this is <= 0, then heartbeats are not
160             * enabled, and next_recv_heartbeat and next_send_heartbeat are set to
161             * infinite */
162             int heartbeat;
163             amqp_time_t next_recv_heartbeat;
164             amqp_time_t next_send_heartbeat;
165              
166             /* buffer for holding frame headers. Allows us to delay allocating
167             * the raw frame buffer until the type, channel, and size are all known
168             */
169             char header_buffer[HEADER_SIZE + 1];
170             amqp_bytes_t inbound_buffer;
171              
172             size_t inbound_offset;
173             size_t target_size;
174              
175             amqp_bytes_t outbound_buffer;
176              
177             amqp_socket_t *socket;
178              
179             amqp_bytes_t sock_inbound_buffer;
180             size_t sock_inbound_offset;
181             size_t sock_inbound_limit;
182              
183             amqp_link_t *first_queued_frame;
184             amqp_link_t *last_queued_frame;
185              
186             amqp_rpc_reply_t most_recent_api_result;
187              
188             amqp_table_t server_properties;
189             amqp_table_t client_properties;
190             amqp_pool_t properties_pool;
191              
192             struct timeval *handshake_timeout;
193             struct timeval internal_handshake_timeout;
194             struct timeval *rpc_timeout;
195             struct timeval internal_rpc_timeout;
196             };
197              
198             amqp_pool_t *amqp_get_or_create_channel_pool(amqp_connection_state_t connection,
199             amqp_channel_t channel);
200             amqp_pool_t *amqp_get_channel_pool(amqp_connection_state_t state,
201             amqp_channel_t channel);
202              
203             static inline int amqp_heartbeat_send(amqp_connection_state_t state) {
204             return state->heartbeat;
205             }
206              
207             static inline int amqp_heartbeat_recv(amqp_connection_state_t state) {
208             return 2 * state->heartbeat;
209             }
210              
211             int amqp_try_recv(amqp_connection_state_t state);
212              
213 5177           static inline void *amqp_offset(void *data, size_t offset) {
214 5177           return (char *)data + offset;
215             }
216              
217             /* This macro defines the encoding and decoding functions associated with a
218             simple type. */
219              
220             #define DECLARE_CODEC_BASE_TYPE(bits) \
221             \
222             static inline int amqp_encode_##bits(amqp_bytes_t encoded, size_t *offset, \
223             uint##bits##_t input) { \
224             size_t o = *offset; \
225             if ((*offset = o + bits / 8) <= encoded.len) { \
226             amqp_e##bits(input, amqp_offset(encoded.bytes, o)); \
227             return 1; \
228             } \
229             return 0; \
230             } \
231             \
232             static inline int amqp_decode_##bits(amqp_bytes_t encoded, size_t *offset, \
233             uint##bits##_t *output) { \
234             size_t o = *offset; \
235             if ((*offset = o + bits / 8) <= encoded.len) { \
236             *output = amqp_d##bits(amqp_offset(encoded.bytes, o)); \
237             return 1; \
238             } \
239             return 0; \
240             }
241              
242 1019           static inline int is_bigendian(void) {
243             union {
244             uint32_t i;
245             char c[4];
246 1019           } bint = {0x01020304};
247 1019           return bint.c[0] == 1;
248             }
249              
250 910           static inline void amqp_e8(uint8_t val, void *data) {
251 910           memcpy(data, &val, sizeof(val));
252 910           }
253              
254 1760           static inline uint8_t amqp_d8(void *data) {
255             uint8_t val;
256 1760           memcpy(&val, data, sizeof(val));
257 1760           return val;
258             }
259              
260 0           static inline void amqp_e16(uint16_t val, void *data) {
261 0 0         if (!is_bigendian()) {
262 0           val = ((val & 0xFF00u) >> 8u) | ((val & 0x00FFu) << 8u);
263             }
264 0           memcpy(data, &val, sizeof(val));
265 0           }
266              
267 0           static inline uint16_t amqp_d16(void *data) {
268             uint16_t val;
269 0           memcpy(&val, data, sizeof(val));
270 0 0         if (!is_bigendian()) {
271 0           val = ((val & 0xFF00u) >> 8u) | ((val & 0x00FFu) << 8u);
272             }
273 0           return val;
274             }
275              
276 422           static inline void amqp_e32(uint32_t val, void *data) {
277 422 50         if (!is_bigendian()) {
278 422           val = ((val & 0xFF000000u) >> 24u) | ((val & 0x00FF0000u) >> 8u) |
279 844           ((val & 0x0000FF00u) << 8u) | ((val & 0x000000FFu) << 24u);
280             }
281 422           memcpy(data, &val, sizeof(val));
282 422           }
283              
284 347           static inline uint32_t amqp_d32(void *data) {
285             uint32_t val;
286 347           memcpy(&val, data, sizeof(val));
287 347 50         if (!is_bigendian()) {
288 347           val = ((val & 0xFF000000u) >> 24u) | ((val & 0x00FF0000u) >> 8u) |
289 694           ((val & 0x0000FF00u) << 8u) | ((val & 0x000000FFu) << 24u);
290             }
291 347           return val;
292             }
293              
294 126           static inline void amqp_e64(uint64_t val, void *data) {
295 126 50         if (!is_bigendian()) {
296 126           val = ((val & 0xFF00000000000000u) >> 56u) |
297 252           ((val & 0x00FF000000000000u) >> 40u) |
298 252           ((val & 0x0000FF0000000000u) >> 24u) |
299 252           ((val & 0x000000FF00000000u) >> 8u) |
300 252           ((val & 0x00000000FF000000u) << 8u) |
301 252           ((val & 0x0000000000FF0000u) << 24u) |
302 252           ((val & 0x000000000000FF00u) << 40u) |
303 126           ((val & 0x00000000000000FFu) << 56u);
304             }
305 126           memcpy(data, &val, sizeof(val));
306 126           }
307              
308 124           static inline uint64_t amqp_d64(void *data) {
309             uint64_t val;
310 124           memcpy(&val, data, sizeof(val));
311 124 50         if (!is_bigendian()) {
312 124           val = ((val & 0xFF00000000000000u) >> 56u) |
313 248           ((val & 0x00FF000000000000u) >> 40u) |
314 248           ((val & 0x0000FF0000000000u) >> 24u) |
315 248           ((val & 0x000000FF00000000u) >> 8u) |
316 248           ((val & 0x00000000FF000000u) << 8u) |
317 248           ((val & 0x0000000000FF0000u) << 24u) |
318 248           ((val & 0x000000000000FF00u) << 40u) |
319 124           ((val & 0x00000000000000FFu) << 56u);
320             }
321 124           return val;
322             }
323              
324 5340 50         DECLARE_CODEC_BASE_TYPE(8)
    50          
325 0 0         DECLARE_CODEC_BASE_TYPE(16)
    0          
326 1538 50         DECLARE_CODEC_BASE_TYPE(32)
    50          
327 500 50         DECLARE_CODEC_BASE_TYPE(64)
    50          
328              
329 584           static inline int amqp_encode_bytes(amqp_bytes_t encoded, size_t *offset,
330             amqp_bytes_t input) {
331 584           size_t o = *offset;
332             /* The memcpy below has undefined behavior if the input is NULL. It is valid
333             * for a 0-length amqp_bytes_t to have .bytes == NULL. Thus we should check
334             * before encoding.
335             */
336 584 50         if (input.len == 0) {
337 0           return 1;
338             }
339 584 50         if ((*offset = o + input.len) <= encoded.len) {
340 584           memcpy(amqp_offset(encoded.bytes, o), input.bytes, input.len);
341 584           return 1;
342             } else {
343 0           return 0;
344             }
345             }
346              
347 904           static inline int amqp_decode_bytes(amqp_bytes_t encoded, size_t *offset,
348             amqp_bytes_t *output, size_t len) {
349 904           size_t o = *offset;
350 904 50         if ((*offset = o + len) <= encoded.len) {
351 904           output->bytes = amqp_offset(encoded.bytes, o);
352 904           output->len = len;
353 904           return 1;
354             } else {
355 0           return 0;
356             }
357             }
358              
359             AMQP_NORETURN
360             void amqp_abort(const char *fmt, ...);
361              
362             int amqp_bytes_equal(amqp_bytes_t r, amqp_bytes_t l);
363              
364             static inline amqp_rpc_reply_t amqp_rpc_reply_error(amqp_status_enum status) {
365             amqp_rpc_reply_t reply;
366             reply.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
367             reply.library_error = status;
368             return reply;
369             }
370              
371             int amqp_send_frame_inner(amqp_connection_state_t state,
372             const amqp_frame_t *frame, int flags,
373             amqp_time_t deadline);
374             #endif