File Coverage

blib/lib/XAS/Lib/Net/POE/Client.pm
Criterion Covered Total %
statement 24 164 14.6
branch 0 18 0.0
condition 0 2 0.0
subroutine 8 27 29.6
pod 11 11 100.0
total 43 222 19.3


line stmt bran cond sub pod time code
1             package XAS::Lib::Net::POE::Client;
2              
3             our $VERSION = '0.02';
4              
5 1     1   1242 use POE;
  1         1  
  1         8  
6 1     1   272 use Try::Tiny;
  1         2  
  1         41  
7 1     1   4 use Socket ':all';
  1         1  
  1         934  
8 1     1   5 use Errno ':POSIX';
  1         1  
  1         252  
9 1     1   4 use POE::Filter::Line;
  1         1  
  1         16  
10 1     1   3 use POE::Wheel::ReadWrite;
  1         1  
  1         17  
11 1     1   3 use POE::Wheel::SocketFactory;
  1         1  
  1         65  
12              
13             use XAS::Class
14 1         14 debug => 0,
15             version => $VERSION,
16             base => 'XAS::Lib::POE::Service',
17             mixin => 'XAS::Lib::Mixins::Keepalive',
18             accessors => 'wheel host port listener socket',
19             mutators => 'input_paused',
20             utils => 'dotid',
21             vars => {
22             PARAMS => {
23             -port => 1,
24             -retry_reconnect => { optional => 1, default => 1 },
25             -tcp_keepalive => { optional => 1, default => 0 },
26             -filter => { optional => 1, default => undef },
27             -alias => { optional => 1, default => 'client' },
28             -eol => { optional => 1, default => "\015\012" },
29             -host => { optional => 1, default => 'localhost'},
30             }
31             }
32 1     1   3 ;
  1         1  
33              
34             our @ERRORS = (0, EPIPE, ETIMEDOUT, ECONNRESET, ECONNREFUSED, ENETUNREACH, ENETDOWN, ENETRESET);
35             our @RECONNECTIONS = (60, 120, 240, 480, 960, 1920, 3840);
36              
37             #use Data::Dumper;
38              
39             # ----------------------------------------------------------------------
40             # Public Events
41             # ----------------------------------------------------------------------
42              
43             # ---------------------------------------------------------------------
44             # Public Methods
45             # ---------------------------------------------------------------------
46              
47             sub session_initialize {
48 0     0 1   my $self = shift;
49              
50 0           my $alias = $self->alias;
51              
52 0           $self->log->debug("$alias: entering session_initialize()");
53              
54             # private events
55              
56 0           $self->log->debug("$alias: doing private events");
57              
58             # private events
59              
60 0           $poe_kernel->state('server_error', $self, '_server_error');
61 0           $poe_kernel->state('server_pause', $self, '_server_pause');
62 0           $poe_kernel->state('server_resume', $self, '_server_resume');
63 0           $poe_kernel->state('server_message', $self, '_server_message');
64 0           $poe_kernel->state('server_connect', $self, '_server_connect');
65 0           $poe_kernel->state('server_connected', $self, '_server_connected');
66 0           $poe_kernel->state('server_reconnect', $self, '_server_reconnect');
67 0           $poe_kernel->state('server_connection_failed', $self, '_server_connection_failed');
68              
69             # public events
70              
71 0           $self->log->debug("$alias: doing public events");
72              
73 0           $poe_kernel->state('read_data', $self);
74 0           $poe_kernel->state('write_data', $self);
75 0           $poe_kernel->state('connection_up', $self);
76 0           $poe_kernel->state('connection_down', $self);
77 0           $poe_kernel->state('handle_connection', $self);
78              
79             # walk the chain
80              
81 0           $self->SUPER::session_initialize();
82              
83 0           $self->log->debug("$alias: leaving session_initialize()");
84              
85             }
86              
87             sub session_startup {
88 0     0 1   my $self = shift;
89              
90 0           my $alias = $self->alias;
91              
92 0           $self->log->debug("$alias: entering session_startup");
93              
94 0           $poe_kernel->post($alias, 'server_connect');
95              
96             # walk the chain
97              
98 0           $self->SUPER::session_startup();
99              
100 0           $self->log->debug("$alias: leaving session_startup");
101              
102             }
103              
104             sub session_shutdown {
105 0     0 1   my $self = shift;
106            
107 0           my $alias = $self->alias;
108              
109 0           $self->log->debug("$alias: entering session_shutdown");
110              
111 0           $self->{'socket'} = undef;
112 0           $self->{'wheel'} = undef;
113 0           $self->{'listener'} = undef;
114              
115             # walk the chain
116              
117 0           $self->SUPER::session_shutdown();
118              
119 0           $self->log->debug("$alias: leaving session_shutdown");
120            
121             }
122              
123             sub session_pause {
124 0     0 1   my ($self) = $_[OBJECT];
125              
126 0           my $alias = $self->alias;
127              
128 0           $self->log->debug("$alias: entering session_pause");
129              
130 0           $poe_kernel->call($alias, 'connection_down');
131              
132             # walk the chain
133              
134 0           $self->SUPER::session_pause();
135              
136 0           $self->log->debug("$alias: leaving session_pause");
137              
138             }
139              
140             sub session_resume {
141 0     0 1   my ($self) = $_[OBJECT];
142              
143 0           my $alias = $self->alias;
144              
145 0           $self->log->debug("$alias: entering session_resume");
146              
147 0           $poe_kernel->call($alias, 'connection_up');
148              
149             # walk the chain
150              
151 0           $self->SUPER::session_resume();
152              
153 0           $self->log->debug("$alias: leaving session_resume");
154              
155             }
156              
157             # ---------------------------------------------------------------------
158             # Public Events
159             # ---------------------------------------------------------------------
160              
161             sub handle_connection {
162 0     0 1   my ($self) = $_[OBJECT];
163              
164             }
165              
166             sub connection_down {
167 0     0 1   my ($self) = $_[OBJECT];
168              
169             }
170              
171             sub connection_up {
172 0     0 1   my ($self) = $_[OBJECT];
173              
174             }
175              
176             sub read_data {
177 0     0 1   my ($self, $data) = @_[OBJECT, ARG0];
178              
179 0           my $alias = $self->alias;
180              
181 0           $poe_kernel->post($alias, 'write_data', $data);
182              
183             }
184              
185             sub write_data {
186 0     0 1   my ($self, $data) = @_[OBJECT, ARG0];
187              
188 0           my @packet;
189 0           my $alias = $self->alias;
190              
191 0 0         if (my $wheel = $self->wheel) {
192              
193 0           push(@packet, $data);
194 0           $wheel->put(@packet);
195              
196             } else {
197              
198 0           $self->throw_msg(
199             dotid($self->class) . '.write_data.nowheel',
200             'net_server_nowheel',
201             $alias
202             );
203              
204             }
205              
206             }
207              
208             # ---------------------------------------------------------------------
209             # Private Events
210             # ---------------------------------------------------------------------
211              
212             sub _server_message {
213 0     0     my ($self, $data, $wheel_id) = @_[OBJECT, ARG0, ARG1];
214              
215 0           my $alias = $self->alias;
216              
217 0           $self->log->debug("$alias: _server_message()");
218              
219 0           $poe_kernel->post($alias, 'read_data', $data);
220              
221             }
222              
223             sub _server_connected {
224 0     0     my ($self, $socket, $peeraddr, $peerport, $wheel_id) = @_[OBJECT,ARG0..ARG3];
225              
226 0           my $alias = $self->alias;
227              
228 0           $self->log->debug("$alias: _server_connected()");
229              
230 0           my $wheel = POE::Wheel::ReadWrite->new(
231             Handle => $socket,
232             Filter => $self->filter,
233             InputEvent => 'server_message',
234             ErrorEvent => 'server_error',
235             );
236              
237 0           my $host = gethostbyaddr($peeraddr, AF_INET);
238              
239 0           $self->{'host'} = $host;
240 0           $self->{'port'} = $peerport;
241 0           $self->{'wheel'} = $wheel;
242 0           $self->{'socket'} = $socket;
243 0           $self->{'attempts'} = 0;
244              
245 0           $poe_kernel->post($alias, 'handle_connection');
246              
247             }
248              
249             sub _server_connect {
250 0     0     my ($self) = $_[OBJECT];
251              
252 0           my $alias = $self->alias;
253              
254 0           $self->log->debug("$alias: _server_connect()");
255              
256 0           $self->{'listner'} = POE::Wheel::SocketFactory->new(
257             RemoteAddress => $self->host,
258             RemotePort => $self->port,
259             SocketType => SOCK_STREAM,
260             SocketDomain => AF_INET,
261             Reuse => 'no',
262             SocketProtocol => 'tcp',
263             SuccessEvent => 'server_connected',
264             FailureEvent => 'server_connection_failed',
265             );
266              
267             }
268              
269             sub _server_connection_failed {
270 0     0     my ($self, $operation, $errnum, $errstr, $wheel_id) = @_[OBJECT,ARG0..ARG3];
271              
272 0           my $alias = $self->alias;
273              
274 0           $self->log->debug("$alias: _server_connection_failed()");
275 0           $self->log->error_msg('net_server_connection_failed', $alias, $operation, $errnum, $errstr);
276              
277 0           delete $self->{'socket'};
278 0           delete $self->{'listner'};
279 0           delete $self->{'wheel'};
280              
281 0           foreach my $error (@ERRORS) {
282              
283 0 0         if ($errnum == $error) {
284              
285 0           $poe_kernel->post($alias, 'server_reconnect');
286 0           last;
287              
288             }
289              
290             }
291              
292             }
293              
294             sub _server_error {
295 0     0     my ($self, $operation, $errnum, $errstr, $wheel_id) = @_[OBJECT,ARG0..ARG3];
296              
297 0           my $alias = $self->alias;
298              
299 0           $self->log->debug("$alias: _server_error()");
300 0           $self->log->error_msg('net_server_error', $alias, $operation, $errnum, $errstr);
301              
302 0           delete $self->{'socket'};
303 0           delete $self->{'listner'};
304 0           delete $self->{'wheel'};
305              
306 0           $poe_kernel->post($alias, 'connection_down');
307              
308 0           foreach my $error (@ERRORS) {
309              
310 0 0         if ($errnum == $error) {
311              
312 0           $poe_kernel->post($alias, 'server_reconnect');
313 0           last;
314              
315             }
316              
317             }
318              
319             }
320              
321             sub _server_reconnect {
322 0     0     my ($self) = $_[OBJECT];
323              
324 0           my $retry;
325 0           my $alias = $self->alias;
326              
327 0           $self->log->warn_msg('net_server_reconnect', $alias, $self->{'attempts'}, $self->{'count'});
328              
329 0 0         if ($self->{'attempts'} < $self->{'count'}) {
330              
331 0           my $delay = $RECONNECTIONS[$self->{'attempts'}];
332 0           $self->log->warn_msg('net_server_attempts', $alias, $self->{'attempts'}, $delay);
333 0           $self->{'attempts'} += 1;
334 0           $poe_kernel->delay('server_connect', $delay);
335              
336             } else {
337              
338 0   0       $retry = $self->retry_reconnect || 0;
339              
340 0 0         if ($retry) {
341              
342 0           $self->log->warn_msg('net_server_recycle', $alias);
343 0           $self->{'attempts'} = 0;
344 0           $poe_kernel->post($alias, 'server_connect');
345              
346             } else {
347              
348 0           $self->log->warn_msg('net_server_shutdown', $alias);
349 0           $poe_kernel->post($alias, 'session_shutdown');
350              
351             }
352              
353             }
354              
355             }
356              
357             sub _server_pause {
358 0     0     my ($self) = $_[OBJECT];
359              
360 0           my $alias = $self->alias;
361              
362 0           $self->log->debug("$alias: _server_pause()");
363              
364 0 0         if (my $wheel = $self->wheel) {
365              
366 0           $wheel->pause_input();
367 0           $self->input_paused(1);
368              
369 0           $self->log->debug("$alias: _server_resume() - input paused");
370              
371             }
372              
373             }
374              
375             sub _server_resume {
376 0     0     my ($self) = $_[OBJECT];
377              
378 0           my $alias = $self->alias;
379              
380 0           $self->log->debug("$alias: _server_resume()");
381              
382 0 0         if ($self->input_paused) {
383              
384 0 0         if (my $wheel = $self->wheel) {
385              
386 0           $wheel->resume_input();
387 0           $self->input_paused(0);
388              
389 0           $self->log->debug("$alias: _server_resume() - input resumed");
390            
391             }
392              
393             }
394              
395             }
396              
397             # ---------------------------------------------------------------------
398             # Private Methods
399             # ---------------------------------------------------------------------
400              
401             sub init {
402 0     0 1   my $class = shift;
403              
404 0           my $self = $class->SUPER::init(@_);
405              
406 0           $self->{'attempts'} = 0;
407 0           $self->{'input_paused'} = 0;
408 0           $self->{'count'} = scalar(@RECONNECTIONS);
409              
410 0 0         unless (defined($self->{'filter'})) {
411              
412 0           $self->{'filter'} = POE::Filter::Line->new(
413             InputLiteral => $self->eol,
414             OutputLiteral => $self->eol,
415             );
416              
417             }
418              
419 0           return $self;
420              
421             }
422              
423             1;
424              
425             __END__
426              
427             =head1 NAME
428              
429             XAS::Lib::Net::POE::Client - An asynchronous network client based on POE
430              
431             =head1 SYNOPSIS
432              
433             This module is a class used to create network clients.
434              
435             package Client;
436              
437             use POE;
438             use XAS::Class
439             version => '1.0',
440             base => 'XAS::Lib::Net::POE::Client'
441             ;
442              
443             sub handle_connection {
444             my ($self) = $_[OBJECT];
445              
446             my $packet = "hello!";
447              
448             $poe_kernel->yield('write_data', $packet);
449              
450             }
451              
452             =head1 DESCRIPTION
453              
454             This module handles the nitty-gritty details of setting up the communications
455             channel to a server. You will need to sub-class this module with your own for
456             it to be useful.
457              
458             An attempt to maintain that channel will be made when/if that server should
459             happen to disappear off the network. There is nothing more unpleasant then
460             having to go around to dozens of servers and restarting processes.
461              
462             The following methods are responding to POE events and use the POE argument
463             passing conventions.
464              
465             =head1 METHODS
466              
467             =head2 new
468              
469             This method initializes the class and starts a session to handle the
470             communications channel. It takes the following parameters:
471              
472             =over 4
473              
474             =item B<-alias>
475              
476             The session alias, defaults to 'client'.
477              
478             =item B<-host>
479              
480             The servers host name, defaults to 'localhost'.
481              
482             =item B<-port>
483              
484             The servers port number.
485              
486             =item B<-retry_count>
487              
488             Wither to attempt reconnections after they run out. Defaults to true.
489              
490             =item B<-tcp_keepalive>
491              
492             For those pesky firewalls, defaults to false.
493              
494             =back
495              
496             =head2 read_data(OBJECT, ARG0)
497              
498             This event is triggered when data is received for the server. It accepts
499             these parameters:
500              
501             =over 4
502              
503             =item B<OBJECT>
504              
505             The current class object.
506              
507             =item B<ARG0>
508              
509             The data that has been read.
510              
511             =back
512              
513             =head2 write_data(OBJECT, ARG0)
514              
515             You use this event to send data to the server. It accepts
516             these parameters:
517              
518             =over 4
519              
520             =item B<OBJECT>
521              
522             The current class object.
523              
524             =item B<ARGO>
525              
526             The data to write out.
527              
528             =back
529              
530             =head2 handle_connection(OBJECT)
531              
532             This event is triggered upon initial connection to the server. It accepts
533             these parameters:
534              
535             =over 4
536              
537             =item B<OBJECT>
538              
539             The current class object.
540              
541             =back
542              
543             =head2 connection_down(OBJECT)
544              
545             This event is triggered to allow you to be notified if the connection to
546             the server is currently down. It accepts
547             these parameters:
548              
549             =over 4
550              
551             =item B<OBJECT>
552              
553             The current class object.
554              
555             =back
556              
557             =head2 connection_up(OBJECT)
558              
559             This event is triggered to allow you to be notified when the connection
560             to the server is restored. It accepts
561             these parameters:
562              
563             =over 4
564              
565             =item B<OBJECT>
566              
567             The current class object.
568              
569             =back
570              
571             =head1 VARIABLES
572              
573             The following class variables are available if you want to adjust them.
574              
575             =over 4
576              
577             =item B<ERRORS>
578              
579             An array of POSIX error codes.
580              
581             =item B<RECONNECTIONS>
582              
583             An array of seconds to wait for the next reconnect attempt.
584              
585             =back
586              
587             =head1 SEE ALSO
588              
589             =over 4
590              
591             =item L<XAS::Lib::Net::Client|XAS::Lib::Net::Client>
592              
593             =item L<XAS::Lib::Net::Server|XAS::Lib::Net::Server>
594              
595             =item L<XAS|XAS>
596              
597             =back
598              
599             =head1 AUTHOR
600              
601             Kevin L. Esteb, E<lt>kevin@kesteb.usE<gt>
602              
603             =head1 COPYRIGHT AND LICENSE
604              
605             Copyright (c) 2012-2015 Kevin L. Esteb
606              
607             This is free software; you can redistribute it and/or modify it under
608             the terms of the Artistic License 2.0. For details, see the full text
609             of the license at http://www.perlfoundation.org/artistic_license_2_0.
610              
611             =cut