File Coverage

blib/lib/Sprocket/Connection.pm
Criterion Covered Total %
statement 86 185 46.4
branch 17 40 42.5
condition 5 9 55.5
subroutine 21 42 50.0
pod 23 31 74.1
total 152 307 49.5


line stmt bran cond sub pod time code
1             package Sprocket::Connection;
2              
3 5     5   28 use POE qw( Wheel::SocketFactory Wheel::ReadWrite );
  5         7  
  5         29  
4 5     5   106968 use Sprocket;
  5         16  
  5         68  
5 5     5   29 use Class::Accessor::Fast;
  5         12  
  5         55  
6 5     5   140 use Time::HiRes qw( time );
  5         12  
  5         45  
7 5     5   1074 use base qw( Class::Accessor::Fast );
  5         11  
  5         429  
8              
9 5     5   34 use Scalar::Util qw( weaken );
  5         11  
  5         622  
10              
11             use overload '""' => sub {
12 142     142   166 my $self = shift;
13 142         313 my $id = $self->ID();
14 142 100       975 return $id ? __PACKAGE__.'/'.$id : $self;
15 5     5   34 };
  5         20  
  5         75  
16              
17             __PACKAGE__->mk_accessors( qw(
18             sf
19             wheel
20             connected
21             close_on_flush
22             error
23             plugin
24             active_time
25             create_time
26             parent_id
27             event_manager
28             fused
29             peer_ip
30             peer_ips
31             peer_port
32             peer_addr
33             peer_hostname
34             local_ip
35             local_port
36             state
37             time_out
38             time_out_id
39             ID
40             ssl
41             x
42             socket
43             ) );
44              
45             sub new {
46 8     8 1 99 my $class = shift;
47 8         30 my $time = time();
48              
49 8   33     257 my $self = bless({
50             sf => undef,
51             wheel => undef,
52             connected => 0,
53             close_on_flush => 0,
54             plugin => undef,
55             active_time => $time,
56             create_time => $time,
57             parent_id => undef,
58             event_manager => 'eventman', # XXX
59             fused => undef,
60             peer_ip => undef,
61             peer_port => undef,
62             state => undef,
63             channels => {},
64             alarms => {},
65             clid => undef,
66             destroy_events => {},
67             peer_ips => [],
68             socket => undef,
69             error => undef,
70             time_out_id => undef, # use for client connections
71             ssl => undef,
72             x => {},
73             @_
74             }, ref $class || $class );
75              
76             # generate the connection ID
77 8         479 $self->ID( ( "$self" =~ m/\(0x([^\)]+)\)/o )[ 0 ] );
78              
79             # XXX keep this?
80 8 50 33     63 if ( $self->{peer_ip} && !@{$self->{peer_ips}} ) {
  8         37  
81 8         44 push( @{$self->{peer_ips}}, $self->{peer_ip} );
  8         23  
82             }
83              
84 8         25 return $self;
85             }
86              
87             sub event {
88 47     47 1 443 return $_[ 0 ]->ID.'/'.$_[ 1 ];
89             }
90              
91             sub socket_factory {
92 4     4 0 24 my $self = shift;
93              
94 4         47 $self->sf(
95             POE::Wheel::SocketFactory->new( @_ )
96             );
97              
98 4         2875 return;
99             }
100              
101             sub wheel_readwrite {
102 8     8 0 44 my $self = shift;
103              
104 8         61 $self->wheel(
105             POE::Wheel::ReadWrite->new( @_ )
106             );
107              
108 8         2315 return;
109             }
110              
111             sub filter {
112 12     12 1 289 my $self = shift;
113              
114 12 50       31 $self->wheel->set_filter( @_ ) if ( @_ );
115              
116 12         34 return $self->wheel->get_input_filter;
117             }
118              
119             sub filter_in {
120 0     0 1 0 my $self = shift;
121              
122 0 0       0 $self->wheel->set_input_filter( @_ ) if ( @_ );
123              
124 0         0 return $self->wheel->get_input_filter;
125             }
126              
127             sub filter_out {
128 0     0 1 0 my $self = shift;
129              
130 0 0       0 $self->wheel->set_output_filter( @_ ) if ( @_ );
131              
132 0         0 return $self->wheel->get_output_filter;
133             }
134              
135             *write = *send;
136              
137             sub send {
138 13     13 1 8296 my $self = shift;
139              
140 13 50       47 unless ( $self->connected ) {
141 0         0 $self->_log( v => 1, msg => "cannot send data. not connected, or disconnecting after flush" );
142 0         0 return;
143             }
144              
145 13 50       99 if ( my $wheel = $self->wheel ) {
146 13         81 $self->active();
147 13         105 return $wheel->put(@_);
148             } else {
149             # XXX does this happen
150 0 0       0 $self->_log( v => 1, msg => "cannot send data. where did my wheel go?! !EMAIL XANTUS! ".
151             ( $self->error ? $self->error : '' ) );
152             }
153             }
154              
155             sub set_time_out {
156 1     1 0 2 my $self = shift;
157              
158 1         3 $self->active();
159            
160 1         7 $self->time_out( shift );
161             }
162              
163             sub alarm_set {
164 0     0 1 0 my $self = shift;
165 0         0 my $event = $self->event( shift );
166            
167 0         0 $self->active();
168              
169 0         0 my $id = $poe_kernel->call( $self->parent_id => call_in_ses_context => alarm_set => $event => @_ );
170 0         0 $self->{alarms}->{ $id } = $event;
171              
172 0         0 return $id;
173             }
174              
175             sub alarm_adjust {
176 0     0 1 0 my $self = shift;
177            
178 0         0 $self->active();
179              
180 0         0 $poe_kernel->call( $self->parent_id => call_in_ses_context => alarm_adjust => @_ );
181             }
182              
183             sub alarm_remove {
184 0     0 1 0 my $self = shift;
185 0         0 my $id = shift;
186              
187 0         0 $self->active();
188            
189             # XXX exists
190 0         0 delete $self->{alarms}->{ $id };
191 0         0 $poe_kernel->call( $self->parent_id => call_in_ses_context => alarm_remove => $id => @_ );
192             }
193              
194             sub alarm_remove_all {
195 0     0 1 0 my $self = shift;
196            
197 0         0 $self->active();
198              
199 0         0 foreach ( keys %{$self->{alarms}} ) {
  0         0  
200 0         0 $self->_log( v => 4, "removed alarm $_ for client" );
201 0         0 $poe_kernel->call( $self->parent_id => call_in_ses_context => alarm_remove => $_ );
202             }
203              
204 0         0 return;
205             }
206              
207             sub delay_set {
208 0     0 1 0 my $self = shift;
209            
210 0         0 $self->active();
211              
212 0         0 $poe_kernel->call( $self->parent_id => call_in_ses_context => delay_set => $self->event( shift ) => @_ );
213             }
214              
215             sub delay_adjust {
216 0     0 1 0 my $self = shift;
217            
218 0         0 $self->active();
219              
220 0         0 $poe_kernel->call( $self->parent_id => call_in_ses_context => delay_adjust => @_ );
221             }
222              
223             sub yield {
224 0     0 1 0 my $self = shift;
225            
226 0         0 $self->active();
227              
228 0         0 $poe_kernel->post( $self->parent_id => $self->event( shift ) => @_ );
229             }
230              
231             sub call {
232 0     0 1 0 my $self = shift;
233            
234 0         0 $self->active();
235              
236 0         0 $poe_kernel->call( $self->parent_id => $self->event( shift ) => @_ );
237             }
238              
239             sub post {
240 0     0 1 0 my $self = shift;
241            
242 0         0 $self->active();
243            
244             # XXX instead?
245             #poe_kernel->call( $self->parent_id => call_in_ses_context => post => @_ );
246 0         0 $poe_kernel->post( @_ );
247             }
248              
249             sub fuse {
250 0     0 0 0 my ( $self, $con ) = @_;
251              
252 0         0 $self->active();
253            
254 0         0 $self->fused( $con );
255 0         0 weaken( $self->{fused} );
256              
257 0         0 $con->fused( $self );
258 0         0 weaken( $con->{fused} );
259              
260             # TODO some code to fuse the socket or other method
261 0         0 return;
262             }
263              
264              
265             sub accept {
266 8     8 1 14 my $self = shift;
267            
268 8         21 $self->active();
269              
270 8         72 $self->connected( 1 );
271              
272 8         82 $poe_kernel->call( $self->parent_id => $self->event( 'accept' ) => @_ );
273             }
274              
275             sub reject {
276 0     0 1 0 my $self = shift;
277            
278 0         0 $self->connected( 0 );
279            
280 0         0 $poe_kernel->call( $self->parent_id => $self->event( 'reject' ) => @_ );
281             }
282              
283             sub close {
284 18     18 1 2645 my ( $self, $force ) = @_;
285              
286             # XXX
287 18         84 $self->active();
288              
289 18 100       125 if ( my $wheel = $self->wheel ) {
290 12         100 my $out = $wheel->get_driver_out_octets;
291              
292 12 100 100     92 if ( !$force && $out ) {
293 1         6 $self->close_on_flush( 1 );
294 1         7 return;
295             } else {
296 11         38 $wheel->shutdown_input();
297 11         1018 $wheel->shutdown_output();
298             }
299             }
300 17 100       1190 $self->wheel( undef ) if ( $force );
301              
302 17         110 $self->time_out( undef );
303            
304             # kill the socket factory if any
305 17         105 $self->sf( undef );
306            
307             # socket is only here during the accept phase
308 17 50       122 if ( my $socket = $self->socket ) {
309 0         0 eval {
310 0         0 close( $socket );
311             };
312             }
313              
314             # fused sockets closes its peer
315 17 50       200 if ( my $con = $self->fused() ) {
316             # avoid a loop by removing the fusion first
317 0         0 $con->fused( undef );
318 0         0 $self->fused( undef );
319             # then close
320 0         0 $con->close( $force );
321             }
322              
323 17 100       107 if ( $self->connected ) {
324 8         53 $self->connected( 0 );
325 8         47 $poe_kernel->call( $self->parent_id => cleanup => $self->ID );
326             }
327              
328 17         153 return;
329             }
330              
331             sub reconnect {
332 0     0 1 0 my $self = shift;
333            
334 0         0 $self->active();
335            
336 0         0 $poe_kernel->call( $self->parent_id => $self->event( 'reconnect' ) => @_ );
337             }
338              
339             sub get_driver_out_octets {
340 0     0 1 0 my $self = shift;
341              
342 0 0       0 if ( my $wheel = $self->wheel ) {
343 0         0 $self->active();
344 0         0 return $wheel->get_driver_out_octets();
345             }
346              
347 0         0 return 0;
348             }
349              
350             sub active {
351 43     43 1 190 shift->active_time( time() );
352             }
353              
354             sub callback {
355 1     1 1 65 my ($self, $event, @etc) = @_;
356            
357 1         3 $self->active();
358              
359 1         6 return $sprocket->callback( $self->parent_id => $self->event( $event ) => @etc );
360             }
361              
362             sub postback {
363 2     2 1 529 my ($self, $event, @etc) = @_;
364              
365 2         6 $self->active();
366              
367 2         13 return $sprocket->postback( $self->parent_id => $self->event( $event ) => @etc );
368             }
369              
370             sub _log {
371 0     0   0 my $self = shift;
372              
373 0         0 $poe_kernel->call( $self->parent_id => _log => ( l => 1, @_ ) );
374             }
375              
376             # XXX not used yet
377             sub aio_sendfile {
378 0     0 0 0 my ( $self, $in_fh, $in_offset, $length, $callback ) = @_;
379              
380 0         0 $self->active();
381            
382 0         0 $self->wheel->pause_output();
383 0         0 $self->wheel->pause_input();
384              
385             return aio_sendfile( $self->socket, $in_fh, $in_offset, $length, sub {
386 0     0   0 $self->wheel->resume_output();
387 0         0 $self->wheel->resume_input();
388 0         0 return $callback->( @_ );
389 0         0 } );
390             }
391              
392             # Danga::Socket type compat
393             # ------------------------
394             # Do not document
395              
396 0     0 0 0 sub tcp_cork {
397             # XXX is this the same as watch_read(0)?
398             }
399              
400             sub watch_write {
401 0     0 0 0 my ( $self, $watch ) = @_;
402              
403 0         0 $self->active();
404              
405 0 0       0 if ( my $wheel = $self->wheel ) {
406 0 0       0 if ( $watch ) {
407 0         0 $wheel->resume_output();
408             } else {
409 0         0 $wheel->pause_output();
410             }
411             } # XXX else
412              
413 0         0 return;
414             }
415              
416             sub watch_read {
417 0     0 0 0 my ( $self, $watch ) = @_;
418              
419 0         0 $self->active();
420              
421 0 0       0 if ( my $wheel = $self->wheel ) {
422 0 0       0 if ( $watch ) {
423 0         0 $wheel->resume_input();
424             } else {
425 0         0 $wheel->pause_input();
426             }
427             } # XXX else
428              
429 0         0 return;
430             }
431              
432             # ------------------------
433              
434             sub DESTROY {
435 8     8   14 my $self = shift;
436              
437             # XXX this will change
438 8 50       12 if ( keys %{$self->{destroy_events}} ) {
  8         35  
439 0         0 foreach my $type ( keys %{$self->{destroy_events}} ) {
  0         0  
440 0         0 $poe_kernel->post( @{$self->{destroy_events}->{$type}} );
  0         0  
441             }
442             }
443              
444             # remove alarms for this connection
445 8         12 foreach ( keys %{$self->{alarms}} ) {
  8         28  
446 0         0 $self->_log( v => 4, "removed alarm $_ for client" );
447 0         0 $poe_kernel->alarm_remove( $_ );
448             }
449            
450 8         81 return;
451             }
452              
453             1;
454              
455             __END__