File Coverage

RabbitMQ.xs
Criterion Covered Total %
statement 590 773 76.3
branch 393 642 61.2
condition n/a
subroutine n/a
pod n/a
total 983 1415 69.4


line stmt bran cond sub pod time code
1             #include "EXTERN.h"
2             #include "perl.h"
3             #include "XSUB.h"
4             #include
5             #include
6              
7             /* perl -MDevel::PPPort -e'Devel::PPPort::WriteFile();' */
8             /* perl ppport.h --compat-version=5.8.0 --cplusplus RabbitMQ.xs */
9             #define NEED_newSVpvn_flags
10             #include "ppport.h"
11              
12             /* ppport.h knows about MUTABLE_PTR and MUTABLE_SV, but not these?! */
13             #ifndef MUTABLE_AV
14             # define MUTABLE_AV(p) ((AV*)MUTABLE_PTR(p))
15             #endif
16             #ifndef MUTABLE_HV
17             # define MUTABLE_HV(p) ((HV*)MUTABLE_PTR(p))
18             #endif
19              
20             #include "amqp.h"
21             #include "amqp_socket.h"
22             #include "amqp_tcp_socket.h"
23             #include "amqp_ssl_socket.h"
24             /* For struct timeval */
25             #include "amqp_time.h"
26              
27             /* This is for the Math::UInt64 integration */
28             #include "perl_math_int64.h"
29              
30             /* perl Makefile.PL; make CCFLAGS=-DDEBUG */
31             #if DEBUG
32             #define __DEBUG__(X) X
33             extern void dump_table(amqp_table_t table);
34             #else
35             #define __DEBUG__(X) /* NOOP */
36             #endif
37              
38             typedef amqp_connection_state_t Net__AMQP__RabbitMQ;
39              
40             #define AMQP_STATUS_UNKNOWN_TYPE 0x500
41              
42             #ifdef USE_LONG_DOUBLE
43             /* stolen from Cpanel::JSON::XS
44             * so we don't mess up double => long double for perls with -Duselongdouble */
45             #if defined(_AIX) && (!defined(HAS_LONG_DOUBLE) || AIX_WORKAROUND)
46             #define HAVE_NO_POWL
47             #endif
48              
49             #ifdef HAVE_NO_POWL
50             /* Ulisse Monari: this is a patch for AIX 5.3, perl 5.8.8 without HAS_LONG_DOUBLE
51             There Perl_pow maps to pow(...) - NOT TO powl(...), core dumps at Perl_pow(...)
52             Base code is from http://bytes.com/topic/c/answers/748317-replacement-pow-function
53             This is my change to fs_pow that goes into libc/libm for calling fmod/exp/log.
54             NEED TO MODIFY Makefile, after perl Makefile.PL by adding "-lm" onto the LDDLFLAGS line */
55             static double fs_powEx(double x, double y)
56             {
57             double p = 0;
58              
59             if (0 > x && fmod(y, 1) == 0) {
60             if (fmod(y, 2) == 0) {
61             p = exp(log(-x) * y);
62             } else {
63             p = -exp(log(-x) * y);
64             }
65             } else {
66             if (x != 0 || 0 >= y) {
67             p = exp(log( x) * y);
68             }
69             }
70             return p;
71             }
72              
73             /* powf() unfortunately is not accurate enough */
74             const NV DOUBLE_POW = fs_powEx(10., DBL_DIG );
75             #else
76             const NV DOUBLE_POW = Perl_pow(10., DBL_DIG );
77             #endif
78             #endif
79              
80              
81             /* This is a place to put some stuff that we convert from perl,
82             it's transient and we recycle it as soon as it's finished being used
83             which means we keep memory we've used with the aim of reusing it */
84             /* temp_memory_pool is ugly and suffers from code smell */
85             static amqp_pool_t temp_memory_pool;
86              
87             /* Parallels amqp_maybe_release_buffers */
88 181           static void maybe_release_buffers(amqp_connection_state_t state) {
89 181 50         if (amqp_release_buffers_ok(state)) {
90 181           amqp_release_buffers(state);
91 181           recycle_amqp_pool(&temp_memory_pool);
92             }
93 181           }
94              
95             #define int_from_hv(hv,name) \
96             do { SV **v; if(NULL != (v = hv_fetchs(hv, #name, 0))) name = SvIV(*v); } while(0)
97             #define double_from_hv(hv,name) \
98             do { SV **v; if(NULL != (v = hv_fetchs(hv, #name, 0))) name = SvNV(*v); } while(0)
99             #define str_from_hv(hv,name) \
100             do { SV **v; if(NULL != (v = hv_fetchs(hv, #name, 0))) name = SvPV_nolen(*v); } while(0)
101             #define has_valid_connection(conn) \
102             ( amqp_get_socket( conn ) != NULL && amqp_get_sockfd( conn ) > -1 )
103             #define assert_amqp_connected(conn) \
104             do { \
105             if ( ! has_valid_connection(conn) ) { \
106             Perl_croak(aTHX_ "AMQP socket not connected"); \
107             } \
108             } while(0)
109              
110             void hash_to_amqp_table(HV *hash, amqp_table_t *table, short force_utf8);
111             void array_to_amqp_array(AV *perl_array, amqp_array_t *mq_array, short force_utf8);
112             SV* mq_array_to_arrayref(amqp_array_t *array);
113             SV* mq_table_to_hashref(amqp_table_t *table);
114              
115 47           void die_on_error(pTHX_ int x, amqp_connection_state_t conn, char const *context) {
116             /* Handle socket errors */
117 47 50         if ( x == AMQP_STATUS_CONNECTION_CLOSED || x == AMQP_STATUS_SOCKET_ERROR ) {
    50          
118 0           amqp_socket_close( amqp_get_socket( conn ), AMQP_SC_FORCE );
119 0           Perl_croak(aTHX_ "%s failed because AMQP socket connection was closed.", context);
120             }
121             /* Handle everything else */
122 47 100         else if (x < 0) {
123 1           Perl_croak(aTHX_ "%s: %s\n", context, amqp_error_string2(x));
124             }
125 46           }
126              
127 274           void die_on_amqp_error(pTHX_ amqp_rpc_reply_t x, amqp_connection_state_t conn, char const *context) {
128 274           switch (x.reply_type) {
129             case AMQP_RESPONSE_NORMAL:
130 269           return;
131              
132             case AMQP_RESPONSE_NONE:
133 0           Perl_croak(aTHX_ "%s: missing RPC reply type!", context);
134             break;
135              
136             case AMQP_RESPONSE_LIBRARY_EXCEPTION:
137             /* If we got a library error saying that there's a socket problem,
138             kill the connection and croak. */
139 0 0         if (
140 0           x.library_error == AMQP_STATUS_CONNECTION_CLOSED
141 0 0         ||
142 0           x.library_error == AMQP_STATUS_SOCKET_ERROR
143             ) {
144 0           amqp_socket_close( amqp_get_socket( conn ), AMQP_SC_FORCE );
145 0           Perl_croak(aTHX_ "%s: failed since AMQP socket connection closed.\n", context);
146             }
147             /* Otherwise, give a more generic croak. */
148             else {
149 0 0         Perl_croak(aTHX_ "%s: %s\n", context,
150 0           (!x.library_error) ? "(end-of-stream)" :
151 0 0         (x.library_error == AMQP_STATUS_UNKNOWN_TYPE) ? "unknown AMQP type id" :
152 0           amqp_error_string2(x.library_error));
153             }
154             break;
155              
156             case AMQP_RESPONSE_SERVER_EXCEPTION:
157 5           switch (x.reply.id) {
158             case AMQP_CONNECTION_CLOSE_METHOD:
159             {
160             amqp_connection_close_ok_t req;
161 0           req.dummy = '\0';
162 0           /* res = */ amqp_send_method(conn, 0, AMQP_CONNECTION_CLOSE_OK_METHOD, &req);
163             }
164 0           amqp_set_socket(conn, NULL);
165             {
166 0           amqp_connection_close_t *m = (amqp_connection_close_t *) x.reply.decoded;
167 0           Perl_croak(aTHX_ "%s: server connection error %d, message: %.*s",
168             context,
169 0           m->reply_code,
170 0           (int) m->reply_text.len, (char *) m->reply_text.bytes);
171             }
172             break;
173              
174             case AMQP_CHANNEL_CLOSE_METHOD:
175             /* We don't know what channel provoked this error!
176             This information should be in amqp_rpc_reply_t, but it isn't.
177             {
178             amqp_channel_close_ok_t req;
179             req.dummy = '\0';
180             / * res = * / amqp_send_method(conn, channel, AMQP_CHANNEL_CLOSE_OK_METHOD, &req);
181             }
182             */
183             /* Only the channel should be invalidated, but we have no means of doing so! */
184             /* Even if we knew which channel we needed to invalidate! */
185 5           amqp_set_socket(conn, NULL);
186             {
187 5           amqp_channel_close_t *m = (amqp_channel_close_t *) x.reply.decoded;
188 5           Perl_croak(aTHX_ "%s: server channel error %d, message: %.*s",
189             context,
190 5           m->reply_code,
191 5           (int) m->reply_text.len, (char *) m->reply_text.bytes);
192             }
193             break;
194              
195             default:
196 0           Perl_croak(aTHX_ "%s: unknown server error, method id 0x%08X", context, x.reply.id);
197             break;
198             }
199             break;
200             }
201             }
202              
203             /*
204             * amqp_kind_for_sv(SV**)
205             * Note: We could handle more types here... but we're trying to take Perl and go to
206             * C. We don't really need to handle much more than this from what I can tell.
207             */
208 170           amqp_field_value_kind_t amqp_kind_for_sv(SV** perl_value, short force_utf8) {
209              
210 170           switch (SvTYPE( *perl_value ))
211             {
212             // Integer types (and references beyond 5.10)
213             case SVt_IV:
214             // References
215 133 100         if ( SvROK( *perl_value ) ) {
216             // Array Reference
217 10 100         if ( SvTYPE( SvRV( *perl_value ) ) == SVt_PVAV ) {
218 5           return AMQP_FIELD_KIND_ARRAY;
219             }
220              
221             // Hash Reference
222 5 50         if ( SvTYPE( SvRV( *perl_value ) ) == SVt_PVHV ) {
223 5           return AMQP_FIELD_KIND_TABLE;
224             }
225 0           Perl_croak(
226             aTHX_ "Unsupported Perl Reference Type: %d",
227 0           SvTYPE( SvRV( *perl_value ) )
228             );
229             }
230              
231             // Regular integers
232             // In the event that it could be unsigned
233 123 50         if ( SvUOK( *perl_value ) ) {
234 0           return AMQP_FIELD_KIND_U64;
235             }
236 123           return AMQP_FIELD_KIND_I64;
237              
238             // Numeric type
239             case SVt_NV:
240 1           return AMQP_FIELD_KIND_F64;
241              
242             // String (handle types which are upgraded to handle IV/UV/NV as well as PV)
243             case SVt_PVIV:
244 0 0         if ( SvI64OK( *perl_value ) ) {
245 0           return AMQP_FIELD_KIND_I64;
246             }
247 0 0         if ( SvU64OK( *perl_value ) ) {
248 0           return AMQP_FIELD_KIND_U64;
249             }
250             // It could be a PV or an IV/UV!
251 0 0         if ( SvIOK( *perl_value ) ) {
252 0 0         if ( SvUOK( *perl_value ) ) {
253 0           return AMQP_FIELD_KIND_U64;
254             }
255 0           return AMQP_FIELD_KIND_I64;
256             }
257              
258             case SVt_PVNV:
259             // It could be a PV or an NV
260 0 0         if ( SvNOK( *perl_value ) ) {
261 0           return AMQP_FIELD_KIND_F64;
262             }
263              
264             case SVt_PV:
265             // UTF-8?
266 33 100         if ( force_utf8 || SvUTF8( *perl_value ) ) {
    100          
267 9           return AMQP_FIELD_KIND_UTF8;
268             }
269 24           return AMQP_FIELD_KIND_BYTES;
270              
271             case SVt_PVMG:
272 3 100         if ( SvPOK( *perl_value ) || SvPOKp( *perl_value ) ) {
    50          
273 1 50         if ( force_utf8 || SvUTF8( *perl_value ) ) {
    50          
274 0           return AMQP_FIELD_KIND_UTF8;
275             }
276 1           return AMQP_FIELD_KIND_BYTES;
277             }
278 2 100         if ( SvIOK( *perl_value ) || SvIOKp( *perl_value ) ) {
    50          
279 1 50         if ( SvUOK( *perl_value ) ) {
280 0           return AMQP_FIELD_KIND_U64;
281             }
282 1           return AMQP_FIELD_KIND_I64;
283             }
284 1 50         if ( SvNOK( *perl_value ) || SvNOKp( *perl_value ) ) {
    0          
285 1           return AMQP_FIELD_KIND_F64;
286             }
287              
288             default:
289 0 0         if ( SvROK( *perl_value ) ) {
290             // Array Reference
291 0 0         if ( SvTYPE( SvRV( *perl_value ) ) == SVt_PVAV ) {
292 0           return AMQP_FIELD_KIND_ARRAY;
293             }
294              
295             // Hash Reference
296 0 0         if ( SvTYPE( SvRV( *perl_value ) ) == SVt_PVHV ) {
297 0           return AMQP_FIELD_KIND_TABLE;
298             }
299 0           Perl_croak(
300             aTHX_ "Unsupported Perl Reference Type: %d",
301 0           SvTYPE( SvRV( *perl_value ) )
302             );
303             }
304              
305 0 0         Perl_croak(
306             aTHX_ "Unsupported scalar type detected >%s<(%d)",
307 0           SvPV_nolen(*perl_value),
308 0           SvTYPE( *perl_value )
309             );
310             }
311              
312             /* If we're still here... wtf */
313             Perl_croak( aTHX_ "The wheels have fallen off. Please call for help." );
314             }
315              
316             /* Parallels amqp_read_message */
317 25           static amqp_rpc_reply_t read_message(amqp_connection_state_t state, amqp_channel_t channel, SV **props_sv_ptr, SV **body_sv_ptr) {
318             HV *props_hv;
319             SV *body_sv;
320             amqp_rpc_reply_t ret;
321             int res;
322             amqp_frame_t frame;
323 25           int is_utf8_body = 1; /* The body is UTF-8 by default */
324              
325 25           memset(&ret, 0, sizeof(amqp_rpc_reply_t));
326              
327 25           res = amqp_simple_wait_frame_on_channel(state, channel, &frame);
328 25 50         if (AMQP_STATUS_OK != res) {
329 0           ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
330 0           ret.library_error = res;
331 0           goto error_out1;
332             }
333              
334 25 50         if (AMQP_FRAME_HEADER != frame.frame_type) {
335 0 0         if (AMQP_FRAME_METHOD == frame.frame_type &&
    0          
336 0 0         (AMQP_CHANNEL_CLOSE_METHOD == frame.payload.method.id ||
337 0           AMQP_CONNECTION_CLOSE_METHOD == frame.payload.method.id)) {
338              
339 0           ret.reply_type = AMQP_RESPONSE_SERVER_EXCEPTION;
340 0           ret.reply = frame.payload.method;
341              
342             } else {
343 0           ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
344 0           ret.library_error = AMQP_STATUS_UNEXPECTED_STATE;
345              
346 0           amqp_put_back_frame(state, &frame);
347             }
348              
349 0           goto error_out1;
350             }
351              
352             {
353             amqp_basic_properties_t *p;
354              
355 25           props_hv = newHV();
356              
357 25           p = (amqp_basic_properties_t *) frame.payload.properties.decoded;
358 25 100         if (p->_flags & AMQP_BASIC_CONTENT_TYPE_FLAG) {
359 4           hv_stores(props_hv, "content_type", newSVpvn(p->content_type.bytes, p->content_type.len));
360             }
361 25 100         if (p->_flags & AMQP_BASIC_CONTENT_ENCODING_FLAG) {
362 7           hv_stores(props_hv, "content_encoding", newSVpvn(p->content_encoding.bytes, p->content_encoding.len));
363              
364             /*
365             * Since we could have UTF-8 in our content-encoding, and most people seem like they
366             * treat this like the default, we're looking for the presence of content-encoding but
367             * the absence of a case-insensitive "UTF-8".
368             */
369 7 50         if (
370 7           strnlen(p->content_encoding.bytes, p->content_encoding.len) > 0
371 7 50         &&
372 7           (strncasecmp(p->content_encoding.bytes, "UTF-8", p->content_encoding.len) != 0)
373             ) {
374 7           is_utf8_body = 0;
375             }
376             }
377 25 100         if (p->_flags & AMQP_BASIC_CORRELATION_ID_FLAG) {
378 4           hv_stores(props_hv, "correlation_id", newSVpvn(p->correlation_id.bytes, p->correlation_id.len));
379             }
380 25 100         if (p->_flags & AMQP_BASIC_REPLY_TO_FLAG) {
381 4           hv_stores(props_hv, "reply_to", newSVpvn(p->reply_to.bytes, p->reply_to.len));
382             }
383 25 100         if (p->_flags & AMQP_BASIC_EXPIRATION_FLAG) {
384 4           hv_stores(props_hv, "expiration", newSVpvn(p->expiration.bytes, p->expiration.len));
385             }
386 25 100         if (p->_flags & AMQP_BASIC_MESSAGE_ID_FLAG) {
387 4           hv_stores(props_hv, "message_id", newSVpvn(p->message_id.bytes, p->message_id.len));
388             }
389 25 100         if (p->_flags & AMQP_BASIC_TYPE_FLAG) {
390 4           hv_stores(props_hv, "type", newSVpvn(p->type.bytes, p->type.len));
391             }
392 25 100         if (p->_flags & AMQP_BASIC_USER_ID_FLAG) {
393 4           hv_stores(props_hv, "user_id", newSVpvn(p->user_id.bytes, p->user_id.len));
394             }
395 25 100         if (p->_flags & AMQP_BASIC_APP_ID_FLAG) {
396 4           hv_stores(props_hv, "app_id", newSVpvn(p->app_id.bytes, p->app_id.len));
397             }
398 25 100         if (p->_flags & AMQP_BASIC_DELIVERY_MODE_FLAG) {
399 4           hv_stores(props_hv, "delivery_mode", newSViv(p->delivery_mode));
400             }
401 25 100         if (p->_flags & AMQP_BASIC_PRIORITY_FLAG) {
402 4           hv_stores(props_hv, "priority", newSViv(p->priority));
403             }
404 25 100         if (p->_flags & AMQP_BASIC_TIMESTAMP_FLAG) {
405 4           hv_stores(props_hv, "timestamp", newSViv(p->timestamp));
406             }
407 25 100         if (p->_flags & AMQP_BASIC_HEADERS_FLAG) {
408             int i;
409 9           HV *headers = newHV();
410 9           hv_stores(props_hv, "headers", newRV_noinc(MUTABLE_SV(headers)));
411              
412             __DEBUG__( dump_table( p->headers ) );
413              
414 40 100         for( i=0; i < p->headers.num_entries; ++i ) {
415 31           amqp_table_entry_t *header_entry = &(p->headers.entries[i]);
416              
417             __DEBUG__(
418             fprintf(stderr,
419             "~~~ Length: %ld/%d, Key: %.*s, Kind: %c\n",
420             header_entry->key.len,
421             (int)header_entry->key.len,
422             (int)header_entry->key.len,
423             (char*)header_entry->key.bytes,
424             header_entry->value.kind
425             )
426             );
427              
428 31           switch (header_entry->value.kind) {
429             case AMQP_FIELD_KIND_BOOLEAN:
430 0           hv_store( headers,
431             header_entry->key.bytes, header_entry->key.len,
432             newSViv(header_entry->value.value.boolean),
433             0
434             );
435 0           break;
436              
437             // Integer types
438             case AMQP_FIELD_KIND_I8:
439 0           hv_store( headers,
440             header_entry->key.bytes, header_entry->key.len,
441             newSViv(header_entry->value.value.i8),
442             0
443             );
444 0           break;
445              
446             case AMQP_FIELD_KIND_I16:
447 0           hv_store( headers,
448             header_entry->key.bytes, header_entry->key.len,
449             newSViv(header_entry->value.value.i16),
450             0
451             );
452 0           break;
453              
454             case AMQP_FIELD_KIND_I32:
455 0           hv_store( headers,
456             header_entry->key.bytes, header_entry->key.len,
457             newSViv(header_entry->value.value.i32),
458             0
459             );
460 0           break;
461              
462             case AMQP_FIELD_KIND_I64:
463 14           hv_store( headers,
464             header_entry->key.bytes, header_entry->key.len,
465             newSVi64(header_entry->value.value.i64),
466             0
467             );
468 14           break;
469              
470             case AMQP_FIELD_KIND_U8:
471 0           hv_store( headers,
472             header_entry->key.bytes, header_entry->key.len,
473             newSVuv(header_entry->value.value.u8),
474             0
475             );
476 0           break;
477              
478             case AMQP_FIELD_KIND_U16:
479 0           hv_store( headers,
480             header_entry->key.bytes, header_entry->key.len,
481             newSVuv(header_entry->value.value.u16),
482             0
483             );
484 0           break;
485              
486             case AMQP_FIELD_KIND_U32:
487 0           hv_store( headers,
488             header_entry->key.bytes, header_entry->key.len,
489             newSVuv(header_entry->value.value.u32),
490             0
491             );
492 0           break;
493              
494             case AMQP_FIELD_KIND_U64:
495 0           hv_store( headers,
496             header_entry->key.bytes, header_entry->key.len,
497             newSVu64(header_entry->value.value.u64),
498             0
499             );
500 0           break;
501              
502             // Floating point precision
503             case AMQP_FIELD_KIND_F32:
504 0           hv_store( headers,
505             header_entry->key.bytes, header_entry->key.len,
506             newSVnv(header_entry->value.value.f32),
507             0
508             );
509 0           break;
510              
511             case AMQP_FIELD_KIND_F64:
512             // TODO: I don't think this is a natively supported type on all Perls.
513              
514 2           hv_store( headers,
515             header_entry->key.bytes, header_entry->key.len,
516             #ifdef USE_LONG_DOUBLE
517             /* amqp uses doubles, if perl is -Duselongdouble it messes up the precision
518             * so we always want take the max precision from a double and discard the rest
519             * because it can't be any more precise than a double */
520             newSVnv( ( rint( header_entry->value.value.f64 * DOUBLE_POW ) / DOUBLE_POW ) ),
521             #else
522             /* both of these are doubles so it's ok */
523             newSVnv( header_entry->value.value.f64 ),
524             #endif
525             0
526             );
527 2           break;
528              
529             // Handle kind UTF8 and kind BYTES
530             case AMQP_FIELD_KIND_UTF8:
531             case AMQP_FIELD_KIND_BYTES:
532 10 100         hv_store( headers,
533             header_entry->key.bytes, header_entry->key.len,
534             newSVpvn_utf8(
535             header_entry->value.value.bytes.bytes,
536             header_entry->value.value.bytes.len,
537             AMQP_FIELD_KIND_UTF8 == header_entry->value.kind
538             ),
539             0
540             );
541 10           break;
542              
543             // Handle arrays
544             case AMQP_FIELD_KIND_ARRAY:
545             __DEBUG__(
546             fprintf(stderr, "ARRAY KIND FOR KEY:>%.*s< KIND:>%c< AMQP_FIELD_KIND_ARRAY:[%c].\n",
547             (int)header_entry->key.len,
548             (char*)header_entry->key.bytes,
549             header_entry->value.kind,
550             AMQP_FIELD_KIND_ARRAY
551             )
552             );
553 3           hv_store( headers,
554             header_entry->key.bytes, header_entry->key.len,
555             mq_array_to_arrayref( &header_entry->value.value.array ),
556             0
557             );
558 3           break;
559              
560             // Handle tables (hashes when translated to Perl)
561             case AMQP_FIELD_KIND_TABLE:
562 2           hv_store( headers,
563             header_entry->key.bytes, header_entry->key.len,
564             mq_table_to_hashref( &header_entry->value.value.table ),
565             0
566             );
567 2           break;
568              
569             default:
570 0           ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
571 0           ret.library_error = AMQP_STATUS_UNKNOWN_TYPE;
572 0           goto error_out2;
573             }
574             }
575             }
576             }
577              
578             {
579             char *body;
580 25           size_t body_target = frame.payload.properties.body_size;
581 25           size_t body_remaining = body_target;
582              
583 25           body_sv = newSV(0);
584 25           sv_grow(body_sv, body_target + 1);
585 25           SvCUR_set(body_sv, body_target);
586 25           SvPOK_on(body_sv);
587 25 100         if (is_utf8_body)
588 18           SvUTF8_on(body_sv);
589              
590 25           body = SvPVX(body_sv);
591              
592 51 100         while (body_remaining > 0) {
593             size_t fragment_len;
594              
595 26           res = amqp_simple_wait_frame_on_channel(state, channel, &frame);
596 26 50         if (AMQP_STATUS_OK != res) {
597 0           ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
598 0           ret.library_error = res;
599 0           goto error_out3;
600             }
601              
602 26 50         if (AMQP_FRAME_BODY != frame.frame_type) {
603 0 0         if (AMQP_FRAME_METHOD == frame.frame_type &&
    0          
604 0 0         (AMQP_CHANNEL_CLOSE_METHOD == frame.payload.method.id ||
605 0           AMQP_CONNECTION_CLOSE_METHOD == frame.payload.method.id)) {
606              
607 0           ret.reply_type = AMQP_RESPONSE_SERVER_EXCEPTION;
608 0           ret.reply = frame.payload.method;
609             } else {
610 0           ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
611 0           ret.library_error = AMQP_STATUS_BAD_AMQP_DATA;
612             }
613 0           goto error_out3;
614             }
615              
616 26           fragment_len = frame.payload.body_fragment.len;
617 26 50         if (fragment_len > body_remaining) {
618 0           ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
619 0           ret.library_error = AMQP_STATUS_BAD_AMQP_DATA;
620 0           goto error_out3;
621             }
622              
623 26           memcpy(body, frame.payload.body_fragment.bytes, fragment_len);
624 26           body += fragment_len;
625 26           body_remaining -= fragment_len;
626             }
627              
628 25           *body = '\0';
629             }
630              
631 25           *props_sv_ptr = newRV_noinc(MUTABLE_SV(props_hv));
632 25           *body_sv_ptr = body_sv;
633 25           ret.reply_type = AMQP_RESPONSE_NORMAL;
634 25           return ret;
635              
636             error_out3:
637 0           SvREFCNT_dec(props_hv);
638             error_out2:
639 0           SvREFCNT_dec(body_sv);
640             error_out1:
641 0           *props_sv_ptr = &PL_sv_undef;
642 0           *body_sv_ptr = &PL_sv_undef;
643 25           return ret;
644             }
645              
646             /* Parallels amqp_consume_message */
647 26           static amqp_rpc_reply_t consume_message(amqp_connection_state_t state, SV **envelope_sv_ptr, struct timeval *timeout) {
648             amqp_rpc_reply_t ret;
649             HV *envelope_hv;
650             int res;
651             amqp_frame_t frame;
652             amqp_channel_t channel;
653             SV *props;
654             SV *body;
655              
656 26           memset(&ret, 0, sizeof(amqp_rpc_reply_t));
657 26           *envelope_sv_ptr = &PL_sv_undef;
658              
659 26           res = amqp_simple_wait_frame_noblock(state, &frame, timeout);
660 26 100         if (AMQP_STATUS_OK != res) {
661 4           ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
662 4           ret.library_error = res;
663 4           goto error_out1;
664             }
665              
666 22 50         if (AMQP_FRAME_METHOD != frame.frame_type ||
    100          
667 22           AMQP_BASIC_DELIVER_METHOD != frame.payload.method.id) {
668              
669 1 50         if (AMQP_FRAME_METHOD == frame.frame_type &&
    50          
670 0 0         (AMQP_CHANNEL_CLOSE_METHOD == frame.payload.method.id ||
671 0           AMQP_CONNECTION_CLOSE_METHOD == frame.payload.method.id)) {
672              
673 1           ret.reply_type = AMQP_RESPONSE_SERVER_EXCEPTION;
674 1           ret.reply = frame.payload.method;
675             } else {
676 0           amqp_put_back_frame(state, &frame);
677 0           ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
678 0           ret.library_error = AMQP_STATUS_UNEXPECTED_STATE;
679             }
680              
681 1           goto error_out1;
682             }
683              
684 21           channel = frame.channel;
685              
686 21           envelope_hv = newHV();
687              
688             {
689 21           amqp_basic_deliver_t *d = (amqp_basic_deliver_t *) frame.payload.method.decoded;
690 21           hv_stores(envelope_hv, "channel", newSViv(channel));
691 21           hv_stores(envelope_hv, "delivery_tag", newSVu64(d->delivery_tag));
692 21           hv_stores(envelope_hv, "redelivered", newSViv(d->redelivered));
693 21           hv_stores(envelope_hv, "exchange", newSVpvn(d->exchange.bytes, d->exchange.len));
694 21           hv_stores(envelope_hv, "consumer_tag", newSVpvn(d->consumer_tag.bytes, d->consumer_tag.len));
695 21           hv_stores(envelope_hv, "routing_key", newSVpvn(d->routing_key.bytes, d->routing_key.len));
696             }
697              
698 21           ret = read_message(state, channel, &props, &body );
699 21 50         if (AMQP_RESPONSE_NORMAL != ret.reply_type)
700 0           goto error_out2;
701              
702 21           hv_stores(envelope_hv, "props", props);
703 21           hv_stores(envelope_hv, "body", body);
704              
705 21           *envelope_sv_ptr = newRV_noinc(MUTABLE_SV(envelope_hv));
706 21           ret.reply_type = AMQP_RESPONSE_NORMAL;
707 21           return ret;
708              
709             error_out2:
710 0           SvREFCNT_dec(envelope_hv);
711             error_out1:
712 5           *envelope_sv_ptr = &PL_sv_undef;
713 26           return ret;
714             }
715              
716 5           void array_to_amqp_array(AV *perl_array, amqp_array_t *mq_array, short force_utf8) {
717 5           int idx = 0;
718             SV **value;
719              
720 5           amqp_field_value_t *new_elements = amqp_pool_alloc(
721             &temp_memory_pool,
722 5           ((av_len(perl_array)+1) * sizeof(amqp_field_value_t))
723             );
724             amqp_field_value_t *element;
725              
726 5           mq_array->entries = new_elements;
727 5           mq_array->num_entries = 0;
728              
729 119 100         for ( idx = 0; idx <= av_len(perl_array); idx += 1) {
730 114           value = av_fetch( perl_array, idx, 0 );
731              
732             // We really should never see NULL here.
733             assert(value != NULL);
734              
735             // Let's start getting the type...
736 114           element = &mq_array->entries[mq_array->num_entries];
737 114           mq_array->num_entries += 1;
738 114           element->kind = amqp_kind_for_sv(value, force_utf8);
739              
740             __DEBUG__( warn("%d KIND >%c<", __LINE__, (unsigned char)element->kind) );
741              
742 114           switch (element->kind) {
743              
744             case AMQP_FIELD_KIND_I64:
745 101           element->value.i64 = (int64_t) SvI64(*value);
746 101           break;
747              
748             case AMQP_FIELD_KIND_U64:
749 0           element->value.u64 = (uint64_t) SvU64(*value);
750 0           break;
751              
752             case AMQP_FIELD_KIND_F64:
753             // TODO: I don't think this is a native type on all Perls
754 0 0         element->value.f64 = (double) SvNV(*value);
755 0           break;
756              
757             case AMQP_FIELD_KIND_UTF8:
758             case AMQP_FIELD_KIND_BYTES:
759 11 50         element->value.bytes = amqp_cstring_bytes(SvPV_nolen(*value));
760 11           break;
761              
762             case AMQP_FIELD_KIND_ARRAY:
763 0           array_to_amqp_array(MUTABLE_AV(SvRV(*value)), &(element->value.array), force_utf8);
764 0           break;
765              
766             case AMQP_FIELD_KIND_TABLE:
767 2           hash_to_amqp_table(MUTABLE_HV(SvRV(*value)), &(element->value.table), force_utf8);
768 2           break;
769              
770             default:
771 0           Perl_croak( aTHX_ "Unsupported SvType for array index %d", idx );
772             }
773             }
774 5           }
775              
776             // Iterate over the array entries and decode them to Perl...
777 5           SV* mq_array_to_arrayref(amqp_array_t *mq_array) {
778 5           AV* perl_array = newAV();
779              
780 5           SV* perl_element = &PL_sv_undef;
781             amqp_field_value_t* mq_element;
782              
783 5           int current_entry = 0;
784              
785 119 100         for (; current_entry < mq_array->num_entries; current_entry += 1) {
786 114           mq_element = &mq_array->entries[current_entry];
787              
788             __DEBUG__( warn("%d KIND >%c<", __LINE__, mq_element->kind) );
789              
790 114           switch (mq_element->kind) {
791             // Boolean
792             case AMQP_FIELD_KIND_BOOLEAN:
793 0           perl_element = newSViv(mq_element->value.boolean);
794 0           break;
795              
796             // Signed values
797             case AMQP_FIELD_KIND_I8:
798 0           perl_element = newSViv(mq_element->value.i8);
799 0           break;
800             case AMQP_FIELD_KIND_I16:
801 0           perl_element = newSViv(mq_element->value.i16);
802 0           break;
803             case AMQP_FIELD_KIND_I32:
804 0           perl_element = newSViv(mq_element->value.i32);
805 0           break;
806             case AMQP_FIELD_KIND_I64:
807 101           perl_element = newSVi64(mq_element->value.i64);
808 101           break;
809              
810             // Unsigned values
811             case AMQP_FIELD_KIND_U8:
812 0           perl_element = newSViv(mq_element->value.u8);
813 0           break;
814             case AMQP_FIELD_KIND_U16:
815 0           perl_element = newSViv(mq_element->value.u16);
816 0           break;
817             case AMQP_FIELD_KIND_U32:
818 0           perl_element = newSVuv(mq_element->value.u32);
819 0           break;
820             case AMQP_FIELD_KIND_TIMESTAMP: /* Timestamps */
821             case AMQP_FIELD_KIND_U64:
822 0           perl_element = newSVu64(mq_element->value.u64);
823 0           break;
824              
825             // Floats
826             case AMQP_FIELD_KIND_F32:
827 0           perl_element = newSVnv(mq_element->value.f32);
828 0           break;
829             case AMQP_FIELD_KIND_F64:
830             // TODO: I don't think this is a native type on all Perls
831 0           perl_element = newSVnv(mq_element->value.f64);
832 0           break;
833              
834             // Strings and bytes
835             case AMQP_FIELD_KIND_BYTES:
836 11           perl_element = newSVpvn(
837             mq_element->value.bytes.bytes,
838             mq_element->value.bytes.len
839             );
840 11           break;
841              
842             // UTF-8 strings
843             case AMQP_FIELD_KIND_UTF8:
844 0           perl_element = newSVpvn(
845             mq_element->value.bytes.bytes,
846             mq_element->value.bytes.len
847             );
848 0           SvUTF8_on(perl_element); // It's UTF-8!
849 0           break;
850              
851             // Arrays
852             case AMQP_FIELD_KIND_ARRAY:
853 0           perl_element = mq_array_to_arrayref(&(mq_element->value.array));
854 0           break;
855              
856             // Tables
857             case AMQP_FIELD_KIND_TABLE:
858 2           perl_element = mq_table_to_hashref(&(mq_element->value.table));
859 2           break;
860              
861             // WTF
862             default:
863             // ACK!
864 0           Perl_croak(
865             aTHX_ "Unsupported Perl type >%c< at index %d",
866 0           (unsigned char)mq_element->kind,
867             current_entry
868             );
869             }
870              
871 114           av_push(perl_array, perl_element);
872             }
873              
874 5           return newRV_noinc(MUTABLE_SV(perl_array));
875             }
876              
877 11           SV* mq_table_to_hashref( amqp_table_t *mq_table ) {
878             // Iterate over the table keys and decode them to Perl...
879             int i;
880             SV *perl_element;
881 11           HV *perl_hash = newHV();
882 11           amqp_table_entry_t *hash_entry = (amqp_table_entry_t*)NULL;
883              
884 57 100         for( i=0; i < mq_table->num_entries; i += 1 ) {
885 46           hash_entry = &(mq_table->entries[i]);
886             __DEBUG__(
887             fprintf(
888             stderr,
889             "!!! Key: >%.*s< Kind: >%c<\n",
890             (int)hash_entry->key.len,
891             (char*)hash_entry->key.bytes,
892             hash_entry->value.kind
893             );
894             );
895              
896 46           switch (hash_entry->value.kind) {
897             // Boolean
898             case AMQP_FIELD_KIND_BOOLEAN:
899 13           perl_element = newSViv(hash_entry->value.value.boolean);
900 13           break;
901              
902             // Integers
903             case AMQP_FIELD_KIND_I8:
904 0           perl_element = newSViv(hash_entry->value.value.i8);
905 0           break;
906             case AMQP_FIELD_KIND_I16:
907 0           perl_element = newSViv(hash_entry->value.value.i16);
908 0           break;
909             case AMQP_FIELD_KIND_I32:
910 0           perl_element = newSViv(hash_entry->value.value.i32);
911 0           break;
912             case AMQP_FIELD_KIND_I64:
913 7           perl_element = newSVi64(hash_entry->value.value.i64);
914 7           break;
915             case AMQP_FIELD_KIND_U8:
916 0           perl_element = newSViv(hash_entry->value.value.u8);
917 0           break;
918             case AMQP_FIELD_KIND_U16:
919 0           perl_element = newSViv(hash_entry->value.value.u16);
920 0           break;
921             case AMQP_FIELD_KIND_U32:
922 0           perl_element = newSVuv(hash_entry->value.value.u32);
923 0           break;
924             case AMQP_FIELD_KIND_TIMESTAMP: /* Timestamps */
925             case AMQP_FIELD_KIND_U64:
926 0           perl_element = newSVu64(hash_entry->value.value.u64);
927 0           break;
928              
929             // Foats
930             case AMQP_FIELD_KIND_F32:
931 0           perl_element = newSVnv(hash_entry->value.value.f32);
932 0           break;
933             case AMQP_FIELD_KIND_F64:
934             // TODO: I don't think this is a native type on all Perls.
935 0           perl_element = newSVnv(hash_entry->value.value.f64);
936 0           break;
937              
938             case AMQP_FIELD_KIND_BYTES:
939 3           perl_element = newSVpvn(
940             hash_entry->value.value.bytes.bytes,
941             hash_entry->value.value.bytes.len
942             );
943 3           break;
944              
945             case AMQP_FIELD_KIND_UTF8:
946 17           perl_element = newSVpvn(
947             hash_entry->value.value.bytes.bytes,
948             hash_entry->value.value.bytes.len
949             );
950 17           SvUTF8_on(perl_element); // It's UTF-8!
951 17           break;
952              
953             case AMQP_FIELD_KIND_ARRAY:
954 2           perl_element = mq_array_to_arrayref(&(hash_entry->value.value.array));
955 2           break;
956              
957             case AMQP_FIELD_KIND_TABLE:
958 4           perl_element = mq_table_to_hashref(&(hash_entry->value.value.table));
959 4           break;
960              
961             default:
962             // ACK!
963 0           Perl_croak(
964             aTHX_ "Unsupported Perl type >%c< at index %d",
965 0           (unsigned char)hash_entry->value.kind,
966             i
967             );
968             }
969              
970             // Stash this in our hash.
971 46           hv_store(
972             perl_hash,
973             hash_entry->key.bytes, hash_entry->key.len,
974             perl_element,
975             0
976             );
977              
978             }
979              
980 11           return newRV_noinc(MUTABLE_SV(perl_hash));
981             }
982              
983 23           void hash_to_amqp_table(HV *hash, amqp_table_t *table, short force_utf8) {
984             HE *he;
985             char *key;
986             SV *value;
987             I32 retlen;
988             amqp_table_entry_t *entry;
989              
990 23 50         amqp_table_entry_t *new_entries = amqp_pool_alloc( &temp_memory_pool, HvKEYS(hash) * sizeof(amqp_table_entry_t) );
991 23           table->entries = new_entries;
992              
993 23           hv_iterinit(hash);
994 79 100         while (NULL != (he = hv_iternext(hash))) {
995 56           key = hv_iterkey(he, &retlen);
996             __DEBUG__( warn("Key: %s\n", key) );
997 56           value = hv_iterval(hash, he);
998              
999 56 50         if (SvGMAGICAL(value)) {
1000 0           mg_get(value);
1001             }
1002              
1003 56           entry = &table->entries[table->num_entries];
1004 56           entry->key = amqp_cstring_bytes( key );
1005              
1006             // Reserved headers, per spec must force UTF-8 for strings.
1007             // Other headers aren't necessarily required to do so.
1008 56 50         if (
1009             // "x-*" exchanges
1010             (
1011 56           strlen(key) > 2
1012 56 100         &&
1013 56           key[0] == 'x'
1014 5 50         &&
1015 5           key[1] == '-'
1016             )
1017             ) {
1018 5           entry->value.kind = amqp_kind_for_sv( &value, 1 );
1019             }
1020             else {
1021 51           entry->value.kind = amqp_kind_for_sv( &value, force_utf8 );
1022             }
1023              
1024              
1025             __DEBUG__(
1026             warn("hash_to_amqp_table()");
1027             warn("%s", SvPV_nolen(value) );
1028             fprintf(
1029             stderr,
1030             "Key: >%.*s< Kind: >%c<\n",
1031             (int)entry->key.len,
1032             (char*)entry->key.bytes,
1033             entry->value.kind
1034             );
1035             );
1036              
1037 56           switch ( entry->value.kind ) {
1038             case AMQP_FIELD_KIND_I64:
1039 23           entry->value.value.i64 = (int64_t) SvI64( value );
1040 23           break;
1041              
1042             case AMQP_FIELD_KIND_U64:
1043 0           entry->value.value.u64 = (uint64_t) SvU64( value );
1044 0           break;
1045              
1046             case AMQP_FIELD_KIND_F64:
1047             // TODO: I don't think this is a native type on all Perls.
1048 2 50         entry->value.value.f64 = (double) SvNV( value );
1049 2           break;
1050              
1051             case AMQP_FIELD_KIND_BYTES:
1052             case AMQP_FIELD_KIND_UTF8:
1053 23 50         entry->value.value.bytes = amqp_cstring_bytes( SvPV_nolen( value )
1054             );
1055 23           break;
1056              
1057             case AMQP_FIELD_KIND_ARRAY:
1058 5           array_to_amqp_array(
1059 5           MUTABLE_AV(SvRV(value)),
1060             &(entry->value.value.array),
1061             force_utf8
1062             );
1063 5           break;
1064              
1065             case AMQP_FIELD_KIND_TABLE:
1066 3           hash_to_amqp_table(
1067 3           MUTABLE_HV(SvRV(value)),
1068             &(entry->value.value.table),
1069             force_utf8
1070             );
1071 3           break;
1072              
1073             default:
1074 0           Perl_croak( aTHX_ "amqp_kind_for_sv() returned a type I don't understand." );
1075             }
1076              
1077             // Successfully (we think) added an entry to the table.
1078 56           table->num_entries++;
1079             }
1080              
1081 23           return;
1082             }
1083              
1084 22           static amqp_rpc_reply_t basic_get(amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t queue, SV **envelope_sv_ptr, amqp_boolean_t no_ack) {
1085             amqp_rpc_reply_t ret;
1086 22           HV *envelope_hv = NULL;
1087             SV *props;
1088             SV *body;
1089              
1090 22           ret = amqp_basic_get(state, channel, queue, no_ack);
1091 22 50         if (AMQP_RESPONSE_NORMAL != ret.reply_type)
1092 0           goto error_out1;
1093              
1094 22 100         if (AMQP_BASIC_GET_OK_METHOD != ret.reply.id)
1095 18           goto success_out;
1096              
1097 4           envelope_hv = newHV();
1098              
1099             {
1100 4           amqp_basic_get_ok_t *ok = (amqp_basic_get_ok_t *) ret.reply.decoded;
1101 4           hv_stores(envelope_hv, "delivery_tag", newSVu64(ok->delivery_tag));
1102 4           hv_stores(envelope_hv, "redelivered", newSViv(ok->redelivered));
1103 4           hv_stores(envelope_hv, "exchange", newSVpvn(ok->exchange.bytes, ok->exchange.len));
1104 4           hv_stores(envelope_hv, "routing_key", newSVpvn(ok->routing_key.bytes, ok->routing_key.len));
1105 4           hv_stores(envelope_hv, "message_count", newSViv(ok->message_count));
1106             }
1107              
1108 4           ret = read_message(state, channel, &props, &body );
1109 4 50         if (AMQP_RESPONSE_NORMAL != ret.reply_type)
1110 0           goto error_out2;
1111              
1112 4           hv_stores(envelope_hv, "props", props);
1113 4           hv_stores(envelope_hv, "body", body);
1114            
1115             success_out:
1116 22 100         *envelope_sv_ptr = envelope_hv ? newRV_noinc(MUTABLE_SV(envelope_hv)) : &PL_sv_undef;
1117 22           ret.reply_type = AMQP_RESPONSE_NORMAL;
1118 22           return ret;
1119              
1120             error_out2:
1121 0           SvREFCNT_dec(envelope_hv);
1122             error_out1:
1123 0           *envelope_sv_ptr = &PL_sv_undef;
1124 22           return ret;
1125             }
1126              
1127              
1128              
1129             MODULE = Net::AMQP::RabbitMQ PACKAGE = Net::AMQP::RabbitMQ PREFIX = net_amqp_rabbitmq_
1130              
1131             REQUIRE: 1.9505
1132             PROTOTYPES: DISABLE
1133              
1134             BOOT:
1135 34 50         PERL_MATH_INT64_LOAD_OR_CROAK;
1136              
1137             int
1138             net_amqp_rabbitmq_connect(conn, hostname, options, client_properties = NULL)
1139             Net::AMQP::RabbitMQ conn
1140             char *hostname
1141             HV *options
1142             HV *client_properties
1143             PREINIT:
1144             amqp_socket_t *sock;
1145 39           char *user = "guest";
1146 39           char *password = "guest";
1147 39           char *vhost = "/";
1148 39           int port = 5672;
1149 39           int channel_max = 0;
1150 39           int frame_max = 131072;
1151 39           int heartbeat = 0;
1152 39           double timeout = -1;
1153             struct timeval to;
1154              
1155 39           int ssl = 0;
1156 39           char *ssl_cacert = NULL;
1157 39           char *ssl_cert = NULL;
1158 39           char *ssl_key = NULL;
1159 39           int ssl_verify_host = 1;
1160 39           int ssl_init = 1;
1161 39           char *sasl_method = "plain";
1162 39           amqp_sasl_method_enum sasl_type = AMQP_SASL_METHOD_PLAIN;
1163 39           amqp_table_t client_properties_tbl = amqp_empty_table;
1164             CODE:
1165 39 50         str_from_hv(options, user);
    50          
1166 39 50         str_from_hv(options, password);
    50          
1167 39 50         str_from_hv(options, vhost);
    50          
1168 39 50         int_from_hv(options, channel_max);
    0          
1169 39 50         int_from_hv(options, frame_max);
    0          
1170 39 100         int_from_hv(options, heartbeat);
    50          
1171 39 50         int_from_hv(options, port);
    50          
1172 39 100         double_from_hv(options, timeout);
    50          
1173              
1174 39 50         int_from_hv(options, ssl);
    50          
1175 39 50         str_from_hv(options, ssl_cacert);
    50          
1176 39 50         str_from_hv(options, ssl_cert);
    0          
1177 39 50         str_from_hv(options, ssl_key);
    0          
1178 39 50         int_from_hv(options, ssl_verify_host);
    50          
1179 39 50         int_from_hv(options, ssl_init);
    50          
1180 39 50         str_from_hv(options, sasl_method);
    0          
1181              
1182 39 100         if(client_properties)
1183             {
1184 1           hash_to_amqp_table(client_properties, &client_properties_tbl, 1);
1185             }
1186              
1187 39 100         if(timeout >= 0) {
1188 1           to.tv_sec = floor(timeout);
1189 1           to.tv_usec = 1000000.0 * (timeout - floor(timeout));
1190             }
1191              
1192 39 100         if ( ssl ) {
1193             #ifndef NAR_HAVE_OPENSSL
1194             Perl_croak(aTHX_ "no ssl support, please install openssl and reinstall");
1195             #endif
1196 1           amqp_set_initialize_ssl_library( (amqp_boolean_t)ssl_init );
1197 1           sock = amqp_ssl_socket_new(conn);
1198 1 50         if ( !sock ) {
1199 0           Perl_croak(aTHX_ "error creating SSL socket");
1200             }
1201              
1202 1           amqp_ssl_socket_set_verify_hostname( sock, (amqp_boolean_t)ssl_verify_host );
1203              
1204 1 50         if ( ( ssl_cacert != NULL ) && strlen(ssl_cacert) ) {
    50          
1205 1 50         if ( amqp_ssl_socket_set_cacert(sock, ssl_cacert) ) {
1206 0           Perl_croak(aTHX_ "error setting CA certificate");
1207             }
1208             }
1209             else {
1210             // TODO
1211             // in librabbitmq > 0.7.1, amqp_ssl_socket_set_verify_peer makes this optional
1212 0           Perl_croak(aTHX_ "required arg ssl_cacert not provided");
1213             }
1214              
1215 1 50         if ( ( ssl_key != NULL ) && strlen(ssl_key) && ( ssl_cert != NULL ) && strlen(ssl_cert) ) {
    0          
    0          
    0          
1216 0 0         if ( amqp_ssl_socket_set_key( sock, ssl_cert, ssl_key ) ) {
1217 0           Perl_croak(aTHX_ "error setting client cert");
1218             }
1219             }
1220             }
1221             else {
1222 38           sock = amqp_tcp_socket_new(conn);
1223 38 50         if (!sock) {
1224 0           Perl_croak(aTHX_ "error creating TCP socket");
1225             }
1226             }
1227              
1228             //if there's data in the buffer, clear it
1229 58 100         while ( amqp_data_in_buffer(conn) ) {
1230             amqp_frame_t frame;
1231 19           amqp_simple_wait_frame( conn, &frame );
1232             }
1233              
1234             // should probably be amqp_raw_equal, but this is a minimal hack
1235 39 50         if (strcasecmp(sasl_method, "external") == 0) {
1236 0           sasl_type = AMQP_SASL_METHOD_EXTERNAL;
1237             }
1238              
1239 39 100         die_on_error(aTHX_ amqp_socket_open_noblock(sock, hostname, port, (timeout<0)?NULL:&to), conn, "opening socket");
1240 38           die_on_amqp_error(aTHX_ amqp_login_with_properties(conn, vhost, channel_max, frame_max, heartbeat, &client_properties_tbl, sasl_type, user, password), conn, "Logging in");
1241              
1242 38           maybe_release_buffers(conn);
1243              
1244 38           RETVAL = 1;
1245             OUTPUT:
1246             RETVAL
1247              
1248             void
1249             net_amqp_rabbitmq_channel_open(conn, channel)
1250             Net::AMQP::RabbitMQ conn
1251             int channel
1252             CODE:
1253 37 100         assert_amqp_connected(conn);
    50          
1254              
1255 36           amqp_channel_open(conn, channel);
1256 36           die_on_amqp_error(aTHX_ amqp_get_rpc_reply(conn), conn, "Opening channel");
1257              
1258             void
1259             net_amqp_rabbitmq_channel_close(conn, channel)
1260             Net::AMQP::RabbitMQ conn
1261             int channel
1262             CODE:
1263             /* If we don't have a socket, just return. */
1264 2 100         if ( ! has_valid_connection( conn ) ) {
    50          
1265 1           return;
1266             }
1267 1           die_on_amqp_error(aTHX_ amqp_channel_close(conn, channel, AMQP_REPLY_SUCCESS), conn, "Closing channel");
1268              
1269             void
1270             net_amqp_rabbitmq_exchange_declare(conn, channel, exchange, options = NULL, args = NULL)
1271             Net::AMQP::RabbitMQ conn
1272             int channel
1273             char *exchange
1274             HV *options
1275             HV *args
1276             PREINIT:
1277 27           char *exchange_type = "direct";
1278 27           int passive = 0;
1279 27           int durable = 0;
1280 27           int auto_delete = 0;
1281 27           int internal = 0;
1282 27           amqp_table_t arguments = amqp_empty_table;
1283             CODE:
1284 27 100         assert_amqp_connected(conn);
    50          
1285              
1286 26 50         if(options) {
1287 26 50         str_from_hv(options, exchange_type);
    50          
1288 26 100         int_from_hv(options, passive);
    50          
1289 26 100         int_from_hv(options, durable);
    50          
1290 26 50         int_from_hv(options, auto_delete);
    50          
1291 26 100         int_from_hv(options, internal);
    50          
1292             }
1293 26 100         if(args)
1294             {
1295 1           hash_to_amqp_table(args, &arguments, 1);
1296             }
1297             __DEBUG__(
1298             warn("%d: amqp_declare_exchange with channel:%d, exchange:%s, and exchange_type:%s\n",
1299             __LINE__,
1300             channel,
1301             exchange,
1302             exchange_type
1303             );
1304             dump_table(arguments);
1305             );
1306 26           amqp_exchange_declare(
1307             conn,
1308             channel,
1309             amqp_cstring_bytes(exchange),
1310             amqp_cstring_bytes(exchange_type),
1311             passive,
1312             (amqp_boolean_t)durable,
1313             (amqp_boolean_t)auto_delete,
1314             (amqp_boolean_t)internal,
1315             arguments);
1316 26           maybe_release_buffers(conn);
1317 26           die_on_amqp_error(aTHX_ amqp_get_rpc_reply(conn), conn, "Declaring exchange");
1318              
1319             void
1320             net_amqp_rabbitmq_exchange_delete(conn, channel, exchange, options = NULL)
1321             Net::AMQP::RabbitMQ conn
1322             int channel
1323             char *exchange
1324             HV *options
1325             PREINIT:
1326 28           int if_unused = 1;
1327             CODE:
1328 28 100         assert_amqp_connected(conn);
    50          
1329              
1330 27 50         if(options) {
1331 27 50         int_from_hv(options, if_unused);
    50          
1332             }
1333 27           amqp_exchange_delete(conn, channel, amqp_cstring_bytes(exchange), if_unused);
1334 27           die_on_amqp_error(aTHX_ amqp_get_rpc_reply(conn), conn, "Deleting exchange");
1335              
1336             void net_amqp_rabbitmq_exchange_bind(conn, channel, destination, source, routing_key, args = NULL)
1337             Net::AMQP::RabbitMQ conn
1338             int channel
1339             char *destination
1340             char *source
1341             char *routing_key
1342             HV *args
1343             PREINIT:
1344 3           amqp_exchange_bind_ok_t *reply = (amqp_exchange_bind_ok_t*)NULL;
1345 3           amqp_table_t arguments = amqp_empty_table;
1346             CODE:
1347             // We must be connected
1348 3 50         assert_amqp_connected(conn);
    50          
1349              
1350             // Parameter validation
1351 3 50         if( ( source == NULL || 0 == strlen(source) )
    100          
1352 2 50         ||
1353 2 100         ( destination == NULL || 0 == strlen(destination) )
1354             )
1355             {
1356 2           Perl_croak(aTHX_ "source and destination must both be specified");
1357             }
1358              
1359             // Pull in arguments if we have any
1360 1 50         if(args)
1361             {
1362 1           hash_to_amqp_table(args, &arguments, 1);
1363             }
1364              
1365 1           reply = amqp_exchange_bind(
1366             conn,
1367             channel,
1368             amqp_cstring_bytes(destination),
1369             amqp_cstring_bytes(source),
1370             amqp_cstring_bytes(routing_key),
1371             arguments
1372             );
1373 1           die_on_amqp_error(aTHX_ amqp_get_rpc_reply(conn), conn, "Binding Exchange");
1374              
1375             void net_amqp_rabbitmq_exchange_unbind(conn, channel, destination, source, routing_key, args = NULL)
1376             Net::AMQP::RabbitMQ conn
1377             int channel
1378             char *destination
1379             char *source
1380             char *routing_key
1381             HV *args
1382             PREINIT:
1383 3           amqp_exchange_unbind_ok_t *reply = (amqp_exchange_unbind_ok_t*)NULL;
1384 3           amqp_table_t arguments = amqp_empty_table;
1385             CODE:
1386             // We must be connected
1387 3 50         assert_amqp_connected(conn);
    50          
1388              
1389             // Parameter validation
1390 3 50         if( ( source == NULL || 0 == strlen(source) )
    100          
1391 2 50         ||
1392 2 100         ( destination == NULL || 0 == strlen(destination) )
1393             )
1394             {
1395 2           Perl_croak(aTHX_ "source and destination must both be specified");
1396             }
1397              
1398             // Pull in arguments if we have any
1399 1 50         if(args)
1400             {
1401 1           hash_to_amqp_table(args, &arguments, 1);
1402             }
1403              
1404 1           reply = amqp_exchange_unbind(
1405             conn,
1406             channel,
1407             amqp_cstring_bytes(destination),
1408             amqp_cstring_bytes(source),
1409             amqp_cstring_bytes(routing_key),
1410             arguments
1411             );
1412 1           die_on_amqp_error(aTHX_ amqp_get_rpc_reply(conn), conn, "Unbinding Exchange");
1413              
1414             void net_amqp_rabbitmq_queue_delete(conn, channel, queuename, options = NULL)
1415             Net::AMQP::RabbitMQ conn
1416             int channel
1417             char *queuename
1418             HV *options
1419             PREINIT:
1420 21           int if_unused = 1;
1421 21           int if_empty = 1;
1422 21           amqp_queue_delete_ok_t *reply = (amqp_queue_delete_ok_t*)NULL;
1423             CODE:
1424 21 100         assert_amqp_connected(conn);
    50          
1425              
1426 20 50         if(options) {
1427 20 50         int_from_hv(options, if_unused);
    50          
1428 20 50         int_from_hv(options, if_empty);
    50          
1429             }
1430 20           reply = amqp_queue_delete(
1431             conn,
1432             channel,
1433             amqp_cstring_bytes(queuename),
1434             if_unused,
1435             if_empty
1436             );
1437 20 50         if (reply == NULL) {
1438 0           die_on_amqp_error(aTHX_ amqp_get_rpc_reply(conn), conn, "Deleting queue");
1439             }
1440 20 50         XPUSHs(sv_2mortal(newSVuv(reply->message_count)));
1441              
1442             void
1443             net_amqp_rabbitmq_queue_declare(conn, channel, queuename, options = NULL, args = NULL)
1444             Net::AMQP::RabbitMQ conn
1445             int channel
1446             char *queuename
1447             HV *options
1448             HV *args
1449             PREINIT:
1450 21           int passive = 0;
1451 21           int durable = 0;
1452 21           int exclusive = 0;
1453 21           int auto_delete = 1;
1454 21           amqp_table_t arguments = amqp_empty_table;
1455 21           amqp_bytes_t queuename_b = amqp_empty_bytes;
1456 21           amqp_queue_declare_ok_t *r = (amqp_queue_declare_ok_t*)NULL;
1457             PPCODE:
1458 21 100         assert_amqp_connected(conn);
    50          
1459              
1460 20 50         if(queuename && strcmp(queuename, "")) queuename_b = amqp_cstring_bytes(queuename);
    100          
1461 20 50         if(options) {
1462 20 100         int_from_hv(options, passive);
    50          
1463 20 100         int_from_hv(options, durable);
    50          
1464 20 100         int_from_hv(options, exclusive);
    50          
1465 20 50         int_from_hv(options, auto_delete);
    50          
1466             }
1467 20 100         if(args)
1468             {
1469 2           hash_to_amqp_table(args, &arguments, 1);
1470             }
1471 20           r = amqp_queue_declare(conn, channel, queuename_b, passive,
1472             durable, exclusive, auto_delete,
1473             arguments);
1474 20           die_on_amqp_error(aTHX_ amqp_get_rpc_reply(conn), conn, "Declaring queue");
1475 19 50         XPUSHs(sv_2mortal(newSVpvn(r->queue.bytes, r->queue.len)));
1476 19 50         if(GIMME_V == G_ARRAY) {
    100          
1477 1 50         XPUSHs(sv_2mortal(newSVuv(r->message_count)));
1478 1 50         XPUSHs(sv_2mortal(newSVuv(r->consumer_count)));
1479             }
1480              
1481             void
1482             net_amqp_rabbitmq_queue_bind(conn, channel, queuename, exchange, bindingkey, args = NULL)
1483             Net::AMQP::RabbitMQ conn
1484             int channel
1485             char *queuename
1486             char *exchange
1487             char *bindingkey
1488             HV *args
1489             PREINIT:
1490 21           amqp_table_t arguments = amqp_empty_table;
1491             CODE:
1492 21 100         assert_amqp_connected(conn);
    50          
1493              
1494 18 50         if(queuename == NULL
1495 18 50         ||
1496             exchange == NULL
1497 18 50         ||
1498 18           0 == strlen(queuename)
1499 18 50         ||
1500 18           0 == strlen(exchange)
1501             )
1502             {
1503 0           Perl_croak(aTHX_ "queuename and exchange must both be specified");
1504             }
1505              
1506 18 100         if(args)
1507 2           hash_to_amqp_table(args, &arguments, 0);
1508 18           amqp_queue_bind(conn, channel, amqp_cstring_bytes(queuename),
1509             amqp_cstring_bytes(exchange),
1510             amqp_cstring_bytes(bindingkey),
1511             arguments);
1512 18           maybe_release_buffers(conn);
1513 18           die_on_amqp_error(aTHX_ amqp_get_rpc_reply(conn), conn, "Binding queue");
1514              
1515             void
1516             net_amqp_rabbitmq_queue_unbind(conn, channel, queuename, exchange, bindingkey, args = NULL)
1517             Net::AMQP::RabbitMQ conn
1518             int channel
1519             char *queuename
1520             char *exchange
1521             char *bindingkey
1522             HV *args
1523             PREINIT:
1524 22           amqp_table_t arguments = amqp_empty_table;
1525             CODE:
1526 22 100         assert_amqp_connected(conn);
    50          
1527              
1528 21 50         if(queuename == NULL || exchange == NULL)
    50          
1529             {
1530 0           Perl_croak(aTHX_ "queuename and exchange must both be specified");
1531             }
1532              
1533 21 100         if(args)
1534             {
1535 1           hash_to_amqp_table(args, &arguments, 0);
1536             }
1537 21           amqp_queue_unbind(conn, channel, amqp_cstring_bytes(queuename),
1538             amqp_cstring_bytes(exchange),
1539             amqp_cstring_bytes(bindingkey),
1540             arguments);
1541 21           maybe_release_buffers(conn);
1542 21           die_on_amqp_error(aTHX_ amqp_get_rpc_reply(conn), conn, "Unbinding queue");
1543              
1544             SV *
1545             net_amqp_rabbitmq_consume(conn, channel, queuename, options = NULL)
1546             Net::AMQP::RabbitMQ conn
1547             int channel
1548             char *queuename
1549             HV *options
1550             PREINIT:
1551             amqp_basic_consume_ok_t *r;
1552 17           char *consumer_tag = NULL;
1553 17           int no_local = 0;
1554 17           int no_ack = 1;
1555 17           int exclusive = 0;
1556             CODE:
1557 17 100         assert_amqp_connected(conn);
    50          
1558              
1559 16 50         if(options) {
1560 16 50         str_from_hv(options, consumer_tag);
    50          
1561 16 50         int_from_hv(options, no_local);
    50          
1562 16 50         int_from_hv(options, no_ack);
    50          
1563 16 50         int_from_hv(options, exclusive);
    50          
1564             }
1565 16 50         r = amqp_basic_consume(conn, channel, amqp_cstring_bytes(queuename),
1566             consumer_tag ? amqp_cstring_bytes(consumer_tag) : amqp_empty_bytes,
1567             no_local, no_ack, exclusive, amqp_empty_table);
1568 16           die_on_amqp_error(aTHX_ amqp_get_rpc_reply(conn), conn, "Consume queue");
1569 16           RETVAL = newSVpvn(r->consumer_tag.bytes, r->consumer_tag.len);
1570             OUTPUT:
1571             RETVAL
1572              
1573             int
1574             net_amqp_rabbitmq_cancel(conn, channel, consumer_tag)
1575             Net::AMQP::RabbitMQ conn
1576             int channel
1577             char *consumer_tag
1578             PREINIT:
1579             amqp_basic_cancel_ok_t *r;
1580             CODE:
1581 4 100         assert_amqp_connected(conn);
    100          
1582              
1583 2           r = amqp_basic_cancel(conn, channel, amqp_cstring_bytes(consumer_tag));
1584 2           die_on_amqp_error(aTHX_ amqp_get_rpc_reply(conn), conn, "cancel");
1585              
1586 1 50         if ( r == NULL ) {
1587 0           RETVAL = 0;
1588             }
1589             else {
1590 1 50         if(strlen(consumer_tag) == r->consumer_tag.len && 0 == strcmp(consumer_tag, (char *)r->consumer_tag.bytes)) {
    50          
1591 0           RETVAL = 1;
1592             } else {
1593 1           RETVAL = 0;
1594             }
1595             }
1596             OUTPUT:
1597             RETVAL
1598              
1599             SV *
1600             net_amqp_rabbitmq_recv(conn, timeout = 0)
1601             Net::AMQP::RabbitMQ conn
1602             int timeout
1603             PREINIT:
1604             SV *envelope;
1605             amqp_rpc_reply_t ret;
1606             struct timeval timeout_tv;
1607             CODE:
1608 27 100         assert_amqp_connected(conn);
    50          
1609              
1610 26 100         if (timeout > 0) {
1611 6           timeout_tv.tv_sec = timeout / 1000;
1612 6           timeout_tv.tv_usec = (timeout % 1000) * 1000;
1613             }
1614              
1615             // Set the waiting time to 0
1616 26 100         if (timeout == -1) {
1617 2           timeout_tv.tv_sec = 0;
1618 2           timeout_tv.tv_usec = 0;
1619             }
1620              
1621 26           maybe_release_buffers(conn);
1622 26 100         ret = consume_message(conn, &RETVAL, timeout ? &timeout_tv : NULL);
1623 26 100         if (AMQP_RESPONSE_LIBRARY_EXCEPTION != ret.reply_type || AMQP_STATUS_TIMEOUT != ret.library_error)
    50          
1624 22           die_on_amqp_error(aTHX_ ret, conn, "recv");
1625              
1626             OUTPUT:
1627             RETVAL
1628              
1629             void
1630             net_amqp_rabbitmq_ack(conn, channel, delivery_tag, multiple = 0)
1631             Net::AMQP::RabbitMQ conn
1632             int channel
1633             uint64_t delivery_tag
1634             int multiple
1635             CODE:
1636 3 100         assert_amqp_connected(conn);
    50          
1637              
1638 2           die_on_error(aTHX_ amqp_basic_ack(conn, channel, delivery_tag, multiple), conn,
1639             "ack");
1640              
1641             void
1642             net_amqp_rabbitmq_nack(conn, channel, delivery_tag, multiple = 0, requeue = 0)
1643             Net::AMQP::RabbitMQ conn
1644             int channel
1645             uint64_t delivery_tag
1646             int multiple
1647             int requeue
1648             CODE:
1649 2 100         assert_amqp_connected(conn);
    50          
1650              
1651 1           die_on_error(
1652             aTHX_ amqp_basic_nack(
1653             conn,
1654             channel,
1655             delivery_tag,
1656             (amqp_boolean_t)multiple,
1657             (amqp_boolean_t)requeue
1658             ),
1659             conn,
1660             "nack"
1661             );
1662              
1663             void
1664             net_amqp_rabbitmq_reject(conn, channel, delivery_tag, requeue = 0)
1665             Net::AMQP::RabbitMQ conn
1666             int channel
1667             uint64_t delivery_tag
1668             int requeue
1669             CODE:
1670 2 100         assert_amqp_connected(conn);
    50          
1671              
1672 1           die_on_error(
1673             aTHX_ amqp_basic_reject(
1674             conn,
1675             channel,
1676             delivery_tag,
1677             requeue
1678             ),
1679             conn,
1680             "reject"
1681             );
1682              
1683              
1684             void
1685             net_amqp_rabbitmq_purge(conn, channel, queuename)
1686             Net::AMQP::RabbitMQ conn
1687             int channel
1688             char *queuename
1689             CODE:
1690 20 100         assert_amqp_connected(conn);
    50          
1691              
1692 19           amqp_queue_purge(conn, channel, amqp_cstring_bytes(queuename));
1693 19           die_on_amqp_error(aTHX_ amqp_get_rpc_reply(conn), conn, "Purging queue");
1694              
1695             void
1696             net_amqp_rabbitmq__publish(conn, channel, routing_key, body, options = NULL, props = NULL)
1697             Net::AMQP::RabbitMQ conn
1698             int channel
1699             HV *options;
1700             char *routing_key
1701             SV *body
1702             HV *props
1703             PREINIT:
1704             SV **v;
1705 30           char *exchange = "amq.direct";
1706 30           amqp_boolean_t mandatory = 0;
1707 30           amqp_boolean_t immediate = 0;
1708             int rv;
1709 30           amqp_bytes_t exchange_b = { 0 };
1710             amqp_bytes_t routing_key_b;
1711             amqp_bytes_t body_b;
1712             struct amqp_basic_properties_t_ properties;
1713             STRLEN len;
1714 30           int force_utf8_in_header_strings = 0;
1715             CODE:
1716 30 100         assert_amqp_connected(conn);
    50          
1717              
1718 29           routing_key_b = amqp_cstring_bytes(routing_key);
1719 29 50         body_b.bytes = SvPV(body, len);
1720 29           body_b.len = len;
1721 29 50         if(options) {
1722 29 50         if(NULL != (v = hv_fetchs(options, "mandatory", 0))) {
1723 0 0         mandatory = SvIV(*v) ? 1 : 0;
1724             }
1725 29 50         if(NULL != (v = hv_fetchs(options, "immediate", 0))) {
1726 0 0         immediate = SvIV(*v) ? 1 : 0;
1727             }
1728 29 50         if(NULL != (v = hv_fetchs(options, "exchange", 0))) {
1729 29 50         exchange_b = amqp_cstring_bytes(SvPV_nolen(*v));
1730             }
1731              
1732             // This is an internal option, only for determining if we want to force utf8
1733 29 100         int_from_hv(options, force_utf8_in_header_strings);
    50          
1734             }
1735 29           properties.headers = amqp_empty_table;
1736 29           properties._flags = 0;
1737 29 50         if (props) {
1738 29 100         if (NULL != (v = hv_fetchs(props, "content_type", 0))) {
1739 5 50         properties.content_type = amqp_cstring_bytes(SvPV_nolen(*v));
1740 5           properties._flags |= AMQP_BASIC_CONTENT_TYPE_FLAG;
1741             }
1742 29 100         if (NULL != (v = hv_fetchs(props, "content_encoding", 0))) {
1743 8 50         properties.content_encoding = amqp_cstring_bytes(SvPV_nolen(*v));
1744 8           properties._flags |= AMQP_BASIC_CONTENT_ENCODING_FLAG;
1745             }
1746 29 100         if (NULL != (v = hv_fetchs(props, "correlation_id", 0))) {
1747 5 50         properties.correlation_id = amqp_cstring_bytes(SvPV_nolen(*v));
1748 5           properties._flags |= AMQP_BASIC_CORRELATION_ID_FLAG;
1749             }
1750 29 100         if (NULL != (v = hv_fetchs(props, "reply_to", 0))) {
1751 5 50         properties.reply_to = amqp_cstring_bytes(SvPV_nolen(*v));
1752 5           properties._flags |= AMQP_BASIC_REPLY_TO_FLAG;
1753             }
1754 29 100         if (NULL != (v = hv_fetchs(props, "expiration", 0))) {
1755 5 50         properties.expiration = amqp_cstring_bytes(SvPV_nolen(*v));
1756 5           properties._flags |= AMQP_BASIC_EXPIRATION_FLAG;
1757             }
1758 29 100         if (NULL != (v = hv_fetchs(props, "message_id", 0))) {
1759 5 50         properties.message_id = amqp_cstring_bytes(SvPV_nolen(*v));
1760 5           properties._flags |= AMQP_BASIC_MESSAGE_ID_FLAG;
1761             }
1762 29 100         if (NULL != (v = hv_fetchs(props, "type", 0))) {
1763 5 50         properties.type = amqp_cstring_bytes(SvPV_nolen(*v));
1764 5           properties._flags |= AMQP_BASIC_TYPE_FLAG;
1765             }
1766 29 100         if (NULL != (v = hv_fetchs(props, "user_id", 0))) {
1767 5 50         properties.user_id = amqp_cstring_bytes(SvPV_nolen(*v));
1768 5           properties._flags |= AMQP_BASIC_USER_ID_FLAG;
1769             }
1770 29 100         if (NULL != (v = hv_fetchs(props, "app_id", 0))) {
1771 5 50         properties.app_id = amqp_cstring_bytes(SvPV_nolen(*v));
1772 5           properties._flags |= AMQP_BASIC_APP_ID_FLAG;
1773             }
1774 29 100         if (NULL != (v = hv_fetchs(props, "delivery_mode", 0))) {
1775 5 50         properties.delivery_mode = (uint8_t) SvIV(*v);
1776 5           properties._flags |= AMQP_BASIC_DELIVERY_MODE_FLAG;
1777             }
1778 29 100         if (NULL != (v = hv_fetchs(props, "priority", 0))) {
1779 5 50         properties.priority = (uint8_t) SvIV(*v);
1780 5           properties._flags |= AMQP_BASIC_PRIORITY_FLAG;
1781             }
1782 29 100         if (NULL != (v = hv_fetchs(props, "timestamp", 0))) {
1783 5           properties.timestamp = (uint64_t) SvI64(*v);
1784 5           properties._flags |= AMQP_BASIC_TIMESTAMP_FLAG;
1785             }
1786 29 100         if (NULL != (v = hv_fetchs(props, "headers", 0)) && SvOK(*v)) {
    100          
    50          
    50          
1787 9           hash_to_amqp_table(MUTABLE_HV(SvRV(*v)), &properties.headers, force_utf8_in_header_strings);
1788 9           properties._flags |= AMQP_BASIC_HEADERS_FLAG;
1789             }
1790             }
1791             __DEBUG__( warn("PUBLISHING HEADERS..."); dump_table( properties.headers ) );
1792 29           rv = amqp_basic_publish(conn, channel, exchange_b, routing_key_b, mandatory, immediate, &properties, body_b);
1793 29           maybe_release_buffers(conn);
1794              
1795             /* If the connection failed, blast the file descriptor! */
1796 29 50         if ( rv == AMQP_STATUS_CONNECTION_CLOSED || rv == AMQP_STATUS_SOCKET_ERROR ) {
    100          
1797 1           amqp_socket_close( amqp_get_socket( conn ), AMQP_SC_FORCE );
1798 1           Perl_croak(aTHX_ "Publish failed because AMQP socket connection was closed.");
1799             }
1800              
1801             /* Otherwise, just croak */
1802 28 50         if ( rv != AMQP_STATUS_OK ) {
1803 0           Perl_croak( aTHX_ "Publish failed, %s\n", amqp_error_string2(rv));
1804             }
1805              
1806             SV *
1807             net_amqp_rabbitmq_get(conn, channel, queuename, options = NULL)
1808             Net::AMQP::RabbitMQ conn
1809             int channel
1810             char *queuename
1811             HV *options
1812             PREINIT:
1813 23           int no_ack = 1;
1814             CODE:
1815 23 100         assert_amqp_connected(conn);
    50          
1816              
1817 22 100         if (options)
1818 5 50         int_from_hv(options, no_ack);
    50          
1819              
1820 22           maybe_release_buffers(conn);
1821 22 50         die_on_amqp_error(aTHX_ basic_get(conn, channel, queuename ? amqp_cstring_bytes(queuename) : amqp_empty_bytes, &RETVAL, no_ack), conn, "basic_get");
1822              
1823             OUTPUT:
1824             RETVAL
1825              
1826             int
1827             net_amqp_rabbitmq_get_channel_max(conn)
1828             Net::AMQP::RabbitMQ conn
1829             CODE:
1830 1           RETVAL = amqp_get_channel_max(conn);
1831             OUTPUT:
1832             RETVAL
1833              
1834             SV*
1835             net_amqp_rabbitmq_get_sockfd(conn)
1836             Net::AMQP::RabbitMQ conn
1837             CODE:
1838             /**
1839             * this is the warning from librabbitmq-c. you have been warned.
1840             *
1841             * \warning Use the socket returned from this function carefully, incorrect use
1842             * of the socket outside of the library will lead to undefined behavior.
1843             * Additionally rabbitmq-c may use the socket differently version-to-version,
1844             * what may work in one version, may break in the next version. Be sure to
1845             * throughly test any applications that use the socket returned by this
1846             * function especially when using a newer version of rabbitmq-c
1847             *
1848             */
1849 2 100         if ( has_valid_connection( conn ) ) {
    50          
1850 1           RETVAL = newSViv( amqp_get_sockfd(conn) );
1851             }
1852             else {
1853             // We don't have a connection, we're still here.
1854 1           RETVAL = &PL_sv_undef;
1855             }
1856             OUTPUT:
1857             RETVAL
1858              
1859             SV*
1860             net_amqp_rabbitmq_is_connected(conn)
1861             Net::AMQP::RabbitMQ conn
1862             CODE:
1863 87 100         if ( has_valid_connection( conn ) ) {
    100          
1864 78           RETVAL = newSViv(1);
1865             }
1866             else {
1867             // We don't have a connection, we're still here.
1868 9           RETVAL = &PL_sv_undef;
1869             }
1870             OUTPUT:
1871             RETVAL
1872              
1873             void
1874             net_amqp_rabbitmq_disconnect(conn)
1875             Net::AMQP::RabbitMQ conn
1876             PREINIT:
1877             int sockfd;
1878             CODE:
1879 5 100         if ( amqp_get_socket(conn) != NULL ) {
1880 4           amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
1881 4           amqp_socket_close( amqp_get_socket( conn ), AMQP_SC_NONE );
1882             }
1883              
1884             Net::AMQP::RabbitMQ
1885             net_amqp_rabbitmq__new(clazz)
1886             char *clazz
1887             CODE:
1888 32           RETVAL = amqp_new_connection();
1889             OUTPUT:
1890             RETVAL
1891              
1892             void
1893             net_amqp_rabbitmq__destroy_connection_close(conn)
1894             Net::AMQP::RabbitMQ conn
1895             CODE:
1896 31 100         if ( amqp_get_socket(conn) != NULL ) {
1897 29           amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
1898             }
1899              
1900             void
1901             net_amqp_rabbitmq__destroy_cleanup(conn)
1902             Net::AMQP::RabbitMQ conn
1903             CODE:
1904 31           empty_amqp_pool( &temp_memory_pool );
1905 31           amqp_destroy_connection(conn);
1906              
1907             void
1908             net_amqp_rabbitmq_heartbeat(conn)
1909             Net::AMQP::RabbitMQ conn
1910             PREINIT:
1911             amqp_frame_t f;
1912             CODE:
1913 10           f.frame_type = AMQP_FRAME_HEARTBEAT;
1914 10           f.channel = 0;
1915 10           amqp_send_frame(conn, &f);
1916              
1917             void
1918             net_amqp_rabbitmq_tx_select(conn, channel, args = NULL)
1919             Net::AMQP::RabbitMQ conn
1920             int channel
1921             HV *args
1922             CODE:
1923 1           amqp_tx_select(conn, channel);
1924 1           die_on_amqp_error(aTHX_ amqp_get_rpc_reply(conn), conn, "Selecting transaction");
1925              
1926             void
1927             net_amqp_rabbitmq_tx_commit(conn, channel, args = NULL)
1928             Net::AMQP::RabbitMQ conn
1929             int channel
1930             HV *args
1931             CODE:
1932 1           amqp_tx_commit(conn, channel);
1933 1           maybe_release_buffers(conn);
1934 1           die_on_amqp_error(aTHX_ amqp_get_rpc_reply(conn), conn, "Commiting transaction");
1935              
1936             void
1937             net_amqp_rabbitmq_tx_rollback(conn, channel, args = NULL)
1938             Net::AMQP::RabbitMQ conn
1939             int channel
1940             HV *args
1941             CODE:
1942 1           amqp_tx_rollback(conn, channel);
1943 1           die_on_amqp_error(aTHX_ amqp_get_rpc_reply(conn), conn, "Rolling Back transaction");
1944              
1945             SV* net_amqp_rabbitmq_get_rpc_timeout(conn)
1946             Net::AMQP::RabbitMQ conn
1947             PREINIT:
1948             struct timeval *timeout_tv;
1949             HV *output;
1950             CODE:
1951 8           timeout_tv = amqp_get_rpc_timeout(conn);
1952 8 100         if (timeout_tv == NULL) {
1953             __DEBUG__( warn("%d get_rpc_timeout: Timeout is NULL, returning undef.", __LINE__) );
1954 4           RETVAL = &PL_sv_undef;
1955             } else {
1956             __DEBUG__( warn("%d get_rpc_timeout: Timeout is non-NULL, returning hashref.", __LINE__) );
1957 4           output = newHV();
1958 4           hv_stores(output, "tv_sec", newSVi64( timeout_tv->tv_sec ));
1959 4           hv_stores(output, "tv_usec", newSVi64( timeout_tv->tv_usec ));
1960 4           RETVAL = newRV_noinc( output );
1961             }
1962             OUTPUT:
1963             RETVAL
1964              
1965             void net_amqp_rabbitmq__set_rpc_timeout(conn, args = NULL)
1966             Net::AMQP::RabbitMQ conn
1967             SV* args
1968             PREINIT:
1969 7           struct timeval timeout = {0,0};
1970 7           struct timeval *old_timeout = NULL;
1971 7           int tv_sec = 0;
1972 7           int tv_usec = 0;
1973 7           int res = 0;
1974             CODE:
1975 7           old_timeout = amqp_get_rpc_timeout(conn);
1976              
1977             // If we are setting the RPC timeout to NULL...
1978 7 50         if (args == NULL || !SvOK(args) || args == &PL_sv_undef) {
    100          
    50          
    50          
    50          
1979             __DEBUG__( warn("%d set_rpc_timeout: No args. Setting to unlimited RPC timeout.", __LINE__) );
1980              
1981 5 100         if (old_timeout != NULL) {
1982             __DEBUG__( warn("%d set_rpc_timeout: Changing to unlimited RPC timeout.", __LINE__) );
1983 2           amqp_set_rpc_timeout( conn, NULL );
1984             }
1985             }
1986              
1987             // If we are setting the RPC timeout to something other than NULL...
1988             else {
1989 4 100         int_from_hv(SvRV(args), tv_sec);
    50          
1990 4 100         int_from_hv(SvRV(args), tv_usec);
    50          
1991             __DEBUG__( warn("%d set_rpc_timeout: Setting to tv_sec:%d and tv_usec:%d.", __LINE__, tv_sec, tv_usec) );
1992             // If we need to allocate the timeout...
1993              
1994 4           timeout.tv_sec = tv_sec;
1995 4           timeout.tv_usec = tv_usec;
1996 4           die_on_error(aTHX_ amqp_set_rpc_timeout(conn, &timeout), conn, "Set RPC Timeout");
1997             }
1998              
1999             void
2000             net_amqp_rabbitmq_basic_qos(conn, channel, args = NULL)
2001             Net::AMQP::RabbitMQ conn
2002             int channel
2003             HV *args
2004             PREINIT:
2005             SV **v;
2006 1           uint32_t prefetch_size = 0;
2007 1           uint16_t prefetch_count = 0;
2008 1           amqp_boolean_t global = 0;
2009             CODE:
2010 1 50         if(args) {
2011 1 50         if(NULL != (v = hv_fetchs(args, "prefetch_size", 0))) prefetch_size = SvIV(*v);
    0          
2012 1 50         if(NULL != (v = hv_fetchs(args, "prefetch_count", 0))) prefetch_count = SvIV(*v);
    50          
2013 1 50         if(NULL != (v = hv_fetchs(args, "global", 0))) global = SvIV(*v) ? 1 : 0;
    0          
2014             }
2015 1           amqp_basic_qos(conn, channel,
2016             prefetch_size, prefetch_count, global);
2017 1           die_on_amqp_error(aTHX_ amqp_get_rpc_reply(conn), conn, "Basic QoS");
2018              
2019             SV* net_amqp_rabbitmq_get_server_properties(conn)
2020             Net::AMQP::RabbitMQ conn
2021             PREINIT:
2022             amqp_table_t* server_properties;
2023             CODE:
2024 2 100         assert_amqp_connected(conn);
    50          
2025 1           server_properties = amqp_get_server_properties(conn);
2026 1 50         if ( server_properties )
2027             {
2028 1           RETVAL = mq_table_to_hashref(server_properties);
2029             }
2030             else
2031             {
2032 0           RETVAL = &PL_sv_undef;
2033             }
2034             OUTPUT:
2035             RETVAL
2036              
2037             SV* net_amqp_rabbitmq_get_client_properties(conn)
2038             Net::AMQP::RabbitMQ conn
2039             PREINIT:
2040             amqp_table_t* client_properties;
2041             CODE:
2042 3 100         assert_amqp_connected(conn);
    50          
2043 2           client_properties = amqp_get_client_properties(conn);
2044 2 50         if ( client_properties )
2045             {
2046 2           RETVAL = mq_table_to_hashref(client_properties);
2047             }
2048             else
2049             {
2050 0           RETVAL = &PL_sv_undef;
2051             }
2052             OUTPUT:
2053             RETVAL
2054              
2055             SV* net_amqp_rabbitmq_has_ssl()
2056             CODE:
2057             #ifdef NAR_HAVE_OPENSSL
2058 1           RETVAL = &PL_sv_yes;
2059             #else
2060             RETVAL = &PL_sv_no;
2061             #endif
2062             OUTPUT:
2063             RETVAL