File Coverage

blib/lib/Poe/Wheel/Spawner.pm
Criterion Covered Total %
statement 15 61 24.5
branch 0 22 0.0
condition 0 9 0.0
subroutine 5 14 35.7
pod 3 3 100.0
total 23 109 21.1


line stmt bran cond sub pod time code
1             package Poe::Wheel::Spawner;
2              
3 1     1   15728 use 5.006;
  1         2  
  1         34  
4 1     1   4 use strict;
  1         1  
  1         30  
5 1     1   3 use warnings;
  1         4  
  1         30  
6              
7 1         7 use fields qw/
8             pool_size
9             stop_if_done
10             workload
11             _workers_sig_count
12 1     1   1756 /;
  1         1199  
13              
14 1         6 use POE qw/
15             Wheel::Run
16             Filter::Reference
17 1     1   514 /;
  1         35991  
18              
19             =head1 NAME
20              
21             Poe::Wheel::Spawner
22              
23             =head1 DESCRIPTION
24              
25             Poe::Wheel::Spawner generate only one process for your workload and will add a next one on spawn call in it unless poos_size is not exceeded.
26              
27             =head1 VERSION
28              
29             Version 0.02
30              
31             =cut
32              
33             $Poe::Wheel::Spawner::VERSION = '0.02';
34              
35             =head1 SYNOPSIS
36              
37             use M43::POE::Wheel::Spawner;
38              
39             my $foo = M43::POE::Wheel::Spawner->new(
40             pool_size => 2,
41             stop_if_done => 1,
42             workload => sub { _workload() }
43             );
44              
45             $foo->run();
46              
47             sub _workload {
48              
49             # request for a new sibling
50             $foo->spawn($$);
51              
52             # ...
53             }
54              
55             =head1 SUBROUTINES/METHODS
56              
57             =head2 new(%opts)
58              
59             options:
60              
61             =over
62              
63             =item
64              
65             pool_size
66              
67             the number of maximal parallel executed C
68              
69             =item
70              
71             stop_if_done
72              
73             stop after C pid's are exited.
74              
75             run endless if !C
76              
77             =item
78              
79             workload
80              
81             CODE reference to execute
82              
83             =back
84              
85             =cut
86              
87             sub new {
88 0     0 1   my Poe::Wheel::Spawner $self = shift;
89 0           my (%opts) = @_;
90 0 0         unless (ref($self)) {
91 0           $self = fields::new($self);
92             }
93              
94 0 0         if (defined($opts{pool_size})) {
95 0 0         $opts{pool_size} =~ /^\d+$/
96             || die "'pool_size' property expects a positive integer value";
97             }
98              
99 0   0       $self->{pool_size} = int(delete($opts{pool_size}) || 0);
100              
101 0           $self->{stop_if_done} = delete($opts{stop_if_done});
102 0           $self->{workload} = delete($opts{workload});
103 0           $self->{_workers_sig_count} = 0;
104              
105 0 0         %opts && warn sprintf("ignore unsupported properties '%s'", keys(%opts));
106              
107 0           return $self;
108             } ## end sub new
109              
110             =head2 run(%opts)
111              
112             %opts provide to POE::Session
113              
114             =over
115              
116             =item
117              
118             debug
119              
120             default 0
121              
122             =item
123              
124             trace
125              
126             default 0
127              
128             =back
129              
130             create a POE::Session
131              
132             run POE::Kernel
133              
134             =cut
135              
136             sub run {
137 0     0 1   my ($self, %opts) = @_;
138              
139 0 0         ref($self->{workload}) eq 'CODE'
140             || die "work_method is not a code reference";
141              
142 0   0       POE::Session->create(
      0        
143             options => { debug => $opts{debug} || 0, trace => $opts{trace} || 0 },
144             object_states => [
145             $self => {
146             _start => '_handle_start',
147             _next => '_handle_start',
148             _sig_child => '_handle_sig_child',
149             _done => '_handle_done',
150             _stderr => '_handle_stderr',
151             _stdout => '_handle_stdout',
152             }
153             ]
154             );
155              
156 0           POE::Kernel->run();
157             } ## end sub run
158              
159             =head2 spawn($pid)
160              
161             request to spawn
162              
163             =cut
164              
165             sub spawn {
166 0     0 1   my ($self, $pid) = @_;
167 0           my $filter = POE::Filter::Reference->new();
168 0           my $output = $filter->put([{ busy_worker_pid => $pid }]);
169              
170 0           print @$output;
171             } ## end sub spawn
172              
173             #=head2 _handle_start
174             #
175             #handle C<_start> and C<_next> events defined in POE::Session, which is initialized in C.
176             #
177             #start execution of C by C parallel running pids
178             #
179             #=cut
180              
181             sub _handle_start {
182 0     0     my ($self, $kernel, $heap) = @_[OBJECT, KERNEL, HEAP];
183              
184 0           my $pids_count = scalar(keys(%{ $heap->{worker_by_pid} }));
  0            
185 0 0         ($pids_count >= $self->{pool_size}) && return;
186              
187             my $w = POE::Wheel::Run->new(
188 0     0     Program => sub { &{ $self->{workload} } },
  0            
189 0           StdoutFilter => POE::Filter::Reference->new(),
190             StdoutEvent => "_stdout",
191             StderrEvent => "_stderr",
192             CloseEvent => "_done",
193             );
194              
195 0           $heap->{worker_by_pid}->{ $w->PID } = $w;
196 0           $kernel->sig_child($w->PID, "_sig_child");
197             } ## end sub _handle_start
198              
199             #=head2 _handle_sig_child
200             #
201             #Clear heap. Trigger '_next' if !stop_if_done and currently no child is busy
202             #
203             #=cut
204              
205             sub _handle_sig_child {
206 0     0     my ($self, $kernel, $heap, $pid, $exit_val)
207             = @_[OBJECT, KERNEL, HEAP, ARG1, ARG2];
208              
209 0           ++$self->{_workers_sig_count};
210              
211 0           my $child = delete $heap->{worker_by_pid}{$pid};
212 0 0         unless ($child) {
213 0           POE::Kernel::_die("no child pid: $pid");
214             }
215              
216 0           delete $heap->{busy_worker_pid}->{$pid};
217              
218 0 0         if ($self->{stop_if_done}) {
219 0 0         ($self->{_workers_sig_count} >= $self->{pool_size}) && return;
220             }
221             else {
222 0 0         (scalar(keys(%{ $heap->{busy_worker_pid} })))
  0            
223             || $kernel->yield("_next");
224             }
225             } ## end sub _handle_sig_child
226              
227             #=head2 _handle_done
228             #
229             #is not implemented yet
230             #
231             #=cut
232              
233 0     0     sub _handle_done { }
234              
235             #=head2 _handle_stderr
236             #
237             #provide STDERR to POE::Kernel::_warn
238             #
239             #=cut
240              
241             sub _handle_stderr {
242 0     0     my ($self, $input, $wheel_id) = @_[OBJECT, ARG0, ARG1];
243 0           POE::Kernel::_warn("wheel $wheel_id STDERR: $input");
244             }
245              
246             #=head2 _handle_stdout
247             #
248             #evaluate from child to stdout printed result.
249             #
250             #trigger _next event if child asks - by using busy_worker_pid printed to stdout - for a sibling
251             #
252             #=cut
253              
254             sub _handle_stdout {
255 0     0     my ($self, $kernel, $heap, $result) = @_[OBJECT, KERNEL, HEAP, ARG0];
256 0 0 0       if (ref($result) eq 'HASH' && $result->{busy_worker_pid}) {
257 0           $heap->{busy_worker_pid}->{ $result->{busy_worker_pid} } = 1;
258 0           $kernel->yield("_next");
259             }
260             } ## end sub _handle_stdout
261              
262             1; # End of Poe::Wheel::Spawner
263              
264             =head1 AUTHOR
265              
266             Alexei Pastuchov Epalik at cpan.orgE.
267              
268             =head1 REPOSITORY
269              
270             L
271              
272             =head1 LICENSE AND COPYRIGHT
273              
274              
275             Copyright 2014 by Alexei Pastuchov Epalik at cpan.orgE.
276              
277             This library is free software; you can redistribute it and/or modify
278             it under the same terms as Perl itself.
279              
280             =cut