File Coverage

RabbitMQ.xs
Criterion Covered Total %
statement 562 749 75.0
branch 372 618 60.1
condition n/a
subroutine n/a
pod n/a
total 934 1367 68.3


line stmt bran cond sub pod time code
1             #include "EXTERN.h"
2             #include "perl.h"
3             #include "XSUB.h"
4             #include
5             #include
6              
7             /* perl -MDevel::PPPort -e'Devel::PPPort::WriteFile();' */
8             /* perl ppport.h --compat-version=5.8.0 --cplusplus RabbitMQ.xs */
9             #define NEED_newSVpvn_flags
10             #include "ppport.h"
11              
12             /* ppport.h knows about MUTABLE_PTR and MUTABLE_SV, but not these?! */
13             #ifndef MUTABLE_AV
14             # define MUTABLE_AV(p) ((AV*)MUTABLE_PTR(p))
15             #endif
16             #ifndef MUTABLE_HV
17             # define MUTABLE_HV(p) ((HV*)MUTABLE_PTR(p))
18             #endif
19              
20             #include "amqp.h"
21             #include "amqp_socket.h"
22             #include "amqp_tcp_socket.h"
23             #include "amqp_ssl_socket.h"
24             /* For struct timeval */
25             #include "amqp_time.h"
26              
27             /* This is for the Math::UInt64 integration */
28             #include "perl_math_int64.h"
29              
30             /* perl Makefile.PL; make CCFLAGS=-DDEBUG */
31             #if DEBUG
32             #define __DEBUG__(X) X
33             #else
34             #define __DEBUG__(X) /* NOOP */
35             #endif
36              
37             typedef amqp_connection_state_t Net__AMQP__RabbitMQ;
38              
39             #define AMQP_STATUS_UNKNOWN_TYPE 0x500
40              
41             #ifdef USE_LONG_DOUBLE
42             /* stolen from Cpanel::JSON::XS
43             * so we don't mess up double => long double for perls with -Duselongdouble */
44             #if defined(_AIX) && (!defined(HAS_LONG_DOUBLE) || AIX_WORKAROUND)
45             #define HAVE_NO_POWL
46             #endif
47              
48             #ifdef HAVE_NO_POWL
49             /* Ulisse Monari: this is a patch for AIX 5.3, perl 5.8.8 without HAS_LONG_DOUBLE
50             There Perl_pow maps to pow(...) - NOT TO powl(...), core dumps at Perl_pow(...)
51             Base code is from http://bytes.com/topic/c/answers/748317-replacement-pow-function
52             This is my change to fs_pow that goes into libc/libm for calling fmod/exp/log.
53             NEED TO MODIFY Makefile, after perl Makefile.PL by adding "-lm" onto the LDDLFLAGS line */
54             static double fs_powEx(double x, double y)
55             {
56             double p = 0;
57              
58             if (0 > x && fmod(y, 1) == 0) {
59             if (fmod(y, 2) == 0) {
60             p = exp(log(-x) * y);
61             } else {
62             p = -exp(log(-x) * y);
63             }
64             } else {
65             if (x != 0 || 0 >= y) {
66             p = exp(log( x) * y);
67             }
68             }
69             return p;
70             }
71              
72             /* powf() unfortunately is not accurate enough */
73             const NV DOUBLE_POW = fs_powEx(10., DBL_DIG );
74             #else
75             const NV DOUBLE_POW = Perl_pow(10., DBL_DIG );
76             #endif
77             #endif
78              
79              
80             /* This is a place to put some stuff that we convert from perl,
81             it's transient and we recycle it as soon as it's finished being used
82             which means we keep memory we've used with the aim of reusing it */
83             /* temp_memory_pool is ugly and suffers from code smell */
84             static amqp_pool_t temp_memory_pool;
85              
86             /* Parallels amqp_maybe_release_buffers */
87 180           static void maybe_release_buffers(amqp_connection_state_t state) {
88 180 50         if (amqp_release_buffers_ok(state)) {
89 180           amqp_release_buffers(state);
90 180           recycle_amqp_pool(&temp_memory_pool);
91             }
92 180           }
93              
94             #define int_from_hv(hv,name) \
95             do { SV **v; if(NULL != (v = hv_fetchs(hv, #name, 0))) name = SvIV(*v); } while(0)
96             #define double_from_hv(hv,name) \
97             do { SV **v; if(NULL != (v = hv_fetchs(hv, #name, 0))) name = SvNV(*v); } while(0)
98             #define str_from_hv(hv,name) \
99             do { SV **v; if(NULL != (v = hv_fetchs(hv, #name, 0))) name = SvPV_nolen(*v); } while(0)
100             #define has_valid_connection(conn) \
101             ( amqp_get_socket( conn ) != NULL && amqp_get_sockfd( conn ) > -1 )
102             #define assert_amqp_connected(conn) \
103             do { \
104             if ( ! has_valid_connection(conn) ) { \
105             Perl_croak(aTHX_ "AMQP socket not connected"); \
106             } \
107             } while(0)
108              
109             void hash_to_amqp_table(HV *hash, amqp_table_t *table, short force_utf8);
110             void array_to_amqp_array(AV *perl_array, amqp_array_t *mq_array, short force_utf8);
111             SV* mq_array_to_arrayref(amqp_array_t *array);
112             SV* mq_table_to_hashref(amqp_table_t *table);
113              
114 42           void die_on_error(pTHX_ int x, amqp_connection_state_t conn, char const *context) {
115             /* Handle socket errors */
116 42 50         if ( x == AMQP_STATUS_CONNECTION_CLOSED || x == AMQP_STATUS_SOCKET_ERROR ) {
    50          
117 0           amqp_socket_close( amqp_get_socket( conn ), AMQP_SC_FORCE );
118 0           Perl_croak(aTHX_ "%s failed because AMQP socket connection was closed.", context);
119             }
120             /* Handle everything else */
121 42 100         else if (x < 0) {
122 1           Perl_croak(aTHX_ "%s: %s\n", context, amqp_error_string2(x));
123             }
124 41           }
125              
126 273           void die_on_amqp_error(pTHX_ amqp_rpc_reply_t x, amqp_connection_state_t conn, char const *context) {
127 273           switch (x.reply_type) {
128             case AMQP_RESPONSE_NORMAL:
129 267           return;
130              
131             case AMQP_RESPONSE_NONE:
132 0           Perl_croak(aTHX_ "%s: missing RPC reply type!", context);
133             break;
134              
135             case AMQP_RESPONSE_LIBRARY_EXCEPTION:
136             /* If we got a library error saying that there's a socket problem,
137             kill the connection and croak. */
138 0 0         if (
139 0           x.library_error == AMQP_STATUS_CONNECTION_CLOSED
140 0 0         ||
141 0           x.library_error == AMQP_STATUS_SOCKET_ERROR
142             ) {
143 0           amqp_socket_close( amqp_get_socket( conn ), AMQP_SC_FORCE );
144 0           Perl_croak(aTHX_ "%s: failed since AMQP socket connection closed.\n", context);
145             }
146             /* Otherwise, give a more generic croak. */
147             else {
148 0 0         Perl_croak(aTHX_ "%s: %s\n", context,
149 0           (!x.library_error) ? "(end-of-stream)" :
150 0 0         (x.library_error == AMQP_STATUS_UNKNOWN_TYPE) ? "unknown AMQP type id" :
151 0           amqp_error_string2(x.library_error));
152             }
153             break;
154              
155             case AMQP_RESPONSE_SERVER_EXCEPTION:
156 6           switch (x.reply.id) {
157             case AMQP_CONNECTION_CLOSE_METHOD:
158             {
159             amqp_connection_close_ok_t req;
160 0           req.dummy = '\0';
161 0           /* res = */ amqp_send_method(conn, 0, AMQP_CONNECTION_CLOSE_OK_METHOD, &req);
162             }
163 0           amqp_set_socket(conn, NULL);
164             {
165 0           amqp_connection_close_t *m = (amqp_connection_close_t *) x.reply.decoded;
166 0           Perl_croak(aTHX_ "%s: server connection error %d, message: %.*s",
167             context,
168 0           m->reply_code,
169 0           (int) m->reply_text.len, (char *) m->reply_text.bytes);
170             }
171             break;
172              
173             case AMQP_CHANNEL_CLOSE_METHOD:
174             /* We don't know what channel provoked this error!
175             This information should be in amqp_rpc_reply_t, but it isn't.
176             {
177             amqp_channel_close_ok_t req;
178             req.dummy = '\0';
179             / * res = * / amqp_send_method(conn, channel, AMQP_CHANNEL_CLOSE_OK_METHOD, &req);
180             }
181             */
182             /* Only the channel should be invalidated, but we have no means of doing so! */
183             /* Even if we knew which channel we needed to invalidate! */
184 6           amqp_set_socket(conn, NULL);
185             {
186 6           amqp_channel_close_t *m = (amqp_channel_close_t *) x.reply.decoded;
187 6           Perl_croak(aTHX_ "%s: server channel error %d, message: %.*s",
188             context,
189 6           m->reply_code,
190 6           (int) m->reply_text.len, (char *) m->reply_text.bytes);
191             }
192             break;
193              
194             default:
195 0           Perl_croak(aTHX_ "%s: unknown server error, method id 0x%08X", context, x.reply.id);
196             break;
197             }
198             break;
199             }
200             }
201              
202             /*
203             * amqp_kind_for_sv(SV**)
204             * Note: We could handle more types here... but we're trying to take Perl and go to
205             * C. We don't really need to handle much more than this from what I can tell.
206             */
207 164           amqp_field_value_kind_t amqp_kind_for_sv(SV** perl_value, short force_utf8) {
208              
209 164           switch (SvTYPE( *perl_value ))
210             {
211             // Integer types (and references beyond 5.10)
212             case SVt_IV:
213             // References
214 131 100         if ( SvROK( *perl_value ) ) {
215             // Array Reference
216 10 100         if ( SvTYPE( SvRV( *perl_value ) ) == SVt_PVAV ) {
217 5           return AMQP_FIELD_KIND_ARRAY;
218             }
219              
220             // Hash Reference
221 5 50         if ( SvTYPE( SvRV( *perl_value ) ) == SVt_PVHV ) {
222 5           return AMQP_FIELD_KIND_TABLE;
223             }
224 0           Perl_croak(
225             aTHX_ "Unsupported Perl Reference Type: %d",
226 0           SvTYPE( SvRV( *perl_value ) )
227             );
228             }
229              
230             // Regular integers
231             // In the event that it could be unsigned
232 121 50         if ( SvUOK( *perl_value ) ) {
233 0           return AMQP_FIELD_KIND_U64;
234             }
235 121           return AMQP_FIELD_KIND_I64;
236              
237             // Numeric type
238             case SVt_NV:
239 1           return AMQP_FIELD_KIND_F64;
240              
241             // String (handle types which are upgraded to handle IV/UV/NV as well as PV)
242             case SVt_PVIV:
243 0 0         if ( SvI64OK( *perl_value ) ) {
244 0           return AMQP_FIELD_KIND_I64;
245             }
246 0 0         if ( SvU64OK( *perl_value ) ) {
247 0           return AMQP_FIELD_KIND_U64;
248             }
249             // It could be a PV or an IV/UV!
250 0 0         if ( SvIOK( *perl_value ) ) {
251 0 0         if ( SvUOK( *perl_value ) ) {
252 0           return AMQP_FIELD_KIND_U64;
253             }
254 0           return AMQP_FIELD_KIND_I64;
255             }
256              
257             case SVt_PVNV:
258             // It could be a PV or an NV
259 0 0         if ( SvNOK( *perl_value ) ) {
260 0           return AMQP_FIELD_KIND_F64;
261             }
262              
263             case SVt_PV:
264             // UTF-8?
265 29 100         if ( force_utf8 || SvUTF8( *perl_value ) ) {
    100          
266 5           return AMQP_FIELD_KIND_UTF8;
267             }
268 24           return AMQP_FIELD_KIND_BYTES;
269              
270             case SVt_PVMG:
271 3 100         if ( SvPOK( *perl_value ) || SvPOKp( *perl_value ) ) {
    50          
272 1 50         if ( force_utf8 || SvUTF8( *perl_value ) ) {
    50          
273 0           return AMQP_FIELD_KIND_UTF8;
274             }
275 1           return AMQP_FIELD_KIND_BYTES;
276             }
277 2 100         if ( SvIOK( *perl_value ) || SvIOKp( *perl_value ) ) {
    50          
278 1 50         if ( SvUOK( *perl_value ) ) {
279 0           return AMQP_FIELD_KIND_U64;
280             }
281 1           return AMQP_FIELD_KIND_I64;
282             }
283 1 50         if ( SvNOK( *perl_value ) || SvNOKp( *perl_value ) ) {
    0          
284 1           return AMQP_FIELD_KIND_F64;
285             }
286              
287             default:
288 0 0         if ( SvROK( *perl_value ) ) {
289             // Array Reference
290 0 0         if ( SvTYPE( SvRV( *perl_value ) ) == SVt_PVAV ) {
291 0           return AMQP_FIELD_KIND_ARRAY;
292             }
293              
294             // Hash Reference
295 0 0         if ( SvTYPE( SvRV( *perl_value ) ) == SVt_PVHV ) {
296 0           return AMQP_FIELD_KIND_TABLE;
297             }
298 0           Perl_croak(
299             aTHX_ "Unsupported Perl Reference Type: %d",
300 0           SvTYPE( SvRV( *perl_value ) )
301             );
302             }
303              
304 0 0         Perl_croak(
305             aTHX_ "Unsupported scalar type detected >%s<(%d)",
306 0           SvPV_nolen(*perl_value),
307 0           SvTYPE( *perl_value )
308             );
309             }
310              
311             /* If we're still here... wtf */
312             Perl_croak( aTHX_ "The wheels have fallen off. Please call for help." );
313             }
314              
315             /* Parallels amqp_read_message */
316 25           static amqp_rpc_reply_t read_message(amqp_connection_state_t state, amqp_channel_t channel, SV **props_sv_ptr, SV **body_sv_ptr) {
317             HV *props_hv;
318             SV *body_sv;
319             amqp_rpc_reply_t ret;
320             int res;
321             amqp_frame_t frame;
322 25           int is_utf8_body = 1; /* The body is UTF-8 by default */
323              
324 25           memset(&ret, 0, sizeof(amqp_rpc_reply_t));
325              
326 25           res = amqp_simple_wait_frame_on_channel(state, channel, &frame);
327 25 50         if (AMQP_STATUS_OK != res) {
328 0           ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
329 0           ret.library_error = res;
330 0           goto error_out1;
331             }
332              
333 25 50         if (AMQP_FRAME_HEADER != frame.frame_type) {
334 0 0         if (AMQP_FRAME_METHOD == frame.frame_type &&
    0          
335 0 0         (AMQP_CHANNEL_CLOSE_METHOD == frame.payload.method.id ||
336 0           AMQP_CONNECTION_CLOSE_METHOD == frame.payload.method.id)) {
337              
338 0           ret.reply_type = AMQP_RESPONSE_SERVER_EXCEPTION;
339 0           ret.reply = frame.payload.method;
340              
341             } else {
342 0           ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
343 0           ret.library_error = AMQP_STATUS_UNEXPECTED_STATE;
344              
345 0           amqp_put_back_frame(state, &frame);
346             }
347              
348 0           goto error_out1;
349             }
350              
351             {
352             amqp_basic_properties_t *p;
353              
354 25           props_hv = newHV();
355              
356 25           p = (amqp_basic_properties_t *) frame.payload.properties.decoded;
357 25 100         if (p->_flags & AMQP_BASIC_CONTENT_TYPE_FLAG) {
358 4           hv_stores(props_hv, "content_type", newSVpvn(p->content_type.bytes, p->content_type.len));
359             }
360 25 100         if (p->_flags & AMQP_BASIC_CONTENT_ENCODING_FLAG) {
361 7           hv_stores(props_hv, "content_encoding", newSVpvn(p->content_encoding.bytes, p->content_encoding.len));
362              
363             /*
364             * Since we could have UTF-8 in our content-encoding, and most people seem like they
365             * treat this like the default, we're looking for the presence of content-encoding but
366             * the absence of a case-insensitive "UTF-8".
367             */
368 7 50         if (
369 7           strnlen(p->content_encoding.bytes, p->content_encoding.len) > 0
370 7 50         &&
371 7           (strncasecmp(p->content_encoding.bytes, "UTF-8", p->content_encoding.len) != 0)
372             ) {
373 7           is_utf8_body = 0;
374             }
375             }
376 25 100         if (p->_flags & AMQP_BASIC_CORRELATION_ID_FLAG) {
377 4           hv_stores(props_hv, "correlation_id", newSVpvn(p->correlation_id.bytes, p->correlation_id.len));
378             }
379 25 100         if (p->_flags & AMQP_BASIC_REPLY_TO_FLAG) {
380 4           hv_stores(props_hv, "reply_to", newSVpvn(p->reply_to.bytes, p->reply_to.len));
381             }
382 25 100         if (p->_flags & AMQP_BASIC_EXPIRATION_FLAG) {
383 4           hv_stores(props_hv, "expiration", newSVpvn(p->expiration.bytes, p->expiration.len));
384             }
385 25 100         if (p->_flags & AMQP_BASIC_MESSAGE_ID_FLAG) {
386 4           hv_stores(props_hv, "message_id", newSVpvn(p->message_id.bytes, p->message_id.len));
387             }
388 25 100         if (p->_flags & AMQP_BASIC_TYPE_FLAG) {
389 4           hv_stores(props_hv, "type", newSVpvn(p->type.bytes, p->type.len));
390             }
391 25 100         if (p->_flags & AMQP_BASIC_USER_ID_FLAG) {
392 4           hv_stores(props_hv, "user_id", newSVpvn(p->user_id.bytes, p->user_id.len));
393             }
394 25 100         if (p->_flags & AMQP_BASIC_APP_ID_FLAG) {
395 4           hv_stores(props_hv, "app_id", newSVpvn(p->app_id.bytes, p->app_id.len));
396             }
397 25 100         if (p->_flags & AMQP_BASIC_DELIVERY_MODE_FLAG) {
398 4           hv_stores(props_hv, "delivery_mode", newSViv(p->delivery_mode));
399             }
400 25 100         if (p->_flags & AMQP_BASIC_PRIORITY_FLAG) {
401 4           hv_stores(props_hv, "priority", newSViv(p->priority));
402             }
403 25 100         if (p->_flags & AMQP_BASIC_TIMESTAMP_FLAG) {
404 4           hv_stores(props_hv, "timestamp", newSViv(p->timestamp));
405             }
406 25 100         if (p->_flags & AMQP_BASIC_HEADERS_FLAG) {
407             int i;
408 9           HV *headers = newHV();
409 9           hv_stores(props_hv, "headers", newRV_noinc(MUTABLE_SV(headers)));
410              
411             __DEBUG__( dump_table( p->headers ) );
412              
413 40 100         for( i=0; i < p->headers.num_entries; ++i ) {
414 31           amqp_table_entry_t *header_entry = &(p->headers.entries[i]);
415              
416             __DEBUG__(
417             fprintf(stderr,
418             "~~~ Length: %ld/%d, Key: %.*s, Kind: %c\n",
419             header_entry->key.len,
420             (int)header_entry->key.len,
421             (int)header_entry->key.len,
422             (char*)header_entry->key.bytes,
423             header_entry->value.kind
424             )
425             );
426              
427 31           switch (header_entry->value.kind) {
428             case AMQP_FIELD_KIND_BOOLEAN:
429 0           hv_store( headers,
430             header_entry->key.bytes, header_entry->key.len,
431             newSViv(header_entry->value.value.boolean),
432             0
433             );
434 0           break;
435              
436             // Integer types
437             case AMQP_FIELD_KIND_I8:
438 0           hv_store( headers,
439             header_entry->key.bytes, header_entry->key.len,
440             newSViv(header_entry->value.value.i8),
441             0
442             );
443 0           break;
444              
445             case AMQP_FIELD_KIND_I16:
446 0           hv_store( headers,
447             header_entry->key.bytes, header_entry->key.len,
448             newSViv(header_entry->value.value.i16),
449             0
450             );
451 0           break;
452              
453             case AMQP_FIELD_KIND_I32:
454 0           hv_store( headers,
455             header_entry->key.bytes, header_entry->key.len,
456             newSViv(header_entry->value.value.i32),
457             0
458             );
459 0           break;
460              
461             case AMQP_FIELD_KIND_I64:
462 14           hv_store( headers,
463             header_entry->key.bytes, header_entry->key.len,
464             newSVi64(header_entry->value.value.i64),
465             0
466             );
467 14           break;
468              
469             case AMQP_FIELD_KIND_U8:
470 0           hv_store( headers,
471             header_entry->key.bytes, header_entry->key.len,
472             newSVuv(header_entry->value.value.u8),
473             0
474             );
475 0           break;
476              
477             case AMQP_FIELD_KIND_U16:
478 0           hv_store( headers,
479             header_entry->key.bytes, header_entry->key.len,
480             newSVuv(header_entry->value.value.u16),
481             0
482             );
483 0           break;
484              
485             case AMQP_FIELD_KIND_U32:
486 0           hv_store( headers,
487             header_entry->key.bytes, header_entry->key.len,
488             newSVuv(header_entry->value.value.u32),
489             0
490             );
491 0           break;
492              
493             case AMQP_FIELD_KIND_U64:
494 0           hv_store( headers,
495             header_entry->key.bytes, header_entry->key.len,
496             newSVu64(header_entry->value.value.u64),
497             0
498             );
499 0           break;
500              
501             // Floating point precision
502             case AMQP_FIELD_KIND_F32:
503 0           hv_store( headers,
504             header_entry->key.bytes, header_entry->key.len,
505             newSVnv(header_entry->value.value.f32),
506             0
507             );
508 0           break;
509              
510             case AMQP_FIELD_KIND_F64:
511             // TODO: I don't think this is a natively supported type on all Perls.
512              
513 2           hv_store( headers,
514             header_entry->key.bytes, header_entry->key.len,
515             #ifdef USE_LONG_DOUBLE
516             /* amqp uses doubles, if perl is -Duselongdouble it messes up the precision
517             * so we always want take the max precision from a double and discard the rest
518             * because it can't be any more precise than a double */
519             newSVnv( ( rint( header_entry->value.value.f64 * DOUBLE_POW ) / DOUBLE_POW ) ),
520             #else
521             /* both of these are doubles so it's ok */
522             newSVnv( header_entry->value.value.f64 ),
523             #endif
524             0
525             );
526 2           break;
527              
528             // Handle kind UTF8 and kind BYTES
529             case AMQP_FIELD_KIND_UTF8:
530             case AMQP_FIELD_KIND_BYTES:
531 10 100         hv_store( headers,
532             header_entry->key.bytes, header_entry->key.len,
533             newSVpvn_utf8(
534             header_entry->value.value.bytes.bytes,
535             header_entry->value.value.bytes.len,
536             AMQP_FIELD_KIND_UTF8 == header_entry->value.kind
537             ),
538             0
539             );
540 10           break;
541              
542             // Handle arrays
543             case AMQP_FIELD_KIND_ARRAY:
544             __DEBUG__(
545             fprintf(stderr, "ARRAY KIND FOR KEY:>%.*s< KIND:>%c< AMQP_FIELD_KIND_ARRAY:[%c].\n",
546             (int)header_entry->key.len,
547             (char*)header_entry->key.bytes,
548             header_entry->value.kind,
549             AMQP_FIELD_KIND_ARRAY
550             )
551             );
552 3           hv_store( headers,
553             header_entry->key.bytes, header_entry->key.len,
554             mq_array_to_arrayref( &header_entry->value.value.array ),
555             0
556             );
557 3           break;
558              
559             // Handle tables (hashes when translated to Perl)
560             case AMQP_FIELD_KIND_TABLE:
561 2           hv_store( headers,
562             header_entry->key.bytes, header_entry->key.len,
563             mq_table_to_hashref( &header_entry->value.value.table ),
564             0
565             );
566 2           break;
567              
568             default:
569 0           ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
570 0           ret.library_error = AMQP_STATUS_UNKNOWN_TYPE;
571 0           goto error_out2;
572             }
573             }
574             }
575             }
576              
577             {
578             char *body;
579 25           size_t body_target = frame.payload.properties.body_size;
580 25           size_t body_remaining = body_target;
581              
582 25           body_sv = newSV(0);
583 25           sv_grow(body_sv, body_target + 1);
584 25           SvCUR_set(body_sv, body_target);
585 25           SvPOK_on(body_sv);
586 25 100         if (is_utf8_body)
587 18           SvUTF8_on(body_sv);
588              
589 25           body = SvPVX(body_sv);
590              
591 59 100         while (body_remaining > 0) {
592             size_t fragment_len;
593              
594 34           res = amqp_simple_wait_frame_on_channel(state, channel, &frame);
595 34 50         if (AMQP_STATUS_OK != res) {
596 0           ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
597 0           ret.library_error = res;
598 0           goto error_out3;
599             }
600              
601 34 50         if (AMQP_FRAME_BODY != frame.frame_type) {
602 0 0         if (AMQP_FRAME_METHOD == frame.frame_type &&
    0          
603 0 0         (AMQP_CHANNEL_CLOSE_METHOD == frame.payload.method.id ||
604 0           AMQP_CONNECTION_CLOSE_METHOD == frame.payload.method.id)) {
605              
606 0           ret.reply_type = AMQP_RESPONSE_SERVER_EXCEPTION;
607 0           ret.reply = frame.payload.method;
608             } else {
609 0           ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
610 0           ret.library_error = AMQP_STATUS_BAD_AMQP_DATA;
611             }
612 0           goto error_out3;
613             }
614              
615 34           fragment_len = frame.payload.body_fragment.len;
616 34 50         if (fragment_len > body_remaining) {
617 0           ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
618 0           ret.library_error = AMQP_STATUS_BAD_AMQP_DATA;
619 0           goto error_out3;
620             }
621              
622 34           memcpy(body, frame.payload.body_fragment.bytes, fragment_len);
623 34           body += fragment_len;
624 34           body_remaining -= fragment_len;
625             }
626              
627 25           *body = '\0';
628             }
629              
630 25           *props_sv_ptr = newRV_noinc(MUTABLE_SV(props_hv));
631 25           *body_sv_ptr = body_sv;
632 25           ret.reply_type = AMQP_RESPONSE_NORMAL;
633 25           return ret;
634              
635             error_out3:
636 0           SvREFCNT_dec(props_hv);
637             error_out2:
638 0           SvREFCNT_dec(body_sv);
639             error_out1:
640 0           *props_sv_ptr = &PL_sv_undef;
641 0           *body_sv_ptr = &PL_sv_undef;
642 25           return ret;
643             }
644              
645             /* Parallels amqp_consume_message */
646 26           static amqp_rpc_reply_t consume_message(amqp_connection_state_t state, SV **envelope_sv_ptr, struct timeval *timeout) {
647             amqp_rpc_reply_t ret;
648             HV *envelope_hv;
649             int res;
650             amqp_frame_t frame;
651             amqp_channel_t channel;
652             SV *props;
653             SV *body;
654              
655 26           memset(&ret, 0, sizeof(amqp_rpc_reply_t));
656 26           *envelope_sv_ptr = &PL_sv_undef;
657              
658 26           res = amqp_simple_wait_frame_noblock(state, &frame, timeout);
659 26 100         if (AMQP_STATUS_OK != res) {
660 4           ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
661 4           ret.library_error = res;
662 4           goto error_out1;
663             }
664              
665 22 50         if (AMQP_FRAME_METHOD != frame.frame_type ||
    100          
666 22           AMQP_BASIC_DELIVER_METHOD != frame.payload.method.id) {
667              
668 1 50         if (AMQP_FRAME_METHOD == frame.frame_type &&
    50          
669 0 0         (AMQP_CHANNEL_CLOSE_METHOD == frame.payload.method.id ||
670 0           AMQP_CONNECTION_CLOSE_METHOD == frame.payload.method.id)) {
671              
672 1           ret.reply_type = AMQP_RESPONSE_SERVER_EXCEPTION;
673 1           ret.reply = frame.payload.method;
674             } else {
675 0           amqp_put_back_frame(state, &frame);
676 0           ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
677 0           ret.library_error = AMQP_STATUS_UNEXPECTED_STATE;
678             }
679              
680 1           goto error_out1;
681             }
682              
683 21           channel = frame.channel;
684              
685 21           envelope_hv = newHV();
686              
687             {
688 21           amqp_basic_deliver_t *d = (amqp_basic_deliver_t *) frame.payload.method.decoded;
689 21           hv_stores(envelope_hv, "channel", newSViv(channel));
690 21           hv_stores(envelope_hv, "delivery_tag", newSVu64(d->delivery_tag));
691 21           hv_stores(envelope_hv, "redelivered", newSViv(d->redelivered));
692 21           hv_stores(envelope_hv, "exchange", newSVpvn(d->exchange.bytes, d->exchange.len));
693 21           hv_stores(envelope_hv, "consumer_tag", newSVpvn(d->consumer_tag.bytes, d->consumer_tag.len));
694 21           hv_stores(envelope_hv, "routing_key", newSVpvn(d->routing_key.bytes, d->routing_key.len));
695             }
696              
697 21           ret = read_message(state, channel, &props, &body );
698 21 50         if (AMQP_RESPONSE_NORMAL != ret.reply_type)
699 0           goto error_out2;
700              
701 21           hv_stores(envelope_hv, "props", props);
702 21           hv_stores(envelope_hv, "body", body);
703              
704 21           *envelope_sv_ptr = newRV_noinc(MUTABLE_SV(envelope_hv));
705 21           ret.reply_type = AMQP_RESPONSE_NORMAL;
706 21           return ret;
707              
708             error_out2:
709 0           SvREFCNT_dec(envelope_hv);
710             error_out1:
711 5           *envelope_sv_ptr = &PL_sv_undef;
712 26           return ret;
713             }
714              
715 5           void array_to_amqp_array(AV *perl_array, amqp_array_t *mq_array, short force_utf8) {
716 5           int idx = 0;
717             SV **value;
718              
719 5           amqp_field_value_t *new_elements = amqp_pool_alloc(
720             &temp_memory_pool,
721 5           ((av_len(perl_array)+1) * sizeof(amqp_field_value_t))
722             );
723             amqp_field_value_t *element;
724              
725 5           mq_array->entries = new_elements;
726 5           mq_array->num_entries = 0;
727              
728 119 100         for ( idx = 0; idx <= av_len(perl_array); idx += 1) {
729 114           value = av_fetch( perl_array, idx, 0 );
730              
731             // We really should never see NULL here.
732             assert(value != NULL);
733              
734             // Let's start getting the type...
735 114           element = &mq_array->entries[mq_array->num_entries];
736 114           mq_array->num_entries += 1;
737 114           element->kind = amqp_kind_for_sv(value, force_utf8);
738              
739             __DEBUG__( warn("%d KIND >%c<", __LINE__, (unsigned char)element->kind) );
740              
741 114           switch (element->kind) {
742              
743             case AMQP_FIELD_KIND_I64:
744 101           element->value.i64 = (int64_t) SvI64(*value);
745 101           break;
746              
747             case AMQP_FIELD_KIND_U64:
748 0           element->value.u64 = (uint64_t) SvU64(*value);
749 0           break;
750              
751             case AMQP_FIELD_KIND_F64:
752             // TODO: I don't think this is a native type on all Perls
753 0 0         element->value.f64 = (double) SvNV(*value);
754 0           break;
755              
756             case AMQP_FIELD_KIND_UTF8:
757             case AMQP_FIELD_KIND_BYTES:
758 11 50         element->value.bytes = amqp_cstring_bytes(SvPV_nolen(*value));
759 11           break;
760              
761             case AMQP_FIELD_KIND_ARRAY:
762 0           array_to_amqp_array(MUTABLE_AV(SvRV(*value)), &(element->value.array), force_utf8);
763 0           break;
764              
765             case AMQP_FIELD_KIND_TABLE:
766 2           hash_to_amqp_table(MUTABLE_HV(SvRV(*value)), &(element->value.table), force_utf8);
767 2           break;
768              
769             default:
770 0           Perl_croak( aTHX_ "Unsupported SvType for array index %d", idx );
771             }
772             }
773 5           }
774              
775             // Iterate over the array entries and decode them to Perl...
776 5           SV* mq_array_to_arrayref(amqp_array_t *mq_array) {
777 5           AV* perl_array = newAV();
778              
779 5           SV* perl_element = &PL_sv_undef;
780             amqp_field_value_t* mq_element;
781              
782 5           int current_entry = 0;
783              
784 119 100         for (; current_entry < mq_array->num_entries; current_entry += 1) {
785 114           mq_element = &mq_array->entries[current_entry];
786              
787             __DEBUG__( warn("%d KIND >%c<", __LINE__, mq_element->kind) );
788              
789 114           switch (mq_element->kind) {
790             // Boolean
791             case AMQP_FIELD_KIND_BOOLEAN:
792 0           perl_element = newSViv(mq_element->value.boolean);
793 0           break;
794              
795             // Signed values
796             case AMQP_FIELD_KIND_I8:
797 0           perl_element = newSViv(mq_element->value.i8);
798 0           break;
799             case AMQP_FIELD_KIND_I16:
800 0           perl_element = newSViv(mq_element->value.i16);
801 0           break;
802             case AMQP_FIELD_KIND_I32:
803 0           perl_element = newSViv(mq_element->value.i32);
804 0           break;
805             case AMQP_FIELD_KIND_I64:
806 101           perl_element = newSVi64(mq_element->value.i64);
807 101           break;
808              
809             // Unsigned values
810             case AMQP_FIELD_KIND_U8:
811 0           perl_element = newSViv(mq_element->value.u8);
812 0           break;
813             case AMQP_FIELD_KIND_U16:
814 0           perl_element = newSViv(mq_element->value.u16);
815 0           break;
816             case AMQP_FIELD_KIND_U32:
817 0           perl_element = newSVuv(mq_element->value.u32);
818 0           break;
819             case AMQP_FIELD_KIND_TIMESTAMP: /* Timestamps */
820             case AMQP_FIELD_KIND_U64:
821 0           perl_element = newSVu64(mq_element->value.u64);
822 0           break;
823              
824             // Floats
825             case AMQP_FIELD_KIND_F32:
826 0           perl_element = newSVnv(mq_element->value.f32);
827 0           break;
828             case AMQP_FIELD_KIND_F64:
829             // TODO: I don't think this is a native type on all Perls
830 0           perl_element = newSVnv(mq_element->value.f64);
831 0           break;
832              
833             // Strings and bytes
834             case AMQP_FIELD_KIND_BYTES:
835 11           perl_element = newSVpvn(
836             mq_element->value.bytes.bytes,
837             mq_element->value.bytes.len
838             );
839 11           break;
840              
841             // UTF-8 strings
842             case AMQP_FIELD_KIND_UTF8:
843 0           perl_element = newSVpvn(
844             mq_element->value.bytes.bytes,
845             mq_element->value.bytes.len
846             );
847 0           SvUTF8_on(perl_element); // It's UTF-8!
848 0           break;
849              
850             // Arrays
851             case AMQP_FIELD_KIND_ARRAY:
852 0           perl_element = mq_array_to_arrayref(&(mq_element->value.array));
853 0           break;
854              
855             // Tables
856             case AMQP_FIELD_KIND_TABLE:
857 2           perl_element = mq_table_to_hashref(&(mq_element->value.table));
858 2           break;
859              
860             // WTF
861             default:
862             // ACK!
863 0           Perl_croak(
864             aTHX_ "Unsupported Perl type >%c< at index %d",
865 0           (unsigned char)mq_element->kind,
866             current_entry
867             );
868             }
869              
870 114           av_push(perl_array, perl_element);
871             }
872              
873 5           return newRV_noinc(MUTABLE_SV(perl_array));
874             }
875              
876 9           SV* mq_table_to_hashref( amqp_table_t *mq_table ) {
877             // Iterate over the table keys and decode them to Perl...
878             int i;
879             SV *perl_element;
880 9           HV *perl_hash = newHV();
881 9           amqp_table_entry_t *hash_entry = (amqp_table_entry_t*)NULL;
882              
883 46 100         for( i=0; i < mq_table->num_entries; i += 1 ) {
884 37           hash_entry = &(mq_table->entries[i]);
885             __DEBUG__(
886             fprintf(
887             stderr,
888             "!!! Key: >%.*s< Kind: >%c<\n",
889             (int)hash_entry->key.len,
890             (char*)hash_entry->key.bytes,
891             hash_entry->value.kind
892             );
893             );
894              
895 37           switch (hash_entry->value.kind) {
896             // Boolean
897             case AMQP_FIELD_KIND_BOOLEAN:
898 11           perl_element = newSViv(hash_entry->value.value.boolean);
899 11           break;
900              
901             // Integers
902             case AMQP_FIELD_KIND_I8:
903 0           perl_element = newSViv(hash_entry->value.value.i8);
904 0           break;
905             case AMQP_FIELD_KIND_I16:
906 0           perl_element = newSViv(hash_entry->value.value.i16);
907 0           break;
908             case AMQP_FIELD_KIND_I32:
909 0           perl_element = newSViv(hash_entry->value.value.i32);
910 0           break;
911             case AMQP_FIELD_KIND_I64:
912 7           perl_element = newSVi64(hash_entry->value.value.i64);
913 7           break;
914             case AMQP_FIELD_KIND_U8:
915 0           perl_element = newSViv(hash_entry->value.value.u8);
916 0           break;
917             case AMQP_FIELD_KIND_U16:
918 0           perl_element = newSViv(hash_entry->value.value.u16);
919 0           break;
920             case AMQP_FIELD_KIND_U32:
921 0           perl_element = newSVuv(hash_entry->value.value.u32);
922 0           break;
923             case AMQP_FIELD_KIND_TIMESTAMP: /* Timestamps */
924             case AMQP_FIELD_KIND_U64:
925 0           perl_element = newSVu64(hash_entry->value.value.u64);
926 0           break;
927              
928             // Foats
929             case AMQP_FIELD_KIND_F32:
930 0           perl_element = newSVnv(hash_entry->value.value.f32);
931 0           break;
932             case AMQP_FIELD_KIND_F64:
933             // TODO: I don't think this is a native type on all Perls.
934 0           perl_element = newSVnv(hash_entry->value.value.f64);
935 0           break;
936              
937             case AMQP_FIELD_KIND_BYTES:
938 3           perl_element = newSVpvn(
939             hash_entry->value.value.bytes.bytes,
940             hash_entry->value.value.bytes.len
941             );
942 3           break;
943              
944             case AMQP_FIELD_KIND_UTF8:
945 11           perl_element = newSVpvn(
946             hash_entry->value.value.bytes.bytes,
947             hash_entry->value.value.bytes.len
948             );
949 11           SvUTF8_on(perl_element); // It's UTF-8!
950 11           break;
951              
952             case AMQP_FIELD_KIND_ARRAY:
953 2           perl_element = mq_array_to_arrayref(&(hash_entry->value.value.array));
954 2           break;
955              
956             case AMQP_FIELD_KIND_TABLE:
957 3           perl_element = mq_table_to_hashref(&(hash_entry->value.value.table));
958 3           break;
959              
960             default:
961             // ACK!
962 0           Perl_croak(
963             aTHX_ "Unsupported Perl type >%c< at index %d",
964 0           (unsigned char)hash_entry->value.kind,
965             i
966             );
967             }
968              
969             // Stash this in our hash.
970 37           hv_store(
971             perl_hash,
972             hash_entry->key.bytes, hash_entry->key.len,
973             perl_element,
974             0
975             );
976              
977             }
978              
979 9           return newRV_noinc(MUTABLE_SV(perl_hash));
980             }
981              
982 20           void hash_to_amqp_table(HV *hash, amqp_table_t *table, short force_utf8) {
983             HE *he;
984             char *key;
985             SV *value;
986             I32 retlen;
987             amqp_table_entry_t *entry;
988              
989 20 50         amqp_table_entry_t *new_entries = amqp_pool_alloc( &temp_memory_pool, HvKEYS(hash) * sizeof(amqp_table_entry_t) );
990 20           table->entries = new_entries;
991              
992 20           hv_iterinit(hash);
993 70 100         while (NULL != (he = hv_iternext(hash))) {
994 50           key = hv_iterkey(he, &retlen);
995             __DEBUG__( warn("Key: %s\n", key) );
996 50           value = hv_iterval(hash, he);
997              
998 50 50         if (SvGMAGICAL(value)) {
999 0           mg_get(value);
1000             }
1001              
1002 50           entry = &table->entries[table->num_entries];
1003 50           entry->key = amqp_cstring_bytes( key );
1004              
1005             // Reserved headers, per spec must force UTF-8 for strings.
1006             // Other headers aren't necessarily required to do so.
1007 50 50         if (
1008             // "x-*" exchanges
1009             (
1010 50           strlen(key) > 2
1011 50 50         &&
1012 50           key[0] == 'x'
1013 0 0         &&
1014 0           key[1] == '-'
1015             )
1016             ) {
1017 0           entry->value.kind = amqp_kind_for_sv( &value, 1 );
1018             }
1019             else {
1020 50           entry->value.kind = amqp_kind_for_sv( &value, force_utf8 );
1021             }
1022              
1023              
1024             __DEBUG__(
1025             warn("hash_to_amqp_table()");
1026             warn("%s", SvPV_nolen(value) );
1027             fprintf(
1028             stderr,
1029             "Key: >%.*s< Kind: >%c<\n",
1030             (int)entry->key.len,
1031             (char*)entry->key.bytes,
1032             entry->value.kind
1033             );
1034             );
1035              
1036 50           switch ( entry->value.kind ) {
1037             case AMQP_FIELD_KIND_I64:
1038 21           entry->value.value.i64 = (int64_t) SvI64( value );
1039 21           break;
1040              
1041             case AMQP_FIELD_KIND_U64:
1042 0           entry->value.value.u64 = (uint64_t) SvU64( value );
1043 0           break;
1044              
1045             case AMQP_FIELD_KIND_F64:
1046             // TODO: I don't think this is a native type on all Perls.
1047 2 50         entry->value.value.f64 = (double) SvNV( value );
1048 2           break;
1049              
1050             case AMQP_FIELD_KIND_BYTES:
1051             case AMQP_FIELD_KIND_UTF8:
1052 19 50         entry->value.value.bytes = amqp_cstring_bytes( SvPV_nolen( value )
1053             );
1054 19           break;
1055              
1056             case AMQP_FIELD_KIND_ARRAY:
1057 5           array_to_amqp_array(
1058 5           MUTABLE_AV(SvRV(value)),
1059             &(entry->value.value.array),
1060             force_utf8
1061             );
1062 5           break;
1063              
1064             case AMQP_FIELD_KIND_TABLE:
1065 3           hash_to_amqp_table(
1066 3           MUTABLE_HV(SvRV(value)),
1067             &(entry->value.value.table),
1068             force_utf8
1069             );
1070 3           break;
1071              
1072             default:
1073 0           Perl_croak( aTHX_ "amqp_kind_for_sv() returned a type I don't understand." );
1074             }
1075              
1076             // Successfully (we think) added an entry to the table.
1077 50           table->num_entries++;
1078             }
1079              
1080 20           return;
1081             }
1082              
1083 22           static amqp_rpc_reply_t basic_get(amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t queue, SV **envelope_sv_ptr, amqp_boolean_t no_ack) {
1084             amqp_rpc_reply_t ret;
1085 22           HV *envelope_hv = NULL;
1086             SV *props;
1087             SV *body;
1088              
1089 22           ret = amqp_basic_get(state, channel, queue, no_ack);
1090 22 50         if (AMQP_RESPONSE_NORMAL != ret.reply_type)
1091 0           goto error_out1;
1092              
1093 22 100         if (AMQP_BASIC_GET_OK_METHOD != ret.reply.id)
1094 18           goto success_out;
1095              
1096 4           envelope_hv = newHV();
1097              
1098             {
1099 4           amqp_basic_get_ok_t *ok = (amqp_basic_get_ok_t *) ret.reply.decoded;
1100 4           hv_stores(envelope_hv, "delivery_tag", newSVu64(ok->delivery_tag));
1101 4           hv_stores(envelope_hv, "redelivered", newSViv(ok->redelivered));
1102 4           hv_stores(envelope_hv, "exchange", newSVpvn(ok->exchange.bytes, ok->exchange.len));
1103 4           hv_stores(envelope_hv, "routing_key", newSVpvn(ok->routing_key.bytes, ok->routing_key.len));
1104 4           hv_stores(envelope_hv, "message_count", newSViv(ok->message_count));
1105             }
1106              
1107 4           ret = read_message(state, channel, &props, &body );
1108 4 50         if (AMQP_RESPONSE_NORMAL != ret.reply_type)
1109 0           goto error_out2;
1110              
1111 4           hv_stores(envelope_hv, "props", props);
1112 4           hv_stores(envelope_hv, "body", body);
1113            
1114             success_out:
1115 22 100         *envelope_sv_ptr = envelope_hv ? newRV_noinc(MUTABLE_SV(envelope_hv)) : &PL_sv_undef;
1116 22           ret.reply_type = AMQP_RESPONSE_NORMAL;
1117 22           return ret;
1118              
1119             error_out2:
1120 0           SvREFCNT_dec(envelope_hv);
1121             error_out1:
1122 0           *envelope_sv_ptr = &PL_sv_undef;
1123 22           return ret;
1124             }
1125              
1126              
1127              
1128             MODULE = Net::AMQP::RabbitMQ PACKAGE = Net::AMQP::RabbitMQ PREFIX = net_amqp_rabbitmq_
1129              
1130             REQUIRE: 1.9505
1131             PROTOTYPES: DISABLE
1132              
1133             BOOT:
1134 32 50         PERL_MATH_INT64_LOAD_OR_CROAK;
1135              
1136             int
1137             net_amqp_rabbitmq_connect(conn, hostname, options)
1138             Net::AMQP::RabbitMQ conn
1139             char *hostname
1140             HV *options
1141             PREINIT:
1142             amqp_socket_t *sock;
1143 38           char *user = "guest";
1144 38           char *password = "guest";
1145 38           char *vhost = "/";
1146 38           int port = 5672;
1147 38           int channel_max = 0;
1148 38           int frame_max = 131072;
1149 38           int heartbeat = 0;
1150 38           double timeout = -1;
1151             struct timeval to;
1152              
1153 38           int ssl = 0;
1154 38           char *ssl_cacert = NULL;
1155 38           char *ssl_cert = NULL;
1156 38           char *ssl_key = NULL;
1157 38           int ssl_verify_host = 1;
1158 38           int ssl_init = 1;
1159 38           char *sasl_method = "plain";
1160 38           amqp_sasl_method_enum sasl_type = AMQP_SASL_METHOD_PLAIN;
1161             CODE:
1162 38 50         str_from_hv(options, user);
    50          
1163 38 50         str_from_hv(options, password);
    50          
1164 38 50         str_from_hv(options, vhost);
    50          
1165 38 50         int_from_hv(options, channel_max);
    0          
1166 38 50         int_from_hv(options, frame_max);
    0          
1167 38 100         int_from_hv(options, heartbeat);
    50          
1168 38 50         int_from_hv(options, port);
    50          
1169 38 100         double_from_hv(options, timeout);
    50          
1170              
1171 38 50         int_from_hv(options, ssl);
    50          
1172 38 50         str_from_hv(options, ssl_cacert);
    50          
1173 38 50         str_from_hv(options, ssl_cert);
    0          
1174 38 50         str_from_hv(options, ssl_key);
    0          
1175 38 50         int_from_hv(options, ssl_verify_host);
    50          
1176 38 50         int_from_hv(options, ssl_init);
    50          
1177 38 50         str_from_hv(options, sasl_method);
    0          
1178              
1179 38 100         if(timeout >= 0) {
1180 1           to.tv_sec = floor(timeout);
1181 1           to.tv_usec = 1000000.0 * (timeout - floor(timeout));
1182             }
1183              
1184 38 100         if ( ssl ) {
1185             #ifndef NAR_HAVE_OPENSSL
1186             Perl_croak(aTHX_ "no ssl support, please install openssl and reinstall");
1187             #endif
1188 1           amqp_set_initialize_ssl_library( (amqp_boolean_t)ssl_init );
1189 1           sock = amqp_ssl_socket_new(conn);
1190 1 50         if ( !sock ) {
1191 0           Perl_croak(aTHX_ "error creating SSL socket");
1192             }
1193              
1194 1           amqp_ssl_socket_set_verify_hostname( sock, (amqp_boolean_t)ssl_verify_host );
1195              
1196 1 50         if ( ( ssl_cacert != NULL ) && strlen(ssl_cacert) ) {
    50          
1197 1 50         if ( amqp_ssl_socket_set_cacert(sock, ssl_cacert) ) {
1198 0           Perl_croak(aTHX_ "error setting CA certificate");
1199             }
1200             }
1201             else {
1202             // TODO
1203             // in librabbitmq > 0.7.1, amqp_ssl_socket_set_verify_peer makes this optional
1204 0           Perl_croak(aTHX_ "required arg ssl_cacert not provided");
1205             }
1206              
1207 1 50         if ( ( ssl_key != NULL ) && strlen(ssl_key) && ( ssl_cert != NULL ) && strlen(ssl_cert) ) {
    0          
    0          
    0          
1208 0 0         if ( amqp_ssl_socket_set_key( sock, ssl_cert, ssl_key ) ) {
1209 0           Perl_croak(aTHX_ "error setting client cert");
1210             }
1211             }
1212             }
1213             else {
1214 37           sock = amqp_tcp_socket_new(conn);
1215 37 50         if (!sock) {
1216 0           Perl_croak(aTHX_ "error creating TCP socket");
1217             }
1218             }
1219              
1220             //if there's data in the buffer, clear it
1221 59 100         while ( amqp_data_in_buffer(conn) ) {
1222             amqp_frame_t frame;
1223 21           amqp_simple_wait_frame( conn, &frame );
1224             }
1225              
1226             // should probably be amqp_raw_equal, but this is a minimal hack
1227 38 50         if (strcasecmp(sasl_method, "external") == 0) {
1228 0           sasl_type = AMQP_SASL_METHOD_EXTERNAL;
1229             }
1230              
1231 38 100         die_on_error(aTHX_ amqp_socket_open_noblock(sock, hostname, port, (timeout<0)?NULL:&to), conn, "opening socket");
1232 37           die_on_amqp_error(aTHX_ amqp_login(conn, vhost, channel_max, frame_max, heartbeat, sasl_type, user, password), conn, "Logging in");
1233              
1234 37           maybe_release_buffers(conn);
1235              
1236 37           RETVAL = 1;
1237             OUTPUT:
1238             RETVAL
1239              
1240             void
1241             net_amqp_rabbitmq_channel_open(conn, channel)
1242             Net::AMQP::RabbitMQ conn
1243             int channel
1244             CODE:
1245 37 100         assert_amqp_connected(conn);
    50          
1246              
1247 36           amqp_channel_open(conn, channel);
1248 36           die_on_amqp_error(aTHX_ amqp_get_rpc_reply(conn), conn, "Opening channel");
1249              
1250             void
1251             net_amqp_rabbitmq_channel_close(conn, channel)
1252             Net::AMQP::RabbitMQ conn
1253             int channel
1254             CODE:
1255             /* If we don't have a socket, just return. */
1256 2 100         if ( ! has_valid_connection( conn ) ) {
    50          
1257 1           return;
1258             }
1259 1           die_on_amqp_error(aTHX_ amqp_channel_close(conn, channel, AMQP_REPLY_SUCCESS), conn, "Closing channel");
1260              
1261             void
1262             net_amqp_rabbitmq_exchange_declare(conn, channel, exchange, options = NULL, args = NULL)
1263             Net::AMQP::RabbitMQ conn
1264             int channel
1265             char *exchange
1266             HV *options
1267             HV *args
1268             PREINIT:
1269 27           char *exchange_type = "direct";
1270 27           int passive = 0;
1271 27           int durable = 0;
1272 27           int auto_delete = 0;
1273 27           int internal = 0;
1274 27           amqp_table_t arguments = amqp_empty_table;
1275             CODE:
1276 27 100         assert_amqp_connected(conn);
    50          
1277              
1278 26 50         if(options) {
1279 26 50         str_from_hv(options, exchange_type);
    50          
1280 26 100         int_from_hv(options, passive);
    50          
1281 26 100         int_from_hv(options, durable);
    50          
1282 26 50         int_from_hv(options, auto_delete);
    50          
1283 26 100         int_from_hv(options, internal);
    50          
1284             }
1285 26 100         if(args)
1286             {
1287 1           hash_to_amqp_table(args, &arguments, 1);
1288             }
1289 26           amqp_exchange_declare(
1290             conn,
1291             channel,
1292             amqp_cstring_bytes(exchange),
1293             amqp_cstring_bytes(exchange_type),
1294             passive,
1295             (amqp_boolean_t)durable,
1296             (amqp_boolean_t)auto_delete,
1297             (amqp_boolean_t)internal,
1298             arguments
1299             );
1300 26           maybe_release_buffers(conn);
1301 26           die_on_amqp_error(aTHX_ amqp_get_rpc_reply(conn), conn, "Declaring exchange");
1302              
1303             void
1304             net_amqp_rabbitmq_exchange_delete(conn, channel, exchange, options = NULL)
1305             Net::AMQP::RabbitMQ conn
1306             int channel
1307             char *exchange
1308             HV *options
1309             PREINIT:
1310 28           int if_unused = 1;
1311             CODE:
1312 28 100         assert_amqp_connected(conn);
    50          
1313              
1314 27 50         if(options) {
1315 27 50         int_from_hv(options, if_unused);
    50          
1316             }
1317 27           amqp_exchange_delete(conn, channel, amqp_cstring_bytes(exchange), if_unused);
1318 27           die_on_amqp_error(aTHX_ amqp_get_rpc_reply(conn), conn, "Deleting exchange");
1319              
1320             void net_amqp_rabbitmq_exchange_bind(conn, channel, destination, source, routing_key, args = NULL)
1321             Net::AMQP::RabbitMQ conn
1322             int channel
1323             char *destination
1324             char *source
1325             char *routing_key
1326             HV *args
1327             PREINIT:
1328 3           amqp_exchange_bind_ok_t *reply = (amqp_exchange_bind_ok_t*)NULL;
1329 3           amqp_table_t arguments = amqp_empty_table;
1330             CODE:
1331             // We must be connected
1332 3 50         assert_amqp_connected(conn);
    50          
1333              
1334             // Parameter validation
1335 3 50         if( ( source == NULL || 0 == strlen(source) )
    100          
1336 2 50         ||
1337 2 100         ( destination == NULL || 0 == strlen(destination) )
1338             )
1339             {
1340 2           Perl_croak(aTHX_ "source and destination must both be specified");
1341             }
1342              
1343             // Pull in arguments if we have any
1344 1 50         if(args)
1345             {
1346 1           hash_to_amqp_table(args, &arguments, 1);
1347             }
1348              
1349 1           reply = amqp_exchange_bind(
1350             conn,
1351             channel,
1352             amqp_cstring_bytes(destination),
1353             amqp_cstring_bytes(source),
1354             amqp_cstring_bytes(routing_key),
1355             arguments
1356             );
1357 1           die_on_amqp_error(aTHX_ amqp_get_rpc_reply(conn), conn, "Binding Exchange");
1358              
1359             void net_amqp_rabbitmq_exchange_unbind(conn, channel, destination, source, routing_key, args = NULL)
1360             Net::AMQP::RabbitMQ conn
1361             int channel
1362             char *destination
1363             char *source
1364             char *routing_key
1365             HV *args
1366             PREINIT:
1367 3           amqp_exchange_unbind_ok_t *reply = (amqp_exchange_unbind_ok_t*)NULL;
1368 3           amqp_table_t arguments = amqp_empty_table;
1369             CODE:
1370             // We must be connected
1371 3 50         assert_amqp_connected(conn);
    50          
1372              
1373             // Parameter validation
1374 3 50         if( ( source == NULL || 0 == strlen(source) )
    100          
1375 2 50         ||
1376 2 100         ( destination == NULL || 0 == strlen(destination) )
1377             )
1378             {
1379 2           Perl_croak(aTHX_ "source and destination must both be specified");
1380             }
1381              
1382             // Pull in arguments if we have any
1383 1 50         if(args)
1384             {
1385 1           hash_to_amqp_table(args, &arguments, 1);
1386             }
1387              
1388 1           reply = amqp_exchange_unbind(
1389             conn,
1390             channel,
1391             amqp_cstring_bytes(destination),
1392             amqp_cstring_bytes(source),
1393             amqp_cstring_bytes(routing_key),
1394             arguments
1395             );
1396 1           die_on_amqp_error(aTHX_ amqp_get_rpc_reply(conn), conn, "Unbinding Exchange");
1397              
1398             void net_amqp_rabbitmq_queue_delete(conn, channel, queuename, options = NULL)
1399             Net::AMQP::RabbitMQ conn
1400             int channel
1401             char *queuename
1402             HV *options
1403             PREINIT:
1404 21           int if_unused = 1;
1405 21           int if_empty = 1;
1406 21           amqp_queue_delete_ok_t *reply = (amqp_queue_delete_ok_t*)NULL;
1407             CODE:
1408 21 100         assert_amqp_connected(conn);
    50          
1409              
1410 20 50         if(options) {
1411 20 50         int_from_hv(options, if_unused);
    50          
1412 20 50         int_from_hv(options, if_empty);
    50          
1413             }
1414 20           reply = amqp_queue_delete(
1415             conn,
1416             channel,
1417             amqp_cstring_bytes(queuename),
1418             if_unused,
1419             if_empty
1420             );
1421 20 50         if (reply == NULL) {
1422 0           die_on_amqp_error(aTHX_ amqp_get_rpc_reply(conn), conn, "Deleting queue");
1423             }
1424 20 50         XPUSHs(sv_2mortal(newSVuv(reply->message_count)));
1425              
1426             void
1427             net_amqp_rabbitmq_queue_declare(conn, channel, queuename, options = NULL, args = NULL)
1428             Net::AMQP::RabbitMQ conn
1429             int channel
1430             char *queuename
1431             HV *options
1432             HV *args
1433             PREINIT:
1434 21           int passive = 0;
1435 21           int durable = 0;
1436 21           int exclusive = 0;
1437 21           int auto_delete = 1;
1438 21           amqp_table_t arguments = amqp_empty_table;
1439 21           amqp_bytes_t queuename_b = amqp_empty_bytes;
1440 21           amqp_queue_declare_ok_t *r = (amqp_queue_declare_ok_t*)NULL;
1441             PPCODE:
1442 21 100         assert_amqp_connected(conn);
    50          
1443              
1444 20 50         if(queuename && strcmp(queuename, "")) queuename_b = amqp_cstring_bytes(queuename);
    100          
1445 20 50         if(options) {
1446 20 100         int_from_hv(options, passive);
    50          
1447 20 100         int_from_hv(options, durable);
    50          
1448 20 100         int_from_hv(options, exclusive);
    50          
1449 20 50         int_from_hv(options, auto_delete);
    50          
1450             }
1451 20 50         if(args)
1452             {
1453 0           hash_to_amqp_table(args, &arguments, 1);
1454             }
1455 20           r = amqp_queue_declare(conn, channel, queuename_b, passive,
1456             durable, exclusive, auto_delete,
1457             arguments);
1458 20           die_on_amqp_error(aTHX_ amqp_get_rpc_reply(conn), conn, "Declaring queue");
1459 19 50         XPUSHs(sv_2mortal(newSVpvn(r->queue.bytes, r->queue.len)));
1460 19 50         if(GIMME_V == G_ARRAY) {
    100          
1461 1 50         XPUSHs(sv_2mortal(newSVuv(r->message_count)));
1462 1 50         XPUSHs(sv_2mortal(newSVuv(r->consumer_count)));
1463             }
1464              
1465             void
1466             net_amqp_rabbitmq_queue_bind(conn, channel, queuename, exchange, bindingkey, args = NULL)
1467             Net::AMQP::RabbitMQ conn
1468             int channel
1469             char *queuename
1470             char *exchange
1471             char *bindingkey
1472             HV *args
1473             PREINIT:
1474 21           amqp_table_t arguments = amqp_empty_table;
1475             CODE:
1476 21 100         assert_amqp_connected(conn);
    50          
1477              
1478 18 50         if(queuename == NULL
1479 18 50         ||
1480             exchange == NULL
1481 18 50         ||
1482 18           0 == strlen(queuename)
1483 18 50         ||
1484 18           0 == strlen(exchange)
1485             )
1486             {
1487 0           Perl_croak(aTHX_ "queuename and exchange must both be specified");
1488             }
1489              
1490 18 100         if(args)
1491 2           hash_to_amqp_table(args, &arguments, 0);
1492 18           amqp_queue_bind(conn, channel, amqp_cstring_bytes(queuename),
1493             amqp_cstring_bytes(exchange),
1494             amqp_cstring_bytes(bindingkey),
1495             arguments);
1496 18           maybe_release_buffers(conn);
1497 18           die_on_amqp_error(aTHX_ amqp_get_rpc_reply(conn), conn, "Binding queue");
1498              
1499             void
1500             net_amqp_rabbitmq_queue_unbind(conn, channel, queuename, exchange, bindingkey, args = NULL)
1501             Net::AMQP::RabbitMQ conn
1502             int channel
1503             char *queuename
1504             char *exchange
1505             char *bindingkey
1506             HV *args
1507             PREINIT:
1508 22           amqp_table_t arguments = amqp_empty_table;
1509             CODE:
1510 22 100         assert_amqp_connected(conn);
    50          
1511              
1512 21 50         if(queuename == NULL || exchange == NULL)
    50          
1513             {
1514 0           Perl_croak(aTHX_ "queuename and exchange must both be specified");
1515             }
1516              
1517 21 100         if(args)
1518             {
1519 1           hash_to_amqp_table(args, &arguments, 0);
1520             }
1521 21           amqp_queue_unbind(conn, channel, amqp_cstring_bytes(queuename),
1522             amqp_cstring_bytes(exchange),
1523             amqp_cstring_bytes(bindingkey),
1524             arguments);
1525 21           maybe_release_buffers(conn);
1526 21           die_on_amqp_error(aTHX_ amqp_get_rpc_reply(conn), conn, "Unbinding queue");
1527              
1528             SV *
1529             net_amqp_rabbitmq_consume(conn, channel, queuename, options = NULL)
1530             Net::AMQP::RabbitMQ conn
1531             int channel
1532             char *queuename
1533             HV *options
1534             PREINIT:
1535             amqp_basic_consume_ok_t *r;
1536 17           char *consumer_tag = NULL;
1537 17           int no_local = 0;
1538 17           int no_ack = 1;
1539 17           int exclusive = 0;
1540             CODE:
1541 17 100         assert_amqp_connected(conn);
    50          
1542              
1543 16 50         if(options) {
1544 16 50         str_from_hv(options, consumer_tag);
    50          
1545 16 50         int_from_hv(options, no_local);
    50          
1546 16 50         int_from_hv(options, no_ack);
    50          
1547 16 50         int_from_hv(options, exclusive);
    50          
1548             }
1549 16 50         r = amqp_basic_consume(conn, channel, amqp_cstring_bytes(queuename),
1550             consumer_tag ? amqp_cstring_bytes(consumer_tag) : amqp_empty_bytes,
1551             no_local, no_ack, exclusive, amqp_empty_table);
1552 16           die_on_amqp_error(aTHX_ amqp_get_rpc_reply(conn), conn, "Consume queue");
1553 16           RETVAL = newSVpvn(r->consumer_tag.bytes, r->consumer_tag.len);
1554             OUTPUT:
1555             RETVAL
1556              
1557             int
1558             net_amqp_rabbitmq_cancel(conn, channel, consumer_tag)
1559             Net::AMQP::RabbitMQ conn
1560             int channel
1561             char *consumer_tag
1562             PREINIT:
1563             amqp_basic_cancel_ok_t *r;
1564             CODE:
1565 4 100         assert_amqp_connected(conn);
    100          
1566              
1567 2           r = amqp_basic_cancel(conn, channel, amqp_cstring_bytes(consumer_tag));
1568 2           die_on_amqp_error(aTHX_ amqp_get_rpc_reply(conn), conn, "cancel");
1569              
1570 1 50         if ( r == NULL ) {
1571 0           RETVAL = 0;
1572             }
1573             else {
1574 1 50         if(strlen(consumer_tag) == r->consumer_tag.len && 0 == strcmp(consumer_tag, (char *)r->consumer_tag.bytes)) {
    50          
1575 0           RETVAL = 1;
1576             } else {
1577 1           RETVAL = 0;
1578             }
1579             }
1580             OUTPUT:
1581             RETVAL
1582              
1583             SV *
1584             net_amqp_rabbitmq_recv(conn, timeout = 0)
1585             Net::AMQP::RabbitMQ conn
1586             int timeout
1587             PREINIT:
1588             SV *envelope;
1589             amqp_rpc_reply_t ret;
1590             struct timeval timeout_tv;
1591             CODE:
1592 27 100         assert_amqp_connected(conn);
    50          
1593              
1594 26 100         if (timeout > 0) {
1595 6           timeout_tv.tv_sec = timeout / 1000;
1596 6           timeout_tv.tv_usec = (timeout % 1000) * 1000;
1597             }
1598              
1599             // Set the waiting time to 0
1600 26 100         if (timeout == -1) {
1601 2           timeout_tv.tv_sec = 0;
1602 2           timeout_tv.tv_usec = 0;
1603             }
1604              
1605 26           maybe_release_buffers(conn);
1606 26 100         ret = consume_message(conn, &RETVAL, timeout ? &timeout_tv : NULL);
1607 26 100         if (AMQP_RESPONSE_LIBRARY_EXCEPTION != ret.reply_type || AMQP_STATUS_TIMEOUT != ret.library_error)
    50          
1608 22           die_on_amqp_error(aTHX_ ret, conn, "recv");
1609              
1610             OUTPUT:
1611             RETVAL
1612              
1613             void
1614             net_amqp_rabbitmq_ack(conn, channel, delivery_tag, multiple = 0)
1615             Net::AMQP::RabbitMQ conn
1616             int channel
1617             uint64_t delivery_tag
1618             int multiple
1619             CODE:
1620 3 100         assert_amqp_connected(conn);
    50          
1621              
1622 2           die_on_error(aTHX_ amqp_basic_ack(conn, channel, delivery_tag, multiple), conn,
1623             "ack");
1624              
1625             void
1626             net_amqp_rabbitmq_nack(conn, channel, delivery_tag, multiple = 0, requeue = 0)
1627             Net::AMQP::RabbitMQ conn
1628             int channel
1629             uint64_t delivery_tag
1630             int multiple
1631             int requeue
1632             CODE:
1633 2 100         assert_amqp_connected(conn);
    50          
1634              
1635 1           die_on_error(
1636             aTHX_ amqp_basic_nack(
1637             conn,
1638             channel,
1639             delivery_tag,
1640             (amqp_boolean_t)multiple,
1641             (amqp_boolean_t)requeue
1642             ),
1643             conn,
1644             "nack"
1645             );
1646              
1647             void
1648             net_amqp_rabbitmq_reject(conn, channel, delivery_tag, requeue = 0)
1649             Net::AMQP::RabbitMQ conn
1650             int channel
1651             uint64_t delivery_tag
1652             int requeue
1653             CODE:
1654 2 100         assert_amqp_connected(conn);
    50          
1655              
1656 1           die_on_error(
1657             aTHX_ amqp_basic_reject(
1658             conn,
1659             channel,
1660             delivery_tag,
1661             requeue
1662             ),
1663             conn,
1664             "reject"
1665             );
1666              
1667              
1668             void
1669             net_amqp_rabbitmq_purge(conn, channel, queuename)
1670             Net::AMQP::RabbitMQ conn
1671             int channel
1672             char *queuename
1673             CODE:
1674 20 100         assert_amqp_connected(conn);
    50          
1675              
1676 19           amqp_queue_purge(conn, channel, amqp_cstring_bytes(queuename));
1677 19           die_on_amqp_error(aTHX_ amqp_get_rpc_reply(conn), conn, "Purging queue");
1678              
1679             void
1680             net_amqp_rabbitmq__publish(conn, channel, routing_key, body, options = NULL, props = NULL)
1681             Net::AMQP::RabbitMQ conn
1682             int channel
1683             HV *options;
1684             char *routing_key
1685             SV *body
1686             HV *props
1687             PREINIT:
1688             SV **v;
1689 30           char *exchange = "amq.direct";
1690 30           amqp_boolean_t mandatory = 0;
1691 30           amqp_boolean_t immediate = 0;
1692             int rv;
1693 30           amqp_bytes_t exchange_b = { 0 };
1694             amqp_bytes_t routing_key_b;
1695             amqp_bytes_t body_b;
1696             struct amqp_basic_properties_t_ properties;
1697             STRLEN len;
1698 30           int force_utf8_in_header_strings = 0;
1699             CODE:
1700 30 100         assert_amqp_connected(conn);
    50          
1701              
1702 29           routing_key_b = amqp_cstring_bytes(routing_key);
1703 29 50         body_b.bytes = SvPV(body, len);
1704 29           body_b.len = len;
1705 29 50         if(options) {
1706 29 50         if(NULL != (v = hv_fetchs(options, "mandatory", 0))) {
1707 0 0         mandatory = SvIV(*v) ? 1 : 0;
1708             }
1709 29 50         if(NULL != (v = hv_fetchs(options, "immediate", 0))) {
1710 0 0         immediate = SvIV(*v) ? 1 : 0;
1711             }
1712 29 50         if(NULL != (v = hv_fetchs(options, "exchange", 0))) {
1713 29 50         exchange_b = amqp_cstring_bytes(SvPV_nolen(*v));
1714             }
1715              
1716             // This is an internal option, only for determining if we want to force utf8
1717 29 100         int_from_hv(options, force_utf8_in_header_strings);
    50          
1718             }
1719 29           properties.headers = amqp_empty_table;
1720 29           properties._flags = 0;
1721 29 50         if (props) {
1722 29 100         if (NULL != (v = hv_fetchs(props, "content_type", 0))) {
1723 5 50         properties.content_type = amqp_cstring_bytes(SvPV_nolen(*v));
1724 5           properties._flags |= AMQP_BASIC_CONTENT_TYPE_FLAG;
1725             }
1726 29 100         if (NULL != (v = hv_fetchs(props, "content_encoding", 0))) {
1727 8 50         properties.content_encoding = amqp_cstring_bytes(SvPV_nolen(*v));
1728 8           properties._flags |= AMQP_BASIC_CONTENT_ENCODING_FLAG;
1729             }
1730 29 100         if (NULL != (v = hv_fetchs(props, "correlation_id", 0))) {
1731 5 50         properties.correlation_id = amqp_cstring_bytes(SvPV_nolen(*v));
1732 5           properties._flags |= AMQP_BASIC_CORRELATION_ID_FLAG;
1733             }
1734 29 100         if (NULL != (v = hv_fetchs(props, "reply_to", 0))) {
1735 5 50         properties.reply_to = amqp_cstring_bytes(SvPV_nolen(*v));
1736 5           properties._flags |= AMQP_BASIC_REPLY_TO_FLAG;
1737             }
1738 29 100         if (NULL != (v = hv_fetchs(props, "expiration", 0))) {
1739 5 50         properties.expiration = amqp_cstring_bytes(SvPV_nolen(*v));
1740 5           properties._flags |= AMQP_BASIC_EXPIRATION_FLAG;
1741             }
1742 29 100         if (NULL != (v = hv_fetchs(props, "message_id", 0))) {
1743 5 50         properties.message_id = amqp_cstring_bytes(SvPV_nolen(*v));
1744 5           properties._flags |= AMQP_BASIC_MESSAGE_ID_FLAG;
1745             }
1746 29 100         if (NULL != (v = hv_fetchs(props, "type", 0))) {
1747 5 50         properties.type = amqp_cstring_bytes(SvPV_nolen(*v));
1748 5           properties._flags |= AMQP_BASIC_TYPE_FLAG;
1749             }
1750 29 100         if (NULL != (v = hv_fetchs(props, "user_id", 0))) {
1751 5 50         properties.user_id = amqp_cstring_bytes(SvPV_nolen(*v));
1752 5           properties._flags |= AMQP_BASIC_USER_ID_FLAG;
1753             }
1754 29 100         if (NULL != (v = hv_fetchs(props, "app_id", 0))) {
1755 5 50         properties.app_id = amqp_cstring_bytes(SvPV_nolen(*v));
1756 5           properties._flags |= AMQP_BASIC_APP_ID_FLAG;
1757             }
1758 29 100         if (NULL != (v = hv_fetchs(props, "delivery_mode", 0))) {
1759 5 50         properties.delivery_mode = (uint8_t) SvIV(*v);
1760 5           properties._flags |= AMQP_BASIC_DELIVERY_MODE_FLAG;
1761             }
1762 29 100         if (NULL != (v = hv_fetchs(props, "priority", 0))) {
1763 5 50         properties.priority = (uint8_t) SvIV(*v);
1764 5           properties._flags |= AMQP_BASIC_PRIORITY_FLAG;
1765             }
1766 29 100         if (NULL != (v = hv_fetchs(props, "timestamp", 0))) {
1767 5           properties.timestamp = (uint64_t) SvI64(*v);
1768 5           properties._flags |= AMQP_BASIC_TIMESTAMP_FLAG;
1769             }
1770 29 100         if (NULL != (v = hv_fetchs(props, "headers", 0)) && SvOK(*v)) {
    100          
    50          
    50          
1771 9           hash_to_amqp_table(MUTABLE_HV(SvRV(*v)), &properties.headers, force_utf8_in_header_strings);
1772 9           properties._flags |= AMQP_BASIC_HEADERS_FLAG;
1773             }
1774             }
1775             __DEBUG__( warn("PUBLISHING HEADERS..."); dump_table( properties.headers ) );
1776 29           rv = amqp_basic_publish(conn, channel, exchange_b, routing_key_b, mandatory, immediate, &properties, body_b);
1777 29           maybe_release_buffers(conn);
1778              
1779             /* If the connection failed, blast the file descriptor! */
1780 29 50         if ( rv == AMQP_STATUS_CONNECTION_CLOSED || rv == AMQP_STATUS_SOCKET_ERROR ) {
    100          
1781 1           amqp_socket_close( amqp_get_socket( conn ), AMQP_SC_FORCE );
1782 1           Perl_croak(aTHX_ "Publish failed because AMQP socket connection was closed.");
1783             }
1784              
1785             /* Otherwise, just croak */
1786 28 50         if ( rv != AMQP_STATUS_OK ) {
1787 0           Perl_croak( aTHX_ "Publish failed, %s\n", amqp_error_string2(rv));
1788             }
1789              
1790             SV *
1791             net_amqp_rabbitmq_get(conn, channel, queuename, options = NULL)
1792             Net::AMQP::RabbitMQ conn
1793             int channel
1794             char *queuename
1795             HV *options
1796             PREINIT:
1797 23           int no_ack = 1;
1798             CODE:
1799 23 100         assert_amqp_connected(conn);
    50          
1800              
1801 22 100         if (options)
1802 5 50         int_from_hv(options, no_ack);
    50          
1803              
1804 22           maybe_release_buffers(conn);
1805 22 50         die_on_amqp_error(aTHX_ basic_get(conn, channel, queuename ? amqp_cstring_bytes(queuename) : amqp_empty_bytes, &RETVAL, no_ack), conn, "basic_get");
1806              
1807             OUTPUT:
1808             RETVAL
1809              
1810             int
1811             net_amqp_rabbitmq_get_channel_max(conn)
1812             Net::AMQP::RabbitMQ conn
1813             CODE:
1814 1           RETVAL = amqp_get_channel_max(conn);
1815             OUTPUT:
1816             RETVAL
1817              
1818             SV*
1819             net_amqp_rabbitmq_get_sockfd(conn)
1820             Net::AMQP::RabbitMQ conn
1821             CODE:
1822             /**
1823             * this is the warning from librabbitmq-c. you have been warned.
1824             *
1825             * \warning Use the socket returned from this function carefully, incorrect use
1826             * of the socket outside of the library will lead to undefined behavior.
1827             * Additionally rabbitmq-c may use the socket differently version-to-version,
1828             * what may work in one version, may break in the next version. Be sure to
1829             * throughly test any applications that use the socket returned by this
1830             * function especially when using a newer version of rabbitmq-c
1831             *
1832             */
1833 2 100         if ( has_valid_connection( conn ) ) {
    50          
1834 1           RETVAL = newSViv( amqp_get_sockfd(conn) );
1835             }
1836             else {
1837             // We don't have a connection, we're still here.
1838 1           RETVAL = &PL_sv_undef;
1839             }
1840             OUTPUT:
1841             RETVAL
1842              
1843             SV*
1844             net_amqp_rabbitmq_is_connected(conn)
1845             Net::AMQP::RabbitMQ conn
1846             CODE:
1847 87 100         if ( has_valid_connection( conn ) ) {
    100          
1848 77           RETVAL = newSViv(1);
1849             }
1850             else {
1851             // We don't have a connection, we're still here.
1852 10           RETVAL = &PL_sv_undef;
1853             }
1854             OUTPUT:
1855             RETVAL
1856              
1857             void
1858             net_amqp_rabbitmq_disconnect(conn)
1859             Net::AMQP::RabbitMQ conn
1860             PREINIT:
1861             int sockfd;
1862             CODE:
1863 5 100         if ( amqp_get_socket(conn) != NULL ) {
1864 4           amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
1865 4           amqp_socket_close( amqp_get_socket( conn ), AMQP_SC_NONE );
1866             }
1867              
1868             Net::AMQP::RabbitMQ
1869             net_amqp_rabbitmq__new(clazz)
1870             char *clazz
1871             CODE:
1872 30           RETVAL = amqp_new_connection();
1873             OUTPUT:
1874             RETVAL
1875              
1876             void
1877             net_amqp_rabbitmq__destroy_connection_close(conn)
1878             Net::AMQP::RabbitMQ conn
1879             CODE:
1880 29 100         if ( amqp_get_socket(conn) != NULL ) {
1881 27           amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
1882             }
1883              
1884             void
1885             net_amqp_rabbitmq__destroy_cleanup(conn)
1886             Net::AMQP::RabbitMQ conn
1887             CODE:
1888 30           empty_amqp_pool( &temp_memory_pool );
1889 30           amqp_destroy_connection(conn);
1890              
1891             void
1892             net_amqp_rabbitmq_heartbeat(conn)
1893             Net::AMQP::RabbitMQ conn
1894             PREINIT:
1895             amqp_frame_t f;
1896             CODE:
1897 10           f.frame_type = AMQP_FRAME_HEARTBEAT;
1898 10           f.channel = 0;
1899 10           amqp_send_frame(conn, &f);
1900              
1901             void
1902             net_amqp_rabbitmq_tx_select(conn, channel, args = NULL)
1903             Net::AMQP::RabbitMQ conn
1904             int channel
1905             HV *args
1906             CODE:
1907 1           amqp_tx_select(conn, channel);
1908 1           die_on_amqp_error(aTHX_ amqp_get_rpc_reply(conn), conn, "Selecting transaction");
1909              
1910             void
1911             net_amqp_rabbitmq_tx_commit(conn, channel, args = NULL)
1912             Net::AMQP::RabbitMQ conn
1913             int channel
1914             HV *args
1915             CODE:
1916 1           amqp_tx_commit(conn, channel);
1917 1           maybe_release_buffers(conn);
1918 1           die_on_amqp_error(aTHX_ amqp_get_rpc_reply(conn), conn, "Commiting transaction");
1919              
1920             void
1921             net_amqp_rabbitmq_tx_rollback(conn, channel, args = NULL)
1922             Net::AMQP::RabbitMQ conn
1923             int channel
1924             HV *args
1925             CODE:
1926 1           amqp_tx_rollback(conn, channel);
1927 1           die_on_amqp_error(aTHX_ amqp_get_rpc_reply(conn), conn, "Rolling Back transaction");
1928              
1929             void
1930             net_amqp_rabbitmq_basic_qos(conn, channel, args = NULL)
1931             Net::AMQP::RabbitMQ conn
1932             int channel
1933             HV *args
1934             PREINIT:
1935             SV **v;
1936 1           uint32_t prefetch_size = 0;
1937 1           uint16_t prefetch_count = 0;
1938 1           amqp_boolean_t global = 0;
1939             CODE:
1940 1 50         if(args) {
1941 1 50         if(NULL != (v = hv_fetchs(args, "prefetch_size", 0))) prefetch_size = SvIV(*v);
    0          
1942 1 50         if(NULL != (v = hv_fetchs(args, "prefetch_count", 0))) prefetch_count = SvIV(*v);
    50          
1943 1 50         if(NULL != (v = hv_fetchs(args, "global", 0))) global = SvIV(*v) ? 1 : 0;
    0          
1944             }
1945 1           amqp_basic_qos(conn, channel,
1946             prefetch_size, prefetch_count, global);
1947 1           die_on_amqp_error(aTHX_ amqp_get_rpc_reply(conn), conn, "Basic QoS");
1948              
1949             SV* net_amqp_rabbitmq_get_server_properties(conn)
1950             Net::AMQP::RabbitMQ conn
1951             PREINIT:
1952             amqp_table_t* server_properties;
1953             CODE:
1954 2 100         assert_amqp_connected(conn);
    50          
1955 1           server_properties = amqp_get_server_properties(conn);
1956 1 50         if ( server_properties )
1957             {
1958 1           RETVAL = mq_table_to_hashref(server_properties);
1959             }
1960             else
1961             {
1962 0           RETVAL = &PL_sv_undef;
1963             }
1964             OUTPUT:
1965             RETVAL
1966              
1967             SV* net_amqp_rabbitmq_get_client_properties(conn)
1968             Net::AMQP::RabbitMQ conn
1969             PREINIT:
1970             amqp_table_t* client_properties;
1971             CODE:
1972 2 100         assert_amqp_connected(conn);
    50          
1973 1           client_properties = amqp_get_client_properties(conn);
1974 1 50         if ( client_properties )
1975             {
1976 1           RETVAL = mq_table_to_hashref(client_properties);
1977             }
1978             else
1979             {
1980 0           RETVAL = &PL_sv_undef;
1981             }
1982             OUTPUT:
1983             RETVAL
1984              
1985             SV* net_amqp_rabbitmq_has_ssl()
1986             CODE:
1987             #ifdef NAR_HAVE_OPENSSL
1988 1           RETVAL = &PL_sv_yes;
1989             #else
1990             RETVAL = &PL_sv_no;
1991             #endif
1992             OUTPUT:
1993             RETVAL