File Coverage

blib/lib/POE/Component/Client/Stomp.pm
Criterion Covered Total %
statement 1 3 33.3
branch n/a
condition n/a
subroutine 1 1 100.0
pod n/a
total 2 4 50.0


line stmt bran cond sub pod time code
1             package POE::Component::Client::Stomp;
2              
3 1     1   5121 use POE;
  0            
  0            
4             use Carp;
5             use Socket ':all';
6             use POE::Filter::Stomp;
7             use POE::Wheel::ReadWrite;
8             use POE::Wheel::SocketFactory;
9             use POE::Component::Client::Stomp::Utils;
10              
11             use 5.8.2;
12             use strict;
13             use warnings;
14              
15             my $TCP_KEEPCNT = 0;
16             my $TCP_KEEPIDLE = 0;
17             my $TCP_KEEPINTVL = 0;
18              
19             if ($^O eq "aix") { # from /usr/include/netinet/tcp.h
20              
21             $TCP_KEEPIDLE = 0x11;
22             $TCP_KEEPINTVL = 0x12;
23             $TCP_KEEPCNT = 0x13;
24              
25             } elsif ($^O eq "linux"){ # from /usr/include/netinet/tcp.h
26              
27             $TCP_KEEPIDLE = 4;
28             $TCP_KEEPINTVL = 5;
29             $TCP_KEEPCNT = 6;
30              
31             }
32              
33             my @errors = qw(0 32 68 73 78 79 110 104 111);
34             my @reconnections = qw(60 120 240 480 960 1920 3840);
35              
36             our $VERSION = '0.12';
37              
38             use Data::Dumper;
39              
40             # ---------------------------------------------------------------------
41             # Public Methods
42             # ---------------------------------------------------------------------
43              
44             sub spawn {
45             my $package = shift;
46              
47             croak "$package requires an even number of parameters" if @_ & 1;
48              
49             my %args = @_;
50             my $self = bless ({}, $package);
51              
52             $args{Alias} = 'stomp-client' unless defined $args{Alias} and $args{Alias};
53             $args{RetryReconnect} = 1 unless defined $args{RetryReconnect};
54             $args{EnableKeepAlive} = defined($args{EnableKeepAlive}) ? $args{EnableKeepAlive} : 0;
55             $args{RemoteAddress} = 'localhost' unless defined $args{RemoteAddress};
56             $args{RemotePort} = 61613 unless defined $args{RemotePort};
57              
58             $self->{CONFIG} = \%args;
59             $self->{count} = scalar(@reconnections);
60             $self->{stomp} = POE::Component::Client::Stomp::Utils->new();
61             $self->{attempts} = 0;
62              
63             POE::Session->create(
64             object_states => [
65             $self => {
66             _start => '_session_start',
67             _stop => '_session_stop',
68             server_connect => '_server_connect',
69             server_connected => '_server_connected',
70             server_reconnect => '_server_connect',
71             server_error => '_server_error',
72             server_message => '_server_message',
73             server_connection_failed => '_server_connection_failed',
74             session_interrupt => '_session_interrupt',
75             session_reload => '_session_reload',
76             shutdown => '_session_shutdown',
77             },
78             $self => [ qw(
79             handle_message
80             handle_receipt
81             handle_error
82             handle_connected
83             handle_connection
84             send_data
85             gather_data
86             connection_down
87             connection_up
88             ) ],
89             ],
90             );
91              
92             return $self;
93              
94             }
95              
96             sub log {
97             my ($self, $kernel, $level, @args) = @_;
98              
99             warn sprintf("%-5s - %s\n", uc($level), join(' ', @args));
100              
101             }
102              
103             sub handle_reload {
104             my ($self, $kernel, $session) = @_;
105              
106             $kernel->sig_handled();
107              
108             }
109              
110             sub handle_shutdown {
111             my ($self, $kernel, $session) = @_;
112              
113             my $params = {};
114             my $frame = $self->stomp->disconnect($params);
115              
116             $kernel->call($session, 'send_data', $frame);
117              
118             }
119              
120             # ---------------------------------------------------------------------
121             # Public Accessors
122             # ---------------------------------------------------------------------
123              
124             sub stomp {
125             my $self = shift;
126              
127             return $self->{stomp};
128              
129             }
130              
131             sub config {
132             my ($self, $arg) = @_;
133              
134             return $self->{CONFIG}->{$arg};
135              
136             }
137              
138             sub host {
139             my $self = shift;
140              
141             return $self->{Host};
142              
143             }
144              
145             sub port {
146             my $self = shift;
147              
148             return $self->{Port};
149              
150             }
151              
152             # ---------------------------------------------------------------------
153             # Public Events
154             # ---------------------------------------------------------------------
155              
156             sub handle_connection {
157             my ($kernel, $self) = @_[KERNEL, OBJECT];
158              
159             }
160              
161             sub handle_connected {
162             my ($kernel, $self, $frame) = @_[KERNEL, OBJECT, ARG0];
163              
164             }
165              
166             sub handle_message {
167             my ($kernel, $self, $frame) = @_[KERNEL, OBJECT, ARG0];
168              
169             }
170              
171             sub handle_receipt {
172             my ($kernel, $self, $frame) = @_[KERNEL, OBJECT, ARG0];
173              
174             }
175              
176             sub handle_error {
177             my ($kernel, $self, $frame) = @_[KERNEL, OBJECT, ARG0];
178              
179             }
180              
181             sub connection_down {
182             my ($kernel, $self) = @_[KERNEL, OBJECT];
183              
184             }
185              
186             sub connection_up {
187             my ($kernel, $self) = @_[KERNEL, OBJECT];
188              
189             }
190              
191             sub gather_data {
192             my ($kernel, $self, $frame) = @_[KERNEL, OBJECT, ARG0];
193              
194             }
195              
196             sub send_data {
197             my ($kernel, $self, $frame) = @_[KERNEL, OBJECT, ARG0];
198              
199             if (defined($self->{Wheel})) {
200              
201             $self->{Wheel}->put($frame);
202              
203             }
204              
205             }
206              
207             # ---------------------------------------------------------------------
208             # Private Events
209             # ---------------------------------------------------------------------
210              
211             sub _session_start {
212             my ($kernel, $self) = @_[KERNEL, OBJECT];
213              
214             my $alias = $self->config('Alias');
215              
216             $self->log($kernel, 'debug', "$alias: _session_start()");
217              
218             if ((my $rc = $kernel->alias_set($alias)) > 0) {
219              
220             croak 'unable to assign an alias to this session';
221              
222             }
223              
224             # set up signal handling.
225              
226             $kernel->sig(HUP => 'session_interrupt');
227             $kernel->sig(INT => 'session_interrupt');
228             $kernel->sig(TERM => 'session_interrupt');
229             $kernel->sig(QUIT => 'session_interrupt');
230              
231             $kernel->yield('server_connect');
232              
233             }
234              
235             sub _session_stop {
236             my ($kernel, $self) = @_[KERNEL, OBJECT];
237              
238             delete $self->{Listner};
239             delete $self->{Wheel};
240              
241             $kernel->alias_remove($self->config('Alias'));
242              
243             }
244              
245             sub _session_reload {
246             my ($kernel, $self, $session) = @_[KERNEL,OBJECT,SESSION];
247              
248             $self->handle_reload($kernel, $session);
249              
250             }
251              
252             sub _session_interrupt {
253             my ($kernel, $self, $session, $signal) = @_[KERNEL,OBJECT,SESSION,ARG0];
254              
255             my $alias = $self->config('Alias');
256              
257             $self->log($kernel, 'debug', "$alias: _session_interrupt()");
258              
259             if ($signal eq 'HUP') {
260              
261             $self->handle_reload($kernel, $session);
262              
263             } else {
264              
265             $self->handle_shutdown($kernel, $session);
266              
267             }
268              
269             }
270              
271             sub _session_shutdown {
272             my ($kernel, $self, $session) = @_[KERNEL, OBJECT, SESSION];
273              
274             my $alias = $self->config('Alias');
275              
276             $self->log($kernel, 'debug', "$alias: _session_shutdown()");
277              
278             $self->handle_shutdown($kernel, $session);
279              
280             }
281              
282             sub _server_connect {
283             my ($kernel, $self) = @_[KERNEL, OBJECT];
284              
285             my $alias = $self->config('Alias');
286              
287             $self->log($kernel, 'debug', "$alias: _server_connect()");
288              
289             $self->{Listner} = POE::Wheel::SocketFactory->new(
290             RemoteAddress => $self->config('RemoteAddress'),
291             RemotePort => $self->config('RemotePort'),
292             SocketType => SOCK_STREAM,
293             SocketDomain => AF_INET,
294             Reuse => 'no',
295             SocketProtocol => 'tcp',
296             SuccessEvent => 'server_connected',
297             FailureEvent => 'server_connection_failed',
298             );
299              
300             }
301              
302             sub _server_connected {
303             my ($kernel, $self, $socket, $peeraddr, $peerport, $wheel_id) =
304             @_[KERNEL, OBJECT, ARG0 .. ARG3];
305              
306             my $os = $^O;
307             my $alias = $self->config('Alias');
308            
309             $self->log($kernel, 'debug', "$alias: _server_connected()");
310              
311             my $wheel = POE::Wheel::ReadWrite->new(
312             Handle => $socket,
313             Filter => POE::Filter::Stomp->new(),
314             InputEvent => 'server_message',
315             ErrorEvent => 'server_error',
316             );
317              
318             if ($self->config('EnableKeepAlive')) {
319              
320             $self->log($kernel, 'debug', "$alias: keepalive activated");
321              
322             # turn keepalive on, this should send a keepalive
323             # packet once every 2 hours according to the RFC.
324              
325             setsockopt($socket, SOL_SOCKET, SO_KEEPALIVE, 1);
326              
327             if (($os eq 'linux') or ($os eq 'aix')) {
328              
329             $self->log($kernel, 'debug', "$alias: adjusting keepalive activity");
330              
331             # adjust from system defaults, all values are in seconds.
332             # so this does the following:
333             # every 15 minutes send upto 3 packets at 5 second intervals
334             # if no reply, the connection is down.
335              
336             setsockopt($socket, IPPROTO_TCP, $TCP_KEEPIDLE, 900); # 15 minutes
337             setsockopt($socket, IPPROTO_TCP, $TCP_KEEPINTVL, 5); #
338             setsockopt($socket, IPPROTO_TCP, $TCP_KEEPCNT, 3); #
339              
340             }
341              
342             }
343              
344             my $host = gethostbyaddr($peeraddr, AF_INET);
345              
346             $self->{attempts} = 0;
347             $self->{Wheel} = $wheel;
348             $self->{Host} = $host;
349             $self->{Port} = $peerport;
350              
351             $kernel->yield('handle_connection');
352              
353             }
354              
355             sub _server_connection_failed {
356             my ($kernel, $self, $operation, $errnum, $errstr, $wheel_id) =
357             @_[KERNEL, OBJECT, ARG0 .. ARG3];
358              
359             my $alias = $self->config('Alias');
360              
361             $self->log($kernel, 'debug', "$alias: _server_connection_failed()");
362             $self->log($kernel, 'error', "$alias: operation: $operation; reason: $errnum - $errstr");
363              
364             delete $self->{Listner};
365             delete $self->{Wheel};
366              
367             foreach my $error (@errors) {
368              
369             $self->_reconnect($kernel) if ($errnum == $error);
370              
371             }
372              
373             }
374              
375             sub _server_error {
376             my ($kernel, $self, $operation, $errnum, $errstr, $wheel_id) =
377             @_[KERNEL, OBJECT, ARG0 .. ARG3];
378              
379             my $alias = $self->config('Alias');
380              
381             $self->log($kernel, 'debug', "$alias: _server_error()");
382             $self->log($kernel, 'error', "$alias: operation: $operation; reason: $errnum - $errstr");
383              
384             delete $self->{Listner};
385             delete $self->{Wheel};
386              
387             $kernel->yield('connection_down');
388              
389             foreach my $error (@errors) {
390              
391             $self->_reconnect($kernel) if ($errnum == $error);
392              
393             }
394              
395             }
396              
397             sub _server_message {
398             my ($kernel, $self, $frame, $wheel_id) = @_[KERNEL, OBJECT, ARG0, ARG1];
399              
400             my $alias = $self->config('Alias');
401              
402             $self->log($kernel, 'debug' , "$alias: _server_message()");
403              
404             if ($frame->command eq 'CONNECTED') {
405              
406             $self->log($kernel, 'debug' , "$alias: received a \"CONNECTED\" message");
407             $kernel->yield('handle_connected', $frame);
408              
409             } elsif ($frame->command eq 'MESSAGE') {
410              
411             $self->log($kernel, 'debug' , "$alias: received a \"MESSAGE\" message");
412             $kernel->yield('handle_message', $frame);
413              
414             } elsif ($frame->command eq 'RECEIPT') {
415              
416             $self->log($kernel, 'debug' , "$alias: received a \"RECEIPT\" message");
417             $kernel->yield('handle_receipt', $frame);
418              
419             } elsif ($frame->command eq 'ERROR') {
420              
421             $self->log($kernel, 'debug' , "$alias: received an \"ERROR\" message");
422             $kernel->yield('handle_error', $frame);
423              
424             } else {
425              
426             $self->log($kernel, 'warn', "$alias: unknown message type: $frame->command");
427              
428             }
429              
430             }
431              
432             # ---------------------------------------------------------------------
433             # Private Methods
434             # ---------------------------------------------------------------------
435              
436             sub _reconnect {
437             my ($self, $kernel) = @_;
438              
439             my $retry;
440             my $alias = $self->config('Alias');
441              
442             $self->log($kernel, 'debug', "$alias: attempts: $self->{attempts}, count: $self->{count}");
443              
444             if ($self->{attempts} < $self->{count}) {
445              
446             my $delay = $reconnections[$self->{attempts}];
447             $self->log($kernel, 'warn', "$alias: attempting reconnection: $self->{attempts}, waiting: $delay seconds");
448             $self->{attempts}++;
449             $kernel->delay('server_reconnect', $delay);
450              
451             } else {
452              
453             $retry = $self->config('RetryReconnect') || 0;
454              
455             if ($retry) {
456              
457             $self->log($kernel, 'warn', "$alias: cycling reconnection attempts, but not shutting down...");
458             $self->{attempts} = 0;
459             $kernel->yield('server_reconnect');
460              
461             } else {
462              
463             $self->log($kernel, 'warn', "$alias: shutting down, to many reconnection attempts");
464             $kernel->yield('shutdown');
465              
466             }
467              
468             }
469              
470             }
471              
472             1;
473              
474             __END__