File Coverage

blib/lib/POE/Component/IKC/Server.pm
Criterion Covered Total %
statement 13 15 86.6
branch n/a
condition n/a
subroutine 5 5 100.0
pod n/a
total 18 20 90.0


line stmt bran cond sub pod time code
1             package POE::Component::IKC::Server;
2              
3             ############################################################
4             # $Id: Server.pm 1247 2014-07-07 09:06:34Z fil $
5             # Based on refserver.perl and preforkedserver.perl
6             # Contributed by Artur Bergman
7             # Revised for 0.06 by Rocco Caputo
8             # Turned into a module by Philp Gwyn
9             #
10             # Copyright 1999-2014 Philip Gwyn. All rights reserved.
11             # This program is free software; you can redistribute it and/or modify
12             # it under the same terms as Perl itself.
13             #
14             # Contributed portions of IKC may be copyright by their respective
15             # contributors.
16              
17 6     6   96648 use strict;
  6         14  
  6         226  
18 6     6   5482 use Socket;
  6         22912  
  6         3963  
19 6     6   58 use vars qw($VERSION @ISA @EXPORT @EXPORT_OK);
  6         9  
  6         716  
20 6     6   40 use Carp;
  6         8  
  6         327  
21 6     6   11478 use POE qw(Wheel::ListenAccept Wheel::SocketFactory);
  0            
  0            
22             use POE::Component::IKC::Channel;
23             use POE::Component::IKC::Responder;
24             use POE::Component::IKC::Util;
25             use POSIX qw(:errno_h);
26             use POSIX qw(ECHILD EAGAIN WNOHANG);
27              
28              
29             require Exporter;
30              
31             @ISA = qw(Exporter);
32             @EXPORT = qw(create_ikc_server);
33             $VERSION = '0.2402';
34              
35             sub DEBUG { 0 }
36             sub DEBUG_USR2 { 1 }
37             BEGIN {
38             # http://support.microsoft.com/support/kb/articles/Q150/5/37.asp
39             eval '*WSAEAFNOSUPPORT = sub { 10047 };';
40             if( $^O eq 'MSWin32' and not eval "EADDRINUSE" ) {
41             eval '*EADDRINUSE = sub { 10048 };';
42             }
43             }
44              
45              
46             ###############################################################################
47             #----------------------------------------------------
48             # This is just a convenient way to create servers. To be useful in
49             # multi-server situations, it probably should accept a bind address
50             # and port.
51             sub spawn
52             {
53             my($package, %params)=@_;
54             $params{package} ||= $package;
55              
56             unless($params{unix}) {
57             $params{ip}||='0.0.0.0'; # INET_ANY
58             $params{port} = 603 # POE! (almost :)
59             unless defined $params{port};
60             }
61             $params{protocol} ||= 'IKC0';
62              
63             # Make sure one is available
64             POE::Component::IKC::Responder->spawn();
65             my $session = POE::Session->create(
66             package_states => [
67             $params{package} =>
68             [qw(
69             _start _stop error _child
70             accept fork retry waste_time
71             babysit rogues shutdown
72             sig_CHLD sig_INT sig_USR2 sig_USR1 sig_TERM
73             )],
74             ],
75             args=>[\%params],
76             );
77             my $heap = $session->get_heap;
78             return $heap->{wheel_port};
79             }
80              
81             sub create_ikc_server
82             {
83             my( %params )=@_;
84             $params{package} ||= __PACKAGE__;
85             carp "create_ikc_server is DEPRECATED. Please use $params{package}->spawn instead";
86             return $params{package}->spawn( %params );
87             }
88              
89             #----------------------------------------------------
90             sub _select_define
91             {
92             my($heap, $on)=@_;
93             return unless $heap->{wheel};
94             $on||=0;
95              
96             DEBUG and
97             warn "_select_define (on=$on)";
98              
99             if($on) {
100             $heap->{wheel}->resume_accept
101             }
102             else {
103             $heap->{wheel}->pause_accept
104             }
105             return;
106             }
107              
108             #----------------------------------------------------
109             # Drop the wheel
110             sub _delete_wheel
111             {
112             my( $heap ) = @_;
113             return unless $heap->{wheel};
114             my $w = delete $heap->{wheel};
115             $w->DESTROY;
116             return;
117             }
118              
119             #----------------------------------------------------
120             #
121             sub _concurrency_up
122             {
123             my( $heap ) = @_;
124             $heap->{concur_connections}++;
125             DEBUG and
126             warn "$$: $heap->{concur_connections} concurrent connections (max $heap->{concurrency})";
127             return unless $heap->{concurrency} > 0;
128             if( $heap->{concur_connections} >= $heap->{concurrency} ) {
129             DEBUG and
130             warn "$$: Blocking more concurrency";
131             $heap->{blocked} = 1;
132             _select_define( $heap, 0 );
133             }
134             }
135              
136              
137             sub _concurrency_down
138             {
139             my( $heap ) = @_;
140             $heap->{concur_connections}--;
141             DEBUG and
142             warn "$$: $heap->{concur_connections} concurrent connections";
143             return unless $heap->{concurrency} > 0;
144             if( $heap->{concur_connections} < $heap->{concurrency}
145             and delete $heap->{blocked} ) {
146             DEBUG and
147             warn "$$: Unblocking concurrency";
148             _select_define( $heap, 1 );
149             }
150             }
151              
152             #----------------------------------------------------
153             # Delete all delays
154             sub _delete_delays
155             {
156             $poe_kernel->delay('rogues');
157             $poe_kernel->delay('waste_time');
158             $poe_kernel->delay('babysit');
159             $poe_kernel->delay( 'retry' );
160              
161             return;
162             }
163              
164              
165             #----------------------------------------------------
166             # Accept POE's standard _start event, and set up the listening socket
167             # factory.
168              
169             sub _start
170             {
171             my($heap, $params, $kernel) = @_[HEAP, ARG0, KERNEL];
172              
173             my $ret;
174              
175             # This shouldn't be necessary
176             POE::Component::IKC::Responder->spawn;
177              
178             # monitor for shutdown events.
179             # this is the best way to get IKC::Responder to tell us about the
180             # shutdown
181             $kernel->post(IKC=>'monitor', '*', {shutdown=>'shutdown'});
182              
183             my $alias='unknown';
184             my %wheel_p=(
185             Reuse => 'yes', # and allow immediate reuse of the port
186             SuccessEvent => 'accept', # generating this event on connection
187             FailureEvent => 'error' # generating this event on error
188             );
189             if($params->{unix}) {
190             $alias="unix:$params->{unix}";
191             $wheel_p{SocketDomain}=AF_UNIX;
192             $wheel_p{BindAddress}=$params->{unix};
193             $heap->{unix}=$params->{unix};
194             unlink $heap->{unix}; # blindly do this ?
195             }
196             else {
197             $alias="$params->{ip}:$params->{port}";
198             $wheel_p{BindPort} = $params->{port};
199             $wheel_p{BindAddress}= $params->{ip};
200             }
201             DEBUG && warn "$$: Server starting $alias.\n";
202              
203              
204             $heap->{name}=$params->{name};
205             $heap->{kernel_aliases}=$params->{aliases};
206             $heap->{concurrency}=$params->{concurrency} || 0;
207             $heap->{protocol}=$params->{protocol};
208             $heap->{on_error}=$params->{on_error} if $params->{on_error};
209             # create a socket factory
210             $heap->{wheel} = new POE::Wheel::SocketFactory (%wheel_p);
211             if( $heap->{wheel} and not $params->{unix} and not $params->{port} ) {
212             $heap->{wheel_port} =
213             $ret = ( sockaddr_in( $heap->{wheel}->getsockname() ) )[0];
214             $alias="$params->{ip}:$ret";
215             DEBUG &&
216             warn "$$: Server listening on $alias.\n";
217             }
218             $heap->{wheel_address}=$alias;
219              
220             $heap->{connections} = 0;
221              
222             # +GC
223             $kernel->alias_set("IKC Server $alias");
224              
225             # set up local names for kernel
226             my @names=($heap->{name});
227             if($heap->{kernel_aliases}) {
228             if(ref $heap->{kernel_aliases}) {
229             push @names, @{$heap->{kernel_aliases}};
230             } else {
231             push @names, $heap->{kernel_aliases};
232             }
233             }
234              
235             $kernel->post(IKC=>'register_local', \@names);
236              
237             # pre-load the default serialisers
238             foreach my $ft ( qw(Storable FreezeThaw POE::Component::IKC::Freezer) ) {
239             eval { local $SIG{__WARN__} = sub {1};
240             local $SIG{__DIE__} = 'DEFAULT';
241             POE::Filter::Reference->new( $ft );
242             };
243             warn "$ft: $@" if DEBUG and $@;
244             }
245              
246             return $ret unless $params->{processes};
247              
248             # Delete the SocketFactory's read select in the parent
249             # We don't ever want the parent to accept a connection
250             # Children put the state back in place after the fork
251             _select_define($heap, 0);
252              
253             $kernel->sig(CHLD => 'sig_CHLD');
254             $kernel->sig(TERM => 'sig_TERM');
255             $kernel->sig(INT => 'sig_INT');
256             DEBUG_USR2 and $kernel->sig('USR2', 'sig_USR2');
257             DEBUG_USR2 and $kernel->sig('USR1', 'sig_USR1');
258              
259             # keep track of children
260             $heap->{children} = {};
261             $heap->{'failed forks'} = 0;
262             $heap->{verbose}=$params->{verbose}||0;
263             $heap->{"max connections"}=$params->{connections}||1;
264              
265             $heap->{'is a child'} = 0; # change behavior for children
266             my $children=0;
267             foreach (2..$params->{processes}) { # fork the initial set of children
268             $kernel->yield('fork', ($_ == $params->{processes}));
269             $children++;
270             }
271              
272             $kernel->yield('waste_time', 60) unless $children;
273             if($params->{babysit}) {
274             $heap->{babysit}=$params->{babysit};
275             delete($heap->{"proctable"});
276             eval {
277             require Proc::ProcessTable;
278             $heap->{"proctable"}=new Proc::ProcessTable;
279             };
280             DEBUG and do {
281             print "Unable to load Proc::ProcessTable: $@\n" if $@;
282             };
283             $kernel->yield('babysit');
284             }
285             return $ret;
286             }
287              
288             #------------------------------------------------------------------------------
289             sub _child
290             {
291             my( $heap, $kernel, $op, $child, $ret ) =
292             @_[ HEAP, KERNEL, ARG0, ARG1, ARG2 ];
293             $ret ||= '';
294             DEBUG and
295             warn "$$: _child op=$op child=$child ret=$ret";
296             unless( $ret eq "channel-$child" ) {
297             if( $op eq 'create' ) {
298             DEBUG and
299             warn "$$: Detatching child session $child";
300             $kernel->detach_child( $child );
301             }
302             return;
303             }
304             if( $op eq 'lose' ) {
305             DB::disable_profile() if $INC{'Devel/NYTProf.pm'};
306             $heap->{child_sessions}--;
307             if( $heap->{child_sessions} > 0 ) {
308             DEBUG and warn "$$: still have a child session";
309             }
310             _concurrency_down($heap);
311             }
312             else {
313             $heap->{child_sessions}++;
314             return;
315             }
316             unless( $heap->{wheel} ) { # no wheel == GAME OVER
317             ( DEBUG and not $INC{'Test/More.pm'} ) and
318             warn "$$: }}}}}}}}}}}}}}} Game over\n";
319             # XXX: Using shutdown is a stop-gap measure. Maybe the daemon
320             # wants to stay alive even if IKC was shutdown...
321             # XXX: more to the point, maybe there are still requests that are
322             # hanging around !
323             $kernel->call( IKC => 'shutdown' );
324             }
325             }
326              
327             #------------------------------------------------------------------------------
328             # This event keeps this POE kernel alive
329             sub waste_time
330             {
331             my($kernel, $heap)=@_[KERNEL, HEAP];
332             return if $heap->{'is a child'};
333              
334             unless($heap->{'been told we are parent'}) {
335             $heap->{verbose} and warn "$$: Telling everyone we are the parent\n";
336             $heap->{'been told we are parent'}=1;
337             $kernel->signal($kernel, '__parent');
338             }
339             if($heap->{'die'}) {
340             DEBUG and warn "$$: Orderly shutdown\n";
341             } else {
342             $kernel->delay('waste_time', 60);
343             }
344             return;
345             }
346            
347             #------------------------------------------------------------------------------
348             # Babysit the child processes
349             sub babysit
350             {
351             my($kernel, $heap)=@_[KERNEL, HEAP];
352              
353             return if $heap->{'die'} or # don't scan if we are dieing
354             $heap->{'is a child'}; # or if we are a child
355              
356             my @children=keys %{$heap->{children}};
357             $heap->{verbose} and warn "$$: Babysiting ", scalar(@children),
358             " children ", join(", ", sort @children), "\n";
359             my %table;
360              
361             if($heap->{proctable}) {
362             my $table=$heap->{proctable}->table;
363             %table=map {($_->pid, $_)} @$table
364             }
365              
366             my(%missing, $state, $time, %rogues, %ok);
367             foreach my $pid (@children) {
368             if($table{$pid}) {
369             $state=$table{$pid}->state;
370              
371             if($state eq 'zombie') {
372             my $t=waitpid($pid, POSIX::WNOHANG());
373             if($t==$pid) {
374             # process was reaped, now fake a SIGCHLD
375             DEBUG and warn "$$: Faking a CHLD for $pid\n";
376             $kernel->yield('sig_CHLD', 'CHLD', $pid, $?, 1);
377             $ok{$pid}=1;
378             } else {
379             $heap->{verbose} and warn "$$: $pid is a $state and couldn't be reaped.\n";
380             $missing{$pid}=1;
381             }
382             }
383             elsif($state eq 'run') {
384             $time=eval{$table{$pid}->utime + $table{$pid}->stime};
385             warn $@ if $@;
386             # utime and stime are Linux-only :(
387             $time /= 1_000_000 if $time; # micro-seconds -> seconds
388              
389             if($time and $time > 1200) { # arbitrary limit of 20 minutes
390             $rogues{$pid}=$table{$pid};
391             warn "$$: $pid has gone rogue, time=$time s\n";
392             } else {
393             DEBUG and
394             warn "$$: child $pid has utime+stime=$time s\n"
395             if $time > 1;
396             $ok{$pid}=1;
397             }
398              
399             } elsif($state eq 'sleep' or $state eq 'defunct') {
400             $ok{$pid}=1;
401             # do nothing
402             } else {
403             $heap->{verbose} and warn "$$: $pid has unknown state '$state'\n";
404             $ok{$pid}=1;
405             }
406             } elsif($heap->{proctable}) {
407             $heap->{verbose} and warn "$$: $pid isn't in proctable!\n";
408             $missing{$pid}=1;
409             } else { # try another means.... :/
410             if(-d "/proc" and not -d "/proc/$pid") {
411             DEBUG and warn "$$: Unable to stat /proc/$pid! Is the child missing\n";
412             $missing{$pid}=1;
413             } elsif(not $missing{$pid}) {
414             $ok{$pid}=1;
415             }
416             }
417             }
418              
419             # if a process is MIA, we fake a death, and spawn a new child
420             foreach my $pid (keys %missing) {
421             $kernel->yield('sig_CHLD', 'CHLD', $pid, 0, 1);
422             $heap->{verbose} and warn "$$: Faking a CHLD for $pid MIA\n";
423             }
424              
425             # we could do the same thing for rogue processes, but instead we
426             # give them time to calm down
427              
428             if($heap->{rogues}) { # processes that are %ok are now removed
429             # from the list of rogues
430             delete @{$heap->{rogues}}{keys %ok} if %ok;
431             }
432              
433             if(%rogues) {
434             $kernel->yield('rogues') if not $heap->{rogues};
435              
436             $heap->{rogues}||={};
437             foreach my $pid (keys %rogues) {
438             if($heap->{rogues}{$pid}) {
439             $heap->{rogues}{$pid}{proc}=$rogues{$pid};
440             } else {
441             $heap->{rogues}{$pid}={proc=>$rogues{$pid}, tries=>0};
442             }
443             }
444             }
445              
446             $kernel->delay('babysit', $heap->{babysit});
447             return;
448             }
449              
450             #------------------------------------------------------------------------------
451             # Deal with rogue child processes
452             sub rogues
453             {
454             my($kernel, $heap)=@_[KERNEL, HEAP];
455              
456             return if $heap->{'die'} or # don't scan if we are dieing
457             $heap->{'is a child'}; # or if we are a child
458              
459             # make sure we have some real work
460             return unless $heap->{rogues};
461             eval {
462             if(ref($heap->{rogues}) ne 'HASH' or not keys %{$heap->{rogues}}) {
463             delete $heap->{rogues};
464             return;
465             }
466              
467             my $signal;
468             while(my($pid, $rogue)=each %{$heap->{rogues}}) {
469             $signal=0;
470             if($rogue->{tries} < 1) {
471             $signal=2;
472             }
473             elsif($rogue->{tries} < 2) {
474             $signal=15;
475             }
476             elsif($rogue->{tries} < 3) {
477             $signal=9;
478             }
479            
480             if($signal) {
481             DEBUG and warn "$$: Sending signal $signal to rogue $pid\n";
482             unless($rogue->{proc}->kill($signal)) {
483             warn "$$: Error sending signal $signal to $pid: $!\n";
484             delete $heap->{rogues}{$pid};
485             }
486             } else {
487             # if SIGKILL didn't work, it's beyond hope!
488             $kernel->yield('sig_CHLD', 'CHLD', $pid, 0, 1);
489             delete $heap->{rogues}{$pid};
490             $heap->{verbose} and warn "$$: Faking a CHLD for rogue $pid\n";
491             }
492              
493             $rogue->{tries}++;
494             }
495             $kernel->delay('rogues', 2*$heap->{babysit});
496             };
497             warn "$$: $@" if $@;
498             }
499              
500             #------------------------------------------------------------------------------
501             # Accept POE's standard _stop event, and stop all the children, too.
502             # The 'children' hash is maintained in the 'fork' and 'sig_CHLD'
503             # handlers. It's empty for children.
504              
505             sub _stop
506             {
507             my($kernel, $heap) = @_[KERNEL, HEAP];
508              
509             # kill the child servers
510             if($heap->{children}) {
511             foreach (keys %{$heap->{children}}) {
512             DEBUG && print "$$: server is killing child $_ ...\n";
513             kill 2, $_ or warn "$$: $_ $!\n";
514             }
515             }
516             if($heap->{unix}) {
517             unlink $heap->{unix};
518             }
519             DEBUG &&
520             warn "$$: Server $heap->{name} _stop\n";
521             # DEBUG_USR2 and check_kernel($kernel, $heap->{'is a child'}, 1);
522             # __peek( 1 );
523             }
524              
525             #------------------------------------------------------------------------------
526             sub shutdown
527             {
528             my($kernel, $heap)=@_[KERNEL, HEAP];
529              
530             DEBUG and
531             warn "$$: Server $heap->{name} shutdown\n";
532              
533             _delete_wheel( $heap ); # close socket
534             _delete_delays(); # get it OVER with
535              
536             # -GC
537             # $kernel->alias_remove("IKC Server $heap->{wheel_address}");
538             $heap->{'die'}=1; # prevent race conditions
539             }
540              
541             #----------------------------------------------------
542             # Log server errors, but don't stop listening for connections. If the
543             # error occurs while initializing the factory's listening socket, it
544             # will exit anyway.
545              
546             sub error
547             {
548             my ($heap, $operation, $errnum, $errstr) = @_[HEAP, ARG0, ARG1, ARG2];
549              
550              
551             DEBUG and
552             warn __PACKAGE__, " $$: encountered $operation error $errnum: $errstr\n";
553              
554             my $ignore;
555             if($errnum==EADDRINUSE) { # EADDRINUSE
556             $heap->{'die'}=1;
557             _delete_wheel( $heap );
558             $ignore = 0;
559             } elsif($errnum==WSAEAFNOSUPPORT) {
560             # Address family not supported by protocol family.
561             # we get this error, yet nothing bad happens... oh well
562             $ignore=1;
563             }
564             unless($ignore) {
565             POE::Component::IKC::Util::monitor_error( $heap, $operation, $errnum, $errstr );
566             }
567             }
568              
569             #----------------------------------------------------
570             # The socket factory invokes this state to take care of accepted
571             # connections.
572              
573             sub accept
574             {
575             my ($heap, $kernel, $handle, $peer_host, $peer_port) =
576             @_[HEAP, KERNEL, ARG0, ARG1, ARG2];
577              
578             T->start( 'IKC' );
579             if(DEBUG) {
580             if($peer_port) {
581             warn "$$: Server connection from ", inet_ntoa($peer_host),
582             ":$peer_port",
583             ($heap->{'is a child'} ?
584             " (Connection $heap->{connections})\n" : "\n");
585             } else {
586             warn "$$: Server connection over $heap->{unix}",
587             ($heap->{'is a child'} ?
588             " (Connection $heap->{connections})\n" : "\n");
589             }
590             }
591             if($heap->{children} and not $heap->{'is a child'}) {
592             warn "$$: Parent process received a connection: THIS SUCKS\n";
593             return;
594             }
595              
596             DB::enable_profile() if $INC{'Devel/NYTProf.pm'};
597              
598             DEBUG and warn "$$: Server kernel_aliases=", join ',', @{$heap->{kernel_aliases}||[]};
599              
600             # give the connection to a channel
601             POE::Component::IKC::Channel->spawn(
602             handle=>$handle,
603             name=>$heap->{name},
604             unix=>$heap->{unix},
605             aliases=>[@{$heap->{kernel_aliases}||[]}],
606             protocol=>$heap->{protocol},
607             on_error=>$heap->{on_error}
608             );
609              
610             _concurrency_up($heap);
611            
612             return unless $heap->{children};
613              
614             if (--$heap->{connections} < 1) {
615             DEBUG and
616             warn "$$: {{{{{{{{{{{{{{{ Game over\n";
617             $kernel->delay('waste_time');
618             _delete_wheel( $heap );
619             $::TRACE_REFCNT = 1;
620              
621             } else {
622             DEBUG and
623             warn "$$: $heap->{connections} connections left\n";
624             }
625             }
626              
627              
628              
629              
630             #------------------------------------------------------------------------------
631             # The server has been requested to fork, so fork already.
632             sub fork
633             {
634             my ($kernel, $heap, $last) = @_[KERNEL, HEAP, ARG0];
635             # children should not honor this event
636             # Note that the forked POE kernel might have these events in it already
637             # this is unavoidable
638             if($heap->{'is a child'} or not $heap->{children} or $heap->{'die'}) {
639             DEBUG and warn "$$: We are a child, why are we forking?\n";
640             return;
641             }
642             my $parent=$$;
643            
644              
645             DEBUG and warn "$$: Forking a child";
646            
647             my $pid = fork(); # try to fork
648             unless (defined($pid)) { # did the fork fail?
649             # try again later, if a temporary error
650             if (($! == EAGAIN) || ($! == ECHILD)) {
651             DEBUG and warn "$$: Recoverable forking problem";
652             $heap->{'failed forks'}++;
653             $kernel->delay('retry', 1);
654             }
655             else { # fail permanently, if fatal
656             POE::Component::IKC::Util::monitor_error( $heap, 'fork', 0+$1, "$!" );
657             $kernel->yield('_stop');
658             }
659             return;
660             }
661             # successful fork; parent keeps track
662             if ($pid) {
663             $heap->{children}->{$pid} = 1;
664             DEBUG &&
665             print( "$$: master server forked a new child. children: (",
666             join(' ', sort keys %{$heap->{children}}), ")\n"
667             );
668             $kernel->yield('waste_time') if $last;
669             }
670             # child becomes a child server
671             else {
672             $heap->{verbose} and warn "$$: Created ", scalar localtime, "\n";
673              
674             # This resets some kernel data that was preventing the child process's
675             # kernel from becoming IDLE
676             if( $kernel->can( 'has_forked' ) ) {
677             $kernel->has_forked;
678             }
679             else {
680             $kernel->_data_sig_initialize;
681             }
682              
683             # Clean out stuff that the parent needs but not the children
684             $heap->{'is a child'} = 1; # don't allow fork
685             $heap->{'failed forks'} = 0;
686             $heap->{children}={}; # don't kill child processes
687             # limit sessions, then die off
688             $heap->{connections} = $heap->{"max connections"};
689              
690             # These signals are no longer our problem
691             $kernel->sig('CHLD');
692             $kernel->sig('INT');
693              
694             # remove any waits that might be around
695             _delete_delays(); # get it OVER with
696              
697             delete @{$heap}{qw(rogues proctable)};
698              
699             # Tell everyone we are now a child
700             $kernel->signal($kernel, '__child');
701              
702             # Create a select for the children, so that SocketFactory can
703             # do it's thing
704             _select_define($heap, 1);
705              
706             DEBUG && print "$$: child server has been forked\n";
707             }
708              
709             # remove the call
710             return;
711             }
712              
713              
714             #------------------------------------------------------------------------------
715             # Retry failed forks. This is invoked (after a brief delay) if the
716             # 'fork' state encountered a temporary error.
717              
718             sub retry
719             {
720             my ($kernel, $heap) = @_[KERNEL, HEAP];
721             if($heap->{'is a child'} or not $heap->{children}) {
722             warn "$$: We are a child, why are we forking?\n";
723             return;
724             }
725              
726             # Multiplex the delayed 'retry' event into enough 'fork' events to
727             # make up for the temporary fork errors.
728              
729             for (1 .. $heap->{'failed forks'}) {
730             $kernel->yield('fork');
731             }
732             # reset the failed forks counter
733             $heap->{'failed forks'} = 0;
734             return;
735             }
736              
737             #------------------------------------------------------------------------------
738             # SIGCHLD causes this session to fork off a replacement for the lost child.
739              
740             sub sig_CHLD
741             {
742             my ($kernel, $heap, $signal, $pid, $status, $fake) =
743             @_[KERNEL, HEAP, ARG0, ARG1, ARG2, ARG3];
744              
745             return if $heap->{"is a child"};
746              
747             if($heap->{children}) {
748             # if it was one of ours; fork another
749             if (delete $heap->{children}->{$pid}) {
750             DEBUG &&
751             print( "$$: master caught SIGCHLD for $pid. children: (",
752             join(' ', sort keys %{$heap->{children}}), ")\n"
753             );
754             $heap->{verbose} and warn "$$: Child $pid ",
755             ($fake?'is gone':'exited normaly'), ".\n";
756             $kernel->yield('fork') unless $heap->{'die'};
757             } elsif($fake) {
758             warn "$$: Needless fake CHLD for $pid\n";
759             } else {
760             warn "$$: CHLD for $pid child of someone else.\n";
761             }
762             }
763             # don't handle terminal signals
764             return;
765             }
766              
767             #------------------------------------------------------------------------------
768             # Terminal signals aren't handled, so the session will stop on SIGINT.
769             # The _stop event handler takes care of cleanup.
770              
771             sub sig_INT
772             {
773             my ($kernel, $heap, $signal, $pid, $status) =
774             @_[KERNEL, HEAP, ARG0, ARG1, ARG2];
775              
776             return 0 if $heap->{"is a child"};
777              
778             if($heap->{children}) {
779             $heap->{verbose} and warn "$$ SIGINT\n";
780             $heap->{'die'}=1;
781             # kill all events
782             _delete_delays(); # get it OVER with
783             } else {
784             _delete_wheel( $heap );
785             }
786             $kernel->post( IKC => 'shutdown' );
787             $kernel->sig_handled(); # INT is terminal
788             return;
789             }
790              
791             #------------------------------------------------------------------------------
792             # daemontool's svc -d sends a TERM
793             # The _stop event handler takes care of cleanup.
794              
795             sub sig_TERM
796             {
797             my ($kernel, $heap, $signal, $pid, $status) =
798             @_[KERNEL, HEAP, ARG0, ARG1, ARG2];
799              
800             $heap->{verbose} and warn "$$ SIGTERM\n";
801             $heap->{'die'}=1;
802              
803             _delete_wheel( $heap );
804              
805             _delete_delays(); # get it OVER with
806              
807             $kernel->post( IKC => 'shutdown' );
808              
809             $kernel->sig_handled(); # TERM is terminal
810             return;
811             }
812              
813             ############################################################
814             sub check_kernel
815             {
816             my($kernel, $child, $signal)=@_;
817             if(ref $kernel) {
818             # 2 = KR_HANDLES
819             # 7 = KR_EVENTS
820             # 8 = KR_ALARMS (NO MORE!)
821             # 12 = KR_EXTRA_REFS
822              
823             # 0 = HND_HANDLE
824             warn( "$$: ,----- Kernel Activity -----\n",
825             "$$: | States : ", scalar(@{$kernel->[7]}), " ",
826             join( ', ', map {$_->[0]->ID."/$_->[2]"}
827             @{$kernel->[7]}), "\n",
828             # "$$: | Alarms : ", scalar(@{$kernel->[8]}), "\n",
829             "$$: | Files : ", scalar(keys(%{$kernel->[2]})), "\n",
830             "$$: | `--> : ", join( ', ',
831             sort { $a <=> $b }
832             map { fileno($_->[0]) }
833             values(%{$kernel->[2]})
834             ), "\n",
835             "$$: | Extra : ${$kernel->[12]}\n",
836             "$$: `---------------------------\n",
837             );
838             # if($child) {
839             # foreach my $q (@{$kernel->[8]}) {
840             # warn "************ Alarm for ", join '/', @{$q->[0][2]{$q->[2]}};
841             # }
842             # }
843             } else {
844             warn "$kernel isn't a reference";
845             }
846             }
847              
848             ############################################################
849             sub __peek
850             {
851             my($verbose)=@_;
852             eval {
853             require POE::Component::Daemon;
854             };
855             unless( $@ ) {
856             my $ret = Daemon->peek( $verbose );
857             $ret =~ s/\n/\n$$: /g;
858             warn "$$: $ret";
859             return 1;
860             }
861              
862             eval {
863             require POE::API::Peek;
864             };
865             if($@) {
866             DEBUG and warn "Failed to load POE::API::Peek: $@";
867             return;
868             }
869             my $api=POE::API::Peek->new();
870             my @queue = $api->event_queue_dump();
871            
872             my $ret = "Event Queue:\n";
873            
874             my $events = {};
875              
876             foreach my $item (@queue) {
877             $ret .= "\t* ID: ". $item->{ID}." - Index: ".$item->{index}."\n";
878             $ret .= "\t\tPriority: ".$item->{priority}."\n";
879             $ret .= "\t\tEvent: ".$item->{event}."\n";
880              
881             if($verbose) {
882             $events->{ $item->{source}->ID }{source} ++;
883             $ret .= "\t\tSource: ".
884             $api->session_id_loggable($item->{source}).
885             "\n";
886             $ret .= "\t\tDestination: ".
887             $api->session_id_loggable($item->{destination}).
888             "\n";
889             $ret .= "\t\tType: ".$item->{type}."\n";
890             $ret .= "\n";
891             }
892             }
893             if($api->session_count) {
894             $ret.="Keepalive " unless $verbose;
895             $ret.="Sessions: \n";
896             my $ses;
897             foreach my $session ( sort { $a->ID <=> $b->ID } $api->session_list) {
898             my $ref=0;
899             $ses='';
900              
901             $ses.="\tSession ".$api->session_id_loggable($session)." ($session)";
902              
903             my $refcount=$api->get_session_refcount($session);
904             $ses.="\n\t\tref count: $refcount\n";
905              
906             my $q1=$api->get_session_extref_count($session);
907             $ref += $q1;
908             $ses.="\t\textref count: $q1 [keepalive]\n" if $q1;
909              
910             my $hc=$api->session_handle_count($session);
911             $ref += $hc;
912             $ses.="\t\thandle count: $q1 [keepalive]\n" if $hc;
913              
914             my @aliases=$api->session_alias_list($session);
915             $ref += @aliases;
916             $q1=join ',', @aliases;
917             $ses.="\t\tAliases: $q1\n" if $q1;
918              
919             my @children = $api->get_session_children($session);
920             if(@children) {
921             $ref += @children;
922             $q1 = join ',', map {$api->session_id_loggable($_)} @children;
923             $ses.="\t\tChildren: $q1\n";
924             }
925              
926             $q1 = $events->{ $session->ID }{source};
927             if( $q1 ) {
928             $ret.="\t\tEvent source count: $q1 (Stay alive)\n";
929             $ref += $q1;
930             }
931              
932             $q1 = $events->{ $session->ID }{destination};
933             if( $q1 ) {
934             $ret.="\t\tEvent destination count: $q1 (Stay alive)\n";
935             $ref += $q1;
936             }
937              
938             if($refcount != $ref) {
939             $ses.="\t\tReference: refcount=$refcount counted=$ref [keepalive]\n";
940             }
941             if($hc or $verbose or $refcount != $ref) {
942             $ret.=$ses;
943             }
944             }
945             }
946             $ret.="\n";
947              
948             warn "$$: $ret";
949             return 1;
950             }
951              
952              
953             sub sig_USR2
954             {
955             # return unless DEBUG;
956             my ($kernel, $heap, $signal, $pid) = @_[KERNEL, HEAP, ARG0, ARG1];
957             $pid||='';
958             warn "$$: signal $signal $pid\n";
959             unless(__peek(1)) {
960             check_kernel($kernel, $heap->{'is a child'}, 1);
961             }
962             $kernel->sig_handled();
963             return;
964             }
965              
966             sub sig_USR1
967             {
968             # return unless DEBUG;
969             my ($kernel, $heap, $signal, $pid) = @_[KERNEL, HEAP, ARG0, ARG1];
970             $pid||='';
971             warn "$$: signal $signal $pid\n";
972             unless(__peek(0)) {
973             check_kernel($kernel, $heap->{'is a child'}, 0);
974             }
975             $kernel->sig_handled();
976             return;
977             }
978              
979              
980             1;
981             __END__