File Coverage

blib/lib/POE/Component/Supervisor.pm
Criterion Covered Total %
statement 1 3 33.3
branch n/a
condition n/a
subroutine 1 1 100.0
pod n/a
total 2 4 50.0


line stmt bran cond sub pod time code
1             #!/usr/bin/perl
2              
3             package POE::Component::Supervisor;
4 1     1   2986 use MooseX::POE;
  0            
  0            
5              
6             use Moose::Util::TypeConstraints;
7              
8             use POE::Component::Supervisor::Supervised;
9             use POE::Component::Supervisor::Handle;
10              
11             use Devel::PartialDump;
12              
13             use Hash::Util::FieldHash::Compat qw(idhash);
14              
15             use namespace::clean -except => 'meta';
16              
17             our $VERSION = "0.08";
18              
19             with qw(
20             POE::Component::Supervisor::Interface
21             MooseX::POE::Aliased
22             POE::Component::Supervisor::LogDispatch
23             );
24              
25             sub run {
26             my $self = shift->new(@_);
27             $poe_kernel->run;
28             }
29              
30             # by default when all the children die we exit as well
31             sub _build_alias { undef }
32              
33             has restart_policy => (
34             isa => enum(__PACKAGE__ . "::RestartPolicy" => qw(one all rest)),
35             is => "rw",
36             default => "one",
37             );
38              
39             has children => (
40             isa => "ArrayRef",
41             init_arg => undef,
42             is => "ro",
43             auto_deref => 1,
44             default => sub { [] },
45             );
46              
47             has _children_tmp => (
48             isa => "ArrayRef",
49             is => "rw",
50             init_arg => undef,
51             clearer => "_clear_children_tmp",
52             );
53              
54             has _last_child_id => (
55             isa => "Int",
56             is => "rw",
57             default => 0,
58             );
59              
60             sub _next_child_id {
61             my $self = shift;
62             $self->_last_child_id( $self->_last_child_id + 1 );
63             }
64              
65             has _children_hash => (
66             isa => "HashRef",
67             is => "ro",
68             init_arg => undef,
69             default => sub { idhash my %h },
70             );
71              
72             sub _child_id {
73             my ( $self, $child ) = @_;
74              
75             if ( defined ( my $id = $self->_children_hash->{$child}{id} ) ) {
76             return $id;
77             } else {
78             confess "unknown child $child";
79             }
80             }
81              
82             sub _child_handle {
83             my ( $self, $child ) = @_;
84             $self->_children_hash->{$child}{handle};
85             }
86              
87             # used to track which children are currently being stopped for the purpose of
88             # restarting, because we first have to stop everything and then we start them again
89             has _stopping_for_restart => (
90             isa => "HashRef",
91             is => "ro",
92             init_arg => undef,
93             default => sub { idhash my %h },
94             );
95              
96             # when children that are being restarted have stopped they are tracked here
97             # when the last child is stopped this collection of children will be started based on the order of 'children'
98             has _pending_restart => (
99             isa => "HashRef",
100             is => "ro",
101             init_arg => undef,
102             default => sub { idhash my %h },
103             );
104              
105             sub START {
106             my ( $self, $kernel ) = @_[OBJECT, KERNEL];
107              
108             $kernel->sig( DIE => "exception" );
109              
110             $self->logger->info("starting supervisor $self in process $$");
111              
112             if ( my $children = $self->_children_tmp ) {
113             $self->_clear_children_tmp;
114             $self->start(@$children);
115             }
116             }
117              
118             sub STOP {
119             my $self = $_[OBJECT];
120              
121             $self->logger->info("stopping supervisor $self in process $$");
122             }
123              
124             event exception => sub {
125             my ( $self, $error_info ) = @_[OBJECT, ARG1];
126              
127             $self->logger->error("Error in supervisor child session, event $error_info->{event} of $error_info->{dest_session}: $error_info->{error_str}, sent from $error_info->{source_session} state $error_info->{from_state} at $error_info->{file} line $error_info->{line}");
128             };
129              
130             sub _register_child {
131             my ( $self, $new_child ) = @_;
132              
133             $self->logger->debug("registering child $new_child");
134              
135             $self->_children_hash->{$new_child} ||= do {
136             push @{ $self->children }, $new_child;
137             $self->_new_child_registration($new_child);
138             }
139             }
140              
141             sub _new_child_registration {
142             my ( $self, $new_child ) = @_;
143             return { id => $self->_next_child_id };
144             }
145              
146             sub _unregister_child {
147             my ( $self, $child ) = @_;
148              
149             $self->logger->debug("unregistering child $child");
150              
151             if ( delete $self->_children_hash->{$child} ) {
152             @{ $self->children } = grep { $_ != $child } @{ $self->children };
153             }
154              
155             }
156              
157             sub BUILD {
158             my ( $self, $params ) = @_;
159              
160             if ( my $children = $params->{children} ) {
161             $self->_children_tmp($children);
162             }
163             }
164              
165             sub start {
166             my ( $self, @children ) = @_;
167              
168             foreach my $child ( @children ) {
169             next if $self->_children_hash->{$child};
170             $self->_register_child($child);
171             $self->yield( spawn => $child );
172             }
173             }
174              
175             sub stop {
176             my ( $self, @children ) = @_;
177              
178             @children = reverse $self->children unless @children;
179              
180             foreach my $child ( @children ) {
181             if ( my $entry = $self->_children_hash->{$child} ) {
182             if ( my $handle = $entry->{handle} ) {
183             $entry->{stopping} = 1;
184             $entry->{handle}->stop;
185              
186             # remove it from the children list, so that it isn't restarted due to a
187             # 'rest' or 'all' policy because of some other childs' exit
188             # _unregister_child will eventually try to do this too, but that's OK
189             # because it *should* do it if the child has been stopped unexpectedly
190             # and is temporary/transient
191             @{ $self->children } = grep { $_ != $child } @{ $self->children };
192             } else {
193             # it's already dead, just delete it
194             $self->_unregister_child($child);
195             }
196             }
197             }
198             }
199              
200             sub notify_spawned {
201             my ( $self, @args ) = @_;
202             $self->yield( spawned => @args );
203             }
204              
205             sub notify_stopped {
206             my ( $self, @args ) = @_;
207             $self->yield( stopped => @args );
208             }
209              
210             event spawned => sub {
211             my ( $self, $kernel, $child, @args ) = @_[OBJECT, KERNEL, ARG0 .. $#_];
212              
213             $kernel->refcount_increment( $self->get_session_id(), "handles" );
214              
215             $self->logger->info("child " . $self->_child_id($child) . " spawned " . Devel::PartialDump::dump(@args));
216             };
217              
218             event spawn => sub {
219             my ( $self, $child ) = @_[OBJECT, ARG0 ];
220              
221             $self->logger->debug("instructing child " . $self->_child_id($child) . " to spawn");
222              
223             $self->_children_hash->{$child}{handle} = $child->spawn( supervisor => $self );
224             };
225              
226             event respawn => sub {
227             my ( $self, $child ) = @_[OBJECT, ARG0];
228              
229             $self->_pending_restart->{$child} = $child;
230              
231             if ( scalar keys %{ $self->_stopping_for_restart } ) {
232             # if we're waiting on more children to exit, just mark this child as ready to restart
233             $self->logger->debug("child " . $self->_child_id($child) . " respawn postponed, other children still not stopped");
234             } else {
235             # otherwise we can now restart all the children which are ready to be restarted
236             $self->logger->debug("no more unstopped children, ready to respawn");
237             my @children_to_restart = grep { defined } delete @{ $self->_pending_restart }{ $self->children };
238              
239             foreach my $child ( @children_to_restart ) {
240             $self->yield( _respawn => $child );
241             }
242             }
243             };
244              
245             event _respawn => sub {
246             my ( $self, $child ) = @_[OBJECT, ARG0];
247              
248             $self->logger->info("respawning child " . $self->_child_id($child));
249             $self->_children_hash->{$child}{handle} = $child->respawn( supervisor => $self );
250             };
251              
252             event stopped => sub {
253             my ( $self, $kernel, $child, @args ) = @_[OBJECT, KERNEL, ARG0 .. $#_];
254              
255             $kernel->refcount_decrement( $self->get_session_id(), "handles" );
256              
257             delete $self->_children_hash->{$child}{handle};
258              
259             if ( $self->_children_hash->{$child}{stopping} ) {
260             $self->call( stopped_per_request => $child, @args );
261             } elsif ( my $restarting = delete $self->_stopping_for_restart->{$child} ) {
262             $self->call( stopped_for_restart => $child, @args );
263             } else {
264             $self->call( stopped_unexpectedly => $child, @args );
265             }
266             };
267              
268             event stopped_per_request => sub {
269             my ( $self, $kernel, $child, @args ) = @_[OBJECT, KERNEL, ARG0 .. $#_];
270              
271             $self->logger->info("child " . $self->_child_id($child) . " exited as requested");
272              
273             $self->_unregister_child($child);
274             $kernel->refcount_decrement( $self->get_session_id(), "children" );
275             };
276              
277             event stopped_unexpectedly => sub {
278             my ( $self, $kernel, $child, @args ) = @_[OBJECT, KERNEL, ARG0 .. $#_];
279              
280             my $id = $self->_child_id($child);
281              
282             $self->logger->notice("child $id exited on its own");
283              
284             if ( $self->should_restart_child($child, @args) ) {
285             if ( $self->child_exit_is_fatal($child, @args) ) {
286             $self->logger->error("child $id exit is fatal, raising error");
287             $self->yield( fatal_exit => $child, @args );
288             } else {
289             my $policy = $self->restart_policy;
290             $self->logger->info("child $id will be restarted, restart policy is $policy");
291             $self->yield( "restart_$policy" => $child, @args );
292             }
293             } else {
294             $self->logger->info("child $id won't be restarted");
295             $self->_unregister_child($child);
296             $kernel->refcount_decrement( $self->get_session_id(), "children" );
297             }
298             };
299              
300             event stopped_for_restart => sub {
301             my ( $self, $child, %args ) = @_[OBJECT, ARG0 .. $#_];
302              
303             $self->logger->info("child " . $self->_child_id($child) . " exited for restart as requested");
304              
305             $self->yield( respawn => $child );
306             };
307              
308             event restart_one => sub {
309             my ( $self, $child, %args ) = @_[OBJECT, ARG0 .. $#_];
310              
311             if ( my $handle = $self->_child_handle($child) ) {
312             $self->logger->info("stopping child " . $self->_child_id($child) . " for restart");
313             $self->_stopping_for_restart->{$child} = 1;
314             $handle->stop_for_restart();
315             } else {
316             $self->logger->debug("child " . $self->_child_id($child) . " is already dead, marking for respawn");
317             $self->yield( respawn => $child );
318             }
319             };
320              
321             event restart_all => sub {
322             my ( $self, $child, %args ) = @_[OBJECT, ARG0 .. $#_];
323              
324             foreach my $child ( reverse $self->children ) {
325             $self->yield( restart_one => $child );
326             }
327             };
328              
329             event restart_rest => sub {
330             my ( $self, $child, %args ) = @_[OBJECT, ARG0 .. $#_];
331              
332             my @children = $self->children;
333              
334             shift @children while $children[0] != $child;
335              
336             foreach my $child ( reverse @children ) {
337             $self->yield( restart_one => $child );
338             }
339             };
340              
341             event fatal_exit => sub {
342             # stop everything
343             # exit with error
344             # FIXME how do we exit abstractly? yield to some callback?
345              
346             die "supervisor seppuku";
347             };
348              
349             sub child_exit_is_fatal {
350             my ( $self, $child, @args ) = @_;
351              
352             # check if the child exceeded the maximal number of restarts by looking at
353             # $self->_children_hash->{$child}{token_bucket}, the child descriptor's
354             # restart policy (if its transient or not, etc), $args{exit_code} being an
355             # error, etc
356              
357             return 0;
358             }
359              
360             sub should_restart_child {
361             my ( $self, $child, @args ) = @_;
362              
363             $child->should_restart(@args);
364             }
365              
366             event _child => sub {
367             my ( $self, $verb, $child ) = @_[OBJECT, ARG0, ARG1];
368              
369             $self->logger->debug("supervisor $self had child event for $child: $verb");
370             };
371              
372             __PACKAGE__
373              
374             __END__
375              
376             =pod
377              
378             =head1 NAME
379              
380             POE::Component::Supervisor - Erlang inspired babysitting
381              
382             =head1 SYNOPSIS
383              
384             use POE;
385              
386             use POE::Component::Supervisor;
387              
388             POE::Component::Supervisor->new(
389             children => [
390             POE::Component::Supervisor::Supervised::Proc->new( ... ), # monitor UNIX procs
391             POE::Component::Supervisor::Supervised::Session->new( ... ), # monitor POE sessions
392             ],
393             );
394              
395             =head1 DESCRIPTION
396              
397             This is a port of the Erlang process supervisor
398             (L<http://www.erlang.org/doc/design_principles/sup_princ.html>).
399              
400             This will monitor instances of children, restarting them as necessary should
401             they exit.
402              
403             Restart throttling is not yet implemented but planned for a future version.
404              
405             =head1 OBJECT HIERARCHY
406              
407             A supervisor has any number of supervised child descriptors, which in turn
408             instantiate handles for each spawned instance of the child.
409              
410             Supervised children are essential object factories for handles. They spawn new
411             instances of the child they describe by instantiating handles.
412              
413             A handle will do the actual management of the child, sending events to the
414             supervisor when the child is terminated, and also facilitate explicit
415             termination of the child's instance.
416              
417             Based on its C<restart_policy> the supervisor may order other handles to also
418             stop, and ask various child descriptors to respawn certain children.
419              
420             =head1 POE REFERENCE COUNTING
421              
422             When no more children are being supervised the L<POE> reference count for the
423             supervisor's session will go down to zero. If no C<alias> is set up then the
424             session will close. If an C<alias> is set and no other sessions are doing
425             anything the session will also close. See L<POE>, and L<MooseX::POE::Aliased>.
426              
427             =head1 ATTRIBUTES
428              
429             =over 4
430              
431             =item alias
432              
433             See L<MooseX::POE::Aliased>.
434              
435             This defaults to C<undef>, unlike the role, so that a supervisor session will
436             close automatically once it has no more children to supervise.
437              
438             =item use_logger_singleton
439              
440             See L<MooseX::LogDispatch>.
441              
442             Changes the default to true, to allow usage of an already configured
443             L<Log::Dispatch::Config> setup.
444              
445             =item restart_policy
446              
447             This is one of C<one>, C<all> or C<rest>.
448              
449             If the L<POE::Component::Supervisor::Supervised> object describing the child
450             deems the child should be restarted, then the value of this attribute controls
451             which other children to also restart.
452              
453             C<one> denotes that only the child which died will be restarted.
454              
455             C<rest> will cause all the children appearing after the child which died in the
456             children array to be restarted, but not the children preceding it.
457              
458             C<all> will restart all the children.
459              
460             =item children
461              
462             This is the array of children being supervised.
463              
464             It is a required argument.
465              
466             Note that the array reference will be modified if new children are introduced
467             and when children are removed (even during normal shutdown), so pass in a copy
468             of an array if this is a problem for you.
469              
470             The order of the children matters, see C<restart_policy>.
471              
472             =back
473              
474             =head1 METHODS
475              
476             =item new %args
477              
478             =item start @children
479              
480             Spawn and supervise the children described by the descriptors in @children.
481              
482             =item stop [ @children ]
483              
484             Stop the specified children.
485              
486             If no arguments are provided all the children are stopped.
487              
488             =item should_restart_child $child, %args
489              
490             Delegates to C<$child> by calling
491             L<POE::Component::Supervisor::Supervised/should_restart>.
492              
493             =item child_exit_is_fatal
494              
495             Currently always returns false. In the future restart throttling will be
496             implemented using this method.
497              
498             If C<true> is returned an error will be thrown by the supervisor.
499              
500             =head1 EVENTS
501              
502             The L<POE> event api is currently internal. All manipulation of the supervisor
503             object should be done using the api described in L<METHODS>.
504              
505             =head1 SEE ALSO
506              
507             L<http://www.erlang.org/doc/design_principles/sup_princ.html>
508              
509             =head1 AUTHOR
510              
511             Stevan Little E<lt>stevan@iinteractive.comE<gt>
512              
513             Yuval Kogman E<lt>yuval.kogman@iinteractive.com<gt>
514              
515             =head1 COPYRIGHT
516              
517             Copyright (c) 2008, 2010 Infinity Interactive, Yuval Kogman. All rights
518             reserved This program is free software; you can redistribute it and/or
519             modify it under the same terms as Perl itself.
520              
521             =cut