File Coverage

blib/lib/Net/Stomp.pm
Criterion Covered Total %
statement 272 308 88.3
branch 88 116 75.8
condition 27 42 64.2
subroutine 32 34 94.1
pod 13 13 100.0
total 432 513 84.2


line stmt bran cond sub pod time code
1             package Net::Stomp;
2 11     11   319980 use strict;
  11         122  
  11         538  
3 11     11   50 use warnings;
  11         18  
  11         448  
4 11     11   48 use IO::Select;
  11         17  
  11         172  
5 11     11   3822 use Net::Stomp::Frame;
  11         28  
  11         65  
6 11     11   349 use Carp qw(longmess);
  11         21  
  11         496  
7 11     11   56 use base 'Class::Accessor::Fast';
  11         18  
  11         611  
8 11     11   56 use Log::Any;
  11         17  
  11         65  
9             our $VERSION = '0.60';
10              
11             __PACKAGE__->mk_accessors( qw(
12             current_host failover hostname hosts port select serial session_id socket ssl
13             ssl_options socket_options subscriptions _connect_headers bufsize
14             reconnect_on_fork logger connect_delay
15             reconnect_attempts initial_reconnect_attempts timeout receipt_timeout
16             ) );
17              
18             sub _logconfess {
19 2     2   10 my ($self,@etc) = @_;
20 2         183 my $m = longmess(@etc);
21 2         806 $self->logger->fatal($m);
22 2         146 die $m;
23             }
24             sub _logdie {
25 34     34   679 my ($self,@etc) = @_;
26 34         476 $self->logger->fatal(@etc);
27 34         1786 die "@etc";
28             }
29              
30             sub new {
31 28     28 1 123782 my $class = shift;
32 28         177 my $self = $class->SUPER::new(@_);
33              
34 28 50       855 $self->bufsize(8192) unless $self->bufsize;
35 28 50       1134 $self->connect_delay(5) unless defined $self->connect_delay;
36 28 100       566 $self->reconnect_on_fork(1) unless defined $self->reconnect_on_fork;
37 28 100       967 $self->reconnect_attempts(0) unless defined $self->reconnect_attempts;
38 28 100       1019 $self->initial_reconnect_attempts(1) unless defined $self->initial_reconnect_attempts;
39 28 50       948 $self->socket_options({}) unless defined $self->socket_options;
40              
41 28 50       1022 $self->logger(Log::Any->get_logger) unless $self->logger;
42              
43 28         3561 $self->{_framebuf} = "";
44              
45             # We are not subscribed to anything at the start
46 28         457 $self->subscriptions( {} );
47              
48 28         250 $self->select( IO::Select->new );
49 28         635 my @hosts = ();
50              
51             # failover://tcp://primary:61616
52             # failover:(tcp://primary:61616,tcp://secondary:61616)?randomize=false
53              
54 28 100       426 if ($self->failover) {
    100          
55 6         95 my ($uris, $opts) = $self->failover =~ m{^failover:(?://)? \(? (.*?) \)? (?: \? (.*?) ) ?$}ix;
56              
57 6 100       78 $self->_logconfess("Unable to parse failover uri: " . $self->failover)
58             unless $uris;
59              
60 5         18 foreach my $host (split(/,/,$uris)) {
61 7 100       32 $host =~ m{^\w+://([a-zA-Z0-9\-./]+):([0-9]+)$} || $self->_logconfess("Unable to parse failover component: '$host'");
62 6         13 my ($hostname, $port) = ($1, $2);
63              
64 6         19 push(@hosts, {hostname => $hostname, port => $port});
65             }
66             } elsif ($self->hosts) {
67             ## @hosts is used inside the while loop later to decide whether we have
68             ## cycled through all setup hosts.
69 19         449 @hosts = @{$self->hosts};
  19         273  
70             }
71 26 100       496 $self->hosts(\@hosts) if @hosts;
72              
73 26         179 $self->_get_connection_retrying(1);
74              
75 22         80 return $self;
76             }
77              
78             sub _get_connection_retrying {
79 36     36   73 my ($self,$initial) = @_;
80              
81 36         59 my $tries=0;
82 36         64 while(not eval { $self->_get_connection; 1 }) {
  60         2328  
  31         98  
83 29         69 my $err = $@;$err =~ s{\n\z}{}sm;
  29         127  
84 29         43 ++$tries;
85 29 100       72 if($self->_should_stop_trying($initial,$tries)) {
86             # We've cycled enough. Die now.
87 5         20 $self->_logdie("Failed to connect: $err; giving up");
88             }
89 24         454 $self->logger->warn("Failed to connect: $err; retrying");
90 24         1249 sleep($self->connect_delay);
91             }
92             }
93              
94             sub _should_stop_trying {
95 29     29   53 my ($self,$initial,$tries) = @_;
96              
97 29 100       503 my $max_tries = $initial
98             ? $self->initial_reconnect_attempts
99             : $self->reconnect_attempts;
100              
101 29 100       157 return unless $max_tries > 0; # 0 means forever
102              
103 17 100       230 if (defined $self->hosts) {
104 14         57 $max_tries *= @{$self->hosts}; # try at least once per host
  14         180  
105             }
106 17         85 return $tries >= $max_tries;
107             }
108              
109             my $socket_class;
110             sub _get_connection {
111 52     52   90 my $self = shift;
112 52 100       912 if (my $hosts = $self->hosts) {
113 48 100 100     867 if (defined $self->current_host && ($self->current_host < $#{$hosts} ) ) {
  33         669  
114 13         213 $self->current_host($self->current_host+1);
115             } else {
116 35         586 $self->current_host(0);
117             }
118 48         869 my $h = $hosts->[$self->current_host];
119 48         829 $self->hostname($h->{hostname});
120 48         902 $self->port($h->{port});
121 48         941 $self->ssl($h->{ssl});
122 48   100     1071 $self->ssl_options($h->{ssl_options} || {});
123             }
124 52         391 my $socket = $self->_get_socket;
125 52 100       868 $self->_logdie("Error connecting to " . $self->hostname . ':' . $self->port . ": $!")
126             unless $socket;
127              
128 23         345 $self->select->remove($self->socket);
129              
130 23         818 $self->select->add($socket);
131 23         463 $self->socket($socket);
132 23         154 $self->{_pid} = $$;
133             }
134              
135             sub _get_socket {
136 0     0   0 my ($self) = @_;
137 0         0 my $socket;
138              
139 0         0 my $timeout = $self->timeout;
140 0 0       0 $timeout = 5 unless defined $timeout;
141              
142             my %sockopts = (
143             Timeout => $timeout,
144 0         0 %{ $self->socket_options },
  0         0  
145             PeerAddr => $self->hostname,
146             PeerPort => $self->port,
147             Proto => 'tcp',
148             );
149 0         0 my $keep_alive = delete $sockopts{keep_alive};
150              
151 0 0       0 if ( $self->ssl ) {
152 0         0 eval { require IO::Socket::SSL };
  0         0  
153 0 0       0 $self->_logdie(
154             "You should install the IO::Socket::SSL module for SSL support in Net::Stomp"
155             ) if $@;
156 0 0       0 %sockopts = ( %sockopts, %{ $self->ssl_options || {} } );
  0         0  
157 0         0 $self->logger->trace('opening IO::Socket::SSL',\%sockopts);
158 0         0 $socket = IO::Socket::SSL->new(%sockopts);
159             } else {
160             $socket_class ||= eval { require IO::Socket::IP; IO::Socket::IP->VERSION('0.20'); "IO::Socket::IP" }
161 0   0     0 || do { require IO::Socket::INET; "IO::Socket::INET" };
      0        
162 0         0 $self->logger->trace("opening $socket_class",\%sockopts);
163 0         0 $socket = $socket_class->new(%sockopts);
164 0 0       0 binmode($socket) if $socket;
165             }
166 0 0       0 if ($keep_alive) {
167 0         0 require Socket;
168 0 0       0 if (Socket->can('SO_KEEPALIVE')) {
169 0         0 $socket->setsockopt(Socket::SOL_SOCKET(),Socket::SO_KEEPALIVE(),1);
170             }
171             else {
172 0         0 $self->logger->warn(q{TCP keep-alive was requested, but the Socket module does not export the SO_KEEPALIVE constant, so we couldn't enable it});
173             }
174             }
175              
176 0         0 return $socket;
177             }
178              
179             sub connect {
180 11     11 1 1159 my ( $self, $conf ) = @_;
181              
182 11         161 $self->logger->trace('connecting');
183 11         534 my $frame = Net::Stomp::Frame->new(
184             { command => 'CONNECT', headers => $conf } );
185 11         47 $self->send_frame($frame);
186 11         30 $frame = $self->receive_frame;
187              
188 11 50 33     178 if ($frame && $frame->command eq 'CONNECTED') {
189 11         211 $self->logger->trace('connected');
190             # Setting initial values for session id, as given from
191             # the stomp server
192 11         540 $self->session_id( $frame->headers->{session} );
193 11         393 $self->_connect_headers( $conf );
194             }
195             else {
196 0         0 $self->logger->warn('failed to connect',{ %{$frame} });
  0         0  
197             }
198              
199 11         83 return $frame;
200             }
201              
202             sub _close_socket {
203 14     14   27 my ($self) = @_;
204 14 50       218 return unless $self->socket;
205 14         272 $self->logger->trace('closing socket');
206 14         688 $self->socket->close;
207 14         291 $self->select->remove($self->socket);
208             }
209              
210             sub disconnect {
211 1     1 1 15826 my $self = shift;
212 1         33 $self->logger->trace('disconnecting');
213 1         30 my $frame = Net::Stomp::Frame->new( { command => 'DISCONNECT' } );
214 1         7 $self->send_frame($frame);
215 1         4 $self->_close_socket;
216 1         24 return 1;
217             }
218              
219             sub _reconnect {
220 10     10   17 my $self = shift;
221 10         36 $self->_close_socket;
222              
223 10         330 $self->logger->warn("reconnecting");
224 10         413 $self->_get_connection_retrying(0);
225             # Both ->connect and ->subscribe can call _reconnect. It *should*
226             # work out fine in the end, worst scenario we send a few subscribe
227             # frame more than once
228 9         148 $self->connect( $self->_connect_headers );
229 9         15 for my $sub(keys %{$self->subscriptions}) {
  9         138  
230 8         162 $self->subscribe($self->subscriptions->{$sub});
231             }
232             }
233              
234             sub can_read {
235 0     0 1 0 my ( $self, $conf ) = @_;
236              
237             # If there is any data left in the framebuffer that we haven't read, return
238             # 'true'. But we don't want to spin endlessly, so only return true the
239             # first time. (Anything touching the _framebuf should update this flag when
240             # it does something.
241 0 0 0     0 if ( $self->{_framebuf_changed} && length $self->{_framebuf} ) {
242 0         0 $self->{_framebuf_changed} = 0;
243 0         0 return 1;
244             }
245              
246 0   0     0 $conf ||= {};
247 0 0       0 my $timeout = exists $conf->{timeout} ? $conf->{timeout} : $self->timeout;
248 0   0     0 return $self->select->can_read($timeout) || 0;
249             }
250              
251             sub send {
252 19     19 1 50907 my ( $self, $conf ) = @_;
253 19         73 $conf = { %$conf };
254 19         381 $self->logger->trace('sending',$conf);
255 19         2355 my $body = $conf->{body};
256 19         42 delete $conf->{body};
257 19         145 my $frame = Net::Stomp::Frame->new(
258             { command => 'SEND', headers => $conf, body => $body } );
259 19         85 $self->send_frame($frame);
260 18         80 return 1;
261             }
262              
263             sub send_with_receipt {
264 9     9 1 27737 my ( $self, $conf ) = @_;
265 9         41 $conf = { %$conf };
266              
267             # send the message
268 9         34 my $receipt_id = $self->_get_next_transaction;
269 9         199 $self->logger->debug('sending with receipt',{ receipt => $receipt_id });
270 9         382 $conf->{receipt} = $receipt_id;
271 9 100       117 my $receipt_timeout = exists $conf->{timeout} ? delete $conf->{timeout} : $self->receipt_timeout;
272 9         51 $self->send($conf);
273              
274 9         157 $self->logger->trace('waiting for receipt',$conf);
275             # check the receipt
276 9 100       364 my $receipt_frame = $self->receive_frame({
277             ( defined $receipt_timeout ?
278             ( timeout => $receipt_timeout )
279             : () ),
280             });
281              
282 9 100       51 if (@_ > 2) {
283 4         9 $_[2] = $receipt_frame;
284             }
285              
286 9 100 100     121 if ( $receipt_frame
      100        
287             && $receipt_frame->command eq 'RECEIPT'
288             && $receipt_frame->headers->{'receipt-id'} eq $receipt_id )
289             {
290 2         82 $self->logger->debug('got good receipt',{ %{$receipt_frame} });
  2         18  
291 2         24 return 1;
292             } else {
293 7 100       166 $self->logger->debug('got bad receipt',{ %{$receipt_frame || {} } });
  7         67  
294 7         97 return 0;
295             }
296             }
297              
298             sub send_transactional {
299 4     4 1 51360 my ( $self, $conf ) = @_;
300              
301 4         27 $conf = { %$conf };
302             # begin the transaction
303 4         22 my $transaction_id = $self->_get_next_transaction;
304 4         111 $self->logger->debug('starting transaction',{ transaction => $transaction_id });
305 4         171 my $begin_frame
306             = Net::Stomp::Frame->new(
307             { command => 'BEGIN', headers => { transaction => $transaction_id } }
308             );
309 4         22 $self->send_frame($begin_frame);
310              
311 4         18 $conf->{transaction} = $transaction_id;
312 4         9 my $receipt_frame;
313 4         14 my $ret = $self->send_with_receipt($conf,$receipt_frame);
314              
315 4 50       13 if (@_ > 2) {
316 0         0 $_[2] = $receipt_frame;
317             }
318              
319 4 100       12 if ( $ret ) {
320             # success, commit the transaction
321 1         17 $self->logger->debug('committing transaction',{ transaction => $transaction_id });
322 1         16 my $frame_commit = Net::Stomp::Frame->new(
323             { command => 'COMMIT',
324             headers => { transaction => $transaction_id }
325             }
326             );
327 1         4 $self->send_frame($frame_commit);
328             } else {
329 3         55 $self->logger->debug('rolling back transaction',{ transaction => $transaction_id });
330             # some failure, abort transaction
331 3         48 my $frame_abort = Net::Stomp::Frame->new(
332             { command => 'ABORT',
333             headers => { transaction => $transaction_id }
334             }
335             );
336 3         10 $self->send_frame($frame_abort);
337             }
338 4         21 return $ret;
339             }
340              
341             sub _sub_key {
342 13     13   26 my ($conf) = @_;
343              
344 13 100       33 if ($conf->{id}) { return "id-".$conf->{id} }
  2         9  
345             return "dest-".$conf->{destination}
346 11         33 }
347              
348             sub subscribe {
349 11     11 1 10209 my ( $self, $conf ) = @_;
350 11         203 $self->logger->trace('subscribing',$conf);
351 11         1896 my $frame = Net::Stomp::Frame->new(
352             { command => 'SUBSCRIBE', headers => $conf } );
353 11         38 $self->send_frame($frame);
354 11         165 my $subs = $self->subscriptions;
355 11         62 $subs->{_sub_key($conf)} = $conf;
356 11         35 return 1;
357             }
358              
359             sub unsubscribe {
360 2     2 1 9375 my ( $self, $conf ) = @_;
361 2         49 $self->logger->trace('unsubscribing',$conf);
362 2         38 my $frame = Net::Stomp::Frame->new(
363             { command => 'UNSUBSCRIBE', headers => $conf } );
364 2         9 $self->send_frame($frame);
365 2         36 my $subs = $self->subscriptions;
366 2         10 delete $subs->{_sub_key($conf)};
367 2         6 return 1;
368             }
369              
370             sub ack {
371 2     2 1 18535 my ( $self, $conf ) = @_;
372 2         7 $conf = { %$conf };
373 2         46 my $id = $conf->{frame}->headers->{'message-id'};
374 2         11 delete $conf->{frame};
375 2         33 $self->logger->trace('acking',{ 'message-id' => $id, %$conf });
376 2         48 my $frame = Net::Stomp::Frame->new(
377             { command => 'ACK', headers => { 'message-id' => $id, %$conf } } );
378 2         11 $self->send_frame($frame);
379 2         9 return 1;
380             }
381              
382             sub nack {
383 2     2 1 5552 my ( $self, $conf ) = @_;
384 2         7 $conf = { %$conf };
385 2         43 my $id = $conf->{frame}->headers->{'message-id'};
386 2         37 $self->logger->trace('nacking',{ 'message-id' => $id, %$conf });
387 2         32 delete $conf->{frame};
388 2         11 my $frame = Net::Stomp::Frame->new(
389             { command => 'NACK', headers => { 'message-id' => $id, %$conf } } );
390 2         8 $self->send_frame($frame);
391 2         8 return 1;
392             }
393              
394             sub send_frame {
395 48     48 1 98 my ( $self, $frame ) = @_;
396             # see if we're connected before we try to syswrite()
397 48 100       119 if (not $self->_connected) {
398 5         24 $self->_reconnect;
399 4 50       15 if (not $self->_connected) {
400 0         0 $self->_logdie(q{wasn't connected; couldn't _reconnect()});
401             }
402             }
403             # keep writing until we finish, or get an error
404 47         124 my $to_write = my $frame_string = $frame->as_string;
405 47         70 my $written;
406 47         103 while (length($to_write)) {
407 58         857 local $SIG{PIPE}='IGNORE'; # just in case writing to a closed
408             # socket kills us
409 58         1130 $written = $self->socket->syswrite($to_write);
410 58 100       566 last unless defined $written;
411 57         790 substr($to_write,0,$written,'');
412             }
413 47 100       140 if (not defined $written) {
414 1         20 $self->logger->warn("error writing frame <<$frame_string>>: $!");
415             }
416 47 100 100     213 unless (defined $written && $self->_connected) {
417 2         9 $self->_reconnect;
418 2         6 $self->send_frame($frame);
419             }
420 47         116 return;
421             }
422              
423             sub _read_data {
424 138     138   223 my ($self, $timeout) = @_;
425              
426 138 100       2301 return unless $self->select->can_read($timeout);
427             my $len = $self->socket->sysread($self->{_framebuf},
428             $self->bufsize,
429 106   100     2676 length($self->{_framebuf} || ''));
430              
431 106 100 100     3077 if (defined $len && $len>0) {
432 103         168 $self->{_framebuf_changed} = 1;
433             }
434             else {
435 3 100       10 if (!defined $len) {
436 2         30 $self->logger->warn("error reading frame: $!");
437             }
438             # EOF or error detected - connection is gone. We have to reset
439             # the framebuf in case we had a partial frame in there that
440             # will never arrive.
441 3         107 $self->_close_socket;
442 3         62 $self->{_framebuf} = "";
443 3         6 delete $self->{_command};
444 3         3 delete $self->{_headers};
445             }
446 106         287 return $len;
447             }
448              
449             sub _read_headers {
450 143     143   209 my ($self) = @_;
451              
452 143 100       298 return 1 if $self->{_headers};
453 133 100       435 if ($self->{_framebuf} =~ s/^\n*([^\n].*?)\n\n//s) {
454 31         56 $self->{_framebuf_changed} = 1;
455 31         68 my $raw_headers = $1;
456 31 50       134 if ($raw_headers =~ s/^(.+)\n//) {
457 31         85 $self->{_command} = $1;
458             }
459 31         114 foreach my $line (split(/\n/, $raw_headers)) {
460 36         153 my ($key, $value) = split(/\s*:\s*/, $line, 2);
461             $self->{_headers}{$key} = $value
462 36 50       156 unless defined $self->{_headers}{$key};
463             }
464 31         105 return 1;
465             }
466 102         203 return 0;
467             }
468              
469             sub _read_body {
470 67     67   103 my ($self) = @_;
471              
472 67         104 my $h = $self->{_headers};
473 67 100       223 if ($h->{'content-length'}) {
    100          
474 37 100       234 if (length($self->{_framebuf}) > $h->{'content-length'}) {
475 5         10 $self->{_framebuf_changed} = 1;
476             my $body = substr($self->{_framebuf},
477             0,
478 5         15 $h->{'content-length'},
479             '' );
480              
481             # Trim the trailer off the frame.
482 5         19 $self->{_framebuf} =~ s/^.*?\000\n*//s;
483             return Net::Stomp::Frame->new({
484             command => delete $self->{_command},
485             headers => delete $self->{_headers},
486 5         41 body => $body
487             });
488             }
489             } elsif ($self->{_framebuf} =~ s/^(.*?)\000\n*//s) {
490             # No content-length header.
491              
492 26         89 my $body = $1;
493 26         45 $self->{_framebuf_changed} = 1;
494             return Net::Stomp::Frame->new({
495             command => delete $self->{_command},
496             headers => delete $self->{_headers},
497 26         151 body => $body });
498             }
499              
500 36         66 return 0;
501             }
502              
503             # this method is to stop the pointless warnings being thrown when trying to
504             # call peername() on a closed socket, i.e.
505             # getpeername() on closed socket GEN125 at
506             # /opt/xt/xt-perl/lib/5.12.3/x86_64-linux/IO/Socket.pm line 258.
507             #
508             # solution taken from:
509             # http://objectmix.com/perl/80545-warning-getpeername.html
510             sub _connected {
511 164     164   243 my $self = shift;
512              
513 164 100 100     539 return if $self->{_pid} != $$ and $self->reconnect_on_fork;
514              
515 163         200 my $connected;
516             {
517 163         184 local $^W = 0;
  163         444  
518 163         2647 $connected = $self->socket->connected;
519             }
520 163         1263 return $connected;
521             }
522              
523             sub receive_frame {
524 66     66 1 38818 my ($self, $conf) = @_;
525              
526 66         1276 $self->logger->trace('waiting to receive frame',$conf);
527 66 100       2339 my $timeout = exists $conf->{timeout} ? $conf->{timeout} : $self->timeout;
528              
529 66 100       348 unless ($self->_connected) {
530 3         11 $self->_reconnect;
531             }
532              
533 66         95 my $done = 0;
534 66         165 while ( not $done = $self->_read_headers ) {
535 102 100       244 return undef unless $self->_read_data($timeout);
536             }
537 41         109 while ( not $done = $self->_read_body ) {
538 36 100       64 return undef unless $self->_read_data($timeout);
539             }
540              
541 31         105 return $done;
542             }
543              
544             sub _get_next_transaction {
545 13     13   22 my $self = shift;
546 13   100     308 my $serial = $self->serial || 0;
547 13         101 $serial++;
548 13         213 $self->serial($serial);
549              
550 13   100     270 return ($self->session_id||'nosession') . '-' . $serial;
551             }
552              
553             1;
554              
555             __END__