File Coverage

blib/lib/TheSchwartz/Job.pm
Criterion Covered Total %
statement 50 176 28.4
branch 12 52 23.0
condition 6 24 25.0
subroutine 11 28 39.2
pod 19 19 100.0
total 98 299 32.7


line stmt bran cond sub pod time code
1             # $Id$
2              
3             package TheSchwartz::Job;
4 24     24   140 use strict;
  24         41  
  24         641  
5 24     24   118 use base qw( Data::ObjectDriver::BaseObject );
  24         45  
  24         1450  
6              
7 24     24   122 use Carp qw( croak );
  24         38  
  24         1074  
8 24     24   12610 use Storable ();
  24         63043  
  24         583  
9 24     24   8409 use TheSchwartz::Error;
  24         50  
  24         544  
10 24     24   7699 use TheSchwartz::ExitStatus;
  24         47  
  24         494  
11 24     24   7406 use TheSchwartz::JobHandle;
  24         56  
  24         122  
12              
13             __PACKAGE__->install_properties(
14             { columns => [
15             qw(jobid funcid arg uniqkey insert_time
16             run_after grabbed_until priority coalesce)
17             ],
18             datasource => 'job',
19             column_defs => { arg => 'blob' },
20             primary_key => 'jobid',
21             }
22             );
23              
24             __PACKAGE__->add_trigger(
25             pre_save => sub {
26             my ($job) = @_;
27             my $arg = $job->arg
28             or return;
29             if ( ref($arg) ) {
30             $job->arg( Storable::nfreeze($arg) );
31             }
32             }
33             );
34              
35             __PACKAGE__->add_trigger(
36             post_load => sub {
37             my ($job) = @_;
38             my $arg = $job->arg
39             or return;
40             $job->arg( _cond_thaw( $job->arg ) );
41             }
42             );
43              
44             sub new_from_array {
45 6     6 1 2533 my $class = shift;
46 6         13 my (@arg) = @_;
47 6 100       283 croak "usage: new_from_array(funcname, arg)" unless @arg == 2;
48 3         10 return $class->new(
49             funcname => $arg[0],
50             arg => $arg[1],
51             );
52             }
53              
54             sub new {
55 9     9 1 4985 my $class = shift;
56 9         23 my (%param) = @_;
57 9         28 my $job = $class->SUPER::new;
58 9 100       83 if ( my $arg = $param{arg} ) {
59 8 100       24 if ( ref($arg) eq 'SCALAR' ) {
    100          
60 2         5 $param{arg} = Storable::thaw($$arg);
61             }
62             elsif ( !ref($arg) ) {
63              
64             # if a regular scalar, test to see if it's a storable or not.
65 2         5 $param{arg} = _cond_thaw($arg);
66             }
67             }
68 9   66     63 $param{run_after} ||= time;
69 9   50     27 $param{grabbed_until} ||= 0;
70 9         20 for my $key ( keys %param ) {
71 37         269 $job->$key( $param{$key} );
72             }
73 8         62 return $job;
74             }
75              
76             sub funcname {
77 8     8 1 10 my $job = shift;
78 8 50       13 if (@_) {
79 8         16 $job->{__funcname} = shift;
80             }
81              
82             # lazily load,
83 8 50       14 if ( !$job->{__funcname} ) {
84 0         0 my $handle = $job->handle;
85 0         0 my $client = $handle->client;
86 0         0 my $driver = $client->driver_for( $handle->dsn_hashed );
87 0 0       0 my $funcname
88             = $client->funcid_to_name( $driver, $handle->dsn_hashed,
89             $job->funcid )
90             or die "Failed to lookup funcname of job $job";
91 0         0 return $job->{__funcname} = $funcname;
92             }
93 8         12 return $job->{__funcname};
94             }
95              
96             sub handle {
97 0     0 1 0 my $job = shift;
98 0 0       0 if (@_) {
99 0         0 $job->{__handle} = $_[0];
100             }
101 0         0 return $job->{__handle};
102             }
103              
104             sub driver {
105 0     0 1 0 my $job = shift;
106 0 0       0 unless ( exists $job->{__driver} ) {
107 0         0 my $handle = $job->handle;
108 0         0 $job->{__driver} = $handle->client->driver_for( $handle->dsn_hashed );
109             }
110 0         0 return $job->{__driver};
111             }
112              
113             sub add_failure {
114 0     0 1 0 my $job = shift;
115 0         0 my ($msg) = @_;
116 0         0 my $error = TheSchwartz::Error->new;
117 0         0 $error->error_time( time() );
118 0         0 $error->jobid( $job->jobid );
119 0         0 $error->funcid( $job->funcid );
120 0   0     0 $error->message( $msg || '' );
121              
122 0         0 my $driver = $job->driver;
123 0         0 $driver->insert($error);
124              
125             # and let's lazily clean some errors while we're here.
126 0         0 my $unixtime = $driver->dbd->sql_for_unixtime;
127 0   0     0 my $maxage = $TheSchwartz::T_ERRORS_MAX_AGE || ( 86400 * 7 );
128 0 0       0 $driver->remove(
129             'TheSchwartz::Error',
130             { error_time => \"< $unixtime - $maxage", },
131             { nofetch => 1,
132             limit => $driver->dbd->can_delete_with_limit ? 1000 : undef,
133             }
134             );
135              
136 0         0 return $error;
137             }
138              
139 0     0 1 0 sub exit_status { shift->handle->exit_status }
140 0     0 1 0 sub failure_log { shift->handle->failure_log }
141 0     0 1 0 sub failures { shift->handle->failures }
142              
143             sub set_exit_status {
144 0     0 1 0 my $job = shift;
145 0         0 my ($exit) = @_;
146 0         0 my $class = $job->funcname;
147 0 0       0 my $secs = $class->keep_exit_status_for or return;
148 0         0 my $status = TheSchwartz::ExitStatus->new;
149 0         0 $status->jobid( $job->jobid );
150 0         0 $status->funcid( $job->funcid );
151 0         0 $status->completion_time(time);
152 0         0 $status->delete_after( $status->completion_time + $secs );
153 0         0 $status->status($exit);
154              
155 0         0 my $driver = $job->driver;
156 0         0 $driver->insert($status);
157              
158             # and let's lazily clean some exitstatus while we're here. but
159             # rather than doing this query all the time, we do it 1/nth of the
160             # time, and deleting up to n*10 queries while we're at it.
161             # default n is 10% of the time, doing 100 deletes.
162 0   0     0 my $clean_thres = $TheSchwartz::T_EXITSTATUS_CLEAN_THRES || 0.10;
163 0 0       0 if ( rand() < $clean_thres ) {
164 0         0 my $unixtime = $driver->dbd->sql_for_unixtime;
165 0 0       0 $driver->remove(
166             'TheSchwartz::ExitStatus',
167             { delete_after => \"< $unixtime", },
168             { nofetch => 1,
169             limit => $driver->dbd->can_delete_with_limit
170             ? int( 1 / $clean_thres * 100 )
171             : undef,
172             }
173             );
174             }
175              
176 0         0 return $status;
177             }
178              
179             sub was_declined {
180 0     0 1 0 my $job = shift;
181 0 0       0 if (@_) {
182 0         0 $job->{__was_declined} = shift;
183             }
184 0         0 return $job->{__was_declined};
185             }
186              
187             sub did_something {
188 0     0 1 0 my $job = shift;
189 0 0       0 if (@_) {
190 0         0 $job->{__did_something} = shift;
191             }
192 0         0 return $job->{__did_something};
193             }
194              
195             sub debug {
196 0     0 1 0 my ( $job, $msg ) = @_;
197 0         0 $job->handle->client->debug( $msg, $job );
198             }
199              
200             sub completed {
201 0     0 1 0 my $job = shift;
202 0         0 $job->debug("job completed");
203 0 0       0 if ( $job->did_something ) {
204 0         0 $job->debug("can't call 'completed' on already finished job");
205 0         0 return 0;
206             }
207 0         0 $job->set_exit_status(0);
208 0         0 $job->driver->remove($job);
209 0         0 $job->did_something(1);
210             }
211              
212             sub permanent_failure {
213 0     0 1 0 my ( $job, $msg, $ex_status ) = @_;
214 0 0       0 if ( $job->did_something ) {
215 0         0 $job->debug("can't call 'permanent_failure' on already finished job");
216 0         0 return 0;
217             }
218 0         0 $job->_failed( $msg, $ex_status, 0 );
219             }
220              
221             sub declined {
222 0     0 1 0 my $job = shift;
223 0         0 my $run_after = shift;
224              
225 0 0       0 if ( $job->did_something ) {
226 0         0 $job->debug("can't call 'declined' on already finished job");
227 0         0 return 0;
228             }
229              
230 0         0 $job->was_declined(1);
231              
232 0 0       0 if ($run_after) {
233 0         0 $job->run_after($run_after);
234 0         0 $job->grabbed_until(0);
235 0         0 $job->driver->update($job);
236 0         0 $job->debug(
237             "job declined. retry will be considered after lease is up at "
238             . $job->run_after );
239             }
240             else {
241 0         0 $job->debug(
242             "job declined. retry will be considered after lease is up at "
243             . $job->grabbed_until );
244             }
245              
246             # we do nothing regarding the job's status
247             }
248              
249             sub failed {
250 0     0 1 0 my ( $job, $msg, $ex_status ) = @_;
251 0 0       0 if ( $job->did_something ) {
252 0         0 $job->debug("can't call 'failed' on already finished job");
253 0         0 return 0;
254             }
255              
256             ## If this job class specifies that jobs should be retried,
257             ## update the run_after if necessary, but keep the job around.
258              
259 0         0 my $class = $job->funcname;
260 0         0 my $failures = $job->failures
261             + 1; # include this one, since we haven't ->add_failure yet
262 0         0 my $max_retries = $class->max_retries($job);
263              
264 0         0 $job->debug(
265             "job failed. considering retry. is max_retries of $max_retries >= failures of $failures?"
266             );
267 0         0 $job->_failed( $msg, $ex_status, $max_retries >= $failures, $failures );
268             }
269              
270             sub _failed {
271 0     0   0 my ( $job, $msg, $exit_status, $_retry, $failures ) = @_;
272 0   0     0 $job->debug( "job failed: " . ( $msg || "" ) );
273              
274             ## Mark the failure in the error table.
275 0         0 $job->add_failure($msg);
276              
277 0 0       0 if ($_retry) {
278 0         0 my $class = $job->funcname;
279 0 0       0 if ( my $delay = $class->retry_delay($failures) ) {
280 0         0 $job->run_after( time() + $delay );
281             }
282 0         0 $job->grabbed_until(0);
283 0         0 $job->driver->update($job);
284             }
285             else {
286 0   0     0 $job->set_exit_status( $exit_status || 1 );
287 0         0 $job->driver->remove($job);
288             }
289 0         0 $job->did_something(1);
290             }
291              
292             sub replace_with {
293 0     0 1 0 my $job = shift;
294 0         0 my (@jobs) = @_;
295              
296 0 0       0 if ( $job->did_something ) {
297 0         0 $job->debug("can't call 'replace_with' on already finished job");
298 0         0 return 0;
299             }
300              
301             # Note: we don't set 'did_something' here because completed does it down below.
302              
303             ## The new jobs @jobs should be inserted into the same database as $job,
304             ## which they're replacing. So get a driver for the database that $job
305             ## belongs to.
306 0         0 my $handle = $job->handle;
307 0         0 my $client = $handle->client;
308 0         0 my $hashdsn = $handle->dsn_hashed;
309 0         0 my $driver = $job->driver;
310              
311 0         0 $job->debug( "replacing job with " . ( scalar @jobs ) . " other jobs" );
312              
313             ## Start a transaction.
314 0         0 $driver->begin_work;
315              
316             ## Insert the new jobs.
317 0         0 for my $j (@jobs) {
318 0         0 $client->insert_job_to_driver( $j, $driver, $hashdsn );
319             }
320              
321             ## Mark the original job as completed successfully.
322 0         0 $job->completed;
323              
324             # for testing
325 0 0       0 if ($TheSchwartz::Job::_T_REPLACE_WITH_FAIL) {
326 0         0 $driver->rollback;
327 0         0 die "commit failed for driver: due to testing\n";
328             }
329              
330             ## Looks like it's all ok, so commit.
331 0         0 $driver->commit;
332             }
333              
334             sub set_as_current {
335 0     0 1 0 my $job = shift;
336 0         0 my $client = $job->handle->client;
337 0         0 $client->set_current_job($job);
338             }
339              
340             sub _cond_thaw {
341 2     2   3 my $data = shift;
342              
343 2         3 my $magic = eval { Storable::read_magic($data); };
  2         4  
344 2 50 33     105 if ( $magic
      33        
      33        
345             && $magic->{major}
346             && $magic->{major} >= 2
347             && $magic->{major} <= 5 )
348             {
349 2         4 my $thawed = eval { Storable::thaw($data) };
  2         3  
350 2 50       46 if ($@) {
351              
352             # false alarm... looked like a Storable, but wasn't.
353 0         0 return $data;
354             }
355 2         7 return $thawed;
356             }
357             else {
358 0           return $data;
359             }
360             }
361              
362             1;
363              
364             __END__