File Coverage

blib/lib/Proc/tored/Pool/Manager.pm
Criterion Covered Total %
statement 69 70 98.5
branch 8 10 80.0
condition n/a
subroutine 19 20 95.0
pod 2 3 66.6
total 98 103 95.1


line stmt bran cond sub pod time code
1             package Proc::tored::Pool::Manager;
2             # ABSTRACT: OO interface to creating a managed worker pool service
3             $Proc::tored::Pool::Manager::VERSION = '0.06';
4              
5 29     29   2176391 use strict;
  29         29  
  29         654  
6 29     29   87 use warnings;
  29         45  
  29         525  
7              
8 29     29   14108 use Moo;
  29         252795  
  29         132  
9 29     29   29980 use Carp;
  29         42  
  29         1589  
10 29     29   13885 use Types::Standard -types;
  29         1440193  
  29         319  
11 29     29   93109 use Type::Utils qw(declare as where);
  29         96322  
  29         251  
12 29     29   24186 use Try::Tiny;
  29         24365  
  29         1529  
13 29     29   13814 use Time::HiRes 'sleep';
  29         30320  
  29         116  
14 29     29   17800 use Parallel::ForkManager;
  29         709407  
  29         863  
15 29     29   6814 use Proc::tored::Pool::Constants ':events';
  29         45  
  29         3540  
16 29     29   11439 use Proc::tored::Pool::Types -types;
  29         42  
  29         219  
17              
18             extends 'Proc::tored::Manager';
19              
20              
21             has workers => (
22             is => 'ro',
23             isa => PosInt,
24             required => 1,
25             );
26              
27              
28             has on_assignment => (
29             is => 'ro',
30             isa => Maybe[CodeRef],
31             );
32              
33              
34             has on_success => (
35             is => 'ro',
36             isa => Maybe[CodeRef],
37             );
38              
39              
40             has on_failure => (
41             is => 'ro',
42             isa => Maybe[CodeRef],
43             );
44              
45              
46             has pending => (
47             is => 'ro',
48             isa => Int,
49             default => 0,
50             init_arg => undef,
51             );
52              
53             sub trigger {
54 450     450 0 2756 my ($self, $event, $ident, @args) = @_;
55 450         2809 Event->assert_valid($event);
56 450 50       16133 my $acc = $self->can("on_$event") or die "unknown event type: $event";
57 450 50       2717 if (my $cb = $self->$acc()) {
58 450     450   28648 try { $cb->($self, $ident, @args) }
59 450     0   11129 catch { warn "error triggering callback for task $ident: $_" };
  0         0  
60             }
61             }
62              
63             has forkmgr => (
64             is => 'lazy',
65             isa => InstanceOf['Parallel::ForkManager'],
66             init_arg => undef,
67             );
68              
69             sub _build_forkmgr {
70 83     83   15293 my $self = shift;
71 83         1161 my $pm = Parallel::ForkManager->new($self->workers);
72              
73             $pm->run_on_start(sub {
74 234     234   233895 my ($pid, $ident) = @_;
75 234         1701 ++$self->{pending};
76 234         4267 $self->trigger(assignment, $ident);
77 83         35594 });
78              
79             $pm->run_on_finish(sub {
80 216     216   126816009 my ($pid, $code, $ident, $signal, $core, $data) = @_;
81 216         875 --$self->{pending};
82              
83             # Task completed successfully
84 216 100       928 if ($code == 0) {
85             # Task returned data
86 181 100       465 if ($data) {
87 180         554 my ($success, @results) = @$data;
88 180 100       576 if ($success) {
89 145         792 $self->trigger(success, $ident, @results);
90             }
91             else {
92 35         354 $self->trigger(failure, $ident, @results);
93             }
94             }
95             # No data was returned - likely the result of an exec
96             else {
97 1         14 $self->trigger(success, $ident, '0 but true');
98             }
99             }
100             else {
101 35         269 $self->trigger(failure, $ident, "worker terminated with exit code $code (signal $signal)");
102             }
103 83         1150 });
104              
105 83         2191 return $pm;
106             }
107              
108              
109             sub assign {
110 226     226 1 244692 my $self = shift;
111 226         377 my $code = shift;
112              
113             push @_, sub {
114 27         3345 try { [1, $code->(@_)] }
115 27     27   57372 catch { [0, $_] };
  3         273  
116 226         772 };
117              
118 226         4539 $self->forkmgr->wait_for_available_procs(1);
119 226         12587 $self->forkmgr->start_child(@_);
120 199         27370 $self->forkmgr->wait_children; # triggers pending callbacks w/o blocking
121 199         17789 return 1;
122             }
123              
124              
125             sub sync {
126 93     93 1 8288 my $self = shift;
127 93         2597 $self->forkmgr->wait_all_children;
128             }
129              
130             after service => sub { shift->sync };
131              
132             1;
133              
134             __END__