line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
1
|
9
|
|
|
9
|
|
470549
|
use 5.008; use warnings; use strict; |
|
9
|
|
|
9
|
|
69
|
|
|
9
|
|
|
9
|
|
46
|
|
|
9
|
|
|
|
|
15
|
|
|
9
|
|
|
|
|
227
|
|
|
9
|
|
|
|
|
38
|
|
|
9
|
|
|
|
|
16
|
|
|
9
|
|
|
|
|
316
|
|
2
|
|
|
|
|
|
|
|
3
|
|
|
|
|
|
|
package Parallel::Iterator; |
4
|
|
|
|
|
|
|
|
5
|
9
|
|
|
9
|
|
53
|
use Carp; |
|
9
|
|
|
|
|
21
|
|
|
9
|
|
|
|
|
559
|
|
6
|
9
|
|
|
9
|
|
5020
|
use Storable qw( store_fd fd_retrieve dclone ); |
|
9
|
|
|
|
|
26190
|
|
|
9
|
|
|
|
|
638
|
|
7
|
9
|
|
|
9
|
|
4583
|
use IO::Handle; |
|
9
|
|
|
|
|
49485
|
|
|
9
|
|
|
|
|
433
|
|
8
|
9
|
|
|
9
|
|
4612
|
use IO::Select; |
|
9
|
|
|
|
|
12853
|
|
|
9
|
|
|
|
|
446
|
|
9
|
9
|
|
|
9
|
|
67
|
use Config; |
|
9
|
|
|
|
|
20
|
|
|
9
|
|
|
|
|
409
|
|
10
|
|
|
|
|
|
|
|
11
|
|
|
|
|
|
|
our $VERSION = '1.002'; |
12
|
|
|
|
|
|
|
|
13
|
9
|
|
|
9
|
|
46
|
use Exporter (); *import = \&Exporter::import; |
|
9
|
|
|
|
|
18
|
|
|
9
|
|
|
|
|
627
|
|
14
|
|
|
|
|
|
|
our @EXPORT_OK = qw( iterate iterate_as_array iterate_as_hash ); |
15
|
|
|
|
|
|
|
|
16
|
9
|
|
|
9
|
|
58
|
use constant IS_WIN32 => ( $^O =~ /^(MS)?Win32$/ ); |
|
9
|
|
|
|
|
17
|
|
|
9
|
|
|
|
|
6059
|
|
17
|
|
|
|
|
|
|
|
18
|
|
|
|
|
|
|
my %DEFAULTS = ( |
19
|
|
|
|
|
|
|
workers => ( ( $Config{d_fork} && !IS_WIN32 ) ? 10 : 0 ), |
20
|
|
|
|
|
|
|
onerror => 'die', |
21
|
|
|
|
|
|
|
nowarn => 0, |
22
|
|
|
|
|
|
|
batch => 1, |
23
|
|
|
|
|
|
|
adaptive => 0, |
24
|
|
|
|
|
|
|
); |
25
|
|
|
|
|
|
|
|
26
|
|
|
|
|
|
|
=head1 NAME |
27
|
|
|
|
|
|
|
|
28
|
|
|
|
|
|
|
Parallel::Iterator - Simple parallel execution |
29
|
|
|
|
|
|
|
|
30
|
|
|
|
|
|
|
=head1 SYNOPSIS |
31
|
|
|
|
|
|
|
|
32
|
|
|
|
|
|
|
use Parallel::Iterator qw( iterate ); |
33
|
|
|
|
|
|
|
|
34
|
|
|
|
|
|
|
# A very expensive way to double 100 numbers... |
35
|
|
|
|
|
|
|
|
36
|
|
|
|
|
|
|
my @nums = ( 1 .. 100 ); |
37
|
|
|
|
|
|
|
|
38
|
|
|
|
|
|
|
my $iter = iterate( sub { |
39
|
|
|
|
|
|
|
my ( $id, $job ) = @_; |
40
|
|
|
|
|
|
|
return $job * 2; |
41
|
|
|
|
|
|
|
}, \@nums ); |
42
|
|
|
|
|
|
|
|
43
|
|
|
|
|
|
|
my @out = (); |
44
|
|
|
|
|
|
|
while ( my ( $index, $value ) = $iter->() ) { |
45
|
|
|
|
|
|
|
$out[$index] = $value; |
46
|
|
|
|
|
|
|
} |
47
|
|
|
|
|
|
|
|
48
|
|
|
|
|
|
|
The C |
49
|
|
|
|
|
|
|
each element in a list, returning a new list containing the |
50
|
|
|
|
|
|
|
transformed elements. |
51
|
|
|
|
|
|
|
|
52
|
|
|
|
|
|
|
=head1 DESCRIPTION |
53
|
|
|
|
|
|
|
|
54
|
|
|
|
|
|
|
This module provides a 'parallel map'. Multiple worker processes are |
55
|
|
|
|
|
|
|
forked so that many instances of the transformation function may be |
56
|
|
|
|
|
|
|
executed simultaneously. |
57
|
|
|
|
|
|
|
|
58
|
|
|
|
|
|
|
For time consuming operations, particularly operations that spend most |
59
|
|
|
|
|
|
|
of their time waiting for I/O, this is a big performance win. It also |
60
|
|
|
|
|
|
|
provides a simple idiom to make effective use of multi CPU systems. |
61
|
|
|
|
|
|
|
|
62
|
|
|
|
|
|
|
There is, however, a considerable overhead associated with forking, so |
63
|
|
|
|
|
|
|
the example in the synopsis (doubling a list of numbers) is I a |
64
|
|
|
|
|
|
|
sensible use of this module. |
65
|
|
|
|
|
|
|
|
66
|
|
|
|
|
|
|
=head1 MANUAL |
67
|
|
|
|
|
|
|
|
68
|
|
|
|
|
|
|
=head2 Basic Usage |
69
|
|
|
|
|
|
|
|
70
|
|
|
|
|
|
|
Imagine you have an array of URLs to fetch: |
71
|
|
|
|
|
|
|
|
72
|
|
|
|
|
|
|
my @urls = qw( |
73
|
|
|
|
|
|
|
http://google.com/ |
74
|
|
|
|
|
|
|
http://hexten.net/ |
75
|
|
|
|
|
|
|
http://search.cpan.org/ |
76
|
|
|
|
|
|
|
... and lots more ... |
77
|
|
|
|
|
|
|
); |
78
|
|
|
|
|
|
|
|
79
|
|
|
|
|
|
|
Write a function that retrieves a URL and returns its contents or undef |
80
|
|
|
|
|
|
|
if it can't be fetched: |
81
|
|
|
|
|
|
|
|
82
|
|
|
|
|
|
|
sub fetch { |
83
|
|
|
|
|
|
|
my ($id, $url) = @_; |
84
|
|
|
|
|
|
|
my $resp = $ua->get($url); |
85
|
|
|
|
|
|
|
return unless $resp->is_success; |
86
|
|
|
|
|
|
|
return $resp->content; |
87
|
|
|
|
|
|
|
}; |
88
|
|
|
|
|
|
|
|
89
|
|
|
|
|
|
|
Now write a function to synthesize a special kind of iterator: |
90
|
|
|
|
|
|
|
|
91
|
|
|
|
|
|
|
sub list_iter { |
92
|
|
|
|
|
|
|
my @ar = @_; |
93
|
|
|
|
|
|
|
my $pos = 0; |
94
|
|
|
|
|
|
|
return sub { |
95
|
|
|
|
|
|
|
return if $pos >= @ar; |
96
|
|
|
|
|
|
|
my @r = ( $pos, $ar[$pos] ); # Note: returns ( index, value ) |
97
|
|
|
|
|
|
|
$pos++; |
98
|
|
|
|
|
|
|
return @r; |
99
|
|
|
|
|
|
|
}; |
100
|
|
|
|
|
|
|
} |
101
|
|
|
|
|
|
|
|
102
|
|
|
|
|
|
|
The returned iterator will return each element of the array in turn and |
103
|
|
|
|
|
|
|
then undef. Actually it returns both the index I the value of each |
104
|
|
|
|
|
|
|
element in the array. Because multiple instances of the transformation |
105
|
|
|
|
|
|
|
function execute in parallel the results won't necessarily come back in |
106
|
|
|
|
|
|
|
order. The array index will later allow us to put completed items in the |
107
|
|
|
|
|
|
|
correct place in an output array. |
108
|
|
|
|
|
|
|
|
109
|
|
|
|
|
|
|
Get an iterator for the list of URLs: |
110
|
|
|
|
|
|
|
|
111
|
|
|
|
|
|
|
my $url_iter = list_iter( @urls ); |
112
|
|
|
|
|
|
|
|
113
|
|
|
|
|
|
|
Then wrap it in another iterator which will return the transformed results: |
114
|
|
|
|
|
|
|
|
115
|
|
|
|
|
|
|
my $page_iter = iterate( \&fetch, $url_iter ); |
116
|
|
|
|
|
|
|
|
117
|
|
|
|
|
|
|
Finally loop over the returned iterator storing results: |
118
|
|
|
|
|
|
|
|
119
|
|
|
|
|
|
|
my @out = ( ); |
120
|
|
|
|
|
|
|
while ( my ( $index, $value ) = $page_iter->() ) { |
121
|
|
|
|
|
|
|
$out[$index] = $value; |
122
|
|
|
|
|
|
|
} |
123
|
|
|
|
|
|
|
|
124
|
|
|
|
|
|
|
Behind the scenes your program forked into ten (by default) instances of |
125
|
|
|
|
|
|
|
itself and executed the page requests in parallel. |
126
|
|
|
|
|
|
|
|
127
|
|
|
|
|
|
|
=head2 Simpler interfaces |
128
|
|
|
|
|
|
|
|
129
|
|
|
|
|
|
|
Having to construct an iterator is a pain so C is smart enough |
130
|
|
|
|
|
|
|
to do that for you. Instead of passing an iterator just pass a reference |
131
|
|
|
|
|
|
|
to the array: |
132
|
|
|
|
|
|
|
|
133
|
|
|
|
|
|
|
my $page_iter = iterate( \&fetch, \@urls ); |
134
|
|
|
|
|
|
|
|
135
|
|
|
|
|
|
|
If you pass a hash reference the iterator you get back will return key, |
136
|
|
|
|
|
|
|
value pairs: |
137
|
|
|
|
|
|
|
|
138
|
|
|
|
|
|
|
my $some_iter = iterate( \&fetch, \%some_hash ); |
139
|
|
|
|
|
|
|
|
140
|
|
|
|
|
|
|
If the returned iterator is inconvenient you can get back a hash or |
141
|
|
|
|
|
|
|
array instead: |
142
|
|
|
|
|
|
|
|
143
|
|
|
|
|
|
|
my @done = iterate_as_array( \&fetch, \@urls ); |
144
|
|
|
|
|
|
|
|
145
|
|
|
|
|
|
|
my %done = iterate_as_hash( \&worker, \%jobs ); |
146
|
|
|
|
|
|
|
|
147
|
|
|
|
|
|
|
=head2 How It Works |
148
|
|
|
|
|
|
|
|
149
|
|
|
|
|
|
|
The current process is forked once for each worker. Each forked child is |
150
|
|
|
|
|
|
|
connected to the parent by a pair of pipes. The child's STDIN, STDOUT |
151
|
|
|
|
|
|
|
and STDERR are unaffected. |
152
|
|
|
|
|
|
|
|
153
|
|
|
|
|
|
|
Input values are serialised (using Storable) and passed to the workers. |
154
|
|
|
|
|
|
|
Completed work items are serialised and returned. |
155
|
|
|
|
|
|
|
|
156
|
|
|
|
|
|
|
=head2 Caveats |
157
|
|
|
|
|
|
|
|
158
|
|
|
|
|
|
|
Parallel::Iterator is designed to be simple to use - but the underlying |
159
|
|
|
|
|
|
|
forking of the main process can cause mystifying problems unless you |
160
|
|
|
|
|
|
|
have an understanding of what is going on behind the scenes. |
161
|
|
|
|
|
|
|
|
162
|
|
|
|
|
|
|
=head3 Worker execution enviroment |
163
|
|
|
|
|
|
|
|
164
|
|
|
|
|
|
|
All code apart from the worker subroutine executes in the parent process |
165
|
|
|
|
|
|
|
as normal. The worker executes in a forked instance of the parent |
166
|
|
|
|
|
|
|
process. That means that things like this won't work as expected: |
167
|
|
|
|
|
|
|
|
168
|
|
|
|
|
|
|
my %tally = (); |
169
|
|
|
|
|
|
|
my @r = iterate_as_array( sub { |
170
|
|
|
|
|
|
|
my ($id, $name) = @_; |
171
|
|
|
|
|
|
|
$tally{$name}++; # might not do what you think it does |
172
|
|
|
|
|
|
|
return reverse $name; |
173
|
|
|
|
|
|
|
}, \@names ); |
174
|
|
|
|
|
|
|
|
175
|
|
|
|
|
|
|
# Now print out the tally... |
176
|
|
|
|
|
|
|
while ( my ( $name, $count ) = each %tally ) { |
177
|
|
|
|
|
|
|
printf("%5d : %s\n", $count, $name); |
178
|
|
|
|
|
|
|
} |
179
|
|
|
|
|
|
|
|
180
|
|
|
|
|
|
|
Because the worker is a closure it can see the C<%tally> hash from its |
181
|
|
|
|
|
|
|
enclosing scope; but because it's running in a forked clone of the parent |
182
|
|
|
|
|
|
|
process it modifies its own copy of C<%tally> rather than the copy for |
183
|
|
|
|
|
|
|
the parent process. |
184
|
|
|
|
|
|
|
|
185
|
|
|
|
|
|
|
That means that after the job terminates the C<%tally> in the parent |
186
|
|
|
|
|
|
|
process will be empty. |
187
|
|
|
|
|
|
|
|
188
|
|
|
|
|
|
|
In general you should avoid side effects in your worker subroutines. |
189
|
|
|
|
|
|
|
|
190
|
|
|
|
|
|
|
=head3 Serialization |
191
|
|
|
|
|
|
|
|
192
|
|
|
|
|
|
|
Values are serialised using L to pass to the worker subroutine |
193
|
|
|
|
|
|
|
and results from the worker are again serialised before being passed |
194
|
|
|
|
|
|
|
back. Be careful what your values refer to: everything has to be |
195
|
|
|
|
|
|
|
serialised. If there's an indirect way to reach a large object graph |
196
|
|
|
|
|
|
|
Storable will find it and performance will suffer. |
197
|
|
|
|
|
|
|
|
198
|
|
|
|
|
|
|
To find out how large your serialised values are serialise one of them |
199
|
|
|
|
|
|
|
and check its size: |
200
|
|
|
|
|
|
|
|
201
|
|
|
|
|
|
|
use Storable qw( freeze ); |
202
|
|
|
|
|
|
|
my $serialized = freeze $some_obj; |
203
|
|
|
|
|
|
|
print length($serialized), " bytes\n"; |
204
|
|
|
|
|
|
|
|
205
|
|
|
|
|
|
|
In your tests you may wish to guard against the possibility of a change |
206
|
|
|
|
|
|
|
to the structure of your values resulting in a sudden increase in |
207
|
|
|
|
|
|
|
serialized size: |
208
|
|
|
|
|
|
|
|
209
|
|
|
|
|
|
|
ok length(freeze $some_obj) < 1000, "Object too bulky?"; |
210
|
|
|
|
|
|
|
|
211
|
|
|
|
|
|
|
See the documetation for L for other caveats. |
212
|
|
|
|
|
|
|
|
213
|
|
|
|
|
|
|
=head3 Performance |
214
|
|
|
|
|
|
|
|
215
|
|
|
|
|
|
|
Process forking is expensive. Only use Parallel::Iterator in cases where: |
216
|
|
|
|
|
|
|
|
217
|
|
|
|
|
|
|
=over |
218
|
|
|
|
|
|
|
|
219
|
|
|
|
|
|
|
=item the worker waits for I/O |
220
|
|
|
|
|
|
|
|
221
|
|
|
|
|
|
|
The case of fetching web pages is a good example of this. Fetching a |
222
|
|
|
|
|
|
|
page with LWP::UserAgent may take as long as a few seconds but probably |
223
|
|
|
|
|
|
|
consumes only a few milliseconds of processor time. Running many |
224
|
|
|
|
|
|
|
requests in parallel is a huge win - but be kind to the server you're |
225
|
|
|
|
|
|
|
talking to: don't launch a lot of parallel requests unless it's your |
226
|
|
|
|
|
|
|
server or you know it can handle the load. |
227
|
|
|
|
|
|
|
|
228
|
|
|
|
|
|
|
=item the worker is CPU intensive and you have multiple cores / CPUs |
229
|
|
|
|
|
|
|
|
230
|
|
|
|
|
|
|
If the worker is doing an expensive calculation you can parallelise that |
231
|
|
|
|
|
|
|
across multiple CPU cores. Benchmark first though. There's a |
232
|
|
|
|
|
|
|
considerable overhead associated with Parallel::Iterator; unless your |
233
|
|
|
|
|
|
|
calculations are time consuming that overhead will dwarf whatever time |
234
|
|
|
|
|
|
|
they take. |
235
|
|
|
|
|
|
|
|
236
|
|
|
|
|
|
|
=back |
237
|
|
|
|
|
|
|
|
238
|
|
|
|
|
|
|
=head1 INTERFACE |
239
|
|
|
|
|
|
|
|
240
|
|
|
|
|
|
|
=head2 C<< iterate( [ $options ], $worker, $iterator ) >> |
241
|
|
|
|
|
|
|
|
242
|
|
|
|
|
|
|
Get an iterator that applies the supplied transformation function to |
243
|
|
|
|
|
|
|
each value returned by the input iterator. |
244
|
|
|
|
|
|
|
|
245
|
|
|
|
|
|
|
Instead of an iterator you may pass an array or hash reference and |
246
|
|
|
|
|
|
|
C will convert it internally into a suitable iterator. |
247
|
|
|
|
|
|
|
|
248
|
|
|
|
|
|
|
If you are doing this you may wish to investigate C and |
249
|
|
|
|
|
|
|
C. |
250
|
|
|
|
|
|
|
|
251
|
|
|
|
|
|
|
=head3 Options |
252
|
|
|
|
|
|
|
|
253
|
|
|
|
|
|
|
A reference to a hash of options may be supplied as the first argument. |
254
|
|
|
|
|
|
|
The following options are supported: |
255
|
|
|
|
|
|
|
|
256
|
|
|
|
|
|
|
=over |
257
|
|
|
|
|
|
|
|
258
|
|
|
|
|
|
|
=item C |
259
|
|
|
|
|
|
|
|
260
|
|
|
|
|
|
|
The number of concurrent processes to launch. Set this to 0 to disable |
261
|
|
|
|
|
|
|
forking. Defaults to 10 on systems that support fork and 0 (disable |
262
|
|
|
|
|
|
|
forking) on those that do not. |
263
|
|
|
|
|
|
|
|
264
|
|
|
|
|
|
|
=item C |
265
|
|
|
|
|
|
|
|
266
|
|
|
|
|
|
|
Normally C will issue a warning and fall back to single process |
267
|
|
|
|
|
|
|
mode on systems on which fork is not available. This option supresses |
268
|
|
|
|
|
|
|
that warning. |
269
|
|
|
|
|
|
|
|
270
|
|
|
|
|
|
|
=item C |
271
|
|
|
|
|
|
|
|
272
|
|
|
|
|
|
|
Ordinarily items are passed to the worker one at a time. If you are |
273
|
|
|
|
|
|
|
processing a large number of items it may be more efficient to process |
274
|
|
|
|
|
|
|
them in batches. Specify the batch size using this option. |
275
|
|
|
|
|
|
|
|
276
|
|
|
|
|
|
|
Batching is transparent from the caller's perspective. Internally it |
277
|
|
|
|
|
|
|
modifies the iterators and worker (by wrapping them in additional |
278
|
|
|
|
|
|
|
closures) so that they pack, process and unpack chunks of work. |
279
|
|
|
|
|
|
|
|
280
|
|
|
|
|
|
|
=item C |
281
|
|
|
|
|
|
|
|
282
|
|
|
|
|
|
|
Extending the idea of batching a number of work items to amortize the |
283
|
|
|
|
|
|
|
overhead of passing work to and from parallel workers you may also ask |
284
|
|
|
|
|
|
|
C to heuristically determine the batch size by setting the |
285
|
|
|
|
|
|
|
C option to a numeric value. |
286
|
|
|
|
|
|
|
|
287
|
|
|
|
|
|
|
The batch size will be computed as |
288
|
|
|
|
|
|
|
|
289
|
|
|
|
|
|
|
/ / |
290
|
|
|
|
|
|
|
|
291
|
|
|
|
|
|
|
A larger value for C will reduce the rate at which the batch |
292
|
|
|
|
|
|
|
size increases. Good values tend to be in the range 1 to 2. |
293
|
|
|
|
|
|
|
|
294
|
|
|
|
|
|
|
You can also specify lower and, optionally, upper bounds on the batch |
295
|
|
|
|
|
|
|
size by passing an reference to an array containing ( lower bound, |
296
|
|
|
|
|
|
|
growth ratio, upper bound ). The upper bound may be omitted. |
297
|
|
|
|
|
|
|
|
298
|
|
|
|
|
|
|
my $iter = iterate( |
299
|
|
|
|
|
|
|
{ adaptive => [ 5, 2, 100 ] }, |
300
|
|
|
|
|
|
|
$worker, \@stuff ); |
301
|
|
|
|
|
|
|
|
302
|
|
|
|
|
|
|
=item C |
303
|
|
|
|
|
|
|
|
304
|
|
|
|
|
|
|
The action to take when an error is thrown in the iterator. Possible |
305
|
|
|
|
|
|
|
values are 'die', 'warn' or a reference to a subroutine that will be |
306
|
|
|
|
|
|
|
called with the index of the job that threw the exception and the value |
307
|
|
|
|
|
|
|
of C<$@> thrown. |
308
|
|
|
|
|
|
|
|
309
|
|
|
|
|
|
|
iterate( { |
310
|
|
|
|
|
|
|
onerror => sub { |
311
|
|
|
|
|
|
|
my ($id, $err) = @_; |
312
|
|
|
|
|
|
|
$self->log( "Error for index $id: $err" ); |
313
|
|
|
|
|
|
|
}, |
314
|
|
|
|
|
|
|
$worker, |
315
|
|
|
|
|
|
|
\@jobs |
316
|
|
|
|
|
|
|
); |
317
|
|
|
|
|
|
|
|
318
|
|
|
|
|
|
|
The default is 'die'. |
319
|
|
|
|
|
|
|
|
320
|
|
|
|
|
|
|
=back |
321
|
|
|
|
|
|
|
|
322
|
|
|
|
|
|
|
=cut |
323
|
|
|
|
|
|
|
|
324
|
|
|
|
|
|
|
sub _massage_iterator { |
325
|
27
|
|
|
27
|
|
59
|
my $iter = shift; |
326
|
27
|
100
|
|
|
|
154
|
if ( 'ARRAY' eq ref $iter ) { |
|
|
100
|
|
|
|
|
|
|
|
50
|
|
|
|
|
|
327
|
21
|
|
|
|
|
2237
|
my @ar = @$iter; |
328
|
21
|
|
|
|
|
51
|
my $pos = 0; |
329
|
|
|
|
|
|
|
return sub { |
330
|
50202
|
100
|
|
50202
|
|
72638
|
return if $pos >= @ar; |
331
|
50126
|
|
|
|
|
67732
|
my @r = ( $pos, $ar[$pos] ); |
332
|
50126
|
|
|
|
|
49327
|
$pos++; |
333
|
50126
|
|
|
|
|
108877
|
return @r; |
334
|
21
|
|
|
|
|
169
|
}; |
335
|
|
|
|
|
|
|
} |
336
|
|
|
|
|
|
|
elsif ( 'HASH' eq ref $iter ) { |
337
|
2
|
|
|
|
|
11
|
my %h = %$iter; |
338
|
2
|
|
|
|
|
13
|
my @k = keys %h; |
339
|
|
|
|
|
|
|
return sub { |
340
|
12
|
100
|
|
12
|
|
32
|
return unless @k; |
341
|
10
|
|
|
|
|
44
|
my $k = shift @k; |
342
|
10
|
|
|
|
|
54
|
return ( $k, $h{$k} ); |
343
|
2
|
|
|
|
|
14
|
}; |
344
|
|
|
|
|
|
|
} |
345
|
|
|
|
|
|
|
elsif ( 'CODE' eq ref $iter ) { |
346
|
4
|
|
|
|
|
13
|
return $iter; |
347
|
|
|
|
|
|
|
} |
348
|
|
|
|
|
|
|
else { |
349
|
0
|
|
|
|
|
0
|
croak "Iterator must be a code, array or hash ref"; |
350
|
|
|
|
|
|
|
} |
351
|
|
|
|
|
|
|
} |
352
|
|
|
|
|
|
|
|
353
|
|
|
|
|
|
|
sub _nonfork { |
354
|
9
|
|
|
9
|
|
25
|
my ( $options, $worker, $iter ) = @_; |
355
|
|
|
|
|
|
|
|
356
|
|
|
|
|
|
|
return sub { |
357
|
551
|
|
|
551
|
|
730
|
while ( 1 ) { |
358
|
551
|
100
|
|
|
|
827
|
if ( my @next = $iter->() ) { |
359
|
542
|
|
|
|
|
1060
|
my ( $id, $work ) = @next; |
360
|
|
|
|
|
|
|
# dclone so that we have the same semantics as the |
361
|
|
|
|
|
|
|
# forked version. |
362
|
542
|
100
|
66
|
|
|
25802
|
$work = dclone $work if defined $work && ref $work; |
363
|
542
|
|
|
|
|
1161
|
my $result = eval { $worker->( $id, $work ) }; |
|
542
|
|
|
|
|
1157
|
|
364
|
542
|
50
|
|
|
|
8251
|
if ( my $err = $@ ) { |
365
|
0
|
|
|
|
|
0
|
$options->{onerror}->( $id, $err ); |
366
|
|
|
|
|
|
|
} |
367
|
|
|
|
|
|
|
else { |
368
|
542
|
|
|
|
|
1480
|
return ( $id, $result ); |
369
|
|
|
|
|
|
|
} |
370
|
|
|
|
|
|
|
} |
371
|
|
|
|
|
|
|
else { |
372
|
9
|
|
|
|
|
33
|
return; |
373
|
|
|
|
|
|
|
} |
374
|
|
|
|
|
|
|
} |
375
|
9
|
|
|
|
|
53
|
}; |
376
|
|
|
|
|
|
|
} |
377
|
|
|
|
|
|
|
|
378
|
|
|
|
|
|
|
# Does this sub look a bit long to you? :) |
379
|
|
|
|
|
|
|
sub _fork { |
380
|
18
|
|
|
18
|
|
48
|
my ( $options, $worker, $iter ) = @_; |
381
|
|
|
|
|
|
|
|
382
|
18
|
|
|
|
|
35
|
my @workers = (); |
383
|
18
|
|
|
|
|
30
|
my @result_queue = (); |
384
|
18
|
|
|
|
|
265
|
my $select = IO::Select->new; |
385
|
18
|
|
|
|
|
278
|
my $rotate = 0; |
386
|
|
|
|
|
|
|
|
387
|
|
|
|
|
|
|
return sub { |
388
|
|
|
|
|
|
|
LOOP: { |
389
|
|
|
|
|
|
|
# Make new workers |
390
|
946
|
|
100
|
946
|
|
1726
|
while ( @workers < $options->{workers} && ( my @next = $iter->() ) ) |
|
1282
|
|
|
|
|
2911
|
|
391
|
|
|
|
|
|
|
{ |
392
|
|
|
|
|
|
|
|
393
|
76
|
|
|
|
|
3409
|
my ( $my_rdr, $my_wtr, $child_rdr, $child_wtr ) |
394
|
|
|
|
|
|
|
= map IO::Handle->new, 1 .. 4; |
395
|
|
|
|
|
|
|
|
396
|
76
|
50
|
|
|
|
12746
|
pipe $child_rdr, $my_wtr |
397
|
|
|
|
|
|
|
or croak "Can't open write pipe ($!)\n"; |
398
|
|
|
|
|
|
|
|
399
|
76
|
50
|
|
|
|
2438
|
pipe $my_rdr, $child_wtr |
400
|
|
|
|
|
|
|
or croak "Can't open read pipe ($!)\n"; |
401
|
|
|
|
|
|
|
|
402
|
76
|
50
|
|
|
|
79461
|
if ( my $pid = fork ) { |
403
|
|
|
|
|
|
|
# Parent |
404
|
76
|
|
|
|
|
5633
|
close $_ for $child_rdr, $child_wtr; |
405
|
76
|
|
|
|
|
1278
|
push @workers, $pid; |
406
|
76
|
|
|
|
|
4511
|
$select->add( [ $my_rdr, $my_wtr, 0 ] ); |
407
|
76
|
|
|
|
|
12243
|
_put_obj( \@next, $my_wtr ); |
408
|
|
|
|
|
|
|
} |
409
|
|
|
|
|
|
|
else { |
410
|
|
|
|
|
|
|
# Child |
411
|
0
|
|
|
|
|
0
|
close $_ for $my_rdr, $my_wtr; |
412
|
|
|
|
|
|
|
|
413
|
|
|
|
|
|
|
# Don't execute any END blocks |
414
|
9
|
|
|
9
|
|
4428
|
use POSIX '_exit'; |
|
9
|
|
|
|
|
51292
|
|
|
9
|
|
|
|
|
50
|
|
415
|
0
|
|
|
|
|
0
|
eval q{END { _exit 0 }}; |
416
|
|
|
|
|
|
|
|
417
|
|
|
|
|
|
|
# Worker loop |
418
|
0
|
|
|
|
|
0
|
while ( defined( my $job = _get_obj( $child_rdr ) ) ) { |
419
|
0
|
|
|
|
|
0
|
my $result = eval { $worker->( @$job ) }; |
|
0
|
|
|
|
|
0
|
|
420
|
0
|
|
|
|
|
0
|
my $err = $@; |
421
|
0
|
0
|
|
|
|
0
|
_put_obj( |
422
|
|
|
|
|
|
|
[ |
423
|
|
|
|
|
|
|
$err |
424
|
|
|
|
|
|
|
? ( 'E', $job->[0], $err ) |
425
|
|
|
|
|
|
|
: ( 'R', $job->[0], $result ) |
426
|
|
|
|
|
|
|
], |
427
|
|
|
|
|
|
|
$child_wtr |
428
|
|
|
|
|
|
|
); |
429
|
|
|
|
|
|
|
} |
430
|
|
|
|
|
|
|
|
431
|
|
|
|
|
|
|
# End of stream |
432
|
0
|
|
|
|
|
0
|
_put_obj( undef, $child_wtr ); |
433
|
0
|
|
|
|
|
0
|
close $_ for $child_rdr, $child_wtr; |
434
|
|
|
|
|
|
|
# We use CORE::exit for MP compatibility |
435
|
0
|
|
|
|
|
0
|
CORE::exit; |
436
|
|
|
|
|
|
|
} |
437
|
|
|
|
|
|
|
} |
438
|
|
|
|
|
|
|
|
439
|
1282
|
100
|
|
|
|
2197
|
return @{ shift @result_queue } if @result_queue; |
|
928
|
|
|
|
|
2515
|
|
440
|
354
|
100
|
|
|
|
1414
|
if ( $select->count ) { |
441
|
337
|
|
|
|
|
1721
|
eval { |
442
|
337
|
|
|
|
|
845
|
my @rdr = $select->can_read; |
443
|
|
|
|
|
|
|
# Anybody got completed work? |
444
|
337
|
|
|
|
|
31946
|
for my $r ( @rdr ) { |
445
|
1014
|
|
|
|
|
2852
|
my ( $rh, $wh, $eof ) = @$r; |
446
|
1014
|
100
|
|
|
|
1801
|
if ( defined( my $results = _get_obj( $rh ) ) ) { |
447
|
939
|
|
|
|
|
1497
|
my $type = shift @$results; |
448
|
939
|
100
|
|
|
|
1898
|
if ( $type eq 'R' ) { |
|
|
50
|
|
|
|
|
|
449
|
928
|
|
|
|
|
1585
|
push @result_queue, $results; |
450
|
|
|
|
|
|
|
} |
451
|
|
|
|
|
|
|
elsif ( $type eq 'E' ) { |
452
|
11
|
|
|
|
|
338
|
$options->{onerror}->( @$results ); |
453
|
|
|
|
|
|
|
} |
454
|
|
|
|
|
|
|
else { |
455
|
0
|
|
|
|
|
0
|
die "Bad result type: $type"; |
456
|
|
|
|
|
|
|
} |
457
|
|
|
|
|
|
|
|
458
|
|
|
|
|
|
|
# We operate a strict one in, one out policy |
459
|
|
|
|
|
|
|
# - which avoids deadlocks. Having received |
460
|
|
|
|
|
|
|
# the previous result send a new work value. |
461
|
938
|
50
|
|
|
|
1705
|
unless ( $eof ) { |
462
|
938
|
100
|
|
|
|
1786
|
if ( my @next = $iter->() ) { |
463
|
863
|
|
|
|
|
2906
|
_put_obj( \@next, $wh ); |
464
|
|
|
|
|
|
|
} |
465
|
|
|
|
|
|
|
else { |
466
|
75
|
|
|
|
|
235
|
_put_obj( undef, $wh ); |
467
|
75
|
|
|
|
|
748
|
close $wh; |
468
|
75
|
|
|
|
|
162
|
@{$r}[ 1, 2 ] = ( undef, 1 ); |
|
75
|
|
|
|
|
914
|
|
469
|
|
|
|
|
|
|
} |
470
|
|
|
|
|
|
|
} |
471
|
|
|
|
|
|
|
} |
472
|
|
|
|
|
|
|
else { |
473
|
75
|
|
|
|
|
259
|
$select->remove( $r ); |
474
|
75
|
|
|
|
|
3609
|
close $rh; |
475
|
|
|
|
|
|
|
} |
476
|
|
|
|
|
|
|
} |
477
|
|
|
|
|
|
|
}; |
478
|
|
|
|
|
|
|
|
479
|
337
|
100
|
|
|
|
1285
|
if ( my $err = $@ ) { |
480
|
|
|
|
|
|
|
# Finish all the workers |
481
|
1
|
|
|
|
|
12
|
_put_obj( undef, $_->[1] ) for $select->handles; |
482
|
|
|
|
|
|
|
|
483
|
|
|
|
|
|
|
# And wait for them to exit |
484
|
1
|
|
|
|
|
1446
|
waitpid( $_, 0 ) for @workers; |
485
|
|
|
|
|
|
|
|
486
|
|
|
|
|
|
|
# Rethrow |
487
|
1
|
|
|
|
|
12
|
die $err; |
488
|
|
|
|
|
|
|
} |
489
|
|
|
|
|
|
|
|
490
|
336
|
|
|
|
|
746
|
redo LOOP; |
491
|
|
|
|
|
|
|
} |
492
|
17
|
|
|
|
|
16786
|
waitpid( $_, 0 ) for @workers; |
493
|
17
|
|
|
|
|
190
|
return; |
494
|
|
|
|
|
|
|
} |
495
|
18
|
|
|
|
|
215
|
}; |
496
|
|
|
|
|
|
|
} |
497
|
|
|
|
|
|
|
|
498
|
|
|
|
|
|
|
sub _batch_input_iter { |
499
|
10
|
|
|
10
|
|
40
|
my ( $code, $options ) = @_; |
500
|
|
|
|
|
|
|
|
501
|
10
|
100
|
|
|
|
30
|
if ( my $adapt = $options->{adaptive} ) { |
502
|
6
|
|
100
|
|
|
55
|
my $workers = $options->{workers} || 1; |
503
|
6
|
|
|
|
|
11
|
my $count = 0; |
504
|
|
|
|
|
|
|
|
505
|
6
|
100
|
|
|
|
32
|
$adapt = [ 1, $adapt, undef ] |
506
|
|
|
|
|
|
|
unless 'ARRAY' eq ref $adapt; |
507
|
|
|
|
|
|
|
|
508
|
6
|
|
|
|
|
27
|
my ( $min, $ratio, $max ) = @$adapt; |
509
|
6
|
100
|
66
|
|
|
49
|
$min = 1 unless defined $min && $min > 1; |
510
|
|
|
|
|
|
|
|
511
|
|
|
|
|
|
|
return sub { |
512
|
774
|
|
|
774
|
|
1361
|
my @chunk = (); |
513
|
|
|
|
|
|
|
|
514
|
|
|
|
|
|
|
# Adapt batch size |
515
|
774
|
|
|
|
|
2042
|
my $batch = $count / $workers / $ratio; |
516
|
774
|
100
|
|
|
|
1697
|
$batch = $min if $batch < $min; |
517
|
774
|
100
|
100
|
|
|
2143
|
$batch = $max if defined $max && $batch > $max; |
518
|
|
|
|
|
|
|
|
519
|
774
|
|
100
|
|
|
2567
|
while ( @chunk < $batch && ( my @next = $code->() ) ) { |
520
|
30000
|
|
|
|
|
36088
|
push @chunk, \@next; |
521
|
30000
|
|
|
|
|
53941
|
$count++; |
522
|
|
|
|
|
|
|
} |
523
|
|
|
|
|
|
|
|
524
|
774
|
100
|
|
|
|
2384
|
return @chunk ? ( 0, \@chunk ) : (); |
525
|
6
|
|
|
|
|
75
|
}; |
526
|
|
|
|
|
|
|
} |
527
|
|
|
|
|
|
|
else { |
528
|
4
|
|
|
|
|
9
|
my $batch = $options->{batch}; |
529
|
|
|
|
|
|
|
|
530
|
|
|
|
|
|
|
return sub { |
531
|
226
|
|
|
226
|
|
414
|
my @chunk = (); |
532
|
226
|
|
100
|
|
|
1001
|
while ( @chunk < $batch && ( my @next = $code->() ) ) { |
533
|
20000
|
|
|
|
|
37724
|
push @chunk, \@next; |
534
|
|
|
|
|
|
|
} |
535
|
226
|
100
|
|
|
|
723
|
return @chunk ? ( 0, \@chunk ) : (); |
536
|
4
|
|
|
|
|
36
|
}; |
537
|
|
|
|
|
|
|
} |
538
|
|
|
|
|
|
|
} |
539
|
|
|
|
|
|
|
|
540
|
|
|
|
|
|
|
sub _batch_output_iter { |
541
|
10
|
|
|
10
|
|
22
|
my $code = shift; |
542
|
10
|
|
|
|
|
19
|
my @queue = (); |
543
|
|
|
|
|
|
|
return sub { |
544
|
50010
|
100
|
|
50010
|
|
65810
|
unless ( @queue ) { |
545
|
955
|
100
|
|
|
|
1300
|
if ( my ( undef, $chunk ) = $code->() ) { |
546
|
945
|
|
|
|
|
4485
|
@queue = @$chunk; |
547
|
|
|
|
|
|
|
} |
548
|
|
|
|
|
|
|
else { |
549
|
10
|
|
|
|
|
50
|
return; |
550
|
|
|
|
|
|
|
} |
551
|
|
|
|
|
|
|
} |
552
|
50000
|
|
|
|
|
53506
|
return @{ shift @queue }; |
|
50000
|
|
|
|
|
89345
|
|
553
|
10
|
|
|
|
|
77
|
}; |
554
|
0
|
|
|
|
|
0
|
return $code; |
555
|
|
|
|
|
|
|
} |
556
|
|
|
|
|
|
|
|
557
|
|
|
|
|
|
|
sub _batch_worker { |
558
|
10
|
|
|
10
|
|
20
|
my $code = shift; |
559
|
|
|
|
|
|
|
return sub { |
560
|
388
|
|
|
388
|
|
612
|
my ( undef, $chunk ) = @_; |
561
|
388
|
|
|
|
|
614
|
for my $item ( @$chunk ) { |
562
|
25000
|
|
|
|
|
75595
|
$item->[1] = $code->( @$item ); |
563
|
|
|
|
|
|
|
} |
564
|
388
|
|
|
|
|
1354
|
return $chunk; |
565
|
10
|
|
|
|
|
87
|
}; |
566
|
|
|
|
|
|
|
} |
567
|
|
|
|
|
|
|
|
568
|
|
|
|
|
|
|
sub iterate { |
569
|
27
|
50
|
|
27
|
1
|
12050
|
my %options = ( %DEFAULTS, %{ 'HASH' eq ref $_[0] ? shift : {} } ); |
|
27
|
|
|
|
|
371
|
|
570
|
|
|
|
|
|
|
|
571
|
27
|
50
|
|
|
|
140
|
croak "iterate takes 2 or 3 args" unless @_ == 2; |
572
|
|
|
|
|
|
|
|
573
|
27
|
|
|
|
|
103
|
my @bad_opt = grep { !exists $DEFAULTS{$_} } keys %options; |
|
135
|
|
|
|
|
260
|
|
574
|
27
|
50
|
|
|
|
88
|
croak "Unknown option(s): ", join( ', ', sort @bad_opt ), "\n" |
575
|
|
|
|
|
|
|
if @bad_opt; |
576
|
|
|
|
|
|
|
|
577
|
27
|
|
|
|
|
59
|
my $worker = shift; |
578
|
27
|
50
|
|
|
|
100
|
croak "Worker must be a coderef" |
579
|
|
|
|
|
|
|
unless 'CODE' eq ref $worker; |
580
|
|
|
|
|
|
|
|
581
|
27
|
|
|
|
|
113
|
my $iter = _massage_iterator( shift ); |
582
|
|
|
|
|
|
|
|
583
|
27
|
100
|
|
|
|
362
|
if ( $options{onerror} =~ /^(die|warn)$/ ) { |
584
|
26
|
|
|
|
|
3192
|
$options{onerror} = eval "sub { shift; $1 shift }"; |
585
|
|
|
|
|
|
|
} |
586
|
|
|
|
|
|
|
|
587
|
|
|
|
|
|
|
croak "onerror option must be 'die', 'warn' or a code reference" |
588
|
27
|
50
|
|
|
|
129
|
unless 'CODE' eq ref $options{onerror}; |
589
|
|
|
|
|
|
|
|
590
|
27
|
100
|
100
|
|
|
236
|
if ( $options{workers} > 0 && $DEFAULTS{workers} == 0 ) { |
591
|
|
|
|
|
|
|
warn "Fork not available; falling back to single process mode\n" |
592
|
3
|
50
|
|
|
|
9
|
unless $options{nowarn}; |
593
|
3
|
|
|
|
|
6
|
$options{workers} = 0; |
594
|
|
|
|
|
|
|
} |
595
|
|
|
|
|
|
|
|
596
|
27
|
100
|
|
|
|
113
|
my $factory = $options{workers} == 0 ? \&_nonfork : \&_fork; |
597
|
|
|
|
|
|
|
|
598
|
27
|
100
|
100
|
|
|
178
|
if ( $options{batch} > 1 || $options{adaptive} ) { |
599
|
10
|
|
|
|
|
69
|
return _batch_output_iter( |
600
|
|
|
|
|
|
|
$factory->( |
601
|
|
|
|
|
|
|
\%options, |
602
|
|
|
|
|
|
|
_batch_worker( $worker ), |
603
|
|
|
|
|
|
|
_batch_input_iter( $iter, \%options ) |
604
|
|
|
|
|
|
|
) |
605
|
|
|
|
|
|
|
); |
606
|
|
|
|
|
|
|
} |
607
|
|
|
|
|
|
|
else { |
608
|
|
|
|
|
|
|
# OK. Ready. Let's do it. |
609
|
17
|
|
|
|
|
64
|
return $factory->( \%options, $worker, $iter ); |
610
|
|
|
|
|
|
|
} |
611
|
|
|
|
|
|
|
} |
612
|
|
|
|
|
|
|
|
613
|
|
|
|
|
|
|
=head2 C<< iterate_as_array >> |
614
|
|
|
|
|
|
|
|
615
|
|
|
|
|
|
|
As C but instead of returning an iterator returns an array |
616
|
|
|
|
|
|
|
containing the collected output from the iterator. In a scalar context |
617
|
|
|
|
|
|
|
returns a reference to the same array. |
618
|
|
|
|
|
|
|
|
619
|
|
|
|
|
|
|
For this to work properly the input iterator must return (index, value) |
620
|
|
|
|
|
|
|
pairs. This allows the results to be placed in the correct slots in the |
621
|
|
|
|
|
|
|
output array. The simplest way to do this is to pass an array reference |
622
|
|
|
|
|
|
|
as the input iterator: |
623
|
|
|
|
|
|
|
|
624
|
|
|
|
|
|
|
my @output = iterate_as_array( \&some_handler, \@input ); |
625
|
|
|
|
|
|
|
|
626
|
|
|
|
|
|
|
=cut |
627
|
|
|
|
|
|
|
|
628
|
|
|
|
|
|
|
sub iterate_as_array { |
629
|
17
|
|
|
17
|
1
|
464209
|
my $iter = iterate( @_ ); |
630
|
17
|
|
|
|
|
47
|
my @out = (); |
631
|
17
|
|
|
|
|
57
|
while ( my ( $index, $value ) = $iter->() ) { |
632
|
50110
|
|
|
|
|
68555
|
$out[$index] = $value; |
633
|
|
|
|
|
|
|
} |
634
|
17
|
50
|
|
|
|
6360
|
return wantarray ? @out : \@out; |
635
|
|
|
|
|
|
|
} |
636
|
|
|
|
|
|
|
|
637
|
|
|
|
|
|
|
=head2 C<< iterate_as_hash >> |
638
|
|
|
|
|
|
|
|
639
|
|
|
|
|
|
|
As C but instead of returning an iterator returns a hash |
640
|
|
|
|
|
|
|
containing the collected output from the iterator. In a scalar context |
641
|
|
|
|
|
|
|
returns a reference to the same hash. |
642
|
|
|
|
|
|
|
|
643
|
|
|
|
|
|
|
For this to work properly the input iterator must return (key, value) |
644
|
|
|
|
|
|
|
pairs. This allows the results to be placed in the correct slots in the |
645
|
|
|
|
|
|
|
output hash. The simplest way to do this is to pass a hash reference as |
646
|
|
|
|
|
|
|
the input iterator: |
647
|
|
|
|
|
|
|
|
648
|
|
|
|
|
|
|
my %output = iterate_as_hash( \&some_handler, \%input ); |
649
|
|
|
|
|
|
|
|
650
|
|
|
|
|
|
|
=cut |
651
|
|
|
|
|
|
|
|
652
|
|
|
|
|
|
|
sub iterate_as_hash { |
653
|
1
|
|
|
1
|
1
|
1141
|
my $iter = iterate( @_ ); |
654
|
1
|
|
|
|
|
14
|
my %out = (); |
655
|
1
|
|
|
|
|
3
|
while ( my ( $key, $value ) = $iter->() ) { |
656
|
5
|
|
|
|
|
24
|
$out{$key} = $value; |
657
|
|
|
|
|
|
|
} |
658
|
1
|
50
|
|
|
|
93
|
return wantarray ? %out : \%out; |
659
|
|
|
|
|
|
|
} |
660
|
|
|
|
|
|
|
|
661
|
|
|
|
|
|
|
sub _get_obj { |
662
|
1014
|
|
|
1014
|
|
1368
|
my $fd = shift; |
663
|
1014
|
|
|
|
|
2523
|
my $r = fd_retrieve $fd; |
664
|
1014
|
|
|
|
|
65632
|
return $r->[0]; |
665
|
|
|
|
|
|
|
} |
666
|
|
|
|
|
|
|
|
667
|
|
|
|
|
|
|
sub _put_obj { |
668
|
1015
|
|
|
1015
|
|
1704
|
my ( $obj, $fd ) = @_; |
669
|
1015
|
|
|
|
|
4858
|
store_fd [$obj], $fd; |
670
|
1015
|
|
|
|
|
108754
|
$fd->flush; |
671
|
|
|
|
|
|
|
} |
672
|
|
|
|
|
|
|
|
673
|
|
|
|
|
|
|
1; |
674
|
|
|
|
|
|
|
|
675
|
|
|
|
|
|
|
__END__ |