File Coverage

blib/lib/Event/ExecFlow/Job/Group.pm
Criterion Covered Total %
statement 159 356 44.6
branch 46 110 41.8
condition 28 59 47.4
subroutine 28 54 51.8
pod 0 49 0.0
total 261 628 41.5


line stmt bran cond sub pod time code
1             package Event::ExecFlow::Job::Group;
2              
3 2     2   14 use base qw( Event::ExecFlow::Job );
  2         6  
  2         249  
4              
5 2     2   13 use strict;
  2         5  
  2         86  
6 2     2   13 use Scalar::Util qw(weaken);
  2         3  
  2         22849  
7              
8 23     23 0 261 sub get_type { "group" }
9              
10 136     136 0 614 sub get_jobs { shift->{jobs} }
11 0     0 0 0 sub get_fail_with_members { shift->{fail_with_members} }
12 0     0 0 0 sub get_stop_on_failure { shift->{stop_on_failure} }
13 19     19 0 158 sub get_parallel { shift->{parallel} }
14 158     158 0 1071 sub get_scheduler { shift->{scheduler} }
15 26     26 0 144 sub get_member_finished_callbacks { shift->{member_finished_callbacks} }
16              
17 3     3 0 6 sub set_jobs { shift->{jobs} = $_[1] }
18 3     3 0 5 sub set_fail_with_members { shift->{fail_with_members} = $_[1] }
19 3     3 0 7 sub set_stop_on_failure { shift->{stop_on_failure} = $_[1] }
20 3     3 0 5 sub set_parallel { shift->{parallel} = $_[1] }
21 3     3 0 6 sub set_member_finished_callbacks { shift->{member_finished_callbacks} = $_[1] }
22              
23             sub new {
24 3     3 0 29 my $class = shift;
25 3         15 my %par = @_;
26 3         8 my ($jobs, $fail_with_members, $stop_on_failure) =
27             @par{'jobs','fail_with_members','stop_on_failure'};
28 3         5 my ($parallel, $scheduler, $member_finished_callbacks) =
29             @par{'parallel','scheduler','member_finished_callbacks'};
30              
31 3 50       9 $jobs = [] unless defined $jobs;
32 3 50       7 $fail_with_members = 1 unless defined $fail_with_members;
33 3 50       6 $stop_on_failure = 1 unless defined $stop_on_failure;
34              
35 3         16 my $self = $class->SUPER::new(@_);
36              
37 3         5 for my $cb ( $member_finished_callbacks ) {
38 3   33     27 $cb ||= Event::ExecFlow::Callbacks->new;
39 3 50       19 $cb = Event::ExecFlow::Callbacks->new($cb) if ref $cb eq 'CODE';
40             }
41              
42 3         8 $self->set_jobs($jobs);
43 3         7 $self->set_fail_with_members($fail_with_members);
44 3         7 $self->set_stop_on_failure($stop_on_failure);
45 3         6 $self->set_parallel($parallel);
46 3         8 $self->set_scheduler($scheduler);
47 3         7 $self->set_member_finished_callbacks($member_finished_callbacks);
48              
49 3         13 return $self;
50             }
51              
52             sub set_frontend {
53 3     3 0 6 my $self = shift;
54 3         5 my ($frontend) = @_;
55            
56 3         15 $self->SUPER::set_frontend($frontend);
57              
58 3         3 $_->set_frontend($frontend) for @{$self->get_jobs};
  3         12  
59            
60 3         13 return $frontend;
61             }
62              
63             sub set_scheduler {
64 5     5 0 6 my $self = shift;
65 5         6 my ($scheduler) = @_;
66            
67 5         6 $self->{scheduler} = $scheduler;
68            
69 5         6 foreach my $job ( @{$self->get_jobs} ) {
  5         12  
70 23 100       106 $job->set_scheduler($scheduler)
71             if $job->get_type eq 'group';
72             }
73            
74 5         10 return $scheduler;
75             }
76              
77             sub get_exec_type {
78 0     0 0 0 my $self = shift;
79 0         0 my $job = $self->get_next_job;
80 0 0       0 return "sync" if not $job;
81 0         0 return $job->get_exec_type;
82             }
83              
84             sub get_diskspace_consumed {
85 0     0 0 0 my $self = shift;
86            
87 0         0 my $sum = $self->SUPER::get_diskspace_consumed;
88            
89 0         0 $sum += $_->get_diskspace_consumed for @{$self->get_jobs};
  0         0  
90            
91 0         0 return $sum;
92             }
93              
94             sub get_diskspace_freed {
95 0     0 0 0 my $self = shift;
96            
97 0         0 my $sum = $self->SUPER::get_diskspace_freed;
98            
99 0         0 $sum += $_->get_diskspace_freed for @{$self->get_jobs};
  0         0  
100            
101 0         0 return $sum;
102             }
103              
104             sub init {
105 3     3 0 10 my $self = shift;
106              
107 3         20 $self->SUPER::init();
108              
109 3         6 foreach my $job ( @{$self->get_jobs} ) {
  3         7  
110 13         63 $job->set_group($self);
111 13         121 weaken($job->{group});
112 13         33 $self->add_child_post_callback($job);
113             }
114              
115 3         21 $self->set_progress_max($self->get_job_cnt);
116              
117 3         9 1;
118             }
119              
120             sub reset_non_finished_jobs {
121 0     0 0 0 my $self = shift;
122            
123 0 0       0 if ( $self->get_state ne 'finished' ) {
124 0         0 $self->set_state("waiting");
125 0         0 $self->set_cancelled(0);
126 0         0 $self->set_error_message();
127 0         0 $self->get_frontend->report_job_progress($self);
128             }
129            
130 0         0 foreach my $job ( @{$self->get_jobs} ) {
  0         0  
131 0 0       0 if ( $job->get_state ne 'finished' ) {
132 0         0 $job->set_state("waiting");
133 0         0 $job->set_cancelled(0);
134 0         0 $job->set_error_message();
135 0         0 $self->get_frontend->report_job_progress($job);
136             }
137 0 0       0 $job->reset_non_finished_jobs if $job->get_type eq 'group';
138             }
139              
140 0         0 1;
141             }
142              
143             sub get_job_cnt {
144 5     5 0 14 my $self = shift;
145              
146 5         6 my $cnt = 0;
147 5         7 foreach my $job ( @{$self->get_jobs} ) {
  5         10  
148 23         64 $cnt += $job->get_job_cnt;
149             }
150            
151 5         23 return $cnt;
152             }
153              
154             sub init_progress_state {
155 0     0 0 0 my $self = shift;
156            
157 0         0 my $progress_cnt = 0;
158 0         0 foreach my $job ( @{$self->get_jobs} ) {
  0         0  
159 0 0       0 if ( $job->get_type eq 'group' ) {
160 0         0 $job->init_progress_state;
161 0         0 $progress_cnt += $job->get_progress_cnt;
162             }
163             else {
164 0 0 0     0 ++$progress_cnt if $job->get_state eq 'finished' ||
165             $job->get_state eq 'error';
166             }
167             }
168              
169 0         0 $self->set_progress_cnt($progress_cnt);
170 0         0 $self->set_progress_max($self->get_job_cnt);
171              
172 0 0       0 $self->set_state("finished")
173             if $self->get_progress_cnt == $self->get_progress_max;
174              
175 0         0 1;
176             }
177              
178             sub set_group_in_all_childs {
179 0     0 0 0 my $self = shift;
180              
181 0         0 foreach my $job ( @{$self->get_jobs} ) {
  0         0  
182 0 0       0 if ( $job->get_type eq 'group' ) {
183 0         0 $job->set_group($self);
184 0         0 weaken($job->{group});
185 0         0 $job->set_group_in_all_childs;
186             }
187             else {
188 0         0 $job->set_group($self);
189 0         0 weaken($job->{group});
190             }
191             }
192              
193 0         0 1;
194             }
195              
196             sub increase_progress_max {
197 0     0 0 0 my $self = shift;
198 0         0 my ($add) = @_;
199              
200 0         0 my $job = $self;
201 0         0 while ( $job ) {
202 0         0 $job->set_progress_max($job->get_progress_max + $add);
203 0         0 $job = $job->get_group;
204             }
205              
206 0         0 1;
207             }
208              
209             sub decrease_progress_max {
210 0     0 0 0 my $self = shift;
211 0         0 my ($del) = @_;
212              
213 0         0 my $job = $self;
214 0         0 while ( $job ) {
215 0         0 $job->set_progress_max($job->get_progress_max - $del);
216 0         0 $job = $job->get_group;
217             }
218            
219 0         0 1;
220             }
221              
222             sub increase_progress_cnt {
223 0     0 0 0 my $self = shift;
224 0         0 my ($add) = @_;
225              
226 0         0 my $job = $self;
227 0         0 while ( $job ) {
228 0         0 $job->set_progress_cnt($job->get_progress_cnt + $add);
229 0         0 $job = $job->get_group;
230             }
231              
232 0         0 1;
233             }
234              
235             sub decrease_progress_cnt {
236 0     0 0 0 my $self = shift;
237 0         0 my ($del) = @_;
238              
239 0         0 my $job = $self;
240 0         0 while ( $job ) {
241 0         0 $job->set_progress_cnt($job->get_progress_cnt - $del);
242 0         0 $job = $job->get_group;
243             }
244            
245 0         0 1;
246             }
247              
248             sub add_job {
249 0     0 0 0 my $self = shift;
250 0         0 my ($job) = @_;
251            
252 0         0 push @{$self->get_jobs}, $job;
  0         0  
253            
254 0         0 $job->set_frontend($self->get_frontend);
255 0         0 $job->set_group($self);
256 0         0 weaken($job->{group});
257              
258 0         0 my $job_cnt = $job->get_job_cnt;
259 0 0       0 $self->increase_progress_max($job_cnt) if $job_cnt != 0;
260              
261 0 0 0     0 if ( $self->get_state eq 'finished' ||
262             $self->get_state eq 'error' ) {
263 0         0 $self->set_state("waiting");
264             }
265              
266 0         0 $self->add_child_post_callback($job);
267              
268 0         0 $self->get_frontend->report_job_added($job);
269              
270 0         0 1;
271             }
272              
273             sub remove_job {
274 0     0 0 0 my $self = shift;
275 0         0 my ($job) = @_;
276            
277 0         0 my $jobs = $self->get_jobs;
278            
279 0         0 my $i;
280 0         0 for ( $i=0; $i < @{$jobs}; ++$i ) {
  0         0  
281 0 0       0 last if $jobs->[$i] eq $job;
282             }
283            
284 0         0 die "Job with ID ".$job->get_id." no member of this group"
285 0 0       0 if $i == @{$jobs};
286              
287 0         0 splice @{$jobs}, $i, 1;
  0         0  
288              
289 0         0 my $job_cnt = $job->get_job_cnt;
290 0 0       0 $self->decrease_progress_max($job_cnt) if $job_cnt != 0;
291              
292 0         0 $self->get_frontend->report_job_removed($job);
293              
294 0         0 1;
295             }
296              
297             sub get_job_by_name {
298 8     8 0 83 my $self = shift;
299 8         40 my ($job_name) = @_;
300            
301 8         17 foreach my $job ( @{$self->get_jobs} ) {
  8         24  
302 8 50       58 return $job if $job->get_name eq $job_name;
303             }
304            
305 0         0 die "Job '$job_name' not member of group '".$self->get_name."'";
306             }
307              
308             sub execute {
309 33     33 0 58 my $self = shift;
310 33         66 my %par = @_;
311 33         60 my ($skip) = $par{'skip'};
312            
313 33 100       93 $skip = "" if ! defined $skip;
314              
315 33         47 my $blocked_job;
316 33         38 while ( 1 ) {
317 46 100 66     354 if ( $self->get_cancelled
      33        
      66        
318             || $self->all_jobs_finished
319             || ( $self->get_error_message &&
320             $self->get_stop_on_failure ) ) {
321 3         46 $self->execution_finished;
322 3 50 33     19 if ( $self->get_scheduler &&
323             $self->get_scheduler->is_exclusive ) {
324 0         0 $self->get_scheduler->run;
325             }
326 3         14 return;
327             }
328              
329 43 50 33     152 return if $self->get_scheduler &&
330             $self->get_scheduler->is_exclusive;
331            
332 43         213 my $job = $self->get_next_job(blocked=>$blocked_job);
333 43 50 66     237 next if defined $job && "$job" eq "$skip";
334              
335 43 100       128 if ( !$job ) {
336 23         316 $self->try_reschedule_jobs(skip => $skip);
337 23         39 last;
338             }
339              
340 20 50       53 if ( $self->get_scheduler ) {
341 20         53 my $state = $self->get_scheduler->schedule_job($job);
342 20 100       101 return if $state eq 'sched-blocked';
343 13 50       30 if ( $state eq 'job-blocked' ) {
344 0         0 $blocked_job = $job;
345 0         0 next;
346             }
347 13 50       38 die "Illegal scheduler state '$state'"
348             unless $state eq 'ok';
349             }
350              
351 13         53 $self->start_child_job($job);
352              
353 13 50       133 last if !$self->get_parallel;
354             }
355            
356 23         77 1;
357             }
358              
359             sub try_reschedule_jobs {
360 23     23 0 35 my $self = shift;
361 23         85 my %par = @_;
362 23         52 my ($skip) = $par{'skip'};
363              
364 23         32 my $executed = 0;
365 23         44 foreach my $job ( @{$self->get_jobs} ) {
  23         55  
366 91 100       288 next if "$job" eq "$skip";
367              
368             # Parallel execution groups which are running now
369             # probably can execute more job, so give it a try.
370 81 100 100     745 if ( $job->get_type eq 'group' &&
      66        
371             $job->get_state eq 'running' &&
372             $job->get_parallel ) {
373 6         33 $job->execute;
374 6         62 $executed = 1;
375             }
376             }
377            
378 23 100 100     4425 if ( !$executed && $self->get_group ) {
379 11         42 $self->get_group->execute(skip => $self);
380             }
381            
382 23         101 1;
383             }
384              
385             sub cancel {
386 0     0 0 0 my $self = shift;
387            
388 0         0 $self->set_cancelled(1);
389 0   0     0 $_->get_state eq 'running' && $_->cancel for @{$self->get_jobs};
  0         0  
390            
391 0         0 1;
392             }
393              
394             sub pause_job {
395 0     0 0 0 my $self = shift;
396            
397 0   0     0 $_->get_state eq 'running' && $_->pause for @{$self->get_jobs};
  0         0  
398            
399 0         0 1;
400             }
401              
402             sub reset {
403 0     0 0 0 my $self = shift;
404            
405 0         0 foreach my $job ( @{$self->get_jobs} ) {
  0         0  
406 0 0       0 if ( $job->reset ) {
407 0         0 $self->decrease_progress_cnt($job->get_job_cnt);
408             }
409             }
410            
411 0         0 $self->get_frontend->report_job_progress($self);
412              
413 0 0       0 return $self->SUPER::reset() if $self->get_progress_cnt == 0;
414              
415 0         0 0;
416             }
417              
418             sub add_child_post_callback {
419 13     13 0 16 my $self = shift;
420 13         17 my ($job) = @_;
421            
422 13 50       38 if ( $job->{_post_callbacks_added} ) {
423 0         0 return;
424 0         0 require Carp;
425 0         0 Carp::confess($job->get_info.": callbacks added twice!");
426             }
427 13         28 $job->{_post_callbacks_added} = 1;
428            
429             $job->get_post_callbacks->add( sub {
430 13     13   25 my ($job) = @_;
431 13         59 $self->child_job_finished($job);
432 13         90 1;
433 13         101 });
434              
435 13         34 1;
436             }
437              
438             sub start_child_job {
439 13     13 0 24 my $self = shift;
440 13         19 my ($job) = @_;
441            
442 13 50       32 $Event::ExecFlow::DEBUG && print "Group->start_child_job(".$job->get_info.")\n";
443              
444 13 50       60 $self->set_progress_cnt(0) unless defined $self->get_progress_cnt;
445 13         80 $self->get_frontend->report_job_progress($self);
446              
447 13         239 $job->start;
448              
449 13         109 1;
450             }
451              
452             sub child_job_finished {
453 13     13 0 34 my $self = shift;
454 13         20 my ($job) = @_;
455            
456 13 50       31 $Event::ExecFlow::DEBUG && print "Group->child_job_finished(".$job->get_info.")\n";
457              
458 13 50       46 $self->get_member_finished_callbacks->execute()
459             if $self->get_member_finished_callbacks;
460              
461 13 50 33     49 if ( $job->get_error_message && !$job->get_cancelled ) {
462 0 0       0 if ( $self->get_fail_with_members ) {
463 0         0 $self->set_state("error");
464 0         0 $self->add_job_error_message($job);
465 0         0 $self->get_frontend->report_job_error($self);
466             }
467             }
468              
469 13 50       36 if ( $self->get_scheduler ) {
470 13         32 $self->get_scheduler->job_finished($job);
471             }
472              
473 13         67 $self->execute;
474              
475 13         48 1;
476             }
477              
478             sub add_job_error_message {
479 0     0 0 0 my $self = shift;
480 0         0 my ($job) = @_;
481              
482 0   0     0 my $error_message = $self->get_error_message || "";
483              
484 0         0 $error_message .=
485             "Job '".$job->get_info."' ".
486             "failed with error message:\n".
487             $job->get_error_message."\n".
488             ("-"x80)."\n";
489              
490 0         0 $self->set_error_message($error_message);
491              
492 0         0 1;
493             }
494              
495             sub get_first_job {
496 0     0 0 0 my $self = shift;
497 0         0 return $self->get_jobs->[0];
498             }
499              
500             sub get_next_job {
501 43     43 0 63 my $self = shift;
502 43         179 my %par = @_;
503 43         74 my ($blocked) = $par{'blocked'};
504              
505 43 50       144 $blocked = "" if ! defined $blocked;
506              
507 43         46 my $next_job;
508 43         59 foreach my $job ( @{$self->get_jobs} ) {
  43         481  
509 149 50 33     1665 next if defined $job && "$job" eq "$blocked";
510 149 50       553 $Event::ExecFlow::DEBUG && print "Group(".$self->get_info.")->get_next_job: check ".$job->get_info."=>".$job->get_state."\n";
511 149 100 100     406 if ( $job->get_state eq 'waiting' &&
512             $self->dependencies_ok($job) ) {
513 20         25 $next_job = $job;
514 20         42 last;
515             }
516             }
517            
518 43 0       120 $Event::ExecFlow::DEBUG && print "Group(".$self->get_info.")->get_next_job=".
    50          
519             ($next_job ? $next_job->get_info : "NOJOB")."\n";
520            
521 43         110 return $next_job;
522             }
523              
524             sub dependencies_ok {
525 27     27 0 52 my $self = shift;
526 27         36 my ($job) = @_;
527              
528 27         42 foreach my $dep_job_name ( @{$job->get_depends_on} ) {
  27         6172  
529 8         55 my $dep_job = $self->get_job_by_name($dep_job_name);
530 8 50       28 $Event::ExecFlow::DEBUG && print "Job(".$job->get_info.")->dependencies_ok: check ".$dep_job->get_info." =>".$dep_job->get_state."\n";
531 8 100       27 return if $dep_job->get_state ne 'finished';
532             }
533              
534 20         84 return 1;
535             }
536              
537             sub all_jobs_finished {
538 46     46 0 72 my $self = shift;
539              
540 46         58 foreach my $job ( @{$self->get_jobs} ) {
  46         148  
541 91 100 66     643 return 0 if $job->get_state eq 'waiting' ||
      100        
542             $job->get_state eq 'error' ||
543             $job->get_state eq 'running';
544             }
545            
546 3         24 return 1;
547             }
548              
549             sub get_max_diskspace_consumed {
550 0     0 0   my $self = shift;
551 0           my ($currently_consumed, $max_consumed) = @_;
552              
553 0           foreach my $job ( @{$self->get_jobs} ) {
  0            
554 0           ($currently_consumed, $max_consumed) =
555             $job->get_max_diskspace_consumed
556             ($currently_consumed, $max_consumed);
557             }
558              
559 0           return ($currently_consumed, $max_consumed);
560             }
561              
562             sub backup_state {
563 0     0 0   my $self = shift;
564            
565 0           my $data_href = $self->SUPER::backup_state();
566            
567 0           delete $data_href->{jobs};
568 0           delete $data_href->{scheduler};
569 0           delete $data_href->{member_finished_callbacks};
570              
571 0           my $jobs = $self->get_jobs;
572 0           foreach my $job ( @{$jobs} ) {
  0            
573 0           push @{$data_href->{jobs}},
  0            
574             $job->backup_state;
575             }
576            
577 0           return $data_href;
578             }
579              
580             sub restore_state {
581 0     0 0   my $self = shift;
582 0           my ($data_href) = @_;
583              
584 0           my $jobs = $self->get_jobs;
585            
586 0           $self->SUPER::restore_state($data_href);
587            
588 0           my $job_states = delete $self->{jobs};
589              
590 0           my $i = 0;
591 0           foreach my $job ( @{$jobs} ) {
  0            
592 0           $job->restore_state($job_states->[$i]);
593 0           ++$i;
594             }
595            
596 0           $self->set_jobs($jobs);
597              
598 0           1;
599             }
600              
601             sub add_stash_to_all_jobs {
602 0     0 0   my $self = shift;
603 0           my ($add_stash) = @_;
604              
605 0           $self->add_stash($add_stash);
606            
607 0           foreach my $job ( @{$self->get_jobs} ) {
  0            
608 0 0         if ( $job->get_type eq 'group' ) {
609 0           $job->add_stash_to_all_jobs($add_stash);
610             }
611             else {
612 0           $job->add_stash($add_stash);
613             }
614             }
615             }
616              
617             sub traverse_all_jobs {
618 0     0 0   my $self = shift;
619 0           my ($code) = @_;
620              
621 0           foreach my $job ( @{$self->get_jobs} ) {
  0            
622 0           $code->($job);
623 0 0         if ( $job->get_type eq 'group' ) {
624 0           $job->traverse_all_jobs($code);
625             }
626             }
627              
628 0           1;
629             }
630              
631             sub get_job_with_id {
632 0     0 0   my $self = shift;
633 0           my ($job_id) = @_;
634            
635 0           my $job;
636             $self->traverse_all_jobs(sub{
637 0 0   0     $job = $_[0] if $_[0]->get_id eq $job_id;
638 0           });
639              
640 0           return $job;
641             }
642              
643             1;
644              
645             __END__