File Coverage

blib/lib/Sprocket/Client.pm
Criterion Covered Total %
statement 92 140 65.7
branch 20 42 47.6
condition 8 16 50.0
subroutine 19 23 82.6
pod 4 14 28.5
total 143 235 60.8


line stmt bran cond sub pod time code
1             package Sprocket::Client;
2              
3 4     4   6890 use strict;
  4         10  
  4         175  
4 4     4   26 use warnings;
  4         11  
  4         166  
5              
6 4         41 use POE qw(
7             Filter::Stackable
8             Filter::Stream
9             Driver::SysRW
10             Component::Client::DNS
11 4     4   25 );
  4         8  
12 4     4   870636 use Sprocket qw( Base );
  4         10  
  4         40  
13 4     4   23 use base qw( Sprocket::Base );
  4         9  
  4         322  
14 4     4   22 use Scalar::Util qw( dualvar );
  4         8  
  4         300  
15              
16             BEGIN {
17 4     4   26 $sprocket->register_hook( [qw(
18             sprocket.remote.connection.accept
19             sprocket.remote.connection.reject
20             sprocket.remote.connection.receive
21             sprocket.remote.address.resolved
22             sprocket.remote.wheel.error
23             )] );
24             }
25              
26             sub spawn {
27 4     4 1 45 my $class = shift;
28            
29 4         55 my $self = $class->SUPER::spawn(
30             $class->SUPER::new(
31             @_,
32             _type => 'remote'
33             ),
34             qw(
35             _startup
36             _stop
37              
38             connect
39             reconnect
40             remote_connect_success
41             remote_connect_timeout
42             remote_connect_error
43             remote_error
44             remote_receive
45             remote_flushed
46              
47             resolved_address
48              
49             accept
50             reject
51             )
52             );
53              
54 4         23 return $self;
55             }
56              
57             sub check_params {
58 4     4 0 16 my $self = shift;
59              
60 4   50     25 $self->{name} ||= "Client";
61              
62 4         19 return;
63             }
64              
65             sub _startup {
66 4     4   16 my ( $self, $kernel, $session ) = @_[ OBJECT, KERNEL, SESSION ];
67            
68             # XXX ok to doc
69 4 50       21 $session->option( @{$self->{opts}->{client_session_options}} )
  0         0  
70             if ( $self->{opts}->{client_session_options} );
71            
72             # XXX don't doc yet
73 4 50       20 $kernel->alias_set( $self->{opts}->{client_alias} )
74             if ( $self->{opts}->{client_alias} );
75            
76             # connect to our client list
77 4         9 foreach ( @{$self->{opts}->{client_list}} ) {
  4         13  
78 4 50       41 $self->connect( ref( $_ ) eq 'ARRAY' ? @$_ : $_ );
79             }
80              
81 4         59 return;
82             }
83              
84             sub _stop {
85 4     4   11 my $self = $_[ OBJECT ];
86 4         26 $self->_log( v => 2, msg => $self->{name}." stopped.");
87             }
88              
89             sub remote_connect_success {
90 4     4 0 13 my ( $kernel, $self, $con, $socket ) = @_[ KERNEL, OBJECT, HEAP, ARG0 ];
91            
92 4         17 $con->peer_addr( $con->peer_ip.':'.$con->peer_port );
93            
94 4         67 $self->_log( v => 3, msg => $self->{name}." connected");
95              
96 4 50       18 if ( my $tid = $con->time_out_id ) {
97 0         0 $kernel->alarm_remove( $tid );
98 0         0 $con->time_out_id( undef );
99             }
100              
101 4         45 $con->socket( $socket );
102 4         30 $self->process_plugins( [ 'remote_accept', $self, $con, $socket ] );
103              
104 4         15 return;
105             }
106              
107             sub accept {
108 4     4 0 13 my ( $self, $con, $opts ) = @_[ OBJECT, HEAP, ARG0 ];
109            
110 4 50       38 $opts = {} unless ( $opts );
111              
112 4   50     27 $opts->{block_size} ||= 2048;
113             # XXX don't document this yet, we need to be able to set
114             # the input and output filters seperately
115 4   33     74 $opts->{filter} ||= POE::Filter::Stackable->new(
116             Filters => [
117             POE::Filter::Stream->new(),
118             ]
119             );
120 4 50       132 $opts->{time_out} = $self->{opts}->{time_out}
121             unless( defined( $opts->{time_out} ) );
122              
123 4         26 my $socket = $con->socket;
124            
125 4         42 $con->wheel_readwrite(
126             Handle => $socket,
127             Driver => POE::Driver::SysRW->new( BlockSize => $opts->{block_size} ),
128             Filter => $opts->{filter},
129             InputEvent => $con->event( 'remote_receive' ),
130             ErrorEvent => $con->event( 'remote_error' ),
131             FlushedEvent => $con->event( 'remote_flushed' ),
132             );
133            
134 4         27 $sprocket->broadcast( 'sprocket.remote.connection.accept', {
135             source => $self,
136             target => $con,
137             } );
138            
139 4         17 $con->socket( undef );
140            
141 4         30 $self->process_plugins( [ 'remote_connected', $self, $con, $socket ] );
142              
143             # nothing took the connection
144 4 50       18 unless ( $con->plugin ) {
145 0         0 $self->_log( v => 2, msg => "No plugin took this connection, Dropping.");
146 0         0 $con->close();
147             }
148            
149 4         58 return;
150             }
151              
152             sub reject {
153 0     0 0 0 my ( $self, $con ) = @_[ OBJECT, HEAP ];
154            
155 0         0 $sprocket->broadcast( 'sprocket.remote.connection.reject', {
156             source => $self,
157             target => $con,
158             } );
159            
160             # XXX other?
161 0         0 $con->socket( undef );
162 0         0 $con->close( 1 );
163            
164 0         0 return;
165             }
166              
167             sub remote_connect_error {
168 0     0 1 0 my ( $kernel, $self, $con, $operation, $errnum, $errstr ) =
169             @_[ KERNEL, OBJECT, HEAP, ARG0, ARG1, ARG2 ];
170              
171 0         0 $con->error( dualvar( $errnum, "$operation - $errstr" ) );
172              
173 0         0 $self->_log( v => 2, msg => $self->{name}." : Error connecting to ".$con->peer_addr
174             ." : $operation error $errnum ($errstr)");
175              
176 0 0       0 if ( my $tid = $con->time_out_id ) {
177 0         0 $kernel->alarm_remove( $tid );
178 0         0 $con->time_out_id( undef );
179             }
180              
181             # if ( $con->connected ) {
182 0         0 $self->process_plugins( [ 'remote_disconnected', $self, $con, @_[ ARG0 .. ARG2 ] ] );
183             # } else {
184             # $self->process_plugins( [ 'remote_connect_error', $self, $con ] );
185             # }
186            
187 0         0 return;
188             }
189              
190             sub remote_connect_timeout {
191 0     0 0 0 my $self = $_[ OBJECT ];
192            
193 0         0 $self->_log( v => 2, msg => $self->{name}." : timeout while connecting");
194              
195 0         0 $self->process_plugins( [ 'remote_connect_error', $self, $_[ HEAP ] ] );
196              
197 0         0 return;
198             }
199              
200             sub remote_receive {
201 6     6 1 13 my $self = $_[ OBJECT ];
202              
203 6         44 $sprocket->broadcast( 'sprocket.remote.connection.receive', {
204             source => $self,
205             target => $_[ HEAP ],
206             data => $_[ ARG0 ],
207             } );
208            
209 6         45 $self->process_plugins( [ 'remote_receive', $self, @_[ HEAP, ARG0 ] ] );
210            
211 6         24 return;
212             }
213              
214             sub remote_error {
215 2     2 0 8 my ( $self, $con, $operation, $errnum, $errstr ) =
216             @_[ OBJECT, HEAP, ARG0, ARG1, ARG2 ];
217            
218 2         33 $con->error( dualvar( $errnum, "$operation - $errstr" ) );
219            
220 2 50       16 if ( $errnum != 0 ) {
221 0         0 $self->_log( v => 3, msg => $self->{name}." encountered $operation error $errnum: $errstr");
222             }
223            
224 2         16 $sprocket->broadcast( 'sprocket.remote.wheel.error', {
225             source => $self,
226             operation => $operation,
227             errnum => $errnum,
228             errstr => $errstr,
229             } );
230            
231 2         15 $self->process_plugins( [ 'remote_disconnected', $self, $con, 1, $operation, $errnum, $errstr ] );
232            
233 2         10 return;
234             }
235              
236             sub remote_flushed {
237 5     5 0 17 my ( $self, $con ) = @_[ OBJECT, HEAP ];
238              
239             # we'll get called again if there are octets out
240 5 50 33     19 $con->close()
241             if ( $con->close_on_flush && not $con->get_driver_out_octets() );
242            
243 5         37 return;
244             }
245              
246             sub connect {
247             # must call in this in our session's context
248 8 100 66 8 1 54 unless ( $_[KERNEL] && ref $_[KERNEL] ) {
249 4         43 return $poe_kernel->call( shift->{session_id} => connect => @_ );
250             }
251            
252 4         64 my ( $self, $kernel, $address, $port ) = @_[ OBJECT, KERNEL, ARG0, ARG1 ];
253            
254             # support an array ref
255 4 50       17 ( $address, $port ) = @$address if ( ref( $address ) eq 'ARRAY' );
256            
257 4 50       57 ( $address, $port ) = ( $address =~ /^([^:]+):(\d+)$/ )
258             unless( defined $port );
259              
260 4 50       18 return $self->_log( v => 1, msg => 'Port not defined in call to connect, IGNORED. address: '.$address )
261             unless ( defined $port );
262            
263 4         7 my $con;
264              
265             # PoCo DNS
266             # XXX ipv6?!
267 4 50       46 if ( $address !~ m/^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$/ ) {
268 0         0 my $named_ses = $kernel->alias_resolve( 'named' );
269              
270             # no DNS resolver found, load one instead
271 0 0       0 unless ( $named_ses ) {
272             # could use the object here, but I don't want
273             # duplicated code, so just use the session reference
274 0         0 POE::Component::Client::DNS->spawn( Alias => 'named' );
275 0         0 $named_ses = $kernel->alias_resolve( 'named' );
276             # release ownership of this session
277             #$kernel->detach_child( $named_ses );
278             }
279              
280             # a new unconnected connection
281 0         0 $con = $self->new_connection(
282             peer_port => $port,
283             peer_hostname => $address,
284             peer_addr => "$address:$port", # temp until resolved
285             );
286              
287 0         0 $kernel->call( $named_ses => 'resolve' => {
288             host => $address,
289             context => 1,
290             event => $con->event( 'resolved_address' ),
291             });
292              
293             # we will connect after resolving the address
294 0         0 return $con;
295             } else {
296 4         50 $con = $self->new_connection(
297             peer_ip => $address,
298             peer_port => $port,
299             peer_addr => "$address:$port",
300             );
301             }
302              
303 4         33 return $self->reconnect( $con );
304             }
305              
306             sub resolved_address {
307 0     0 0 0 my ( $self, $con, $response ) = @_[ OBJECT, HEAP, ARG0 ];
308            
309 0         0 my ( $response_obj, $response_err ) = @{$response}{qw( response error )};
  0         0  
310              
311 0 0       0 unless ( defined $response_obj ) {
312 0         0 $self->_log( v => 4, msg => 'resolution of '.$con->peer_hostname.' failed: '.$response_err );
313 0         0 $self->process_plugins( [ 'remote_connect_error', $self, $con, $response_err, $response_obj ] );
314 0         0 return;
315             }
316              
317 0         0 my @addr = map { $_->rdatastr } ( $response_obj->answer );
  0         0  
318 0         0 my $peer_ip = $addr[ int rand( @addr ) ];
319            
320 0         0 $con->peer_ips( \@addr );
321            
322             # pick a random ip
323 0         0 $self->_log( v => 4, msg => 'resolved '.$con->peer_hostname.' to '.join(',',@addr).' using: '.$peer_ip );
324            
325 0         0 $con->peer_ip( $peer_ip );
326 0         0 $con->peer_addr( $peer_ip.':'.$con->peer_port );
327              
328 0         0 $sprocket->broadcast( 'sprocket.remote.address.resolved', {
329             source => $self,
330             addresses => \@addr,
331             response => $response_obj,
332             peer_ip => $peer_ip,
333             } );
334              
335 0         0 $self->reconnect( $con, 1 );
336              
337 0         0 return;
338             }
339              
340             sub reconnect {
341 8     8 0 22 my ( $self, $con, $noclose );
342 8 100 66     56 unless ( $_[KERNEL] && ref $_[KERNEL] ) {
343 4         10 ( $self, $con ) = ( shift, shift );
344 4         23 return $poe_kernel->call( $self->{session_id} => $con->event( 'reconnect' ) => @_ );
345             }
346            
347 4         11 ( $self, $con, $noclose ) = @_[ OBJECT, HEAP, ARG0 ];
348              
349             # XXX include backoff?
350              
351 4         21 $con->connected( 0 );
352            
353             # this would force fused connections to shut each other down
354             # so $noclose is passed during a reconnect call, post address resolve
355 4 50       45 $con->close( 1 ) unless ( $noclose );
356            
357             # $con->sf( undef );
358             # $con->wheel( undef );
359              
360 4 50       16 if ( $self->{opts}->{connect_time_out} ) {
361 0         0 $con->time_out_id(
362             $poe_kernel->alarm_set(
363             $con->event( 'remote_connect_timeout' ),
364             time() + $self->{opts}->{connect_time_out}
365             )
366             );
367             }
368              
369             $con->socket_factory(
370 4         19 RemoteAddress => $con->peer_ip,
371             RemotePort => $con->peer_port,
372             SuccessEvent => $con->event( 'remote_connect_success' ),
373             FailureEvent => $con->event( 'remote_connect_error' ),
374             );
375              
376 4         16 return $con;
377             }
378              
379             sub begin_soft_shutdown {
380 1     1 0 3 my $self = $_[ OBJECT ];
381            
382 1         6 $self->_log( v => 2, msg => $self->{name}." is shuting down (soft)");
383              
384 1         2 foreach ( values %{$self->{heaps}} ) {
  1         4  
385 1 50       3 next unless defined;
386 1         5 $self->process_plugins( [ 'remote_shutdown', $self, $_ ] );
387             }
388             }
389              
390             1;
391              
392             __END__