File Coverage

blib/lib/Sprocket/Base.pm
Criterion Covered Total %
statement 145 248 58.4
branch 46 116 39.6
condition 13 30 43.3
subroutine 21 36 58.3
pod 1 23 4.3
total 226 453 49.8


line stmt bran cond sub pod time code
1             package Sprocket::Base;
2              
3 5     5   25 use strict;
  5         10  
  5         240  
4 5     5   25 use warnings;
  5         6  
  5         159  
5              
6 5     5   24 use Carp qw( croak );
  5         9  
  5         287  
7 5     5   25 use Sprocket qw( Common Connection Session AIO );
  5         7  
  5         223  
8 5     5   25 use POE;
  5         9  
  5         26  
9              
10 5     5   1102 use Class::Accessor::Fast;
  5         10  
  5         37  
11 5     5   125 use base qw(Class::Accessor::Fast);
  5         10  
  5         927  
12              
13             our $VERSION = $Sprocket::VERSION;
14              
15             our $sprocket_aio;
16             our $sprocket;
17             our $basic_logger = 'Sprocket::Logger::Basic';
18              
19             __PACKAGE__->mk_accessors( qw(
20             name
21             uuid
22             _uuid
23             shutting_down
24             opts
25             is_forked
26             is_child
27             connections
28             _logger
29             session_id
30             ) );
31              
32             BEGIN {
33 5     5   321 eval "use BSD::Resource";
  5     5   2147  
  0         0  
  0         0  
34 5 50       238 eval 'sub HAS_BSD_RESOURCE() { '.( $@ ? 0 : 1 ).' }';
35              
36             # can't use $basic_logger here
37 5     5   273 eval "use Sprocket::Logger::Basic";
  5         3008  
  5         62  
  5         89  
38 5 50       212 eval 'sub HAS_BASIC_LOGGER() { '.( $@ ? 0 : 1 ).' }';
39            
40 5         50 $sprocket->register_hook( [qw(
41             sprocket.connection.create
42             sprocket.connection.destroy
43             sprocket.plugin.add
44             sprocket.plugin.remove
45             )] );
46             }
47              
48              
49             # events sent to process_plugins
50             sub EVENT_NAME() { 0 }
51             sub SERVER() { 1 }
52             sub CONNECTION() { 2 }
53              
54             our @base_states = qw(
55             _start
56             _default
57             signals
58             shutdown
59             begin_soft_shutdown
60             _log
61             events_received
62             events_ready
63             exception
64             process_plugins
65             sig_child
66             time_out_check
67             cleanup
68             call_in_ses_context
69             );
70              
71             sub spawn {
72 10     10 0 68 my ( $class, $self, @states ) = @_;
73            
74             # a special session that uses a connection hash
75 10         274 Sprocket::Session->create(
76             # options => { trace => 1 },
77             object_states => [
78             $self => [ @base_states, @states ]
79             ],
80             );
81              
82 10         1579 return $self;
83             }
84              
85             sub new {
86 10     10 1 22 my $class = shift;
87 10 50       55 croak "$class requires an even number of parameters" if @_ % 2;
88 10         41 my %opts = &adjust_params;
89            
90 10         54 my $uuid = new_uuid();
91            
92 10 50 33     553215 $opts{alias} = "sprocket/$uuid" unless( defined( $opts{alias} ) and length( $opts{alias} ) );
93 10 50       54 $opts{name} = "sprocket/$uuid" unless( defined( $opts{name} ) );
94 10 50       47 $opts{time_out} = defined( $opts{time_out} ) ? $opts{time_out} : 30;
95 10 50       38 $opts{log_level} = 4 unless( defined( $opts{log_level} ) );
96            
97 10         29 my $logger = delete $opts{logger};
98 10 50 33     63 if ( defined( $logger ) && not UNIVERSAL::can( $logger, 'put' ) ) {
99 0         0 warn "invalid logger: $logger (no put method), falling back to $basic_logger";
100 0         0 undef $logger;
101             }
102            
103 10 50       35 unless ( defined $logger ) {
104 10         20 if ( !HAS_BASIC_LOGGER ) {
105             warn "$basic_logger is unavailable. Logging disabled!";
106             undef $logger;
107             } else {
108 10         237 $logger = "$basic_logger"->new(
109             parent_alias => $opts{alias},
110             log_level => $opts{log_level},
111             );
112             }
113             }
114            
115 10   33     226 my $self = bless( {
116             name => $opts{name},
117             opts => \%opts,
118             heaps => {},
119             connections => 0,
120             plugins => {},
121             plugin_pri => [],
122             time_out_check => 10, # time_out checker
123             type => delete $opts{_type},
124             uuid => $uuid,
125             is_forked => 0,
126             _logger => $logger,
127             }, ref $class || $class );
128              
129 10         123 $self->{_uuid} = gen_uuid( $self );
130              
131 10 50       246 $self->check_params if ( $self->can( 'check_params' ) );
132              
133 10 50       44 if ( $opts{max_connections} ) {
134 0         0 if ( HAS_BSD_RESOURCE ) {
135             my $ret = setrlimit( RLIMIT_NOFILE, $opts{max_connections}, $opts{max_connections} );
136             unless ( defined $ret && $ret ) {
137             if ( $> == 0 ) {
138             $self->_log(v => 1, msg => 'Unable to set max connections limit');
139             } else {
140             $self->_log(v => 1, msg => 'Need to be root to increase max connections');
141             }
142             }
143             } else {
144 0         0 $self->_log(v => 1, msg => 'Need BSD::Resource installed to increase max connections');
145             }
146             }
147              
148 10         67 $sprocket->add_component( $self );
149            
150 10         113 return $self;
151             }
152              
153             sub _start {
154 10     10   40 my ( $self, $kernel, $session ) = @_[ OBJECT, KERNEL, SESSION ];
155              
156 10         58 $self->session_id( $session->ID );
157            
158 10 50       189 $session->option( @{$self->{opts}->{session_options}} )
  0         0  
159             if ( $self->{opts}->{session_options} );
160            
161 10 50       109 $kernel->alias_set( $self->{opts}->{alias} )
162             if ( $self->{opts}->{alias} );
163              
164              
165 10 50       483 if ( $self->{opts}->{plugins} ) {
166 10         19 foreach my $t ( @{ $self->opts->{plugins} } ) {
  10         161  
167             # convert CamelCase to camel_case
168 10         91 $t = adjust_params($t);
169 10   50     162 $self->add_plugin(
170             $t->{plugin},
171             $t->{priority} || 0
172             );
173             }
174             }
175            
176 10 50       58 if ( my $ev = delete $self->opts->{event_manager} ) {
177 0         0 eval "use $ev->{module}";
178 0 0       0 if ( $@ ) {
179 0         0 $self->_log(v => 1, msg => "Error loading $ev->{module} : $@");
180 0         0 $self->shutdown_all;
181 0         0 return;
182             }
183 0 0 0     0 $ev->{options} = []
184             unless ( $ev->{options} && ref( $ev->{options} ) eq 'ARRAY' );
185            
186 0         0 $self->{event_manager} = "$ev->{module}"->new(
187 0         0 @{$ev->{options}},
188             parent_id => $self->session_id
189             );
190             }
191              
192 10 50       158 $self->{aio} = defined( $sprocket_aio ) ? 1 : 0;
193              
194 10 50       120 $self->{time_out_id} = $kernel->alarm_set( time_out_check => time() + $self->{time_out_check} )
195             if ( $self->{time_out_check} );
196              
197             # TODO recheck and document
198 10 50       1031 $kernel->sig( DIE => 'exception' )
199             if ( $self->{opts}->{use_exception_handler} );
200              
201 10 50       35 $kernel->sig( TSTP => 'signals' )
202             unless( $self->opts->{no_tstp} );
203 10         603 $kernel->sig( INT => 'signals' );
204              
205 10         404 $kernel->call( $session => '_startup' );
206            
207 10         107 return;
208             }
209              
210             sub _default {
211 3     3   7 my ( $self, $con, $cmd ) = @_[ OBJECT, HEAP, ARG0 ];
212            
213 3 50       10 return if ( $cmd =~ m/^_(child|parent)/ );
214              
215 3 50       32 return $self->process_plugins( [ $cmd, $self, $con, @_[ ARG1 .. $#_ ] ] )
216             if ( UNIVERSAL::can( $con, 'ID' ) );
217            
218 0         0 $self->_log(v => 1, msg => "_default called, no handler for event $cmd"
219             ." [$con] (the connection for this event may be gone)");
220            
221 0         0 return;
222             }
223              
224             sub signals {
225 0     0 0 0 my ( $self, $signal_name ) = @_[ OBJECT, ARG0 ];
226              
227 0         0 $self->_log(v => 1, msg => "Client caught SIG$signal_name");
228              
229 0 0       0 if ( $signal_name eq 'INT' ) {
    0          
230             # TODO do something here
231             # to stop ctrl-c / INT
232             #$_[KERNEL]->sig_handled();
233             } elsif ( $signal_name eq 'TSTP' ) {
234 0         0 local $SIG{TSTP} = 'DEFAULT';
235 0         0 kill( TSTP => $$ );
236 0         0 $_[ KERNEL ]->sig_handled();
237             }
238              
239 0         0 return 0;
240             }
241              
242             sub sig_child {
243 0     0 0 0 $_[KERNEL]->sig_handled();
244             }
245              
246             sub new_connection {
247 8     8 0 19 my $self = shift;
248            
249 8         31 my $con = Sprocket::Connection->new(
250             parent_id => $self->session_id,
251             @_
252             );
253            
254             # TODO ugh, move this stuff out of here
255 8 50       26 $con->event_manager( $self->{event_manager}->{alias} )
256             if ( $self->{event_manager} );
257              
258 8         36 $self->{heaps}->{ $con->ID } = $con;
259              
260 8         49 my $len = $self->connections( scalar( keys %{$self->{heaps}} ) );
  8         63  
261              
262 8         83 $sprocket->broadcast( 'sprocket.connection.create', {
263             source => $self,
264             target => $con,
265             } );
266            
267 8         32 return $con;
268             }
269              
270             # gets a connection obj from any component
271             sub get_connection {
272 0     0 0 0 my ( $self, $id, $norec ) = @_;
273            
274 0 0       0 if ( my $con = $self->{heaps}->{ $id } ) {
275 0         0 return $con;
276             }
277            
278 0 0       0 return undef if ( $norec );
279              
280 0         0 return $sprocket->get_connection( $id );
281             }
282              
283             sub _log {
284 24 50   24   212 my ( $self, %o ) = ref $_[ KERNEL ] ? @_[ OBJECT, ARG0 .. $#_ ] : @_;
285 24 50       164 return unless defined $self->_logger;
286 24         175 $self->_logger->put( $self, \%o );
287             }
288              
289             sub cleanup {
290 8     8 0 18 my ( $self, $con_id ) = @_[ OBJECT, ARG0 ];
291              
292 8 50       38 if ( my $con = $self->{heaps}->{ $con_id } ) {
293 8 100       37 $self->process_plugins( [ $self->{type}.'_disconnected', $self, $con, 0 ] )
294             unless ( defined $con->error );
295 8         63 $self->cleanup_connection( $con );
296             }
297             }
298              
299             sub cleanup_connection {
300 15     15 0 21 my ( $self, $con ) = @_;
301              
302 15 50       38 return unless( $con );
303            
304 15         85 $sprocket->broadcast( 'sprocket.connection.destroy', {
305             source => $self,
306             target => $con,
307             } );
308            
309 15         59 delete $self->{heaps}->{ $con->ID };
310            
311 15         59 $self->connections( scalar( keys %{$self->{heaps}} ) );
  15         56  
312              
313 15 100 66     106 $self->shutdown()
314             if ( $self->shutting_down && $self->connections <= 0 );
315            
316 15         104 return;
317             }
318              
319             sub shutdown_all {
320 0     0 0 0 shift;
321 0         0 $sprocket->shutdown_all( @_ );
322             }
323              
324             sub shutdown {
325 28 100 66 28 0 2780 unless ( $_[KERNEL] && ref $_[KERNEL] ) {
326 14         54 return $poe_kernel->call( shift->session_id => shutdown => @_ );
327             }
328            
329 14         32 my ( $self, $kernel, $type ) = @_[ OBJECT, KERNEL, ARG0 ];
330              
331 14 100       1105 if ( lc( $type ) eq 'soft' ) {
332 2         10 $self->shutting_down( $type );
333 2         16 $kernel->call( $_[SESSION] => 'begin_soft_shutdown' );
334 2         14 return;
335             }
336              
337 12         54 foreach ( values %{$self->{heaps}} ) {
  12         48  
338 7         37 $_->close( 1 ); # force
339 7         367 $self->cleanup_connection( $_ );
340             }
341 12         37 $self->{heaps} = {};
342              
343             # XXX proper?
344 12         55 $kernel->sig( INT => undef );
345 12         470 $kernel->sig( TSTP => undef );
346 12         391 $kernel->alarm_remove_all();
347 12 50       914 $kernel->alias_remove( $self->{opts}->{alias} )
348             if ( $self->{opts}->{alias} );
349              
350             # XXX remove plugins one by one?
351 12         349 delete @{$self}{qw( wheel sf )};
  12         59  
352              
353             # if this is the last component, sprocket will shutdown aio
354 12         1047 $sprocket->remove_component( $self );
355              
356 12         45 return;
357             }
358              
359             sub begin_soft_shutdown {
360 0     0 0 0 my ( $kernel, $self ) = @_[ KERNEL, OBJECT ];
361            
362 0         0 $self->_log(v => 1, msg => $self->{name}." subclass didn't define a begin_soft_shutdown event. shutting down hard!");
363              
364 0         0 $self->shutdown();
365              
366 0         0 return;
367             }
368              
369             sub events_received {
370 0     0 0 0 my $self = $_[ OBJECT ];
371 0         0 $self->process_plugins( [ 'events_received', $self, @_[ HEAP, ARG0 .. $#_ ] ] );
372             }
373              
374             sub events_ready {
375 0     0 0 0 my $self = $_[ OBJECT ];
376 0         0 $self->process_plugins( [ 'events_ready', $self, @_[ HEAP, ARG0 .. $#_ ] ] );
377             }
378              
379             sub exception {
380 0     0 0 0 my ( $kernel, $self, $con, $sig, $error ) = @_[ KERNEL, OBJECT, HEAP, ARG0, ARG1 ];
381              
382             # TODO check exceptions with new POE
383 0         0 $self->_log(v => 1, l => 1, msg => "plugin exception handled: ($sig) : "
384 0         0 .join(' | ',map { $_.':'.$error->{$_} } keys %$error ) );
385            
386 0 0       0 $con->close( 1 ) if ( UNIVERSAL::can( $con, 'close' ) );
387 0         0 $kernel->sig_handled();
388             }
389              
390             sub time_out_check {
391 0     0 0 0 my ( $kernel, $self ) = @_[ KERNEL, OBJECT ];
392              
393 0         0 my $time = time();
394 0         0 $self->{time_out_id} = $kernel->alarm_set( time_out_check => $time + $self->{time_out_check} );
395              
396 0         0 foreach my $con ( values %{$self->{heaps}} ) {
  0         0  
397 0 0       0 next unless ( $con );
398 0 0       0 if ( my $timeout = $con->time_out ) {
399 0 0       0 $self->process_plugins( [ $self->{type}.'_time_out', $self, $con, $time ] )
400             if ( ( $con->active_time + $timeout ) < $time );
401             }
402             }
403             }
404              
405             sub add_plugin {
406 10     10 0 21 my $self = shift;
407            
408 10         30 my $t = $self->{plugins};
409            
410 10         21 my ( $plugin, $pri ) = @_;
411 10         16 my $uuid;
412            
413 10 50       89 if ( $plugin->can( 'uuid' ) ) {
414 10         48 $uuid = $plugin->uuid;
415             } else {
416 0         0 warn "WARNING, plugin $plugin doesn't have a uuid,"
417             ."contact the author and have them read the Sprocket::Plugin docs";
418 0         0 $uuid = "bad-plugin-$plugin";
419             }
420            
421 10 50       86 warn "WARNING : Overwriting existing plugin '$uuid' (You have two plugins with the same id!!)"
422             if ( exists( $t->{ $uuid } ) );
423              
424 10   50     62 $pri ||= 0;
425              
426 10         35 my $found = 0;
427 10         41 foreach ( values %$t ) {
428 0 0       0 $found++ if ( $_->{priority} == $pri );
429             }
430            
431 10 50       34 warn "WARNING: You have defined more than one plugin with the same"
432             ." priority, was this intended? plugin: $plugin uuid: $uuid pri: $pri"
433             if ( $found );
434              
435 10         59 $t->{ $uuid } = {
436             plugin => $plugin,
437             priority => $pri,
438             };
439            
440 10         40 $plugin->parent_id( $self->session_id );
441              
442 10         182 $sprocket->broadcast( 'sprocket.plugin.add', {
443             source => $self,
444             target => $plugin,
445             } );
446            
447 10         80 $plugin->handle_event( plugin_start_aio => $self => $pri );
448 10         31 $plugin->handle_event( add_plugin => $self => $pri );
449            
450             # recalc plugin order
451 10         32 @{ $self->{plugin_pri} } = sort {
  0         0  
452 10         39 $t->{ $a }->{priority} <=> $t->{ $b }->{priority}
453             } keys %$t;
454              
455 10         42 return 1;
456             }
457              
458             sub remove_plugin {
459 0     0 0 0 my $self = shift;
460 0         0 my $uuid = shift;
461            
462             # TODO remove by name or obj
463            
464 0         0 my $t = $self->{plugins};
465            
466 0         0 my $plugin = delete $t->{ $uuid };
467 0 0       0 return 0 unless ( $plugin );
468            
469 0         0 $sprocket->broadcast( 'sprocket.plugin.remove', {
470             source => $self,
471             target => $plugin,
472             } );
473            
474 0         0 $plugin->{plugin}->handle_event( remove_plugin => $plugin->{priority} );
475            
476             # recalc plugin_pri
477 0         0 @{ $self->{plugin_pri} } = sort {
  0         0  
478 0         0 $t->{ $a }->{priority} <=> $t->{ $b }->{priority}
479             } keys %$t;
480            
481 0         0 return 1;
482             }
483              
484             sub process_plugins {
485 41 50   41 0 158 my ( $self, $args, $i ) = $_[ KERNEL ] ? @_[ OBJECT, ARG0, ARG1 ] : @_;
486              
487 41 50       48 return unless ( @{ $self->{plugin_pri} } );
  41         112  
488            
489 41         55 my $con = $args->[ CONNECTION ];
490 41 100       239 $con->state( $args->[ EVENT_NAME ] )
491             if ( UNIVERSAL::can( $con, 'state' ) );
492            
493 41 100 100     358 if ( UNIVERSAL::can( $con, 'plugin' ) && ( my $t = $con->plugin ) ) {
494 24         236 return $self->{plugins}->{ $t }->{plugin}->handle_event( @$args );
495             } else {
496 17   50     160 $i ||= 0;
497 17 50       17 if ( $#{ $self->{plugin_pri} } >= $i ) {
  17         61  
498             return if ( $self->{plugins}->{
499 17 50       95 $self->{plugin_pri}->[ $i ]
500             }->{plugin}->handle_event( @$args ) );
501             }
502 0           $i++;
503             # avoid a post
504 0 0         return if ( $#{ $self->{plugin_pri} } < $i );
  0            
505             }
506            
507             # XXX call?
508             #$poe_kernel->call( $self->session_id => process_plugins => $args => $i );
509 0           $poe_kernel->yield( process_plugins => $args => $i );
510             }
511              
512             sub get_plugin {
513 0     0 0   my ( $self, $uuid ) = @_;
514              
515 0 0         return $self->{plugins}->{ $uuid }->{plugin}
516             if ( exists( $self->{plugins}->{ $uuid } ) );
517              
518             # fall back to finding the plugin globally
519 0           return $sprocket->get_plugin( $uuid );
520             }
521              
522             sub resolve_plugin_uuid {
523 0     0 0   my ( $self, $name ) = @_;
524              
525 0           my $plugin = grep { $name eq $_->{plugin}->name } values %{ $self->{plugins} };
  0            
  0            
526              
527 0 0         return $plugin ? $plugin->uuid : undef;
528             }
529              
530             sub forward_plugin_by_uuid {
531 0     0 0   my $self = shift;
532 0           my $uuid = shift;
533              
534 0 0         unless( exists ( $self->{plugins}->{ $uuid } ) ) {
535 0           $self->_log( v => 4, msg => 'plugin not loaded! plugin uuid: '.$uuid );
536 0           return 0;
537             }
538            
539             # XXX
540 0           my $con = $self->{heap};
541 0           $con->plugin( $uuid );
542              
543 0           return $self->process_plugins( [ $con->state, $self, $con, @_ ] );
544             }
545            
546             sub forward_plugin {
547 0     0 0   my $self = shift;
548 0           my $name = shift;
549              
550 0           my ($plugin) = grep { $name eq $_->{plugin}->name } values %{ $self->{plugins} };
  0            
  0            
551            
552 0 0         unless( $plugin ) {
553 0           $self->_log( v => 4, msg => 'plugin not loaded! plugin: '.$name );
554 0           return 0;
555             }
556              
557             # XXX
558 0           my $con = $self->{heap};
559 0           $con->plugin( $plugin->{plugin}->uuid );
560              
561 0           return $self->process_plugins( [ $con->state, $self, $con, @_ ] );
562             }
563              
564             # helper used by Sprocket::Connection
565             sub call_in_ses_context {
566             # must call in this in our session's context
567 0 0 0 0 0   unless ( $_[KERNEL] && ref $_[KERNEL] ) {
568 0           return $poe_kernel->call( shift->session_id => @_ );
569             }
570            
571 0           my $event = $_[ ARG0 ];
572 0           return $_[ KERNEL ]->$event( @_[ ARG1 .. $#_ ] );
573             }
574              
575             1;