File Coverage

amqp_consumer.c
Criterion Covered Total %
statement 0 145 0.0
branch 0 114 0.0
condition n/a
subroutine n/a
pod n/a
total 0 259 0.0


line stmt bran cond sub pod time code
1             /*
2             * ***** BEGIN LICENSE BLOCK *****
3             * Version: MIT
4             *
5             * Portions created by Alan Antonuk are Copyright (c) 2013-2014
6             * Alan Antonuk. All Rights Reserved.
7             *
8             * Permission is hereby granted, free of charge, to any person
9             * obtaining a copy of this software and associated documentation
10             * files (the "Software"), to deal in the Software without
11             * restriction, including without limitation the rights to use, copy,
12             * modify, merge, publish, distribute, sublicense, and/or sell copies
13             * of the Software, and to permit persons to whom the Software is
14             * furnished to do so, subject to the following conditions:
15             *
16             * The above copyright notice and this permission notice shall be
17             * included in all copies or substantial portions of the Software.
18             *
19             * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
20             * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
21             * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
22             * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
23             * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
24             * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
25             * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
26             * SOFTWARE.
27             * ***** END LICENSE BLOCK *****
28             */
29             #include "amqp.h"
30             #include "amqp_private.h"
31             #include "amqp_socket.h"
32              
33             #include
34             #include
35              
36 0           static int amqp_basic_properties_clone(amqp_basic_properties_t *original,
37             amqp_basic_properties_t *clone,
38             amqp_pool_t *pool) {
39 0           memset(clone, 0, sizeof(*clone));
40 0           clone->_flags = original->_flags;
41              
42             #define CLONE_BYTES_POOL(original, clone, pool) \
43             if (0 == original.len) { \
44             clone = amqp_empty_bytes; \
45             } else { \
46             amqp_pool_alloc_bytes(pool, original.len, &clone); \
47             if (NULL == clone.bytes) { \
48             return AMQP_STATUS_NO_MEMORY; \
49             } \
50             memcpy(clone.bytes, original.bytes, clone.len); \
51             }
52              
53 0 0         if (clone->_flags & AMQP_BASIC_CONTENT_TYPE_FLAG) {
54 0 0         CLONE_BYTES_POOL(original->content_type, clone->content_type, pool)
    0          
55             }
56              
57 0 0         if (clone->_flags & AMQP_BASIC_CONTENT_ENCODING_FLAG) {
58 0 0         CLONE_BYTES_POOL(original->content_encoding, clone->content_encoding, pool)
    0          
59             }
60              
61 0 0         if (clone->_flags & AMQP_BASIC_HEADERS_FLAG) {
62 0           int res = amqp_table_clone(&original->headers, &clone->headers, pool);
63 0 0         if (AMQP_STATUS_OK != res) {
64 0           return res;
65             }
66             }
67              
68 0 0         if (clone->_flags & AMQP_BASIC_DELIVERY_MODE_FLAG) {
69 0           clone->delivery_mode = original->delivery_mode;
70             }
71              
72 0 0         if (clone->_flags & AMQP_BASIC_PRIORITY_FLAG) {
73 0           clone->priority = original->priority;
74             }
75              
76 0 0         if (clone->_flags & AMQP_BASIC_CORRELATION_ID_FLAG) {
77 0 0         CLONE_BYTES_POOL(original->correlation_id, clone->correlation_id, pool)
    0          
78             }
79              
80 0 0         if (clone->_flags & AMQP_BASIC_REPLY_TO_FLAG) {
81 0 0         CLONE_BYTES_POOL(original->reply_to, clone->reply_to, pool)
    0          
82             }
83              
84 0 0         if (clone->_flags & AMQP_BASIC_EXPIRATION_FLAG) {
85 0 0         CLONE_BYTES_POOL(original->expiration, clone->expiration, pool)
    0          
86             }
87              
88 0 0         if (clone->_flags & AMQP_BASIC_MESSAGE_ID_FLAG) {
89 0 0         CLONE_BYTES_POOL(original->message_id, clone->message_id, pool)
    0          
90             }
91              
92 0 0         if (clone->_flags & AMQP_BASIC_TIMESTAMP_FLAG) {
93 0           clone->timestamp = original->timestamp;
94             }
95              
96 0 0         if (clone->_flags & AMQP_BASIC_TYPE_FLAG) {
97 0 0         CLONE_BYTES_POOL(original->type, clone->type, pool)
    0          
98             }
99              
100 0 0         if (clone->_flags & AMQP_BASIC_USER_ID_FLAG) {
101 0 0         CLONE_BYTES_POOL(original->user_id, clone->user_id, pool)
    0          
102             }
103              
104 0 0         if (clone->_flags & AMQP_BASIC_APP_ID_FLAG) {
105 0 0         CLONE_BYTES_POOL(original->app_id, clone->app_id, pool)
    0          
106             }
107              
108 0 0         if (clone->_flags & AMQP_BASIC_CLUSTER_ID_FLAG) {
109 0 0         CLONE_BYTES_POOL(original->cluster_id, clone->cluster_id, pool)
    0          
110             }
111              
112 0           return AMQP_STATUS_OK;
113             #undef CLONE_BYTES_POOL
114             }
115              
116 0           void amqp_destroy_message(amqp_message_t *message) {
117 0           empty_amqp_pool(&message->pool);
118 0           amqp_bytes_free(message->body);
119 0           }
120              
121 0           void amqp_destroy_envelope(amqp_envelope_t *envelope) {
122 0           amqp_destroy_message(&envelope->message);
123 0           amqp_bytes_free(envelope->routing_key);
124 0           amqp_bytes_free(envelope->exchange);
125 0           amqp_bytes_free(envelope->consumer_tag);
126 0           }
127              
128 0           static int amqp_bytes_malloc_dup_failed(amqp_bytes_t bytes) {
129 0 0         if (bytes.len != 0 && bytes.bytes == NULL) {
    0          
130 0           return 1;
131             }
132 0           return 0;
133             }
134              
135 0           amqp_rpc_reply_t amqp_consume_message(amqp_connection_state_t state,
136             amqp_envelope_t *envelope,
137             const struct timeval *timeout,
138             AMQP_UNUSED int flags) {
139             int res;
140             amqp_frame_t frame;
141             amqp_basic_deliver_t *delivery_method;
142             amqp_rpc_reply_t ret;
143              
144 0           memset(&ret, 0, sizeof(ret));
145 0           memset(envelope, 0, sizeof(*envelope));
146              
147 0           res = amqp_simple_wait_frame_noblock(state, &frame, timeout);
148 0 0         if (AMQP_STATUS_OK != res) {
149 0           ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
150 0           ret.library_error = res;
151 0           goto error_out1;
152             }
153              
154 0 0         if (AMQP_FRAME_METHOD != frame.frame_type ||
    0          
155 0           AMQP_BASIC_DELIVER_METHOD != frame.payload.method.id) {
156 0           amqp_put_back_frame(state, &frame);
157 0           ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
158 0           ret.library_error = AMQP_STATUS_UNEXPECTED_STATE;
159 0           goto error_out1;
160             }
161              
162 0           delivery_method = frame.payload.method.decoded;
163              
164 0           envelope->channel = frame.channel;
165 0           envelope->consumer_tag = amqp_bytes_malloc_dup(delivery_method->consumer_tag);
166 0           envelope->delivery_tag = delivery_method->delivery_tag;
167 0           envelope->redelivered = delivery_method->redelivered;
168 0           envelope->exchange = amqp_bytes_malloc_dup(delivery_method->exchange);
169 0           envelope->routing_key = amqp_bytes_malloc_dup(delivery_method->routing_key);
170              
171 0           if (amqp_bytes_malloc_dup_failed(envelope->consumer_tag) ||
172 0 0         amqp_bytes_malloc_dup_failed(envelope->exchange) ||
173 0           amqp_bytes_malloc_dup_failed(envelope->routing_key)) {
174 0           ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
175 0           ret.library_error = AMQP_STATUS_NO_MEMORY;
176 0           goto error_out2;
177             }
178              
179 0           ret = amqp_read_message(state, envelope->channel, &envelope->message, 0);
180 0 0         if (AMQP_RESPONSE_NORMAL != ret.reply_type) {
181 0           goto error_out2;
182             }
183              
184 0           ret.reply_type = AMQP_RESPONSE_NORMAL;
185 0           return ret;
186              
187             error_out2:
188 0           amqp_bytes_free(envelope->routing_key);
189 0           amqp_bytes_free(envelope->exchange);
190 0           amqp_bytes_free(envelope->consumer_tag);
191             error_out1:
192 0           return ret;
193             }
194              
195 0           amqp_rpc_reply_t amqp_read_message(amqp_connection_state_t state,
196             amqp_channel_t channel,
197             amqp_message_t *message,
198             AMQP_UNUSED int flags) {
199             amqp_frame_t frame;
200             amqp_rpc_reply_t ret;
201              
202             size_t body_read;
203             char *body_read_ptr;
204             int res;
205              
206 0           memset(&ret, 0, sizeof(ret));
207 0           memset(message, 0, sizeof(*message));
208              
209 0           res = amqp_simple_wait_frame_on_channel(state, channel, &frame);
210 0 0         if (AMQP_STATUS_OK != res) {
211 0           ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
212 0           ret.library_error = res;
213              
214 0           goto error_out1;
215             }
216              
217 0 0         if (AMQP_FRAME_HEADER != frame.frame_type) {
218 0 0         if (AMQP_FRAME_METHOD == frame.frame_type &&
    0          
219 0 0         (AMQP_CHANNEL_CLOSE_METHOD == frame.payload.method.id ||
220 0           AMQP_CONNECTION_CLOSE_METHOD == frame.payload.method.id)) {
221              
222 0           ret.reply_type = AMQP_RESPONSE_SERVER_EXCEPTION;
223 0           ret.reply = frame.payload.method;
224              
225             } else {
226 0           ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
227 0           ret.library_error = AMQP_STATUS_UNEXPECTED_STATE;
228              
229 0           amqp_put_back_frame(state, &frame);
230             }
231 0           goto error_out1;
232             }
233              
234 0           init_amqp_pool(&message->pool, 4096);
235 0           res = amqp_basic_properties_clone(frame.payload.properties.decoded,
236             &message->properties, &message->pool);
237              
238 0 0         if (AMQP_STATUS_OK != res) {
239 0           ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
240 0           ret.library_error = res;
241 0           goto error_out3;
242             }
243              
244 0 0         if (0 == frame.payload.properties.body_size) {
245 0           message->body = amqp_empty_bytes;
246             } else {
247             if (SIZE_MAX < frame.payload.properties.body_size) {
248             ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
249             ret.library_error = AMQP_STATUS_NO_MEMORY;
250             goto error_out1;
251             }
252 0           message->body =
253 0           amqp_bytes_malloc((size_t)frame.payload.properties.body_size);
254 0 0         if (NULL == message->body.bytes) {
255 0           ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
256 0           ret.library_error = AMQP_STATUS_NO_MEMORY;
257 0           goto error_out1;
258             }
259             }
260              
261 0           body_read = 0;
262 0           body_read_ptr = message->body.bytes;
263              
264 0 0         while (body_read < message->body.len) {
265 0           res = amqp_simple_wait_frame_on_channel(state, channel, &frame);
266 0 0         if (AMQP_STATUS_OK != res) {
267 0           ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
268 0           ret.library_error = res;
269 0           goto error_out2;
270             }
271 0 0         if (AMQP_FRAME_BODY != frame.frame_type) {
272 0 0         if (AMQP_FRAME_METHOD == frame.frame_type &&
    0          
273 0 0         (AMQP_CHANNEL_CLOSE_METHOD == frame.payload.method.id ||
274 0           AMQP_CONNECTION_CLOSE_METHOD == frame.payload.method.id)) {
275              
276 0           ret.reply_type = AMQP_RESPONSE_SERVER_EXCEPTION;
277 0           ret.reply = frame.payload.method;
278             } else {
279 0           ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
280 0           ret.library_error = AMQP_STATUS_BAD_AMQP_DATA;
281             }
282 0           goto error_out2;
283             }
284              
285 0 0         if (body_read + frame.payload.body_fragment.len > message->body.len) {
286 0           ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
287 0           ret.library_error = AMQP_STATUS_BAD_AMQP_DATA;
288 0           goto error_out2;
289             }
290              
291 0           memcpy(body_read_ptr, frame.payload.body_fragment.bytes,
292             frame.payload.body_fragment.len);
293              
294 0           body_read += frame.payload.body_fragment.len;
295 0           body_read_ptr += frame.payload.body_fragment.len;
296             }
297              
298 0           ret.reply_type = AMQP_RESPONSE_NORMAL;
299 0           return ret;
300              
301             error_out2:
302 0           amqp_bytes_free(message->body);
303             error_out3:
304 0           empty_amqp_pool(&message->pool);
305             error_out1:
306 0           return ret;
307             }