File Coverage

blib/lib/Socket/More/Resolver.pm
Criterion Covered Total %
statement 228 315 72.3
branch 70 130 53.8
path n/a
condition 19 41 46.3
subroutine 23 29 79.3
pod 2 11 18.1
total 342 526 65.0


line stmt bran path cond sub pod time code
1               package Socket::More::Resolver;
2 1       1   122164 use strict;
  1           2  
  1           42  
3 1       1   5 use warnings;
  1           2  
  1           66  
4                
5 1       1   6 use feature qw;
  1           2  
  1           145  
6                
7 1       1   7 no warnings "experimental";
  1           2  
  1           94  
8               our $VERSION="v0.1.3";
9                
10 1       1   544 use constant::more DEBUG=>0;
  1           1110  
  1           8  
11 1       1   172 use constant::more qw;
  1           3  
  1           4  
12 1       1   490 use constant::more qw;
  1           3  
  1           7  
13 1       1   677 use constant::more qw;
  1           3  
  1           5  
14                
15 1       1   527 use Fcntl;
  1           3  
  1           348  
16                
17 1       1   602 use Export::These qw;
  1           971  
  1           9  
18 1       1   1726 use Socket::More::Lookup ();
  1           2615  
  1           618  
19                
20               my $gai_data_pack="l> l> l> l> l>/a* l>/a*";
21                
22               #REQID and as above
23               #
24               my $gai_pack="($gai_data_pack)*";
25                
26                
27                
28               sub _results_available;
29               sub process_results;
30               sub getaddrinfo;
31               sub shrink_pool;
32               sub monitor_workers;
33               my $i=0; # Sequential ID of requests
34                
35               my $in_flight=0;
36                
37               #my @pool_free; # pids (keys) of workers we can use
38               my $pool_max=4;
39               my $enable_shrink;
40                
41                
42               my @pairs; # file handles for parent/child pipes
43               # preallocated with first import of this module
44                
45               #my $template_pid;
46                
47               our $Shared;
48                
49               my %fd_worker_map;
50                
51                
52                
53               # In the pre export, we start the workers if not already started.
54               # Also detect event system.
55               #
56               sub _preexport {
57 1       1   181 shift; shift;
  1           2  
58                
59                
60 1           7 my %options=map %$_, grep ref, @_;
61              
62               #my @imports=map %$_, grep !ref, @_;
63              
64              
65               # Don't generate pairs if they already exist
66 1 50         4 if(!@pairs){
67                
68 1     50     4 $pool_max=($options{max_workers}//4);
69 1 50         4 $pool_max=4 if $pool_max <=0;
70 1           2 $pool_max++;
71 1           2 $enable_shrink=$options{enable_shrink};
72                
73                
74               #pre allocate enough pipes for full pool
75 1           4 for(1..$pool_max){
76 6           175 pipe my $c_read, my $p_write;
77 6           171 pipe my $p_read, my $c_write;
78 6           22 fcntl $c_read, F_SETFD, 0; #Make sure we clear CLOSEXEC
79 6           17 fcntl $c_write, F_SETFD,0;
80                
81 6           24 push @pairs,[0, $p_read, $p_write, $c_read, $c_write, [], 0];
82               }
83                
84                
85                
86                
87               # Create the template process here. This is the first worker
88               #Need to bootstrap/ create the first worker, which is used as a template
89 1           2 DEBUG and say STDERR "Create worker: Bootrapping first/template worker";
90 1           4 spawn_template();
91                
92               # Prefork
93                
94 1 50         44 if($options{prefork}){
95 0           0 for(1..($pool_max-1)){
96 0           0 unshift $pairs[0][WORKER_QUEUE]->@*, [CMD_SPAWN, $i++, $_];
97 0           0 $in_flight++;
98               }
99               }
100                
101               # Work with event systems
102 1           18 my $sub;
103                
104 1           107 my @search=qw; # Built in drivers
105 1     33     37 for($options{loop_driver}//()){
106 0 0         0 if(ref eq "CODE"){
107 0           0 $sub=$_;
108               }
109 0 0         0 if(ref eq "ARRAY"){
110 0           0 unshift @search, @$_;
111               }
112               else {
113               #Assume a string
114 0           0 unshift @search, $_;
115               }
116               }
117                
118 1 50         4 if($options{no_loop}){
119               # Prevent event loop integration
120 0           0 $sub=undef;
121               }
122               else{
123               # Use search list
124 1       1   8 no strict "refs";
  1           2  
  1           4222  
125 1           3 for(@search){
126 3 50         17 if(%{$_."::"}){
  3           284  
127 0           0 $sub=eval "require Socket::More::Resolver::$_";
128 0 0         0 die $@ if $@;
129 0           0 last;
130               }
131               }
132               }
133 1 50         16 $sub->() if($sub);
134               #grep !ref, @_;
135               }
136 1 50         12 if($options{prefork}){
137 0           0 getaddrinfo for(1..($pool_max-1));
138               }
139 1           242 @_;
140               }
141                
142               sub _reexport {
143 1       1   673 Socket::More::Lookup->import("gai_strerror");
144               }
145                
146                
147                
148                
149                
150                
151               #If used as a module, setup the process pool
152                
153               #getaddrinfo Request
154               #REQID FLAGS FAMILY TYPE PROTOCOL HOST PORT
155                
156               #getaddrinfo response
157               #FLAG/ERROR FAMILY TYPE PROTOCOL ADDR CANONNNAME
158               #
159                
160               # Return undef when no worker available.
161               # If under limit, a new worker is spawned for next run
162               # Return the worker struct to use otherwise
163               #
164               sub _get_worker{
165                
166               #_results_available unless $Shared;
167 4       4   16 my $worker;
168               my $fallback;
169 4           0 my $unspawned;
170 4           0 my $index;
171 4           6 my $busy_count=0;
172 4           9 state $robin=1;
173 4           14 for(1..$#pairs){
174 12           16 $index=$_;
175 12           23 $worker=$pairs[$index];
176 12 100         30 if($worker->[WORKER_BUSY]){
177 1 50         5 if($worker->[WORKER_ID]){
178 0           0 $busy_count++;
179               # Fully spawned and working on a request
180 0           0 DEBUG and say STDERR "GETTING WORKER: fully spawned $index";
181               }
182               else {
183               # half spawned, this has at least 1 message
184               # if all other workers are busy we use the first one of these we come accros
185 1 50   33     29 $fallback//=$index if $worker->[WORKER_QUEUE]->@*;
186 1           2 DEBUG and say STDERR "GETTING WORKER: half spawned fallback $index";
187               }
188               }
189               else {
190               # Not busy
191               #
192 11 100         26 if($worker->[WORKER_ID]){
193               # THIS IS THE WORKER WE WANT
194 2           2 DEBUG and say STDERR "GETTING WORKER: found unbusy $index";
195 2           6 return $worker;
196               }
197               else{
198               # Not spawned. Use first one we come accross if we need to spawn
199 9     66     25 $unspawned//=$index;
200 9           18 DEBUG and say STDERR "GETTING WORKER: found unspawned $index";
201               }
202               }
203               }
204                
205               # Use the about to be spawned worker
206 2 100         10 return $pairs[$fallback] if defined $fallback;
207                
208               # Here we actaully need to spawn a worker
209              
210 1           6 my $template_worker=spawn_template(); #ensure template exists
211              
212 1 50         5 if($busy_count < (@pairs-1)){
213 1           3 DEBUG and say STDERR "Queue spawn command to template for inext $unspawned";
214 1           5 push $template_worker->[WORKER_QUEUE]->@*, [CMD_SPAWN, $i++, $unspawned];
215 1           3 $index=$unspawned;
216 1           2 $in_flight++;
217 1           4 $pairs[$unspawned][WORKER_BUSY]=1;
218               }
219               else{
220 0           0 $index=$robin++;
221 0 0         0 $robin=1 if $robin >=@pairs;
222               }
223                
224               #$pairs[$index][WORKER_BUSY]=1;
225               #$pairs[$index][WORKER_ID]=-1;
226 1           3 $pairs[$index];
227                
228               }
229                
230                
231               sub pool_next;
232               # Serialize messages to worker from queue
233               sub pool_next{
234 22       22 0 59 my $w=shift;
235                
236               # handle returns first .. TODO: This is only if no event system is being used
237 22 50         103 _results_available unless $Shared;
238 22           56 my $redo;
239 22 50         56 for($w?$w:@pairs){
240 132           169 DEBUG and say STDERR "POOL next for ".$_->[WORKER_ID]." busy: $_->[WORKER_BUSY], queue; ".$_->[WORKER_QUEUE]->@*;
241 132           202 my $ofd;
242               # only process worker is initialized not busy and have something to process
243 132 100         266 next unless $_->[WORKER_ID];
244 35 100         83 next if $_->[WORKER_BUSY];
245 11 100         27 next unless $_->[WORKER_QUEUE]->@*;
246                
247 9           16 $_->[WORKER_BUSY]=1;
248                
249               #my $req=shift $_->[WORKER_QUEUE]->@*;
250 9           16 my $req=$_->[WORKER_QUEUE][0];
251 9           25 $req->[REQ_WORKER]=$_->[WORKER_ID];
252              
253               #$reqs{$req->[REQ_ID]}=$req; #Add to outstanding
254                
255                
256               # Header
257 9           37 my $out=pack "l> l>", $req->[REQ_CMD], $req->[REQ_ID];
258                
259               # Body
260 9 100         43 if($req->[REQ_CMD]==CMD_SPAWN){
    100            
    100            
    50            
    50            
261               # Write to template process
262               #DEBUG and
263 1           3 my $windex=$req->[2];
264 1           2 DEBUG and say STDERR ">> SENDING CMD_SPWAN TO WORKER: $req->[REQ_WORKER], worker index $windex";
265 1           4 my $cread=fileno $pairs[$windex][WORKER_CREAD];
266 1           3 my $cwrite=fileno $pairs[$windex][WORKER_CWRITE];
267                
268 1           5 $out.=pack("l> l>", $cread, $cwrite);
269 1           3 $ofd=$pairs[0][WORKER_WRITE];
270 1           2 $redo=1;
271               }
272               elsif($req->[REQ_CMD]==CMD_GAI) {
273               # getaddrinfo request
274 2           5 DEBUG and say STDERR ">> SENDING CMD_GAI TO WORKER: $req->[REQ_WORKER]";
275 2 50         9 if(ref $req->[REQ_DATA] eq "ARRAY"){
276 0           0 $out.=pack $gai_pack, $req->[REQ_DATA]->@*;
277               }
278               else {
279               # assume a hash
280 2           6 for($req->[REQ_DATA]){
281               #$out.=pack $gai_pack, $_->{flags}//0, $_->{family}//0, $_->{socktype}//0, $_->{protocol}//0, $_->{host}, $_->{port};
282 2     50     43 $out.=pack $gai_pack, $_->{flags}//0, $_->{family}//0, $_->{socktype}//0, $_->{protocol}//0, $_->{address}, $_->{port};
        50        
        50        
        50        
283               }
284               }
285                
286 2           5 $ofd=$_->[WORKER_WRITE];
287               }
288               elsif($req->[REQ_CMD]==CMD_GNI){
289 2           4 DEBUG and say STDERR ">> SENDING CMD_GNI TO WORKER: $req->[REQ_WORKER]";
290 2           11 $out.=pack "l>/A* l>", $req->[REQ_DATA]->@*;
291 2           5 $ofd=$_->[WORKER_WRITE];
292                
293               }
294               elsif($req->[REQ_CMD]== CMD_KILL){
295 0           0 DEBUG and say STDERR ">> Sending CMD_KILL to worker: $req->[REQ_WORKER]";
296 0           0 $ofd=$_->[WORKER_WRITE];
297 0           0 $redo=1;
298               }
299               elsif($req->[REQ_CMD]== CMD_REAP){
300 4           18 DEBUG and say STDERR ">> Sending CMD_REAP to worker: $req->[REQ_WORKER]";
301 4           13 $out.=pack("l>/l>*", $req->[REQ_DATA]->@*);
302 4           47 $ofd=$pairs[0][WORKER_WRITE];
303 4           7 $redo=1;
304               }
305               else {
306 0           0 die "UNkown command in pool_next";
307               }
308                
309 9           11 DEBUG and say STDERR ">> WRITING WITH FD $ofd";
310 9           208 syswrite $ofd, unpack("H*", $out)."\n"; # bypass buffering
311                
312               }
313 22 100         88 pool_next if $redo;
314               }
315                
316                
317               # Peforms a read on the pipe, parses response from worker
318               # and executes callbacks as needed
319               #
320               # This is the routine needing to be called from an event loop
321               # when the pipe is readable
322               #
323               sub process_results{
324 9       9 0 14 my $fd_or_struct=shift;
325 9           13 my $worker;
326 9 50         22 if(ref $fd_or_struct){
327 9           12 $worker=$fd_or_struct;
328               }
329               else{
330 0           0 $worker=$fd_worker_map{$fd_or_struct};
331               }
332               #Check which worker is ready to read.
333               # Read the result
334               #For now we wait.
335 9           47 my $r=$worker->[WORKER_READ];
336 9           131 local $_=<$r>;
337 9           16 chomp;
338 9           58 my $bin=pack "H*", $_;
339                
340 9           34 my ($cmd, $id)=unpack "l> l>", $bin;
341 9           20 $bin=substr $bin, 8; #two lots of long
342                
343               # Remove from the outstanding table
344 9           22 my $entry=shift $worker->[WORKER_QUEUE]->@*;
345 9           29 $in_flight--;
346               #my $entry=delete $reqs{$id};
347              
348               # Mark the returning worker as not busy
349               #
350 9           15 $worker->[WORKER_BUSY]=0;
351                
352 9 100         42 if($cmd==CMD_GAI){
    100            
    100            
    50            
    50            
353 2           2 DEBUG and say STDERR "<< GAI return from worker $entry->[REQ_WORKER]";
354 2           60 my @res=unpack $gai_pack, $bin;
355 2 50   33     22 if($res[0] and $entry->[REQ_ERR]){
    50   33        
356 0           0 $entry->[REQ_ERR]($res[0]);
357               }
358               elsif(!$res[0] and $entry->[REQ_CB]){
359 2           4 my @list;
360 2           7 while(@res>=6){
361 42           55 my @r=splice @res,0, 6;
362 42     50     111 $r[5]||=undef; #Set cannon name to undef if empty string
363 42 50         84 if(ref($entry->[REQ_DATA]) eq "ARRAY"){
364 0           0 push @list, \@r;#[$error, $family, $type, $protocol, $addr, $canonname];
365               }
366               else {
367 42           170 push @list, {
368               error=>$r[0],
369               family=>$r[1],
370               socktype=>$r[2],
371               protocol=>$r[3],
372               addr=>$r[4],
373               cannonname=>$r[5]
374               };
375                
376               }
377               }
378               #}
379 2           14 $entry->[REQ_CB](@list);
380               }
381               else {
382               # throw away results
383               }
384                
385                
386               }
387                
388               elsif($cmd==CMD_GNI){
389 2           6 DEBUG and say STDERR "<< GNI return from worker $entry->[REQ_WORKER]";
390 2           10 my ($error, $host, $port)=unpack "l> l>/A* l>/A*", $bin;
391 2           4 DEBUG and say STDERR "error $error";
392 2           2 DEBUG and say STDERR "host $host";
393 2           5 DEBUG and say STDERR "service Service $port";
394 2 50   33     27 if($error and $entry->[REQ_ERR]){
    50   33        
395 0           0 $entry->[REQ_ERR]($error);
396               }
397               elsif(!$error and $entry->[REQ_CB]){
398 2           3 DEBUG and say $entry->[REQ_CB];
399 2           26 $entry->[REQ_CB]($host, $port);
400               }
401               else {
402               # Should not get here
403               }
404               }
405               elsif($cmd==CMD_SPAWN){
406               # response from template fork. Add the worker to the pool
407               #
408 1           4 my $pid=unpack "l>", $bin;
409 1           3 my $index=$entry->[2]; #
410 1           2 DEBUG and say STDERR "SPAWN RETURN: pid $pid index $index";
411               #unshift @pool_free, $index;
412 1           2 my $worker=$pairs[$index];
413 1           3 $worker->[WORKER_ID]=$pid;
414               # turn on the worker by clearing the busy flag
415 1           2 $worker->[WORKER_BUSY]=0;
416 1           8 $fd_worker_map{fileno $worker->[WORKER_READ]}=$worker;
417                
418 1           3 DEBUG and say STDERR "<< SPAWN RETURN FROM TEMPLATE $entry->[REQ_WORKER]: new worker $pid";
419               }
420               elsif($cmd == CMD_KILL){
421 0           0 my $id=$entry->[REQ_WORKER];
422 0           0 DEBUG and say STDERR "<< KILL RETURN FROM WORKER: $id : $worker->[WORKER_ID]";
423 0           0 $worker->[WORKER_ID]=0;
424               #@pool_free=grep $pairs[$_]->[WORKER_ID] != $id, @pool_free;
425               }
426               elsif($cmd ==CMD_REAP){
427               # Grandchild process checking via template process
428 4           13 my @pids=unpack "l>/l>*", $bin;
429                
430 4           9 DEBUG and say STDERR "<< REAP RETURN FROM TEMPLATE $entry->[REQ_WORKER]";
431 4           5 for(@pids){
432 20 50         33 next unless $_ >0;
433                
434 0           0 my $index=-1; # ignore template
435               #Locate the pid in the worker slots
436 0           0 for my $windex (1..$#pairs){
437 0 0         0 if($pairs[$windex][WORKER_ID]==$_){
438 0           0 $index=$windex;
439 0           0 last;
440               }
441               }
442                
443 0 0         0 if($index>0){
444 0           0 $pairs[$index][WORKER_ID]=0;
445 0           0 $pairs[$index][WORKER_BUSY]=0;
446               #only restart if the worker has items in its queue
447 0 0         0 if($pairs[$index][WORKER_QUEUE]->@*){
448 0           0 unshift $pairs[0][WORKER_QUEUE]->@*, [CMD_SPAWN, $i++, $index];
449 0           0 $in_flight++;
450               }
451               }
452               else {
453               # ignore
454               }
455               }
456               }
457                
458 9 50         1060 pool_next $worker if $Shared;
459               }
460                
461               sub _results_available {
462 22     50 22   102 my $timeout=shift//0;
463 22           29 DEBUG and say STDERR "CHECKING IF RESULTS AVAILABLE";
464               # Check if any workers are ready to talk
465 22           51 my $bits="";
466 22           78 for(@pairs){
467 132 100         567 vec($bits, fileno($_->[WORKER_READ]),1)=1 if $_->[WORKER_ID];
468               }
469                
470 22           209 my $count=select $bits, undef, undef, $timeout;
471                
472 22 100         110 if($count>0){
473 5           10 for(@pairs){
474 30 100   100     107 if($_->[WORKER_ID] and vec($bits, fileno($_->[WORKER_READ]), 1)){
475 9           32 process_results $_;
476               }
477               }
478               }
479 22           62 $count;
480               }
481                
482               sub getaddrinfo{
483 11 100     11 1 1215044 if( @_ ){
484               # If arguments present, then add to the request queue
485                
486 2           8 my ($host, $port, $hints, $on_result, $on_error)=@_;
487                
488               # Format the resuest into the same as the return structures.
489 2           21 my $ref=[];
490 2 50         9 if(ref($hints) eq "ARRAY"){
491 0           0 push @$hints, $host, $port;
492               }
493               else {
494               #$hints->{host}=$host;
495 2           7 $hints->{address}=$host;
496 2           5 $hints->{port}=$port;
497               }
498                
499                
500               # add the request to the queue and to outstanding table
501 2           7 my $worker=_get_worker;
502 2           9 my $req=[CMD_GAI, $i++, $hints, $on_result, $on_error, $worker->[WORKER_ID]];
503 2           5 push $worker->[WORKER_QUEUE]->@*, $req;
504 2           4 $in_flight++;
505               #
506 2 50         25 monitor_workers unless $Shared;
507 2 50         28 shrink_pool if $enable_shrink;
508               }
509                
510 11           74 pool_next;
511               #return true if outstanding requests
512 11           18 DEBUG and say STDERR "IN FLIGHT: $in_flight";
513 11           53 $in_flight;
514               }
515                
516               sub getnameinfo{
517 2 50     2 1 2191 if(@_){
518 2           6 my ($addr, $flags, $on_result, $on_error)=@_;
519 2           8 my $worker=_get_worker;
520 2           9 my $req=[CMD_GNI, $i++, [$addr, $flags], $on_result, $on_error, $worker->[WORKER_ID]];
521 2           4 push $worker->[WORKER_QUEUE]->@*, $req;
522 2           3 $in_flight++;
523                
524 2 50         9 monitor_workers unless $Shared;
525 2 50         6 shrink_pool if $enable_shrink;
526                
527               }
528 2           5 pool_next;
529 2           3 DEBUG and say STDERR "IN FLIGHT: $in_flight";
530 2           27 $in_flight;
531               }
532                
533               sub close_pool {
534                
535 0       0 0 0 my @indexes=1..$#pairs;
536 0           0 push @indexes, 0;
537                
538               #generate messages to close
539 0           0 for(@indexes){
540 0           0 my $worker=$pairs[$_];
541 0 0         0 next unless $worker->[WORKER_ID];
542                
543 0           0 my $req=[CMD_KILL, $i++, [], undef, undef, $_];
544 0           0 push $worker->[WORKER_QUEUE]->@*, $req;
545 0           0 $in_flight++;
546 0           0 pool_next;
547               }
548               }
549                
550               # Send kill signal to all workers (not template)
551               # This forces respawning.
552               sub kill_pool {
553 0       0 0 0 my @indexes=1..$#pairs;
554 0           0 for(@indexes){
555 0           0 my $worker=$pairs[$_];
556 0 0         0 next unless $worker->[WORKER_ID];
557                
558 0           0 kill 'KILL', $worker->[WORKER_ID];
559 0           0 $worker->[WORKER_ID]=0;
560 0           0 $worker->[WORKER_BUSY]=0;
561               }
562                
563               }
564                
565               # return the parent side reading filehandles. This is what is needed for event loops
566               sub to_watch {
567 0       0 0 0 map $_->[WORKER_READ], @pairs;
568               }
569                
570               sub monitor_workers {
571 1       1   687 use POSIX qw<:sys_wait_h :errno_h>;
  1           10539  
  1           8  
572                
573               # check we have a template
574 4       4 0 11 my $tpid=$pairs[0][WORKER_ID];
575 4           65 my $res=waitpid $tpid, WNOHANG;
576 4 50   33     22 if($res==$tpid){
    50            
577               # This is the non event case
578 0           0 $pairs[0][WORKER_ID]=0;
579               #close_pool;
580 0           0 kill_pool;
581               }
582               elsif($res == -1 and $! == ECHILD){
583               # Event loops take over the child listening.... so work around
584               #
585 0           0 $pairs[0][WORKER_ID]=0;
586               #close_pool;
587 0           0 kill_pool;
588               }
589               else {
590               # Template still active, use it as proxy
591 4           9 my @pids= map {$_->[WORKER_ID]} @pairs;
  24           43  
592 4           8 shift @pids; #remove template from the list
593                
594 4           20 push $pairs[0][WORKER_QUEUE]->@*, [CMD_REAP, $i++, [@pids], \&_monitor_callback, undef];
595 4           10 $in_flight++;
596               }
597                
598 4           13 pool_next;
599 4           7 $in_flight;
600               }
601                
602         0     sub _monitor_callback {
603              
604               }
605                
606                
607               sub spawn_template {
608               # This should only be called when modules is first loaded, or when an
609               # external force has killed the template process
610 2       2 0 4 my $worker=$pairs[0];
611 2 100         10 return $worker if $worker->[WORKER_ID];
612                
613 1           2025 my $pid=fork;
614 1 50         214 if($pid){
615               # parent
616               #
617 1           27 $worker->[WORKER_ID]=$pid;
618 1           136 $fd_worker_map{fileno $worker->[WORKER_READ]}=$worker;
619               #push @pool_free, 0;
620 1           56 $worker;
621                
622               }
623               else {
624               # child
625               # exec an tell the process which fileno we want to communicate on
626 0             close $worker->[WORKER_READ];
627 0             close $worker->[WORKER_WRITE];
628 0             my @ins=map {fileno $_->[WORKER_CREAD]} @pairs; # Child read end
  0              
629 0             my @outs=map {fileno $_->[WORKER_CWRITE]} @pairs; # Child write end
  0              
630 0             DEBUG and say STDERR "Create worker: exec with ins: @ins";
631 0             DEBUG and say STDERR "Create worker: exec with outs: @outs";
632 0             my $file=__FILE__;
633 0             $file=~s|\.pm|/Worker.pm|;
634 0             local $"=",";
635 0             exec $^X, $file, "--in", "@ins", "--out", "@outs";
636               }
637               }
638                
639               sub shrink_pool {
640               # work backwards and send a kill message to any non busy workers
641 0       0 0   my $template_worker=spawn_template(); #ensure template exists
642 0             for(reverse(@pairs)){
643 0 0           next if $_== $template_worker;
644 0 0           next if $_->[WORKER_QUEUE]->@*;
645 0 0           next unless $_->[WORKER_ID];
646                
647               # send a kill message to any un needed workers
648 0             my $req=[CMD_KILL, $i++, [], undef, undef, $_];
649 0             push $_->[WORKER_QUEUE]->@*, $req;
650 0             $in_flight++;
651               }
652               }
653                
654               sub cleanup {
655               #say STDERR "END HERE";
656               #kill_pool;
657               ################################
658               # for(@pairs){ #
659               # close $_->[WORKER_CREAD]; #
660               # close $_->[WORKER_CWRITE]; #
661               # } #
662               ################################
663               # The template
664                
665 0       0 0   my $tpid=$pairs[0][WORKER_ID];
666               #say STDERR "Template pid: ", $tpid;
667               #say STDERR
668 0             kill 'KILL', $tpid;
669 0             my $res=waitpid $tpid, 0;#, WNOHANG;
670 0 0           if($res==$tpid){
671               #say STDERR "TEMPLATE KILLED";
672               # This is the non event case
673               #$pairs[0][WORKER_ID]=0;
674               #close_pool;
675 0             kill_pool;
676               }
677               else {
678               #say STDERR "RES: $res";
679               }
680                
681               }
682                
683               1;