File Coverage

blib/lib/Proc/tored/Pool/Manager.pm
Criterion Covered Total %
statement 69 70 98.5
branch 8 10 80.0
condition n/a
subroutine 19 20 95.0
pod 2 3 66.6
total 98 103 95.1


line stmt bran cond sub pod time code
1             package Proc::tored::Pool::Manager;
2             # ABSTRACT: OO interface to creating a managed worker pool service
3             $Proc::tored::Pool::Manager::VERSION = '0.05';
4              
5 29     29   2074116 use strict;
  29         16  
  29         657  
6 29     29   87 use warnings;
  29         29  
  29         538  
7              
8 29     29   12432 use Moo;
  29         236512  
  29         116  
9 29     29   29471 use Carp;
  29         29  
  29         1389  
10 29     29   13474 use Types::Standard -types;
  29         1311196  
  29         271  
11 29     29   89072 use Type::Utils qw(declare as where);
  29         90310  
  29         251  
12 29     29   23688 use Try::Tiny;
  29         22942  
  29         1347  
13 29     29   12860 use Time::HiRes 'sleep';
  29         28555  
  29         119  
14 29     29   17997 use Parallel::ForkManager;
  29         681007  
  29         809  
15 29     29   6347 use Proc::tored::Pool::Constants ':events';
  29         29  
  29         3185  
16 29     29   10548 use Proc::tored::Pool::Types -types;
  29         45  
  29         216  
17              
18             extends 'Proc::tored::Manager';
19              
20              
21             has workers => (
22             is => 'ro',
23             isa => PosInt,
24             required => 1,
25             );
26              
27              
28             has on_assignment => (
29             is => 'ro',
30             isa => Maybe[CodeRef],
31             );
32              
33              
34             has on_success => (
35             is => 'ro',
36             isa => Maybe[CodeRef],
37             );
38              
39              
40             has on_failure => (
41             is => 'ro',
42             isa => Maybe[CodeRef],
43             );
44              
45              
46             has pending => (
47             is => 'ro',
48             isa => Int,
49             default => 0,
50             init_arg => undef,
51             );
52              
53             sub trigger {
54 450     450 0 2714 my ($self, $event, $ident, @args) = @_;
55 450         2216 Event->assert_valid($event);
56 450 50       14549 my $acc = $self->can("on_$event") or die "unknown event type: $event";
57 450 50       2363 if (my $cb = $self->$acc()) {
58 450     450   25555 try { $cb->($self, $ident, @args) }
59 450     0   10628 catch { warn "error triggering callback for task $ident: $_" };
  0         0  
60             }
61             }
62              
63             has forkmgr => (
64             is => 'lazy',
65             isa => InstanceOf['Parallel::ForkManager'],
66             init_arg => undef,
67             );
68              
69             sub _build_forkmgr {
70 83     83   15287 my $self = shift;
71 83         976 my $pm = Parallel::ForkManager->new($self->workers);
72              
73             $pm->run_on_start(sub {
74 234     234   228191 my ($pid, $ident) = @_;
75 234         1480 ++$self->{pending};
76 234         3928 $self->trigger(assignment, $ident);
77 83         128526 });
78              
79             $pm->run_on_finish(sub {
80 216     216   142078242 my ($pid, $code, $ident, $signal, $core, $data) = @_;
81 216         791 --$self->{pending};
82              
83             # Task completed successfully
84 216 100       1006 if ($code == 0) {
85             # Task returned data
86 181 100       641 if ($data) {
87 180         451 my ($success, @results) = @$data;
88 180 100       450 if ($success) {
89 145         660 $self->trigger(success, $ident, @results);
90             }
91             else {
92 35         359 $self->trigger(failure, $ident, @results);
93             }
94             }
95             # No data was returned - likely the result of an exec
96             else {
97 1         13 $self->trigger(success, $ident, '0 but true');
98             }
99             }
100             else {
101 35         162 $self->trigger(failure, $ident, "worker terminated with exit code $code (signal $signal)");
102             }
103 83         1196 });
104              
105 83         2189 return $pm;
106             }
107              
108              
109             sub assign {
110 226     226 1 235467 my $self = shift;
111 226         251 my $code = shift;
112              
113             push @_, sub {
114 27         3147 try { [1, $code->(@_)] }
115 27     27   49460 catch { [0, $_] };
  3         212  
116 226         766 };
117              
118 226         4326 $self->forkmgr->wait_for_available_procs(1);
119 226         11194 $self->forkmgr->start_child(@_);
120 199         25877 $self->forkmgr->wait_children; # triggers pending callbacks w/o blocking
121 199         17601 return 1;
122             }
123              
124              
125             sub sync {
126 93     93 1 5653 my $self = shift;
127 93         2734 $self->forkmgr->wait_all_children;
128             }
129              
130             after service => sub { shift->sync };
131              
132             1;
133              
134             __END__