File Coverage

blib/lib/Proc/JobQueue.pm
Criterion Covered Total %
statement 195 247 78.9
branch 76 132 57.5
condition 11 25 44.0
subroutine 26 30 86.6
pod 13 21 61.9
total 321 455 70.5


line stmt bran cond sub pod time code
1              
2             package Proc::JobQueue;
3              
4 4     4   22 use strict;
  4         7  
  4         549  
5 4     4   39 use warnings;
  4         10  
  4         140  
6              
7 4     4   20 use Time::HiRes qw(sleep);
  4         7  
  4         23  
8 4     4   3975 use Sys::Hostname;
  4         5114  
  4         229  
9 4     4   28 use Carp qw(confess);
  4         8  
  4         199  
10 4     4   1788 use Hash::Util qw(lock_keys unlock_keys);
  4         5009  
  4         32  
11 4     4   321 use Time::HiRes qw(time);
  4         6  
  4         27  
12 4     4   4023 use Module::Load;
  4         4653  
  4         24  
13 4     4   1793 use Object::Dependency;
  4         33027  
  4         13363  
14             require Exporter;
15              
16             our $VERSION = 0.903;
17             our $debug ||= 0;
18             our $status_frequency ||= 2;
19             our $host_canonicalizer ||= 'File::Slurp::Remote::CanonicalHostnames';
20              
21             our @ISA = qw(Exporter);
22             our @EXPORT_OK = qw(is_remote_host canonicalize my_hostname);
23              
24             sub configure
25             {
26 0     0 1 0 my ($queue, %params) = shift;
27 0         0 @$queue{keys %params} = values %params;
28             }
29              
30             sub addhost
31             {
32 4     4 1 17 my ($queue, $host, %params) = @_;
33 4         6 my $hr;
34 4 100       16 if ($hr = $queue->{status}{$host}) {
35 2         8 @$hr{keys %params} = values %params;
36             } else {
37 2         21 $hr = $queue->{status}{$host} = {
38             name => $host,
39             jobs_per_host => $queue->{jobs_per_host},
40             in_startmore => 0,
41             %params,
42             running => {},
43             queue => {},
44             };
45             }
46 4         19 $queue->set_readiness($host);
47             }
48              
49             sub set_readiness
50             {
51 448     448 0 3786 my ($queue, $host) = @_;
52 448         1901 my $hr = $queue->{status}{$host};
53 448 50 33     5907 if ($hr->{jobs_per_host} and $hr->{jobs_per_host} > keys %{$hr->{running}}) {
  448 0       3519  
  0         0  
54 448         1908 $queue->{ready_hosts}{$host} = $hr;
55             } elsif (! keys %{$hr->{running}}) {
56 0         0 $queue->{ready_hosts}{$host} = $hr;
57             } else {
58 0         0 delete $queue->{ready_hosts}{$host};
59             }
60             }
61              
62             sub new
63             {
64 2     2 1 8 my ($pkg, %params) = @_;
65 2         13 my $queue = bless {
66             dependency_graph => undef,
67             startmore_in_progress => undef,
68             host_overload => 120,
69             host_is_over => 0,
70             jobnum => 1000,
71             jobs_per_host => 4,
72             queue => {},
73             status => {},
74             ready_hosts => {},
75             hold_all => 0,
76             hosts => [ my_hostname() ],
77             %params,
78             }, $pkg;
79 2         45 $queue->addhost($_) for @{$queue->{hosts}};
  2         36  
80 2         15 lock_keys(%$queue);
81 2         32 return $queue;
82             }
83              
84             sub hold
85             {
86 0     0 1 0 my ($self, $new) = @_;
87 0 0       0 $self->{hold_all} = $new if defined $new;
88 0         0 return $self->{hold_all};
89             }
90              
91             sub add
92             {
93 126     126 1 854 my ($queue, $job, $host) = @_;
94 126 50       663 confess "$job not a ref" unless ref $job;
95 126 50       655 confess "$job is not a job" unless $job->isa('Proc::JobQueue::Job');
96              
97 126 100       631 $job->jobnum($queue->{jobnum}++)
98             unless $job->jobnum;
99 126         502 my $jobnum = $job->jobnum();
100              
101 126 50       316 print STDERR "Adding $jobnum - ".ref($job)." to worklist\n" if $debug > 2;
102 126         163 my $q;
103 126 50       249 if ($host) {
104 0 0       0 confess "no $host" unless $queue->{status}{$host};
105 0         0 $q = $queue->{status}{$host}{queue};
106             } else {
107 126         270 $q = $queue->{queue};
108             }
109              
110 126         1025 $q->{$jobnum} = $job;
111              
112 126         282 $job->{dependency_graph} = $queue->{dependency_graph}; # TODO: do this with a method
113              
114 126         543 $job->queue($queue);
115 126         470 $queue->startmore;
116             }
117              
118             # this looks at the dependency queue. startmore_jobs looks at the
119             # at the jobs queue.
120             sub startmore
121             {
122 304     304 1 694 my ($job_queue) = shift;
123              
124 304 100       1018 if ($job_queue->{startmore_in_progress}) {
125 170 50       603 print STDERR "Re-entry to startmore prevented\n" if $debug;
126 170         265 $job_queue->{startmore_in_progress}++;
127 170         1097 return 0;
128             }
129 134         286 $job_queue->{startmore_in_progress} = 2;
130              
131 134         233 my $dependency_graph = $job_queue->{dependency_graph};
132              
133 134         183 my $stuff_started = 0;
134              
135 134         212 my $jq_done;
136              
137 134 50       332 print STDERR "looking for more depenency graph items to queue up\n" if $debug;
138 134         245 eval {
139 134         1217 $job_queue->checkjobs();
140              
141 134         413 while ($job_queue->{startmore_in_progress} > 1) {
142 171         728 $job_queue->{startmore_in_progress} = 1;
143 171 100       873 if ($dependency_graph) {
144 94         1266 while (my @runnable = $dependency_graph->independent(lock => 1)) {
145 38         1797 $stuff_started++;
146 38         234 for my $task (@runnable) {
147 68 50       200 print "Queuing $task->{desc}\n" if $debug;
148 68 100       732 if ($task->can('run_dependency_task')) {
    50          
149 7 50       93 $job_queue->{startmore_in_progress}++ if $task->run_dependency_task($dependency_graph);
150             } elsif ($task->isa('Proc::JobQueue::Job')) {
151 61         387 $job_queue->add($task, $task->{force_host});
152             } else {
153 0         0 die "don't know how to handle $task";
154             }
155             }
156             }
157             }
158              
159 171         6585 $jq_done = $job_queue->startmore_jobs();
160              
161 171 100       2202 redo if $job_queue->{startmore_in_progress} > 1;
162             }
163             };
164 134 50       521 if ($@) {
165 0         0 $job_queue->suicide();
166             };
167              
168 134         559 $job_queue->{startmore_in_progress} = 0;
169              
170 134 100       1835 return $jq_done unless $dependency_graph;
171              
172 57 100 66     472 if ($jq_done && $dependency_graph->alldone) {
    50 33        
173 2         48 print STDERR "Nothing more to do\n";
174 2         29 $job_queue->unloop();
175 2         13 return 1;
176             } elsif ($jq_done && ! $stuff_started) {
177 0 0       0 if (keys %{$dependency_graph->{stuck}}) {
  0         0  
178 0         0 print STDERR "All runnable jobs are done, remaining dependencies are stuck:\n";
179 0         0 for my $o (values %{$dependency_graph->{stuck}}) {
  0         0  
180 0         0 printf "\t%s\n", $dependency_graph->desc($o);
181             }
182 0         0 $job_queue->unloop();
183 0         0 return 1;
184             } else {
185 0         0 print STDERR "Job queue is empty, but dependency graph doesn't think there is any work to be done!\n";
186 0         0 $dependency_graph->dump_graph();
187             }
188             }
189 55         1278 return 0;
190             }
191              
192             sub startmore_jobs
193             {
194 171     171 0 329 my ($queue) = @_;
195 171 50       529 return 0 if $queue->{hold_all};
196 171 50       429 print "# Looking to start more\n" if $debug > 8;
197 171 50       420 confess "no hosts added" unless keys %{$queue->{status}};
  171         840  
198 171         255 my $stuff = 0;
199 171         207 my $new_host_is_over = 0;
200 171         230 while(1) {
201 297         2239 my $redo = 0;
202 297         1757 HOST:
203 297         895 for my $host (keys %{$queue->{ready_hosts}}) {
204 297 50       716 print STDERR "# checking $host to maybe start more jobs\n" if $debug > 3;
205 297         522 my $hr = $queue->{ready_hosts}{$host};
206             JOB:
207 297   33     3073 while ((! $hr->{jobs_per_host} && ! keys %{$hr->{running}}) || $hr->{jobs_per_host} > (keys %{$hr->{running}} || 0)) {
      100        
      33        
208 297 50       757 print STDERR "# there is room for more on $host\n" if $debug > 4;
209 297         1031 $new_host_is_over++
210 297 50       373 if keys(%{$hr->{queue}}) > $queue->{host_overload};
211 297         391 my @q;
212 297         1633 push (@q, $hr->{queue});
213 297 50 33     3149 push (@q, $queue->{queue})
214             if $hr->{jobs_per_host} && ! $queue->{host_is_over};
215 297         745 for my $q (@q) {
216 594 100       2155 next unless keys %$q;
217 180         208 $stuff = 1;
218 180 50       2506 for my $jobnum (reverse sort { $q->{$a}{priority} <=> $q->{$b}{priority} || $a <=> $b } keys %$q) {
  198         2033  
219 207 50       702 print STDERR "# looking to start $jobnum on $host\n" if $debug > 5;
220 207         1042 my $job = $q->{$jobnum};
221 207 100       1298 unless ($job->runnable) {
222 81 50       973 print STDERR "# can't start $jobnum $job->{desc} on $host: not runnable\n" if $debug > 5;
223 81         292 next;
224             }
225 126         2636 delete $q->{$jobnum};
226 126         1244 $queue->startjob($host, $jobnum, $job);
227 126         209017 $queue->set_readiness($host);
228 126         3434 $redo = 1;
229 126         6186 next HOST;
230             }
231             }
232 171         536 last;
233             }
234             }
235 297 100       2794 last unless $redo;
236             }
237 171         340 $queue->{host_is_over} = $new_host_is_over;
238 171 100       794 return 0 if $stuff;
239 64         2749 return $queue->alldone();
240             }
241              
242             sub suicide
243             {
244 0     0 1 0 print STDERR "DIE DIE DIE DIE DIE (DT2): $@";
245             # exit 1; hangs!
246 0         0 POSIX::_exit(1);
247             }
248              
249             # a hook for EventQueue
250 2     2 0 2 sub unloop { }
251              
252              
253             sub startjob
254             {
255 126     126 1 432 my ($queue, $host, $jobnum, $job) = @_;
256 126 50       303 print STDERR "# starting $jobnum $job->{desc} on $host\n" if $debug > 1;
257 126         432 my $hr = $queue->{status}{$host};
258 126         545 $hr->{running}{$jobnum} = $job;
259 126         585 $job->host($host);
260 126         920 $job->start();
261             }
262              
263              
264             # This routine is re-enterant: it may be called from something it calls.
265             sub checkjobs
266             {
267 192     192 1 395 my ($queue) = @_;
268 192         656 my $found = 0;
269 192         371 for my $host (keys %{$queue->{status}}) {
  192         2212  
270 192 50       650 print STDERR "# checking jobs on $host\n" if $debug > 7;
271 192   50     863 my $hr = $queue->{status}{$host} || die;
272 192         301 for my $jobnum (keys %{$hr->{running}}) {
  192         1026  
273 632         4474 my $job = $hr->{running}{$jobnum};
274 632 100       4097 if ($job) {
275 607 50       6578 print STDERR "# checking $jobnum $job->{desc} on $host\n" if $debug > 8;
276 607 100       2170 $found++
277             if defined $job->checkjob($queue);
278             } else {
279 25 50       395 print STDERR "# job $jobnum is undef!\n" if $debug;
280 25         72 delete $hr->{running}{$jobnum};
281 25         127 $found++;
282             }
283              
284             }
285 192         2681 $queue->set_readiness($host);
286             }
287 192         555 return $found;
288             }
289              
290             sub jobdone
291             {
292 120     120 1 359 my ($job_queue, $job, $do_startmore, @exit_code) = @_;
293 120 100       437 if ($job->{dependency_graph}) {
294 60 50       137 if ($exit_code[0]) {
295 0         0 print STDERR "Things dependent on $job->{desc} will never run: @exit_code\n";
296 0         0 $job->{dependency_graph}->stuck_dependency($job, "exit @exit_code");
297             } else {
298 60         697 $job->{dependency_graph}->remove_dependency($job);
299             }
300 60         5878 $job->{dependency_graph} = undef;
301             # unlock_keys(%$job);
302             # $job->{this_is_finished} = 1;
303             # lock_keys(%$job);
304             }
305 120         746 $job_queue->job_part_finished($job, $do_startmore, @exit_code);
306             }
307              
308             sub job_part_finished
309             {
310 126     126 1 326 my ($job_queue, $job, $do_startmore, @exit_code) = @_;
311 126 50       563 $do_startmore = 1 unless defined $do_startmore;
312              
313 126         858 my $host = $job->host;
314 126         602 my $jobnum = $job->jobnum;
315              
316 126 50       347 print STDERR "# job $jobnum $job->{desc} on $host is done\n" if $debug > 5;
317              
318 126 50       458 my $hr = $job_queue->{status}{$host} or confess;
319 126 50       1208 delete $hr->{running}{$jobnum} or confess;
320              
321 126         1536 $job_queue->set_readiness($host);
322              
323 126 50       644 $job_queue->startmore() if $do_startmore;
324             }
325              
326             sub alldone
327             {
328 64     64 1 242 my ($queue, $skip_status) = @_;
329 64 50 33     212 $queue->status() if $debug && ! $skip_status;
330 64 50       137 return 0 if keys %{$queue->{queue}};
  64         335  
331 64         102 for my $host (keys %{$queue->{status}}) {
  64         389  
332 64         130 my $hr = $queue->{status}{$host};
333 64 50       201 return 0 unless $queue->{ready_hosts}{$host};
334 64 50       96 return 0 if keys %{$hr->{queue}};
  64         197  
335 64 100       92 return 0 if keys %{$hr->{running}};
  64         551  
336 5 50       78 next unless $hr->{jobs_per_host} > 0;
337             }
338 5         24 return 1;
339             }
340              
341             my $last_dump = time;
342              
343             sub status
344             {
345 0     0 1 0 my ($queue) = @_;
346 0 0       0 return if time < $last_dump + $status_frequency;
347 0         0 $last_dump = time;
348 0         0 print STDERR "Queue Status\n";
349 0         0 printf STDERR "\titems in main queue: %d, alldone=%d\n", scalar(keys %{$queue->{queue}}), $queue->alldone(1);
  0         0  
350 0 0       0 print STDERR "\tHost overload condition is true\n" if $queue->{host_is_over};
351 0         0 for my $host (sort keys %{$queue->{status}}) {
  0         0  
352 0         0 my $hr = $queue->{status}{$host};
353 0         0 printf STDERR "\titems in queue for %s: %d, items running: %s, host is %sready\n",
354             $host,
355 0         0 scalar(keys(%{$hr->{queue}})),
356 0 0       0 scalar(keys(%{$hr->{running}})),
357             ($queue->{ready_hosts}{$host} ? "" : "not ");
358 0         0 for my $job (values %{$hr->{running}}) {
  0         0  
359 0         0 print STDERR "\t\tRunning: $job->{jobnum} $job->{desc}\n";
360             }
361             }
362 0         0 my $dg = $queue->{dependency_graph};
363 0         0 printf "Dependency Graph items: %d independent (%d locked %d active), %d total, alldone=%s\n",
364 0         0 scalar(keys(%{$dg->{independent}})),
365 0         0 scalar(grep { $_->{dg_lock} } values %{$dg->{independent}}),
  0         0  
366 0         0 scalar(grep { $_->{dg_active} } values %{$dg->{independent}}),
  0         0  
367 0 0       0 scalar(keys(%{$dg->{addrmap}})),
368             $dg->alldone
369             if $dg;
370             }
371              
372             my $canonicalizer;
373             sub get_canonicalizer
374             {
375 162 100   162 0 1688 return $canonicalizer if $canonicalizer;
376 2         11 load($host_canonicalizer);
377 2         1421 $canonicalizer = $host_canonicalizer->new();
378             }
379              
380             sub canonicalize
381             {
382 160     160 0 481 my ($host) = @_;
383 160         337 return get_canonicalizer()->canonicalize($host);
384             }
385              
386             my $my_hostname;
387             sub my_hostname
388             {
389 202 100   202 0 3048 return $my_hostname if $my_hostname;
390 2         8 $my_hostname = get_canonicalizer()->myname();
391             }
392              
393             sub is_remote_host
394             {
395 160     160 0 509 my ($host) = @_;
396 160         387 return my_hostname() ne canonicalize($host);
397             }
398              
399             sub graph
400             {
401 7     7 0 11 my $queue = shift;
402 7 50       52 if (@_) {
    100          
403 0 0       0 die "a dependency graph was already set" if $queue->{dependency_graph};
404 0         0 $queue->{dependency_graph} = shift;
405             } elsif (! $queue->{dependency_graph}) {
406 1         10 $queue->{dependency_graph} = Object::Dependency->new();
407             }
408 7         83 return $queue->{dependency_graph};
409             }
410              
411              
412             1;
413              
414             __END__