File Coverage

blib/lib/Win32/ProcFarm/Pool.pm
Criterion Covered Total %
statement 1 3 33.3
branch n/a
condition n/a
subroutine 1 1 100.0
pod n/a
total 2 4 50.0


line stmt bran cond sub pod time code
1             #############################################################################
2             #
3             # Win32::ProcFarm::Pool - manages a pool of child processes
4             #
5             # Author: Toby Everett
6             # Revision: 2.15
7             # Last Change: Update to support max_rate and result_sub
8             #############################################################################
9             # Copyright 1999, 2000, 2001 Toby Everett. All rights reserved.
10             #
11             # This file is distributed under the Artistic License. See
12             # http://www.ActiveState.com/corporate/artistic_license.htm or
13             # the license that comes with your perl distribution.
14             #
15             # For comments, questions, bugs or general interest, feel free to
16             # contact Toby Everett at teverett@alascom.att.com
17             #############################################################################
18            
19            
20             =head1 NAME
21            
22             Win32::ProcFarm::Pool - manages a pool of child processes
23            
24             =head1 SYNOPSIS
25            
26             use Win32::ProcFarm::Pool;
27            
28             $Pool = Win32::ProcFarm::Pool->new($poolsize, $portnum, $scriptname, Win32::GetCwd);
29            
30             foreach $i (@list) {
31             $Pool->add_waiting_job($i, 'child_sub', $i);
32             }
33            
34             $Pool->do_all_jobs(0.1);
35            
36             %ping_data = $Pool->get_return_data;
37             $Pool->clear_return_data;
38            
39             foreach $i (@list) {
40             print "$i:\t$ping_data{$i}->[0]\n";
41             }
42            
43             =head1 DESCRIPTION
44            
45             =head2 Installation instructions
46            
47             This installs with MakeMaker as part of Win32::ProcFarm.
48            
49             To install via MakeMaker, it's the usual procedure - download from CPAN,
50             extract, type "perl Makefile.PL", "nmake" then "nmake install". Don't
51             do an "nmake test" because the I haven't written a test suite yet.
52            
53             =head2 More usage instructions
54            
55             See C for more information.
56            
57             =head1 METHODS
58            
59             =cut
60            
61 1     1   4145 use Win32::ProcFarm::Parent;
  0            
  0            
62             use Win32::ProcFarm::Port;
63            
64             package Win32::ProcFarm::Pool;
65            
66             use strict;
67             use vars qw($VERSION @ISA);
68            
69             $VERSION = '2.15';
70            
71             =head2 new
72            
73             The C method creates a new C object (amazing, eh!).
74             It takes 5 parameters:
75            
76             =over 4
77            
78             =item $num_threads
79            
80             This indicates the number of threads that should be created.
81            
82             =item $port_num
83            
84             This indicates the port number to use for the listener.
85            
86             =item $script
87            
88             The script name to execute for the child processes.
89            
90             =item $curdir
91            
92             The working directory to use when running the script. If this is the same
93             directory the script is in, the script name can be specified without a path.
94            
95             =item %options
96            
97             A hash of options. The current options are:
98            
99             =over 4
100            
101             =item timeout
102            
103             Indicates how long jobs should be allowed to execute before they are deemed to
104             have blocked. Blocked jobs will be terminated and a new process created to take
105             their place.
106            
107             =item listeners
108            
109             Indicates how many listeners should be allocated on the port object. During
110             thread instantiation, this controls how many unconnected threads can be spun
111             off. For optimum thread creation speed, this should be set to one more than
112             the number of processors. By default, this is set to three. Setting this to
113             too high a value does not appear to have much effect on the overall thread
114             creation rate, but setting it to two low a value (such as one) could have a
115             dramatic effect on the thread creation rate for multiprocessor machines.
116            
117             =item result_sub
118            
119             If specified, the attached subroutine will be called as soon as each job
120             finishes executing. The subroutine will be passed the key name and then return
121             values. This allows for asynchronous reponses to job execution, rather than
122             having to wait for the entire pool to finish running before operating on the
123             results.
124            
125             =back
126            
127             =back
128            
129             =cut
130            
131             sub new {
132             my $class = shift;
133            
134             my($num_threads, $port_num, $script, $curdir, %options) = @_;
135             my $self = {
136             'num_threads' => 0,
137             'port_obj' => undef,
138             'thread_pool' => [],
139             'waiting_pool' => [],
140             'ondeck_pool' => [],
141             'return_data' => {},
142             'script' => $script,
143             'curdir' => $curdir,
144             };
145            
146             foreach my $i (qw(timeout listeners result_sub)) {
147             exists $options{$i} and $self->{$i} = $options{$i};
148             }
149            
150             $self->{listeners} ||= 3;
151            
152             $self->{port_obj} = Win32::ProcFarm::Port->new($port_num, $self->{listeners});
153            
154             bless $self, $class;
155            
156             $self->add_threads($num_threads);
157             return $self;
158             }
159            
160             =head2 add_threads
161            
162             The C method call adds additional threads to a pool. The only
163             accepted parameter is the number of new threads to add.
164            
165             =cut
166            
167             sub add_threads {
168             my $self = shift;
169            
170             my($add_threads) = @_;
171            
172             $add_threads >= 0 or die "Attempt to delete threads via Win32::ProcFarm::Pool::add_threads.\n";
173            
174             my(@temp);
175             foreach my $i (0..$self->{listeners}-1) {
176             $add_threads or last;
177            
178             my $temp = Win32::ProcFarm::Parent->new_async($self->{port_obj}, $self->{script}, $self->{curdir}, $self->{timeout});
179             push(@{$self->{thread_pool}}, {
180             'key' => undef,
181             'Parent' => $temp
182             });
183             push(@temp, $temp);
184            
185             $add_threads--;
186             $self->{num_threads}++;
187             }
188            
189             while (my $temp = shift @temp) {
190             $temp->connect;
191             $add_threads or next;
192            
193             my $temp = Win32::ProcFarm::Parent->new_async($self->{port_obj}, $self->{script}, $self->{curdir}, $self->{timeout});
194             push(@{$self->{thread_pool}}, {
195             'key' => undef,
196             'Parent' => $temp
197             });
198             push(@temp, $temp);
199            
200             $add_threads--;
201             $self->{num_threads}++;
202             }
203             }
204            
205             =head2 min_threads
206            
207             The C increases the number of threads in the pool to the specified
208             value. If there are more threads in the pool that the specified value, the
209             number of threads in the pool is unchanged.
210            
211             =cut
212            
213             sub min_threads {
214             my $self = shift;
215            
216             my($num_threads) = @_;
217            
218             if ($num_threads >= $self->{num_threads}) {
219             $self->add_threads($num_threads - $self->{num_threads});
220             }
221             }
222            
223             =head2 add_waiting_job
224            
225             The C method adds a job to the waiting pool. It takes three parameters:
226            
227             =over 4
228            
229             =item $key
230            
231             This should be a unique identifier that will be used to retrieve the return values from the
232             return data hash.
233            
234             =item $command
235            
236             The name of the subroutine that the child process will execute.
237            
238             =item @params
239            
240             A list of parameters for that subroutine.
241            
242             =back
243            
244             =cut
245            
246             sub add_waiting_job {
247             my $self = shift;
248             my($key, $command, @params) = @_;
249            
250             push(@{$self->{waiting_pool}}, {'key' => $key, 'command' => $command, 'params' => [@params]});
251             }
252            
253             =head2 do_all_jobs
254            
255             The C command will execute all the jobs in the waiting pool. The
256             first passed parameter specifies the number of seconds to wait between sweeps
257             through the thread pool to check for completed jobs. The number of seconds can
258             be fractional (i.e. 0.1 for a tenth of a second). The second passed parameter
259             specifies the minimum interval between jobs becoming eligible to run.
260            
261             =cut
262            
263             sub do_all_jobs {
264             my $self = shift;
265             my($sleep, $intvl) = @_;
266            
267             if ($intvl) {
268             push(@{$self->{ondeck_pool}}, @{$self->{waiting_pool}});
269             @{$self->{waiting_pool}} = ();
270             }
271            
272             my $start_time = time();
273             my $count = 0;
274            
275             while ($self->count_ondeck + $self->count_waiting + $self->count_running) {
276             if ($intvl) {
277             while ((time()-$start_time)/$intvl > $count) {
278             push(@{$self->{waiting_pool}}, shift(@{$self->{ondeck_pool}}));
279             $count++;
280             }
281             }
282             $self->cleanse_and_dispatch;
283             $sleep and Win32::Sleep($sleep*1000);
284             }
285             }
286            
287             =head2 get_return_data
288            
289             Return the return_data hash, indexed on the unique key passed initially.
290            
291             =cut
292            
293             sub get_return_data {
294             my $self = shift;
295            
296             return (%{$self->{return_data}});
297             }
298            
299             =head2 clear_return_data
300            
301             Clears out the return_data hash.
302            
303             =cut
304            
305             sub clear_return_data {
306             my $self = shift;
307            
308             $self->{return_data} = {};
309             }
310            
311             =head1 INTERNAL METHODS
312            
313             These methods are considered internal methods. Child classes of Win32::ProcFarm::Pool may modify
314             these methods in order to change the behavior of the resultant Pool object.
315            
316             =cut
317            
318             sub count_waiting {
319             my $self = shift;
320            
321             return scalar(@{$self->{waiting_pool}});
322             }
323            
324             sub count_ondeck {
325             my $self = shift;
326            
327             return scalar(@{$self->{ondeck_pool}});
328             }
329            
330             sub count_running {
331             my $self = shift;
332            
333             return scalar(grep {$_->{Parent}->get_state ne 'idle'} @{$self->{thread_pool}});
334             }
335            
336            
337            
338             sub cleanse_pool {
339             my $self = shift;
340            
341             my $retval;
342            
343             foreach my $i (@{$self->{thread_pool}}) {
344             $retval += $self->cleanse_thread($i);
345             }
346             return $retval;
347             }
348            
349             sub dispatch_jobs {
350             my $self = shift;
351            
352             my $retval;
353            
354             foreach my $i (@{$self->{thread_pool}}) {
355             $retval += $self->dispatch_job($i);
356             }
357            
358             return $retval;
359             }
360            
361             sub cleanse_and_dispatch {
362             my $self = shift;
363            
364             my($retval_c, $retval_d, $job);
365            
366             foreach my $i (@{$self->{thread_pool}}) {
367             $retval_c += $self->cleanse_thread($i);
368             $retval_d += $self->dispatch_job($i);
369             }
370            
371             return ($retval_c, $retval_d);
372             }
373            
374            
375            
376             sub cleanse_thread {
377             my $self = shift;
378             my($thread) = @_;
379            
380             $thread->{Parent}->get_state eq 'fin' or return 0;
381            
382             my $temp = $self->{return_data}->{$thread->{key}} = [$thread->{Parent}->get_retval];
383            
384             if (ref($self->{result_sub}) eq 'CODE') {
385             $self->{result_sub}->($thread->{key}, @{$temp});
386             }
387            
388             $thread->{key} = undef;
389             return 1;
390             }
391            
392             sub dispatch_job {
393             my $self = shift;
394             my($thread) = @_;
395            
396             $thread->{Parent}->get_state eq 'idle' or return 0;
397             my $job = $self->get_next_job() or return 0;
398             $thread->{Parent}->execute($job->{command}, @{$job->{params}});
399             $thread->{key} = $job->{key};
400             return 1;
401             }
402            
403             sub get_next_job {
404             my $self = shift;
405            
406             return shift(@{$self->{waiting_pool}});
407             }
408            
409             1;