File Coverage

blib/lib/Proc/tored/Pool/Manager.pm
Criterion Covered Total %
statement 69 70 98.5
branch 8 10 80.0
condition n/a
subroutine 19 20 95.0
pod 2 3 66.6
total 98 103 95.1


line stmt bran cond sub pod time code
1             package Proc::tored::Pool::Manager;
2             # ABSTRACT: OO interface to creating a managed worker pool service
3             $Proc::tored::Pool::Manager::VERSION = '0.07';
4              
5 29     29   2511383 use strict;
  29         45  
  29         670  
6 29     29   103 use warnings;
  29         42  
  29         618  
7              
8 29     29   16657 use Moo;
  29         257702  
  29         116  
9 29     29   32729 use Carp;
  29         58  
  29         1621  
10 29     29   13491 use Types::Standard -types;
  29         2323973  
  29         380  
11 29     29   91061 use Type::Utils qw(declare as where);
  29         103151  
  29         248  
12 29     29   24710 use Try::Tiny;
  29         27033  
  29         1408  
13 29     29   12981 use Time::HiRes 'sleep';
  29         30677  
  29         151  
14 29     29   23238 use Parallel::ForkManager;
  29         758522  
  29         1064  
15 29     29   6306 use Proc::tored::Pool::Constants ':events';
  29         45  
  29         4067  
16 29     29   11690 use Proc::tored::Pool::Types -types;
  29         74  
  29         399  
17              
18             extends 'Proc::tored::Manager';
19              
20              
21             has workers => (
22             is => 'ro',
23             isa => PosInt,
24             required => 1,
25             );
26              
27              
28             has on_assignment => (
29             is => 'ro',
30             isa => Maybe[CodeRef],
31             );
32              
33              
34             has on_success => (
35             is => 'ro',
36             isa => Maybe[CodeRef],
37             );
38              
39              
40             has on_failure => (
41             is => 'ro',
42             isa => Maybe[CodeRef],
43             );
44              
45              
46             has pending => (
47             is => 'ro',
48             isa => Int,
49             default => 0,
50             init_arg => undef,
51             );
52              
53             sub trigger {
54 435     435 0 3791 my ($self, $event, $ident, @args) = @_;
55 435         5256 Event->assert_valid($event);
56 435 50       25409 my $acc = $self->can("on_$event") or die "unknown event type: $event";
57 435 50       3339 if (my $cb = $self->$acc()) {
58 435     435   30405 try { $cb->($self, $ident, @args) }
59 435     0   9855 catch { warn "error triggering callback for task $ident: $_" };
  0         0  
60             }
61             }
62              
63             has forkmgr => (
64             is => 'lazy',
65             isa => InstanceOf['Parallel::ForkManager'],
66             init_arg => undef,
67             );
68              
69             sub _build_forkmgr {
70 29     29   261 my $self = shift;
71 29         358 my $pm = Parallel::ForkManager->new($self->workers);
72              
73             $pm->run_on_start(sub {
74 234     234   398433 my ($pid, $ident) = @_;
75 234         1169 ++$self->{pending};
76 234         4186 $self->trigger(assignment, $ident);
77 29         10397 });
78              
79             $pm->run_on_finish(sub {
80 201     201   122905696 my ($pid, $code, $ident, $signal, $core, $data) = @_;
81 201         791 --$self->{pending};
82              
83             # Task completed successfully
84 201 100       787 if ($code == 0) {
85             # Task returned data
86 166 100       489 if ($data) {
87 165         674 my ($success, @results) = @$data;
88 165 100       405 if ($success) {
89 130         741 $self->trigger(success, $ident, @results);
90             }
91             else {
92 35         352 $self->trigger(failure, $ident, @results);
93             }
94             }
95             # No data was returned - likely the result of an exec
96             else {
97 1         8 $self->trigger(success, $ident, '0 but true');
98             }
99             }
100             else {
101 35         282 $self->trigger(failure, $ident, "worker terminated with exit code $code (signal $signal)");
102             }
103 29         455 });
104              
105 29         706 return $pm;
106             }
107              
108              
109             sub assign {
110 226     226 1 803397 my $self = shift;
111 226         402 my $code = shift;
112              
113             push @_, sub {
114 27         4764 try { [1, $code->(@_)] }
115 27     27   62018 catch { [0, $_] };
  3         370  
116 226         1005 };
117              
118 226         6161 $self->forkmgr->wait_for_available_procs(1);
119 226         11260 $self->forkmgr->start_child(@_);
120 199         30159 $self->forkmgr->wait_children; # triggers pending callbacks w/o blocking
121 199         19576 return 1;
122             }
123              
124              
125             sub sync {
126 91     91 1 5676 my $self = shift;
127 91         3226 $self->forkmgr->wait_all_children;
128             }
129              
130             after service => sub { shift->sync };
131              
132             1;
133              
134             __END__