File Coverage

blib/lib/Parallel/Workers.pm
Criterion Covered Total %
statement 13 15 86.6
branch n/a
condition n/a
subroutine 5 5 100.0
pod n/a
total 18 20 90.0


line stmt bran cond sub pod time code
1             package Parallel::Workers;
2              
3 4     4   79109 use warnings;
  4         9  
  4         160  
4 4     4   20 use strict;
  4         7  
  4         127  
5 4     4   22 use Carp;
  4         11  
  4         434  
6 4         583 use Scalar::Util qw(blessed dualvar isweak readonly refaddr reftype tainted
7 4     4   27 weaken isvstring looks_like_number set_prototype);
  4         23  
8 4     4   17438 use threads 1.39 ;
  0            
  0            
9             use threads::shared;
10             use Thread::Queue;
11             use Data::Dumper;
12             use Parallel::Workers::Transaction;
13             use Parallel::Workers::Backend;
14             use Parallel::Workers::Shared;
15              
16             use version;
17              
18             no warnings 'threads';
19              
20             our (@ISA, @EXPORT, @EXPORT_OK, $VERSION, $WARN, $DEBUG);
21             @ISA = qw(Exporter);
22              
23             @EXPORT = qw($VERSION);
24             @EXPORT_OK = ();
25              
26             $VERSION = '0.0.9';
27              
28             $WARN=0;
29             $DEBUG=0;
30              
31              
32             # Flag to inform all threads that application is terminating
33             my $TERM :shared = 0;
34              
35             # Prevents double detach attempts
36             my $DETACHING :shared;
37              
38             my $ID:shared = 0;
39              
40             my $shared_jobs;
41              
42              
43             # maxworkers =>64 , maxjobs=>100,
44             # transport=> SSH|XMLRPC|LOCAL, constructor=>%options,
45             # timeout => max time to thread to live
46              
47             sub new {
48             my $class:shared = shift;
49             my %params = @_;
50             my $this={};
51            
52             # shared_hash_set($this, "maxworkers",(defined($params{maxworkers}))?$params{maxworkers}:16);
53             # shared_hash_set($this, "maxjobs", (defined($params{maxjobs}))?$params{maxjobs}:32);
54             # shared_hash_set($this, "timeout", (defined($params{timeout}))?$params{timeout}:10);
55            
56             $this->{maxworkers}=(defined($params{maxworkers}))?$params{maxworkers}:16;
57             $this->{maxjobs}=(defined($params{maxjobs}))?$params{maxjobs}:32;
58             $this->{timeout}=(defined($params{timeout}))?$params{timeout}:10;
59            
60             # Wait for max timeout for threads to finish
61            
62             my $backend=(defined $params{backend})?"Parallel::Workers::Backend::".$params{backend}:"Parallel::Workers::Backend::Null";
63            
64             my %constructor=();
65             $constructor{backend}=$backend;
66             $constructor{constructor}=\%{$params{constructor}} if defined $params{constructor};
67             $this->{jobsbackend}=Parallel::Workers::Backend->new(%constructor);
68             bless $this, $class;
69            
70             return $this;
71             }
72              
73             sub clear{
74             my $this = shift;
75             $shared_jobs={};
76             }
77              
78             # hosts => @hosts, command=>, params=>
79             # return $jobid
80             sub create{
81             my $this = shift;
82             my %params = @_;
83            
84             # shared_hash_set($this, "transaction", Parallel::Workers::Transaction->new((defined $params{transaction})?%{$params{transaction}}:undef));
85             $this->{transaction}=Parallel::Workers::Transaction->new((defined $params{transaction})?%{$params{transaction}}:{enable=>0});
86             my @hosts=@{$params{hosts}};
87             my $totaljobs=@hosts;
88             my $jobs=0;
89             my $current_job=0;
90             # Manage the thread pool until signalled to terminate
91             my $id:shared=__genid();
92             my $commands={
93             cmd=>$params{command}, params=>$params{params},
94             pre=>$params{pre}, preparams=>$params{preparams},
95             post=>$params{post}, postparams=>$params{postparams}
96             };
97             $shared_jobs->{$id}=&share({});
98             $shared_jobs->{$id}->{time}=time();
99             lock ($id);
100            
101             while (! $TERM && $totaljobs ) {
102             # New thread
103            
104             threads->new('jobworker', $this, $shared_jobs->{$id}, \$id, $hosts[$current_job++], $commands,$this->{transaction});
105             $totaljobs--;
106             if ($this->{maxworkers}<=threads->list()){
107             #WAITING FOR A NEW THREAD
108             print "#WAITING FOR A THREAD EXIT\n" if $WARN;
109             cond_wait($id);
110             }
111            
112             }
113             #waiting the end of the pool
114             $this->join();
115             print "job terminated\n" if $WARN;
116             return $id;
117             }
118              
119             # wait infinity for the end of workers
120             sub join{
121             my $self = shift;
122             my %params = @_;
123             foreach my $thr (threads->list()) {
124             $thr->join() ;
125             }
126             }
127              
128             # stop the current pool after the timeout done
129             sub stop{
130             my $this = shift;
131             my %params = @_;
132             $TERM=1;
133            
134             ### CLEANING UP ###
135              
136             # Wait for max timeout for threads to finish
137             while ((threads->list() > 0) && $this->{timeout}--) {
138             sleep(1);
139             }
140              
141             # Detach and kill any remaining threads
142             foreach my $thr (threads->list()) {
143             lock($DETACHING);
144             $thr->detach() if ! $thr->is_detached();
145             $thr->kill('KILL');
146             }
147             $TERM=0;
148             }
149              
150             sub info{
151             my $this = shift;
152             return $shared_jobs;
153             # return $shared_jobs;
154             }
155              
156             sub __genid{
157             return "$$-".$ID++;
158             }
159              
160             #private fonction called by thread
161             sub jobworker{
162             my ($this, $job, $id, $host, $params, $transaction)=@_;
163             my $tid = threads->tid();
164             my %host;
165             $host{cmd}=$params->{cmd};
166             $host{params}=$params->{params};
167             shared_hash_set($job,$host,\%host);
168             eval{
169            
170             # Run preprocessing task
171             ##########################
172             if (defined $params->{pre}){
173             $job->{$host}->{status}="preprocessing";
174             my $pre=$this->{jobsbackend}->pre($id, $host, $params->{pre}, $params->{preparams});
175             $job->{$host}->{pre}=shared_share($pre);
176              
177             if ($transaction->check($tid,$pre) eq "TRANSACTION_TERM"){
178             print ">>>>>>>>>>transaction for thread($tid) on preprocessing return TRANSACTION_TERM\n" if $WARN==1;
179             $job->{$host}->{status}=TRANSACTION_TERM;
180             shared_hash_set($job,"pre",TRANSACTION_TERM);
181             $TERM=1;
182             cond_broadcast($$id);
183             threads->exit(0);
184             return;
185             }
186             }
187             # Run processing task
188             ##########################
189             $job->{$host}->{status}="processing";
190             my $do=$this->{jobsbackend}->do($$id, $host, $params->{cmd}, $params->{params});
191             $job->{$host}->{do}=shared_share($do);
192             if ($transaction->check($tid,$do) eq "TRANSACTION_TERM"){
193             print ">>>>>>>>>>transaction for thread($tid) on processing return TRANSACTION_TERM\n" if $WARN==1;
194             $job->{$host}->{status}=TRANSACTION_TERM;
195             shared_hash_set($job,"do",TRANSACTION_TERM);
196             $TERM=1;
197             cond_broadcast($$id);
198             threads->exit(0);
199             return;
200             }
201             # Run postprocessing task
202             ##########################
203             if (defined $params->{post}){
204             $job->{$host}->{status}="postprocessing";
205             my $post=$this->{jobsbackend}->post($id, $host, $params->{post}, $params->{postparams});
206             $job->{$host}->{post}=shared_share($post);
207             if ($transaction->check($tid,$post) eq "TRANSACTION_TERM"){
208             print ">>>>>>>>>>transaction for thread($tid) on postprocessing return TRANSACTION_TERM\n" if $WARN==1;
209             $job->{$host}->{status}=TRANSACTION_TERM;
210             shared_hash_set($job,"post",TRANSACTION_TERM);
211             $TERM=1;
212             cond_broadcast($$id);
213             threads->exit(0);
214             return;
215             }
216            
217             }
218             };
219             if ($@){
220             $job->{$host}->{error}=$@;
221             $job->{$host}->{status}="error";
222             print STDERR $job->{$host}->{error}."\n" if $WARN;
223             cond_broadcast($$id);
224             threads->exit(0);
225             return;
226             }
227             $job->{$host}->{status}="done";
228             cond_broadcast($$id);
229             return;
230             }
231              
232              
233             ### Signal Handling ###
234              
235             # Gracefully terminate application on ^C
236             # or command line 'kill'
237             # $SIG{'INT'} = $SIG{'TERM'} =
238             # sub {
239             # print(">>> Terminating <<<\n");
240             # $TERM = 1;
241             # };
242              
243             # This signal handler is called inside threads
244             # that get cancelled by the timer thread
245             # $SIG{'KILL'} =
246             # sub {
247             # # Tell user we've been terminated
248             # printf(" %3d <- Killed\n", threads->tid());
249             # # Detach and terminate
250             # lock($DETACHING);
251             # threads->detach() if ! threads->is_detached();
252             # threads->exit();
253             # };
254              
255             1; # Magic true value required at end of module
256             __END__