File Coverage

blib/lib/Proc/JobQueue/Job.pm
Criterion Covered Total %
statement 98 126 77.7
branch 43 80 53.7
condition 3 9 33.3
subroutine 19 22 86.3
pod 9 13 69.2
total 172 250 68.8


line stmt bran cond sub pod time code
1              
2             package Proc::JobQueue::Job;
3              
4 4     4   24 use strict;
  4         8  
  4         146  
5 4     4   22 use warnings;
  4         8  
  4         180  
6 4     4   25 use Hash::Util qw(lock_keys);
  4         7  
  4         40  
7 4     4   234 use Carp qw(confess);
  4         9  
  4         176  
8 4     4   22 use Tie::Function::Examples qw(%q_shell);
  4         8  
  4         391  
9 4     4   3657 use Callback;
  4         7058  
  4         216  
10 4     4   3610 use Proc::Background;
  4         56397  
  4         219  
11 4     4   44 use Proc::JobQueue qw(is_remote_host);
  4         8  
  4         241  
12 4     4   24 use Scalar::Util qw(weaken);
  4         68  
  4         6491  
13              
14             our $debug = $Proc::JobQueue::debug;
15              
16             sub new
17             {
18 130     130 0 1308 my ($pkg, %params) = @_;
19 130         3033 my $job = bless {
20             desc => '',
21             priority => 100,
22             procbg => undef,
23             run => undef,
24             command => undef,
25             queue => undef,
26             jobnum => undef,
27             postcb => undef,
28             generate_command => undef,
29             callback => undef,
30             host => undef,
31             jobnum => undef,
32             on_failure => undef,
33             errors => undef,
34             status => 'queued',
35             dependency_graph => undef,
36             force_host => undef,
37             %params
38             }, $pkg;
39 130         1180 lock_keys(%$job);
40 130 50       2500 if ($job->{queue}) {
41 0         0 $job->{queue}->add($job);
42             }
43 130 50 66     314 unless ($job->can_command || $job->can_callback) {
44 0         0 confess "$pkg job needs a command or a callback";
45             }
46 130 100       452 unless ($job->{desc}) {
47 10 50       29 if ($job->{command}) {
48 0         0 $job->{desc} = $job->{command};
49             } else {
50 10         45 $job->{desc} = "$job"; # stringify
51             }
52             }
53 130         827 return $job;
54             }
55              
56             sub can_command
57             {
58 185     185 0 707 my ($job) = @_;
59 185 100       455 return 1 if $job->{command};
60 125 50       395 return 1 if $job->{generate_command};
61 125 100       811 return 1 if $job->can('command');
62 5         65 return 0;
63             }
64              
65             sub can_callback
66             {
67 5     5 0 20 my ($job) = @_;
68 5 50       12 return 1 if $job->{callback};
69 5 50       32 return 1 if $job->can('startup');
70 0         0 return 0;
71             }
72              
73             sub start
74             {
75 186     186 1 1264 my ($job) = @_;
76              
77 186         555 my $host = $job->{host};
78 186         352 my $queue = $job->{queue};
79 186         378 my $jobnum = $job->{jobnum};
80              
81 186         1671 $job->{status} = 'started';
82              
83 186   33     4424 my $command = $job->{command}
84             || ($job->{generate_command} && $job->{generate_command}->($job))
85             || ($job->can('command') && $job->command())
86             ;
87              
88 186 100       7175 if ($command) {
    50          
    50          
89 120 50       966 $job->{desc} = $command
90             unless $job->{desc};
91 120 50       1090 if (is_remote_host($host)) {
92 0         0 $command = "ssh $host -o BatchMode=yes -o StrictHostKeyChecking=no $q_shell{$command}";
93             }
94 120 50       1098 $job->{run} = $command
95             unless $job->{run};
96 120         6759 print "+ $command\n";
97 120         1039 $job->{procbg} = Proc::Background->new($command);
98             } elsif ($job->{callback}) {
99 0 0       0 if (ref($job->{callback}) eq 'Callback') {
100 0         0 $job->{callback}->call($job);
101             } else {
102 0         0 $job->{callback}->($job, $host, $jobnum, $queue);
103             }
104             } elsif ($job->can('startup')) {
105 66         274 $job->startup($job, $host, $jobnum, $queue);
106             } else {
107 0         0 die "don't know how to start $job";
108             }
109             }
110              
111             sub host
112             {
113 537     537 1 2729 my ($job, $host) = @_;
114 537 100       1994 $job->{host} = $host if defined $host;
115 537         2125 return $job->{host};
116             }
117              
118             sub jobnum
119             {
120 1532     1532 1 25075 my ($job, $jobnum) = @_;
121 1532 100       4034 $job->{jobnum} = $jobnum if defined $jobnum;
122 1532         6502 return $job->{jobnum};
123             }
124              
125             sub queue
126             {
127 126     126 1 835 my ($job, $queue) = @_;
128 126 50       349 if ($queue) {
129 126         364 $job->{queue} = $queue;
130 126         594 weaken $job->{queue};
131             }
132 126         404 return $job->{queue};
133             }
134              
135             sub runnable
136             {
137 86     86 1 964 return 1;
138             }
139              
140             sub checkjob
141             {
142 607     607 1 5166 my ($job) = @_;
143 607 50       1614 print STDERR "# checking up on $job->{jobnum} $job->{desc} on $job->{host}\n" if $debug > 6;
144 607 50       2458 unless ($job->{procbg}) {
145 0 0       0 print STDERR "# $job->{jobnum} is not a Proc::Background job\n" if $debug > 9;
146 0         0 return undef;
147             }
148 607 100       2612 if ($job->{procbg}->alive) {
149 487 50       16339 print STDERR "# $job->{jobnum} $job->{desc} is still alive\n" if $debug > 6;
150 487         2214 return undef;
151             }
152 120         11375 my $queue = $job->{queue};
153 120         1289 my $e = $job->{procbg}->wait;
154 120         1718 $e >>= 8;
155 120         9288 print "# $job->{desc} on $job->{host} finished\n";
156 120         1042 $job->finished($e);
157 120         1773 return $e;
158             }
159              
160             sub finished
161             {
162 240     240 1 2729 my ($job, @exit_code) = @_;
163 240 100       4234 return if $job->{status} eq 'finished';
164 180         946 $job->{status} = 'finished';
165 180 50       565 die "NO JOBNUM FOR $job" unless $job->{jobnum};
166 180 50       431 print STDERR "# FINISHED $job->{jobnum} $job->{desc} on $job->{host}\n" if $debug > 7;
167 180 50       508 if ($job->{postcb}) {
168 0         0 $_->call($job, @exit_code)
169 0         0 for @{$job->{postcb}};
170 0         0 delete $job->{postcb}; # may clean circular references
171             }
172 180         528 my $queue = $job->{queue};
173 180         546 undef $job->{queue};
174 180 100       424 if ($queue) {
175 120 50       316 if ($job->{jobnum}) {
176 120 50       346 print STDERR "# calling JOBDONE for $job->{jobnum} $job->{desc} ($job->{status})\n" if $debug > 5;
177 120         1273 $queue->jobdone($job, 0, @exit_code); # not re-entrant since startmore == 0
178             } else {
179 0 0       0 print STDERR "# NOT calling JOBDONE for $job->{jobnum} $job->{desc} ($job->{status})\n" if $debug;
180             }
181             }
182 180 50       410 if ($exit_code[0]) {
183 0 0       0 print STDERR "# calling failed(@exit_code) for $job->{jobnum} $job->{desc}\n" if $debug > 6;
184 0         0 $job->failed(@exit_code);
185             } else {
186 180 50       562 print STDERR "# calling success() for $job->{jobnum} $job->{desc}\n" if $debug > 7;
187 180         1517 $job->success();
188 180 50       3059 print STDERR "# done calling success() for $job->{jobnum} $job->{desc}\n" if $debug > 9;
189             }
190 180 100       1271 $queue->startmore if $queue; # can be re-entrant
191             }
192              
193              
194             sub success
195             {
196 0     0 0   my ($job) = @_;
197 0 0         print STDERR "# Empty success on $job->{jobnum} $job->{desc}\n" if $debug > 8;
198             }
199              
200             sub addpostcb
201             {
202 0     0 1   my ($job, $cb1, @more) = @_;
203 0           my $cb = new Callback($cb1, @more);
204 0 0         $job->{postcb} = []
205             unless $job->{postcb};
206 0           push(@{$job->{postcb}}, $cb);
  0            
207             }
208              
209             sub failed
210             {
211 0     0 1   my ($job, @exit_code) = @_;
212 0 0 0       if ($job->{queue} && $job->{queue}{on_failure}) {
213 0           $job->{queue}{on_failure}->($job, @exit_code);
214             } else {
215 0           die "job $job->{desc} failed with @exit_code";
216             }
217             }
218              
219             1;
220              
221             __END__