File Coverage

blib/lib/XAS/Lib/Net/POE/Client.pm
Criterion Covered Total %
statement 24 146 16.4
branch 0 12 0.0
condition 0 2 0.0
subroutine 8 25 32.0
pod 11 11 100.0
total 43 196 21.9


line stmt bran cond sub pod time code
1             package XAS::Lib::Net::POE::Client;
2              
3             our $VERSION = '0.02';
4              
5 1     1   1101 use POE;
  1         1  
  1         7  
6 1     1   243 use Try::Tiny;
  1         2  
  1         41  
7 1     1   5 use Socket ':all';
  1         2  
  1         846  
8 1     1   6 use Errno ':POSIX';
  1         2  
  1         238  
9 1     1   6 use POE::Filter::Line;
  1         1  
  1         21  
10 1     1   3 use POE::Wheel::ReadWrite;
  1         1  
  1         17  
11 1     1   3 use POE::Wheel::SocketFactory;
  1         2  
  1         63  
12              
13             use XAS::Class
14 1         14 debug => 0,
15             version => $VERSION,
16             base => 'XAS::Lib::POE::Service',
17             mixin => 'XAS::Lib::Mixins::Keepalive',
18             accessors => 'wheel host port listener socket',
19             utils => 'dotid',
20             vars => {
21             PARAMS => {
22             -port => 1,
23             -retry_reconnect => { optional => 1, default => 1 },
24             -tcp_keepalive => { optional => 1, default => 0 },
25             -filter => { optional => 1, default => undef },
26             -alias => { optional => 1, default => 'client' },
27             -eol => { optional => 1, default => "\012\015" },
28             -host => { optional => 1, default => 'localhost'},
29             }
30             }
31 1     1   3 ;
  1         65  
32              
33             our @ERRORS = (0, EPIPE, ETIMEDOUT, ECONNRESET, ECONNREFUSED, ENETUNREACH, ENETDOWN, ENETRESET);
34             our @RECONNECTIONS = (60, 120, 240, 480, 960, 1920, 3840);
35              
36             #use Data::Dumper;
37              
38             # ----------------------------------------------------------------------
39             # Public Events
40             # ----------------------------------------------------------------------
41              
42             # ---------------------------------------------------------------------
43             # Public Methods
44             # ---------------------------------------------------------------------
45              
46             sub session_initialize {
47 0     0 1   my $self = shift;
48              
49 0           my $alias = $self->alias;
50              
51 0           $self->log->debug("$alias: entering session_initialize()");
52              
53             # private events
54              
55 0           $self->log->debug("$alias: doing private events");
56              
57             # private events
58              
59 0           $poe_kernel->state('server_error', $self, '_server_error');
60 0           $poe_kernel->state('server_message', $self, '_server_message');
61 0           $poe_kernel->state('server_connect', $self, '_server_connect');
62 0           $poe_kernel->state('server_connected', $self, '_server_connected');
63 0           $poe_kernel->state('server_reconnect', $self, '_server_reconnect');
64 0           $poe_kernel->state('server_connection_failed', $self, '_server_connection_failed');
65              
66             # public events
67              
68 0           $self->log->debug("$alias: doing public events");
69              
70 0           $poe_kernel->state('read_data', $self);
71 0           $poe_kernel->state('write_data', $self);
72 0           $poe_kernel->state('connection_up', $self);
73 0           $poe_kernel->state('connection_down', $self);
74 0           $poe_kernel->state('handle_connection', $self);
75              
76             # walk the chain
77              
78 0           $self->SUPER::session_initialize();
79              
80 0           $self->log->debug("$alias: leaving session_initialize()");
81              
82             }
83              
84             sub session_startup {
85 0     0 1   my $self = shift;
86              
87 0           my $alias = $self->alias;
88              
89 0           $self->log->debug("$alias: entering session_startup");
90              
91 0           $poe_kernel->post($alias, 'server_connect');
92              
93             # walk the chain
94              
95 0           $self->SUPER::session_startup();
96              
97 0           $self->log->debug("$alias: leaving session_startup");
98              
99             }
100              
101             sub session_shutdown {
102 0     0 1   my $self = shift;
103            
104 0           my $alias = $self->alias;
105              
106 0           $self->log->debug("$alias: entering session_shutdown");
107              
108 0           $self->{'socket'} = undef;
109 0           $self->{'wheel'} = undef;
110 0           $self->{'listener'} = undef;
111              
112             # walk the chain
113              
114 0           $self->SUPER::session_shutdown();
115              
116 0           $self->log->debug("$alias: leaving session_shutdown");
117            
118             }
119              
120             sub session_pause {
121 0     0 1   my ($self) = $_[OBJECT];
122              
123 0           my $alias = $self->alias;
124              
125 0           $self->log->debug("$alias: entering session_pause");
126              
127 0           $poe_kernel->call($alias, 'connection_down');
128              
129             # walk the chain
130              
131 0           $self->SUPER::session_pause();
132              
133 0           $self->log->debug("$alias: leaving session_pause");
134              
135             }
136              
137             sub session_resume {
138 0     0 1   my ($self) = $_[OBJECT];
139              
140 0           my $alias = $self->alias;
141              
142 0           $self->log->debug("$alias: entering session_resume");
143              
144 0           $poe_kernel->call($alias, 'connection_up');
145              
146             # walk the chain
147              
148 0           $self->SUPER::session_resume();
149              
150 0           $self->log->debug("$alias: leaving session_resume");
151              
152             }
153              
154             # ---------------------------------------------------------------------
155             # Public Events
156             # ---------------------------------------------------------------------
157              
158             sub handle_connection {
159 0     0 1   my ($self) = $_[OBJECT];
160              
161             }
162              
163             sub connection_down {
164 0     0 1   my ($self) = $_[OBJECT];
165              
166             }
167              
168             sub connection_up {
169 0     0 1   my ($self) = $_[OBJECT];
170              
171             }
172              
173             sub read_data {
174 0     0 1   my ($self, $data) = @_[OBJECT, ARG0];
175              
176 0           my $alias = $self->alias;
177              
178 0           $poe_kernel->post($alias, 'write_data', $data);
179              
180             }
181              
182             sub write_data {
183 0     0 1   my ($self, $data) = @_[OBJECT, ARG0];
184              
185 0           my @packet;
186 0           my $alias = $self->alias;
187              
188 0 0         if (my $wheel = $self->wheel) {
189              
190 0           push(@packet, $data);
191 0           $wheel->put(@packet);
192              
193             } else {
194              
195 0           $self->throw_msg(
196             dotid($self->class) . '.write_data.nowheel',
197             'net_server_nowheel',
198             $alias
199             );
200              
201             }
202              
203             }
204              
205             # ---------------------------------------------------------------------
206             # Private Events
207             # ---------------------------------------------------------------------
208              
209             sub _server_message {
210 0     0     my ($self, $data, $wheel_id) = @_[OBJECT, ARG0, ARG1];
211              
212 0           my $alias = $self->alias;
213              
214 0           $self->log->debug("$alias: _server_message()");
215              
216 0           $poe_kernel->post($alias, 'read_data', $data);
217              
218             }
219              
220             sub _server_connected {
221 0     0     my ($self, $socket, $peeraddr, $peerport, $wheel_id) = @_[OBJECT,ARG0..ARG3];
222              
223 0           my $alias = $self->alias;
224              
225 0           $self->log->debug("$alias: _server_connected()");
226              
227 0           my $wheel = POE::Wheel::ReadWrite->new(
228             Handle => $socket,
229             Filter => $self->filter,
230             InputEvent => 'server_message',
231             ErrorEvent => 'server_error',
232             );
233              
234 0           my $host = gethostbyaddr($peeraddr, AF_INET);
235              
236 0           $self->{'host'} = $host;
237 0           $self->{'port'} = $peerport;
238 0           $self->{'wheel'} = $wheel;
239 0           $self->{'socket'} = $socket;
240 0           $self->{'attempts'} = 0;
241              
242 0           $poe_kernel->post($alias, 'handle_connection');
243              
244             }
245              
246             sub _server_connect {
247 0     0     my ($self) = $_[OBJECT];
248              
249 0           my $alias = $self->alias;
250              
251 0           $self->log->debug("$alias: _server_connect()");
252              
253 0           $self->{'listner'} = POE::Wheel::SocketFactory->new(
254             RemoteAddress => $self->host,
255             RemotePort => $self->port,
256             SocketType => SOCK_STREAM,
257             SocketDomain => AF_INET,
258             Reuse => 'no',
259             SocketProtocol => 'tcp',
260             SuccessEvent => 'server_connected',
261             FailureEvent => 'server_connection_failed',
262             );
263              
264             }
265              
266             sub _server_connection_failed {
267 0     0     my ($self, $operation, $errnum, $errstr, $wheel_id) = @_[OBJECT,ARG0..ARG3];
268              
269 0           my $alias = $self->alias;
270              
271 0           $self->log->debug("$alias: _server_connection_failed()");
272 0           $self->log->error_msg('net_server_connection_failed', $alias, $operation, $errnum, $errstr);
273              
274 0           delete $self->{'socket'};
275 0           delete $self->{'listner'};
276 0           delete $self->{'wheel'};
277              
278 0           foreach my $error (@ERRORS) {
279              
280 0 0         if ($errnum == $error) {
281              
282 0           $poe_kernel->post($alias, 'server_reconnect');
283 0           last;
284              
285             }
286              
287             }
288              
289             }
290              
291             sub _server_error {
292 0     0     my ($self, $operation, $errnum, $errstr, $wheel_id) = @_[OBJECT,ARG0..ARG3];
293              
294 0           my $alias = $self->alias;
295              
296 0           $self->log->debug("$alias: _server_error()");
297 0           $self->log->error_msg('net_server_error', $alias, $operation, $errnum, $errstr);
298              
299 0           delete $self->{'socket'};
300 0           delete $self->{'listner'};
301 0           delete $self->{'wheel'};
302              
303 0           $poe_kernel->post($alias, 'connection_down');
304              
305 0           foreach my $error (@ERRORS) {
306              
307 0 0         if ($errnum == $error) {
308              
309 0           $poe_kernel->post($alias, 'server_reconnect');
310 0           last;
311              
312             }
313              
314             }
315              
316             }
317              
318             sub _server_reconnect {
319 0     0     my ($self) = $_[OBJECT];
320              
321 0           my $retry;
322 0           my $alias = $self->alias;
323              
324 0           $self->log->warn_msg('net_server_reconnect', $alias, $self->{'attempts'}, $self->{'count'});
325              
326 0 0         if ($self->{'attempts'} < $self->{'count'}) {
327              
328 0           my $delay = $RECONNECTIONS[$self->{'attempts'}];
329 0           $self->log->warn_msg('net_server_attempts', $alias, $self->{'attempts'}, $delay);
330 0           $self->{'attempts'} += 1;
331 0           $poe_kernel->delay('server_connect', $delay);
332              
333             } else {
334              
335 0   0       $retry = $self->retry_reconnect || 0;
336              
337 0 0         if ($retry) {
338              
339 0           $self->log->warn_msg('net_server_recycle', $alias);
340 0           $self->{'attempts'} = 0;
341 0           $poe_kernel->post($alias, 'server_connect');
342              
343             } else {
344              
345 0           $self->log->warn_msg('net_server_shutdown', $alias);
346 0           $poe_kernel->post($alias, 'session_shutdown');
347              
348             }
349              
350             }
351              
352             }
353              
354             # ---------------------------------------------------------------------
355             # Private Methods
356             # ---------------------------------------------------------------------
357              
358             sub init {
359 0     0 1   my $class = shift;
360              
361 0           my $self = $class->SUPER::init(@_);
362              
363 0           $self->{'attempts'} = 0;
364 0           $self->{'count'} = scalar(@RECONNECTIONS);
365              
366 0 0         unless (defined($self->{'filter'})) {
367              
368 0           $self->{'filter'} = POE::Filter::Line->new(
369             InputLiteral => $self->eol,
370             OutputLiteral => $self->eol,
371             );
372              
373             }
374              
375 0           return $self;
376              
377             }
378              
379             1;
380              
381             __END__