File Coverage

blib/lib/POE/Component/Client/opentick/Socket.pm
Criterion Covered Total %
statement 182 245 74.2
branch 34 66 51.5
condition 15 32 46.8
subroutine 43 61 70.4
pod 12 12 100.0
total 286 416 68.7


line stmt bran cond sub pod time code
1             package POE::Component::Client::opentick::Socket;
2             #
3             # opentick.com POE client
4             #
5             # Socket handling
6             #
7             # infi/2008
8             #
9             # $Id: Socket.pm 56 2009-01-08 16:51:14Z infidel $
10             #
11             # Full POD documentation after __END__
12             #
13              
14 2     2   12 use strict;
  2         5  
  2         88  
15 2     2   11 use warnings;
  2         3  
  2         71  
16 2     2   10 use Carp qw( croak );
  2         4  
  2         138  
17 2     2   12 use Data::Dumper;
  2         4  
  2         271  
18 2     2   15 use Socket;
  2         4  
  2         1879  
19 2         18 use POE qw( Wheel::SocketFactory Wheel::ReadWrite
20 2     2   12 Driver::SysRW Filter::Stream );
  2         5  
21              
22             # Ours
23 2     2   1344 use POE::Component::Client::opentick::Constants;
  2         5  
  2         511  
24 2     2   13 use POE::Component::Client::opentick::Util;
  2         5  
  2         197  
25 2     2   12 use POE::Component::Client::opentick::Output;
  2         2  
  2         130  
26 2     2   12 use POE::Component::Client::opentick::Error;
  2         5  
  2         115  
27              
28             ###
29             ### Variables
30             ###
31              
32 2     2   10 use vars qw( $VERSION $TRUE $FALSE $KEEP $DELETE $poe_kernel );
  2         5  
  2         8400  
33              
34             ($VERSION) = q$Revision: 56 $ =~ /(\d+)/;
35             *TRUE = \1;
36             *FALSE = \0;
37             *KEEP = \0;
38             *DELETE = \1;
39              
40             # Arguments are for this object.
41             my %valid_args = (
42             alias => $KEEP,
43             debug => $KEEP,
44             servers => $DELETE,
45             port => $DELETE,
46             realtime => $DELETE,
47             conntimeout => $DELETE,
48             autoreconnect => $DELETE,
49             reconninterval => $DELETE,
50             reconnretries => $DELETE,
51             bindaddress => $DELETE,
52             bindport => $DELETE,
53             );
54              
55             ########################################################################
56             ### Public methods ###
57             ########################################################################
58              
59             sub new
60             {
61 1     1 1 5 my( $class, @args ) = @_;
62 1 50       5 croak( "$class requires an even number of parameters" ) if( @args & 1 );
63              
64 1         5 my $self = {
65             alias => OTDefault( 'alias' ),
66             debug => $FALSE,
67             servers => undef,
68             myserver => undef,
69             port => undef,
70             state => OTConstant( 'OT_STATUS_INACTIVE' ),
71             realtime => OTDefault( 'realtime' ),
72             redirected => $FALSE, # were we redirected?
73             # For reconnection logic
74             conntimeout => OTDefault( 'conntimeout' ),
75             autoreconnect => OTDefault( 'autoreconnect' ),
76             reconninterval => OTDefault( 'reconninterval' ),
77             reconnretries => OTDefault( 'reconnretries' ),
78             reconncount => 0,
79             bindaddress => undef,
80             bindport => undef,
81             # 'socket' => undef,
82             socket_buffer => [], # outgoing socket FIFO
83             # Statistical parameters
84             packets_sent => 0,
85             packets_recv => 0,
86             bytes_sent => 0,
87             bytes_recv => 0,
88             connect_time => 0,
89             };
90              
91 1         6 bless( $self, $class );
92              
93 1         5 $self->initialize( @args );
94              
95 1         5 return( $self );
96             }
97              
98             # Initialize this object instance
99             sub initialize
100             {
101 1     1 1 5 my( $self, %args ) = @_;
102              
103             # Store things. Things that make us go.
104             # We're a leaf node; go ahead and delete.
105 1         6 for( keys( %args ) )
106             {
107 7 100       29 $self->{lc $_} = delete( $args{ $_ } )
108             if( exists( $valid_args{lc $_} ) );
109             }
110              
111 1         8 $self->{servers} = $self->_get_server_list( $self->{servers} );
112 1         6 $self->{port} = $self->_get_port( $self->{port} );
113              
114 1         3 return;
115             }
116              
117             # High level manual disconnect method
118             # NOTE: HYBRID POE EVENT HANDLER/METHOD
119             sub disconnect
120             {
121 2     2 1 49 my( $self ) = @_;
122              
123 2         22 $self->_pause_autoreconnect();
124 2         9 $self->_reset_reconn_count();
125              
126             # Step through and back out for each step.
127 2         7 my $state = $self->_get_state();
128 2 100       8 if( $state >= OTConstant( 'OT_STATUS_LOGGED_IN' ) )
129             {
130 1         5 $poe_kernel->call( $self->{alias}, 'logout' );
131             }
132             else
133             {
134 1 50       2 if( $state >= OTConstant( 'OT_STATUS_CONNECTED' ) )
135             {
136             # Disconnect. This should do it.
137 1         9 delete( $self->{socket} );
138             }
139 1 50       336 if( $state >= OTConstant( 'OT_STATUS_CONNECTING' ) )
140             {
141             # Cancel connection and clean up
142 1         2 delete( $self->{SocketFactory} );
143 1         2 $self->{myserver} = undef;
144             }
145 1         3 $self->_set_state( OTConstant( 'OT_STATUS_INACTIVE' ) );
146             # $self->_set_redirected_flag( $FALSE );
147             }
148              
149              
150 2         10 return;
151             }
152              
153             # High level reconnect method
154             sub reconnect
155             {
156 0     0 1 0 my( $self ) = @_;
157              
158 0         0 $poe_kernel->yield( 'disconnect' );
159 0         0 $poe_kernel->yield( 'connect' );
160              
161 0         0 return;
162             }
163              
164             # High level redirect METHOD (only)
165             # Server can send host redirect response; we must comply.
166             # High priority, so we call it synchronously.
167             sub redirect
168             {
169 1     1 1 3 my( $self, $host, $port ) = @_;
170              
171 1         6 $self->_set_redirected_flag( $TRUE );
172 1         4 $poe_kernel->call( $poe_kernel->get_active_session(), 'disconnect' );
173 1         8 $poe_kernel->call( $poe_kernel->get_active_session(),
174             'connect', $host, $port );
175              
176 1         6 return;
177             }
178              
179             ### Statistical accessors
180              
181             sub get_packets_recv
182             {
183 0     0 1 0 my( $self ) = @_;
184              
185 0         0 return( $self->{packets_recv} );
186             }
187              
188             sub get_packets_sent
189             {
190 0     0 1 0 my( $self ) = @_;
191              
192 0         0 return( $self->{packets_sent} );
193             }
194              
195             sub get_bytes_recv
196             {
197 0     0 1 0 my( $self ) = @_;
198              
199 0         0 return( $self->{bytes_recv} );
200             }
201              
202             sub get_bytes_sent
203             {
204 0     0 1 0 my( $self ) = @_;
205              
206 0         0 return( $self->{bytes_sent} );
207             }
208              
209             sub get_connect_time
210             {
211 0     0 1 0 my( $self ) = @_;
212              
213 0 0       0 return( $self->{connect_time} ? time - $self->{connect_time} : 0 );
214             }
215              
216             ########################################################################
217             ### POE event handlers ###
218             ########################################################################
219              
220             # Public event handlers
221              
222             # quick event handler to marshal args over to redirect method
223             sub _redirect
224             {
225 0     0   0 my( $self, $host, $port ) = @_[OBJECT,ARG0,ARG1];
226              
227 0         0 $self->redirect( $host, $port );
228              
229 0         0 return;
230             }
231              
232             # Connect to the OT server
233             sub connect
234             {
235 2     2 1 341 my( $self, $kernel, $host, $port ) = @_[OBJECT,KERNEL,ARG0,ARG1];
236              
237 2 50       7 if( $self->_get_state() == OTConstant( 'OT_STATUS_INACTIVE' ) )
238             {
239 2         8 $self->_reset_autoreconnect();
240              
241 2   66     12 $self->{myserver} = $host || $self->_get_server();
242 2         13 O_NOTICE( "Connecting to " . $self->{myserver} . "..." );
243              
244 2   66     25 my $wheel = POE::Wheel::SocketFactory->new(
245             SocketDomain => AF_INET,
246             SocketType => SOCK_STREAM,
247             SocketProtocol => 'tcp',
248             BindAddress => $self->{bindaddress},
249             BindPort => $self->{bindport},
250             Reuse => $TRUE,
251             RemoteAddress => $self->{myserver},
252             RemotePort => $port || $self->_get_port(),
253             SuccessEvent => '_ot_sock_connected',
254             FailureEvent => '_ot_sock_connfail',
255             );
256 2         1477 $self->{SocketFactory} = $wheel;
257              
258 2         7 $self->_set_state( OTConstant( 'OT_STATUS_CONNECTING' ) );
259              
260 2 50       8 if( $self->_get_conn_timeout() )
261             {
262 2 100       10 $kernel->alarm_remove( delete( $self->{timeout_id} ) )
263             if( $self->{timeout_id} );
264 2         74 $self->{timeout_id}
265             = $kernel->alarm_set( '_ot_sock_conntimeout',
266             time + $self->_get_conn_timeout() );
267             }
268             }
269              
270 2         102 return;
271             }
272              
273             ### Connection initiation handling
274              
275             # Successfully connected!
276             sub _ot_sock_connected
277             {
278 2     2   4021 my( $self, $kernel, $socket ) = @_[OBJECT, KERNEL, ARG0];
279              
280 2         25 my ($port, $addr) = sockaddr_in( getpeername( $socket ) );
281              
282 2         242 O_NOTICE( sprintf( "Connected to %s [%s]:%s.",
283             scalar( gethostbyaddr( $addr, AF_INET ) ),
284             inet_ntoa( $addr ), $port ) );
285              
286             # We don't need no steenkeen factory anymore.
287 2         18 delete( $self->{SocketFactory} );
288              
289             # Leave the alarm removal until opentick.pm:_logged_in().
290             # $kernel->alarm_remove( delete( $self->{timeout_id} ) )
291             # if( $self->{timeout_id} );
292              
293             # Create the socket handler.
294 2         52 $self->{'socket'} = POE::Wheel::ReadWrite->new(
295             Handle => $socket,
296             Driver => POE::Driver::SysRW->new(),
297             Filter => POE::Filter::Stream->new(),
298             InputEvent => '_ot_sock_receive_packet',
299             ErrorEvent => '_ot_sock_error',
300             );
301              
302             # Set the state variables
303 2         566 $self->_reset_object();
304 2         7 $self->_set_connect_time( time );
305              
306             # Send login command
307 2         8 $self->_set_state( OTConstant( 'OT_STATUS_CONNECTED' ) );
308 2         12 $kernel->call( $kernel->get_active_session(),
309             '_ot_proto_issue_command',
310             OTConstant( 'OT_LOGIN' ) );
311              
312             # Flush queue, if we have queued up commands
313 2         23 $self->_flush_queue();
314              
315 2         6 return;
316             }
317              
318             # Connection failed for whatever reason.
319             sub _ot_sock_connfail
320             {
321 0     0   0 my( $self, $kernel, $op, $err_code, $err_str, $wheel )
322             = @_[OBJECT, KERNEL, ARG0..ARG3];
323              
324 0         0 O_DEBUG( "Connection failed. $op() returned $err_code: $err_str" );
325 0         0 delete( $self->{'socket'} );
326              
327 0         0 retry_connect( @_ );
328             }
329              
330             # Connection timed out.
331             sub _ot_sock_conntimeout
332             {
333 0     0   0 my( $self, $kernel ) = @_[OBJECT, KERNEL];
334              
335 0         0 O_DEBUG( "Connection timed out." );
336 0         0 delete( $self->{'socket'} );
337              
338 0         0 retry_connect( @_ );
339             }
340              
341             # Retry a connection ReconnRetries times, or give up.
342             sub retry_connect
343             {
344 1     1 1 3 my( $self, $kernel ) = @_[OBJECT, KERNEL];
345              
346             # Fix our states
347 1         4 $self->_set_state( OTConstant( 'OT_STATUS_INACTIVE' ) );
348 1 50       7 $kernel->alarm_remove( delete( $self->{timeout_id} ) )
349             if( exists( $self->{timeout_id} ) );
350              
351             # Retry
352 1 50       5 if( $self->_get_autoreconnect() )
353             {
354 0 0 0     0 if( $self->_inc_reconn_count() < $self->_get_reconn_retries() or
355             $self->_get_reconn_retries() == 0 )
356             {
357 0         0 my $timeout = $self->_get_reconn_interval();
358 0         0 O_DEBUG( "Retrying connection in $timeout seconds..." );
359 0         0 $kernel->delay( 'connect', $timeout );
360             }
361             else
362             {
363 0         0 delete( $self->{SocketFactory} );
364 0         0 $kernel->yield( '_reconn_giveup', @_[ARG0..$#_] );
365             }
366             }
367              
368 1         3 return;
369             }
370              
371             # A socket error has occurred.
372             sub _ot_sock_error
373             {
374 1     1   1935714 my( $self, $kernel, $op, $err_code, $err_str, $wheel )
375             = @_[OBJECT,KERNEL,ARG0..ARG3];
376              
377 1         11 O_DEBUG( "Socket disconnected: $op() returned $err_code: $err_str" );
378              
379             # Socket disconnected
380 1 50 33     11 if( $op eq 'read' && $err_code == 0 )
381             {
382             # Stop heartbeats immediately and synchronously.
383 1         6 $kernel->yield( '_ot_proto_heartbeat_stop' );
384              
385 1         86 $self->_reset_object();
386 1         6 delete( $self->{'socket'} );
387              
388 1         216 retry_connect( @_ );
389             }
390              
391 1         4 return;
392             }
393              
394             ### Live connection handling
395              
396             # Got a packet!
397             sub _ot_sock_receive_packet
398             {
399 3     3   59721 my( $self, $kernel, $packet ) = @_[OBJECT, KERNEL, ARG0];
400              
401 3         30 O_DEBUG( "_ot_sock_receive_packet( " . length( $packet ) . " )" );
402              
403             # Tell the protocol handler we got a packet
404 3         18 $kernel->yield( '_ot_proto_process_response', $packet );
405              
406 3         281 $self->_update_stats_recv( length( $packet ) );
407              
408 3         12 return;
409             }
410              
411             # Send a packet!
412             sub _ot_sock_send_packet
413             {
414 4     4   174 my( $self, $packet ) = @_[OBJECT, ARG0];
415              
416             # Put the packet on the wire, or enqueue
417 4         24 my $buffered = $self->_put_or_enqueue( $packet );
418              
419             # Update the stats if appropriate
420 4 50       23 $self->_update_stats_sent( length( $packet ) ) unless( $buffered );
421              
422 4 50       32 O_DEBUG( sprintf "_ot_sock_send_packet( %d ): %s",
423             length( $packet ),
424             $buffered ? "buffered" : "sent" );
425              
426 4 50       31 return( $buffered ? $TRUE : $FALSE );
427             }
428              
429             ########################################################################
430             ### Private methods ###
431             ########################################################################
432              
433             # Return the correct port for initialization based on user preferences
434             sub _get_port
435             {
436 2     2   5 my( $self, $user_port ) = @_;
437              
438 2 0 66     19 my $port = ( defined( $user_port ) && $user_port =~ /^\d+/ )
    50          
    100          
439             ? $user_port
440             : $self->{port}
441             ? $self->{port}
442             : $self->{realtime}
443             ? OTDefault( 'port_realtime' )
444             : OTDefault( 'port_delayed' );
445              
446 2         19 return( $port );
447             }
448              
449             # Return the server list for initialization based on user preferences
450             sub _get_server_list
451             {
452 1     1   3 my( $self, $user_list ) = @_;
453              
454 1 0 33     20 my $servers = ( defined( $user_list ) && ref( $user_list ) eq 'ARRAY' )
    0          
    50          
455             ? $user_list
456             : $self->{servers}
457             ? $self->{servers}
458             : $self->{realtime}
459             ? OTDefault( 'servers_realtime' )
460             : OTDefault( 'servers_delayed' );
461              
462 1         5 return( $servers );
463             }
464              
465             sub _set_servers
466             {
467 0     0   0 my( $self, $user_list ) = @_;
468              
469 0         0 $self->{servers} = $user_list;
470              
471 0         0 return;
472             }
473              
474             sub _set_port
475             {
476 0     0   0 my( $self, $port ) = @_;
477              
478 0         0 return( $self->{port} = $port );
479             }
480              
481             # Get one of the servers from our server list round-robin
482             { # CLOSURE
483             my $server_num = 0;
484             sub _get_server
485             {
486 1     1   2 my( $self ) = @_;
487              
488 1         4 my $server = $self->{servers}->[ $server_num++ ];
489 1 50       3 $server_num = 0 if $server_num > $#{ $self->{servers} };
  1         5  
490              
491 1         6 return( $server );
492             }
493             } # /CLOSURE
494              
495             ### Accessor methods
496              
497             # The USER variable setting the number of retries to attempt
498             sub _get_reconn_retries
499             {
500 0     0   0 my( $self ) = @_;
501              
502 0         0 return( $self->{reconnretries} );
503             }
504              
505             sub _get_state
506             {
507 8     8   17 my( $self ) = @_;
508              
509 8         47 return( $self->{state} );
510             }
511              
512             sub _set_state
513             {
514 7     7   16 my( $self, $state ) = @_;
515              
516 7 50 33     25 throw( O_ERROR( 'Tried to set invalid state: ' . $state ) )
517             if( $state < OTConstant( 'OT_STATUS_INACTIVE') ||
518             $state > OTConstant( 'OT_STATUS_LOGGED_IN' ) );
519              
520 7         27 $poe_kernel->yield( '_notify_of_event',
521             OTEvent( 'OT_STATUS_CHANGED' ),
522             [ $self->{alias} ],
523             $state );
524              
525 7         464 return( $self->{state} = $state );
526             }
527              
528             sub _set_redirected_flag
529             {
530 1     1   2 my( $self, $value ) = @_;
531              
532 1 50       8 $self->{redirected} = defined( $value )
    50          
533             ? $value ? $TRUE : $FALSE
534             : $TRUE;
535             }
536              
537             sub _is_redirected
538             {
539 0     0   0 my( $self ) = @_;
540              
541 0         0 return( $self->{redirected} );
542             }
543              
544             # The ACTUAL count of retry attempts
545             sub _reset_reconn_count
546             {
547 6     6   14 my( $self ) = @_;
548              
549 6         16 return( $self->{reconncount} = 0 );
550             }
551              
552             # The ACTUAL count of retry attempts
553             sub _inc_reconn_count
554             {
555 0     0   0 my( $self ) = @_;
556              
557 0         0 return( ++$self->{reconncount} );
558             }
559              
560             # The ACTUAL count of retry attempts
561             sub _get_reconn_count
562             {
563 0     0   0 my( $self ) = @_;
564              
565 0         0 return( $self->{reconncount} );
566             }
567              
568             sub _get_autoreconnect
569             {
570 3     3   7 my( $self ) = @_;
571              
572 3         12 return( $self->{autoreconnect} );
573             }
574              
575             sub _set_autoreconnect
576             {
577 3     3   5 my( $self, $value ) = @_;
578              
579 3 100       1012 $self->{autoreconnect} = $value ? $TRUE : $FALSE;
580              
581 3         5 return;
582             }
583              
584             sub _get_conn_timeout
585             {
586 4     4   5 my( $self ) = @_;
587              
588 4         16 return( $self->{conntimeout} );
589             }
590              
591             sub _get_reconn_interval
592             {
593 0     0   0 my( $self ) = @_;
594              
595 0         0 return( $self->{reconninterval} );
596             }
597              
598             # Put or enqueue user-requested sent packets to FIFO
599             sub _put_or_enqueue
600             {
601 4     4   8 my( $self, $packet ) = @_;
602              
603 4         5 my $buffered;
604 4 50       15 if( $self->{'socket'} )
605             {
606 4         19 $buffered = $self->{'socket'}->put( $packet );
607             }
608             else
609             {
610 0         0 push( @{ $self->{socket_buffer} }, $packet );
  0         0  
611 0         0 $buffered = $TRUE;
612             }
613              
614 4         291 return( $buffered );
615             }
616              
617             # Flush queue of user-requested sent packets, when connected.
618             sub _flush_queue
619             {
620 2     2   7 my( $self ) = @_;
621              
622 2 50       9 return undef unless( $self->{'socket'} );
623              
624 2         4 my $count;
625 2 50       3 if( $count = @{ $self->{socket_buffer} } )
  2         9  
626             {
627 0         0 $self->{'socket'}->put( @{ $self->{socket_buffer} } );
  0         0  
628 0         0 $self->_update_stats_sent(
629 0         0 length( join( '', @{ $self->{socket_buffer} } ) )
630             );
631 0         0 $self->{socket_buffer} = []; # clear buffer
632 0         0 O_DEBUG( $count . " buffered packets sent." );
633             }
634              
635 2         5 return( $count );
636             }
637              
638             # Pause the autoreconnect state; save the current value
639             sub _pause_autoreconnect
640             {
641 2     2   4 my( $self ) = @_;
642              
643             # make idempotent
644 2 50       9 return if( $self->{autoreconnbak} );
645              
646 2         79 $self->{autoreconnbak} = $self->_get_autoreconnect();
647 2         8 $self->_set_autoreconnect( $FALSE );
648              
649 2         3 return;
650             }
651              
652             # Restore the autoreconnect state
653             sub _reset_autoreconnect
654             {
655 2     2   4 my( $self ) = @_;
656              
657 2 100       8 if( exists( $self->{autoreconnbak} ) )
658             {
659 1         3 $self->_set_autoreconnect( $self->{autoreconnbak} );
660 1         2 delete( $self->{autoreconnbak} );
661             }
662              
663 2         3 return;
664             }
665              
666             ### Statistics logging
667              
668             sub _update_stats_recv
669             {
670 3     3   8 my( $self, $bytes ) = @_;
671              
672 3         15 $self->_inc_bytes_recv( $bytes );
673 3         15 $self->_inc_packets_recv();
674              
675 3         8 return;
676             }
677              
678             sub _update_stats_sent
679             {
680 4     4   7 my( $self, $bytes ) = @_;
681              
682 4         13 $self->_inc_bytes_sent( $bytes );
683 4         14 $self->_inc_packets_sent();
684              
685 4         7 return;
686             }
687              
688             sub _reset_stats_recv
689             {
690 0     0   0 my( $self ) = @_;
691              
692 0         0 $self->{packets_recv} = 0;
693 0         0 $self->{bytes_recv} = 0;
694              
695 0         0 return;
696             }
697              
698             sub _reset_stats_sent
699             {
700 0     0   0 my( $self ) = @_;
701              
702 0         0 $self->{packets_sent} = 0;
703 0         0 $self->{bytes_sent} = 0;
704              
705 0         0 return;
706             }
707              
708             sub _inc_bytes_recv
709             {
710 3     3   7 my( $self, $num ) = @_;
711              
712 3   50     18 return( $self->{bytes_recv} += $num || 1 );
713             }
714              
715             sub _inc_bytes_sent
716             {
717 4     4   7 my( $self, $num ) = @_;
718              
719 4   50     15 return( $self->{bytes_sent} += $num || 1 );
720             }
721              
722             sub _inc_packets_recv
723             {
724 3     3   9 my( $self, $num ) = @_;
725              
726 3   50     25 return( $self->{packets_recv} += $num || 1 );
727             }
728              
729             sub _inc_packets_sent
730             {
731 4     4   8 my( $self, $num ) = @_;
732              
733 4   50     20 return( $self->{packets_sent} += $num || 1 );
734             }
735              
736             sub _set_connect_time
737             {
738 6     6   12 my( $self, $value ) = @_;
739              
740 6   66     46 return( $self->{connect_time} = $value || time );
741             }
742              
743             sub _reset_object
744             {
745 4     4   11 my( $self ) = $_[OBJECT];
746 4 50       18 return if( $self->_get_state() == OTConstant( 'OT_STATUS_INACTIVE' ) );
747            
748             # $self->_set_state( OTConstant( 'OT_STATUS_INACTIVE' ) );
749 4         20 $self->_set_connect_time( 0 );
750 4         13 $self->{myserver} = undef;
751             # $self->_set_redirected_flag( $FALSE );
752 4         17 $self->_reset_reconn_count();
753             # $self->_reset_stats_sent();
754             # $self->_reset_stats_recv();
755              
756 4         7 return;
757             }
758              
759             1;
760              
761             __END__