File Coverage

blib/lib/POE/Component/ForkManager.pm
Criterion Covered Total %
statement 27 136 19.8
branch 0 32 0.0
condition 0 11 0.0
subroutine 9 23 39.1
pod 0 11 0.0
total 36 213 16.9


line stmt bran cond sub pod time code
1             package POE::Component::ForkManager;
2              
3 1     1   23934 use 5;
  1         3  
  1         40  
4              
5 1     1   5 use strict;
  1         2  
  1         30  
6 1     1   5 use warnings;
  1         6  
  1         35  
7              
8 1     1   5 use vars '$VERSION';
  1         2  
  1         63  
9              
10             $VERSION = '0.00_01';
11              
12 1     1   960 use POE;
  1         67012  
  1         6  
13              
14 1     1   121853 use POE::Wheel::ReadWrite;
  1         20798  
  1         38  
15 1     1   989 use POE::Filter::Block;
  1         1497  
  1         25  
16 1     1   7 use POE::Driver::SysRW;
  1         2  
  1         20  
17 1     1   837 use POE::Pipe::TwoWay;
  1         372  
  1         1559  
18              
19             sub DEBUGGING () { 0 }
20              
21             sub new {
22 0     0 0   my $class = shift;
23 0           my ($min, $max) = @_;
24              
25 0   0       $min ||= 0;
26 0   0       $max ||= 0;
27              
28 0   0       my $self = bless {}, (ref $class || $class);
29              
30 0           POE::Session->create(
31             package_states => [
32             __PACKAGE__, [qw(
33             _start
34             _stop
35             spawn_child
36             sigchld
37             parent_input
38             child_input
39             child_flushed
40             check_idle
41             )],
42             ],
43             heap => {
44             pids => {},
45             wheels => {},
46             min_idle => $min,
47             max_idle => $max,
48             idle => {},
49             callbacks => {},
50             },
51             args => [
52             $self,
53             ],
54             );
55              
56 0           return $self;
57             }
58              
59             sub limits {
60 0     0 0   my $self = shift;
61 0           my ($min, $max) = @_;
62              
63 0           my $heap = $self->{heap};
64              
65 0 0         if (defined( $min )) {
66 0           $heap->{min_idle} = $min + 0;
67             }
68 0 0         if (defined( $max )) {
69 0           $heap->{max_idle} = $max + 0;
70             }
71              
72 0           $self->{check_idle}->();
73              
74 0           return( $heap->{min_idle}, $heap->{max_idle} );
75             }
76              
77             sub callback {
78 0     0 0   my $self = shift;
79 0           my ($name, $value) = @_;
80 0           my $callbacks = $self->{heap}->{callbacks};
81              
82 0 0         if (defined( $value )) {
83 0           $callbacks->{$name} = $value;
84             }
85             else {
86 0           delete( $callbacks->{$name} );
87             }
88             }
89              
90             sub busy {
91 0     0 0   my $self = shift;
92 0           my $heap = $self->{heap};
93            
94 0           my $write = $heap->{amulet} = $heap->{write};
95 0           $write->put( 0 );
96             }
97              
98             sub idle {
99 0     0 0   my $self = shift;
100 0           my $heap = $self->{heap};
101              
102 0           my $write = $heap->{amulet} = $heap->{write};
103 0           $write->put( 1 );
104             }
105              
106             sub DESTROY {
107 0     0     my $self = shift;
108            
109 0           warn "DESTROY\n" if DEBUGGING;
110            
111 0           my $heap = delete( $self->{heap} );
112              
113 0           my $write = delete( $heap->{write} );
114            
115 0           $heap->{min_idle} = $heap->{max_idle} = 0;
116 0           $self->{check_idle}->();
117              
118 0           delete( $self->{check_idle} );
119             }
120              
121             sub _start {
122 0     0     my ($kernel, $session, $heap, $object) = @_[KERNEL, SESSION, HEAP, ARG0];
123              
124             # Set up the first batch of child processes, if any.
125 0           $kernel->yield( 'check_idle' );
126              
127             # Detach from the parent session so that we don't hold it alive.
128 0           $kernel->detach_myself();
129              
130 0           $object->{check_idle} = $session->callback( 'check_idle' );
131 0           $object->{heap} = $heap;
132             }
133              
134             sub _stop {
135 0     0     warn "[$$] Stopping\n" if DEBUGGING;
136             }
137              
138             sub spawn_child {
139 0     0 0   my ($kernel, $heap) = @_[KERNEL, HEAP];
140 0           my $pids = $heap->{pids};
141 0           my $wheels = $heap->{wheels};
142              
143             # Our pipe, for IPC
144 0           my ($parent_read, $parent_write, $child_read, $child_write) = POE::Pipe::TwoWay->new();
145              
146             # Build Filter and Driver before we fork, saves cycles later and reduces linecount
147 0           my $filter = POE::Filter::Block->new( BlockSize => 1 );
148 0           my $driver = POE::Driver::SysRW->new();
149              
150 0           $_[KERNEL]->sig( 'CHLD', 'sigchld' );
151            
152             # Where once was one, there are now two.
153 0           my $fork = fork();
154              
155 0 0         if (!defined( $fork )) {
    0          
156 0           warn( "[POE::Component::Forkmanager] Fork Failed: $!\n" );
157             }
158             elsif( $fork ) {
159             # Parent process
160 0           my $wheel = $pids->{$fork} = POE::Wheel::ReadWrite->new(
161             InputHandle => $parent_read,
162             OutputHandle => $parent_write,
163             Driver => $driver,
164             Filter => $filter,
165             InputEvent => 'parent_input',
166             );
167 0           $wheels->{$wheel->ID} = $fork;
168              
169             # A new process is counted as idle, because we don't want to
170             # end up forking and flooding the machine.
171 0           $heap->{idle}->{$fork} = 1;
172              
173             # TODO make child startup tracking more robust. Push startup pids
174             # on an array, add a delay event to forcibly kill them if they haven't
175             # signaled that they are ready soon enough. Possibly refactor this into
176             # a general 'watchdog' design.
177 0           return 0;
178             }
179             else {
180             # Child process
181 0           $kernel->sig( 'CHLD' );
182 0           $kernel->state( 'sigchld' );
183 0           $kernel->state( 'spawn_child' ); # Prevent children from forking themselves.
184 0           $kernel->state( 'check_idle' );
185              
186 0           my $callbacks = $heap->{callbacks};
187            
188 0           %$heap = (
189             callbacks => $callbacks,
190             );
191            
192 0           $heap->{write} = POE::Wheel::ReadWrite->new(
193             InputHandle => $child_read,
194             OutputHandle => $child_write,
195             Driver => $driver,
196             Filter => $filter,
197             InputEvent => 'child_input',
198             FlushedEvent => 'child_flushed',
199             );
200              
201 0 0         if (exists( $callbacks->{child_fork} )) {
202 0           $callbacks->{child_fork}->();
203             }
204            
205 0           return 1;
206             }
207             }
208              
209             sub sigchld {
210 0     0 0   my ($kernel, $session, $heap, $pid) = @_[KERNEL, SESSION, HEAP, ARG1];
211 0           my $pids = $heap->{pids};
212 0           my $wheels = $heap->{wheels};
213              
214 0 0         if (exists( $pids->{$pid} )) {
215 0           my $idle = $heap->{idle};
216 0 0         if (exists( $idle->{$pid} )) {
217 0           delete( $idle->{$pid} );
218 0           $kernel->call( $session, 'check_idle' );
219             }
220 0           my $wheel = delete( $pids->{$pid} );
221 0           delete( $wheels->{$wheel->ID} );
222 0           $kernel->sig_handled();
223             }
224 0 0         $kernel->sig( 'CHLD' ) unless (keys( %$pids ));
225             }
226              
227             sub parent_input {
228 0     0 0   my ($kernel, $session, $heap, $input, $wheel_id) = @_[KERNEL, SESSION, HEAP, ARG0, ARG1];
229 0           my $idle = $heap->{idle};
230            
231 0           my $pid = $heap->{wheels}->{$wheel_id};
232              
233 0           my $now_idle = $input; # This line defines true to mean idle, false to mean busy
234 0           my $previously_idle = exists( $idle->{$pid} );
235            
236 0 0 0       if ($now_idle xor $previously_idle) { # we're changing state
237 0 0         if ($now_idle) {
238 0           $idle->{$pid} = 1;
239             }
240             else {
241 0           delete( $idle->{$pid} );
242             }
243 0           $kernel->call( $session, 'check_idle' );
244             }
245             }
246              
247             sub child_input {
248 0     0 0   my ($kernel, $heap, $input) = @_[KERNEL, HEAP, ARG0];
249 0           my $callbacks = delete( $heap->{callbacks} );
250 0           delete( $heap->{write} );
251              
252 0 0         if ($input) {
253 0 0         if (exists( $callbacks->{child_shutdown} )) {
254 0           $callbacks->{child_shutdown}->();
255             }
256             }
257             }
258              
259             sub child_flushed {
260 0     0 0   delete $_[HEAP]->{amulet};
261             }
262              
263             sub check_idle {
264 0     0 0   my ($kernel, $session, $heap) = @_[KERNEL, SESSION, HEAP];
265              
266 0           my $idle = $heap->{idle};
267              
268 0           my $max_idle = $heap->{max_idle};
269 0           my $min_idle = $heap->{min_idle};
270              
271 0           my $idle_count = keys( %$idle );
272              
273 0 0         if ($idle_count > $max_idle) {
    0          
274 0           my $count = $idle_count - $max_idle;
275              
276 0           while ($count-- > 0) {
277 0           my ($pid, undef) = each %$idle;
278            
279 0           $heap->{pids}->{$pid}->put( '1' );
280             }
281            
282             # TODO Remove from idle count because it should be shutting down.
283             # TODO Push onto an array, add a delay event to forcibly kill off in a few seconds.
284             # TODO cleanup array and delay event in sigchld above
285             }
286             elsif ( $idle_count < $min_idle) {
287 0           my $count = $min_idle - $idle_count;
288            
289 0           foreach (1..$count) {
290 0 0         return if $kernel->call( $session, 'spawn_child' );
291             }
292             }
293             }
294              
295             1;
296             __END__