File Coverage

blib/lib/TheSchwartz/Moosified/Job.pm
Criterion Covered Total %
statement 15 139 10.7
branch 0 38 0.0
condition 0 17 0.0
subroutine 5 22 22.7
pod 0 11 0.0
total 20 227 8.8


line stmt bran cond sub pod time code
1             package TheSchwartz::Moosified::Job;
2              
3 2     2   8 use Moose;
  2         1  
  2         11  
4 2     2   8460 use Storable ();
  2         3  
  2         35  
5 2     2   6 use TheSchwartz::Moosified::Utils qw/sql_for_unixtime run_in_txn/;
  2         3  
  2         77  
6 2     2   652 use TheSchwartz::Moosified::JobHandle;
  2         3  
  2         2370  
7              
8             has 'jobid' => ( is => 'rw', isa => 'Int' );
9             has 'funcid' => ( is => 'rw', isa => 'Int' );
10             has 'arg' => ( is => 'rw', isa => 'Any' );
11             has 'uniqkey' => ( is => 'rw', isa => 'Maybe[Str]' );
12             has 'insert_time' => ( is => 'rw', isa => 'Maybe[Int]' );
13             has 'run_after' => ( is => 'rw', isa => 'Int', default => sub { time } );
14             has 'grabbed_until' => ( is => 'rw', isa => 'Int', default => 0 );
15             has 'priority' => ( is => 'rw', isa => 'Maybe[Int]' );
16             has 'coalesce' => ( is => 'rw', isa => 'Maybe[Str]' );
17              
18             has 'funcname' => (
19             is => 'rw',
20             isa => 'Str',
21             lazy_build => 1,
22             );
23              
24             has 'handle' => (
25             is => 'rw',
26             isa => 'TheSchwartz::Moosified::JobHandle',
27             handles => [qw(
28             exit_status
29             failure_log
30             failures
31             client
32             dbh
33             )],
34             );
35              
36             has 'did_something' => ( is => 'rw', isa => 'Bool', default => 0 );
37              
38             sub BUILD {
39 0     0 0   my ($self, $params) = @_;
40            
41 0 0         if (my $arg = $params->{arg}) {
42 0 0         if (ref($arg) eq 'SCALAR') {
    0          
43 0           $params->{arg} = Storable::thaw($$arg);
44             } elsif (!ref($arg)) {
45             # if a regular scalar, test to see if it's a storable or not.
46 0           $params->{arg} = _cond_thaw($arg);
47             }
48 0           $self->arg( $params->{arg} );
49             }
50             }
51              
52             sub _build_funcname {
53 0     0     my $self = shift;
54 0 0         my $funcname = $self->client->funcid_to_name($self->dbh, $self->funcid)
55             or die "Failed to lookup funcname of job $self";
56 0           return $funcname;
57             }
58              
59             sub debug {
60 0     0 0   my ($self, $msg) = @_;
61            
62 0           $self->client->debug($msg, $self);
63             }
64              
65             sub as_hashref {
66 0     0 0   my $self = shift;
67              
68 0           my %data;
69 0           for my $col (qw( jobid funcid arg uniqkey insert_time run_after grabbed_until priority coalesce )) {
70 0 0         $data{$col} = $self->$col if $self->can($col);
71             }
72              
73 0           return \%data;
74             }
75              
76             sub add_failure {
77 0     0 0   my $job = shift;
78 0           my $msg = shift;
79 0 0         $msg = '' unless defined $msg;
80            
81 0           my $table_error = $job->handle->client->prefix . 'error';
82 0 0         if (my $len = $job->handle->client->error_length) {
83 0           $msg = substr($msg,0,$len);
84             }
85 0           my $sql = qq~INSERT INTO $table_error (error_time, jobid, message, funcid) VALUES (?, ?, ?, ?)~;
86 0           my $dbh = $job->dbh;
87 0           my $sth = $dbh->prepare($sql);
88 0           $sth->execute(time(), $job->jobid, $msg, $job->funcid);
89              
90             # and let's lazily clean some errors while we're here.
91 0   0       my $maxage = $TheSchwartz::Moosified::T_ERRORS_MAX_AGE || (86400*7);
92 0           my $dtime = time() - $maxage;
93 0           $dbh->do(qq~DELETE FROM $table_error WHERE error_time < $dtime~);
94              
95 0           return 1;
96             }
97              
98             sub completed {
99 0     0 0   my $job = shift;
100            
101 0           $job->debug("job completed");
102 0 0         if ($job->did_something) {
103 0           $job->debug("can't call 'completed' on already finished job");
104 0           return 0;
105             }
106 0           $job->did_something(1);
107             return run_in_txn {
108 0     0     $job->set_exit_status(0);
109 0           $job->remove();
110 0           } $job->dbh;
111             }
112              
113             sub remove {
114 0     0 0   my $job = shift;
115            
116 0           my $jobid = $job->jobid;
117 0           my $table_job = $job->handle->client->prefix . 'job';
118 0           $job->dbh->do(qq~DELETE FROM $table_job WHERE jobid = $jobid~);
119             }
120              
121             sub set_exit_status {
122 0     0 0   my $job = shift;
123 0           my($exit) = @_;
124 0           my $class = $job->funcname;
125 0 0         my $secs = $class->keep_exit_status_for or return;
126              
127 0           my $t = time();
128 0           my $jobid = $job->jobid;
129 0           my $funcid = $job->funcid;
130 0           my @status = ($exit, $t, $t + $secs);
131 0           my $dbh = $job->dbh;
132 0           my $table_exitstatus = $job->handle->client->prefix . 'exitstatus';
133 0           my $needs_update = 0;
134             {
135 0           my $sth = $dbh->prepare(qq{
  0            
136             INSERT INTO $table_exitstatus
137             (funcid, status, completion_time, delete_after, jobid)
138             SELECT ?, ?, ?, ?, ?
139             WHERE NOT EXISTS (
140             SELECT 1 FROM $table_exitstatus WHERE jobid = ?
141             )
142             });
143 0           $sth->execute($funcid, @status, $jobid, $jobid);
144 0           $needs_update = ($sth->rows == 0);
145             }
146 0 0         if ($needs_update) {
147             # only update if this status is newest
148 0           my $sth = $dbh->prepare(qq{
149             UPDATE $table_exitstatus
150             SET status=?, completion_time=?, delete_after=?
151             WHERE jobid = ? AND completion_time < ?
152             });
153 0           $sth->execute(@status, $jobid, $t);
154             }
155              
156             # and let's lazily clean some exitstatus while we're here. but
157             # rather than doing this query all the time, we do it 1/nth of the
158             # time, and deleting up to n*10 queries while we're at it.
159             # default n is 10% of the time, doing 100 deletes.
160 0   0       my $clean_thres = $TheSchwartz::Moosified::T_EXITSTATUS_CLEAN_THRES || 0.10;
161 0 0         if (rand() < $clean_thres) {
162 0           my $unixtime = sql_for_unixtime($dbh);
163 0           $dbh->do(qq~DELETE FROM $table_exitstatus WHERE delete_after < $unixtime~);
164             }
165              
166 0           return 1;
167             }
168              
169             sub permanent_failure {
170 0     0 0   my ($job, $msg, $ex_status) = @_;
171 0 0         if ($job->did_something) {
172 0           $job->debug("can't call 'permanent_failure' on already finished job");
173 0           return 0;
174             }
175 0           $job->_failed($msg, $ex_status, 0);
176             }
177              
178             sub failed {
179 0     0 0   my ($job, $msg, $ex_status) = @_;
180 0 0         if ($job->did_something) {
181 0           $job->debug("can't call 'failed' on already finished job");
182 0           return 0;
183             }
184              
185             ## If this job class specifies that jobs should be retried,
186             ## update the run_after if necessary, but keep the job around.
187              
188 0           my $class = $job->funcname;
189 0           my $failures = $job->failures + 1; # include this one, since we haven't ->add_failure yet
190 0           my $max_retries = $class->max_retries($job);
191              
192 0           $job->debug("job failed. considering retry. is max_retries of $max_retries >= failures of $failures?");
193 0           $job->_failed($msg, $ex_status, $max_retries >= $failures, $failures);
194             }
195              
196             sub _failed {
197 0     0     my ($job, $msg, $exit_status, $_retry, $failures) = @_;
198 0           $job->did_something(1);
199 0   0       $job->debug("job failed: " . ($msg || "<no message>"));
200              
201             run_in_txn {
202             ## Mark the failure in the error table.
203 0     0     $job->add_failure($msg);
204              
205 0 0         if ($_retry) {
206 0           my $table_job = $job->handle->client->prefix . 'job';
207 0           my $class = $job->funcname;
208 0           my @bind;
209 0           my $sql = qq{UPDATE $table_job SET };
210 0 0         if (my $delay = $class->retry_delay($failures)) {
211 0           my $run_after = time() + $delay;
212 0           $job->run_after($run_after);
213 0           push @bind, $run_after;
214 0           $sql .= qq{run_after = ?, };
215             }
216 0           $sql .= q{grabbed_until = 0 WHERE jobid = ?};
217 0           push @bind, $job->jobid;
218 0           $job->dbh->do($sql, {}, @bind);
219             } else {
220 0   0       $job->set_exit_status($exit_status || 1);
221 0           $job->remove();
222             }
223 0           } $job->dbh;
224             }
225              
226             sub replace_with {
227 0     0 0   my $job = shift;
228 0           my(@jobs) = @_;
229              
230 0 0         if ($job->did_something) {
231 0           $job->debug("can't call 'replace_with' on already finished job");
232 0           return 0;
233             }
234             # Note: we don't set 'did_something' here because completed does it down below.
235              
236 0           $job->debug("replacing job with " . (scalar @jobs) . " other jobs");
237              
238             ## The new jobs @jobs should be inserted into the same database as $job,
239             ## which they're replacing.
240 0           for my $new_job (@jobs) {
241 0 0         next unless ref $new_job->arg;
242 0           $new_job->arg( Storable::nfreeze( $new_job->arg ) );
243             }
244              
245             run_in_txn {
246             ## Mark the original job as completed successfully.
247 0     0     $job->completed;
248              
249             ## Insert the new jobs.
250 0           $job->client->_try_insert($_, $job->dbh) for @jobs;
251 0           } $job->dbh;
252             }
253              
254             sub set_as_current {
255 0     0 0   my $job = shift;
256 0           $job->client->current_job($job);
257             }
258              
259             sub _cond_thaw {
260 0     0     my $data = shift;
261              
262 0           my $magic = eval { Storable::read_magic($data); };
  0            
263 0 0 0       if ($magic && $magic->{major} && $magic->{major} >= 2 && $magic->{major} <= 5) {
      0        
      0        
264 0           my $thawed = eval { Storable::thaw($data) };
  0            
265 0 0         if ($@) {
266             # false alarm... looked like a Storable, but wasn't.
267 0           return $data;
268             }
269 0           return $thawed;
270             } else {
271 0           return $data;
272             }
273             }
274              
275 2     2   14 no Moose;
  2         3  
  2         10  
276             1;
277             __END__