File Coverage

blib/lib/Minion/Backend/Storable.pm
Criterion Covered Total %
statement 203 203 100.0
branch 59 66 89.3
condition 87 100 87.0
subroutine 42 42 100.0
pod 18 18 100.0
total 409 429 95.3


line stmt bran cond sub pod time code
1             package Minion::Backend::Storable;
2 8     8   211233 use Minion::Backend -base;
  8         4993  
  8         66  
3              
4             our $VERSION = 6.051;
5              
6 8     8   2287 use Sys::Hostname 'hostname';
  8         2740  
  8         389  
7 8     8   56 use Time::HiRes qw(time usleep);
  8         18  
  8         76  
8              
9             # Attributes
10              
11             has 'file';
12              
13             # Methods
14              
15             sub broadcast {
16 7   100 7 1 1133 my ($self, $command, $args, $ids) = (shift, shift, shift || [], shift || []);
      100        
17              
18 7         24 my $guard = $self->_guard->_write;
19 7         24 my $inboxes = $guard->_inboxes;
20 7         1144 my $workers = $guard->_workers;
21 7 50       40 @$ids = @$ids ? map exists($workers->{$_}), @$ids
    100          
22             : keys %$workers unless @$ids;
23              
24 7   100     22 push @{$inboxes->{$_} ||= []}, [$command, @$args] for @$ids;
  10         54  
25              
26 7         31 return !!@$ids;
27             }
28              
29             sub dequeue {
30 237     237 1 32893 my ($self, $id, $wait, $options) = @_;
31 237 100       888 usleep $wait * 1_000_000 unless my $job = $self->_try($id, $options);
32 237   66     1627 return $job || $self->_try($id, $options);
33             }
34              
35             sub enqueue {
36 172   100 172 1 137956 my ($self, $task, $args, $options) = (shift, shift, shift || [], shift || {});
      100        
37              
38 172         548 my $guard = $self->_guard->_write;
39              
40             my $job = {
41             args => $args,
42             attempts => $options->{attempts} // 1,
43             created => time,
44             delayed => time + ($options->{delay} // 0),
45             id => $guard->_job_id,
46             parents => $options->{parents} || [],
47             priority => $options->{priority} // 0,
48 172   100     2124 queue => $options->{queue} // 'default',
      100        
      100        
      100        
      100        
49             retries => 0,
50             state => 'inactive',
51             task => $task
52             };
53 172         607 $guard->_jobs->{$job->{id}} = $job;
54              
55 172         678 return $job->{id};
56             }
57              
58 63     63 1 6497361 sub fail_job { shift->_update(1, @_) }
59              
60 138     138 1 317866 sub finish_job { shift->_update(0, @_) }
61              
62             sub job_info {
63 438     438 1 126233 my ($self, $id) = @_;
64 438         1339 my $guard = $self->_guard;
65 438         1474 my $jobs = $guard->_jobs;
66 438 100       58904 my $job = $jobs->{$id} or return undef;
67 413         1198 $job->{children} = $guard->_children($id);
68 413         1349 return $job;
69             }
70              
71             sub list_jobs {
72 35     35 1 89145 my ($self, $offset, $limit, $options) = @_;
73              
74 35         120 my $guard = $self->_guard;
75 100         2760 my @jobs = sort { $b->{created} <=> $a->{created} }
76             grep +( (not exists $options->{queue} or $_->{queue} eq $options->{queue})
77             and (not exists $options->{state} or $_->{state} eq $options->{state})
78             and (not exists $options->{task} or $_->{task} eq $options->{task})
79 35   100     90 ), values %{$guard->_jobs};
  35         110  
80              
81 35   33     2155 return [map +($_->{children} = $guard->_children($_->{id}) and $_), grep defined, @jobs[$offset .. ($offset + $limit - 1)]];
82             }
83              
84             sub list_workers {
85 20     20 1 19035 my ($self, $offset, $limit) = @_;
86 20         65 my $guard = $self->_guard;
87 40         140 my @workers = map { $self->_worker_info($guard, $_->{id}) }
88 20         65 sort { $b->{started} <=> $a->{started} } values %{$guard->_workers};
  20         2150  
  20         85  
89 20         90 return [grep {defined} @workers[$offset .. ($offset + $limit - 1)]];
  65         225  
90             }
91              
92 7     7 1 191 sub new { shift->SUPER::new(file => shift) }
93              
94             sub receive {
95 5     5 1 154 my ($self, $id) = @_;
96 5         14 my $guard = $self->_guard->_write;
97 5         20 my $inboxes = $guard->_inboxes;
98 5   100     824 my $inbox = $inboxes->{$id} || [];
99 5         13 $inboxes->{$id} = [];
100 5         18 return $inbox;
101             }
102              
103             sub register_worker {
104 151   50 151 1 975397 my ($self, $id, $options) = (shift, shift, shift || {});
105 151         534 my $guard = $self->_guard->_write;
106 151 100       683 my $worker = $id ? $guard->_workers->{$id} : undef;
107 151 100       4577 unless ($worker) {
108 115         660 $worker = {host => hostname, id => $guard->_id, pid => $$, started => time};
109 115         392 $guard->_workers->{$worker->{id}} = $worker;
110             }
111 151   50     927 @$worker{qw(notified status)} = (time, $options->{status} || {});
112 151         636 return $worker->{id};
113             }
114              
115             sub remove_job {
116 30     30 1 4100 my ($self, $id) = @_;
117 30         85 my $guard = $self->_guard;
118 30 50       135 delete $guard->_write->_jobs->{$id}
119             if my $removed = !!$guard->_job($id, qw(failed finished inactive));
120 30         155 return $removed;
121             }
122              
123             sub repair {
124 33     33 1 1179 my $self = shift;
125 33         116 my $minion = $self->minion;
126              
127             # Workers without heartbeat
128 33         144 my $guard = $self->_guard->_write;
129 33         98 my $workers = $guard->_workers;
130 33         3445 my $jobs = $guard->_jobs;
131 33         166 my $after = time - $minion->missing_after;
132 33   66     342 $_->{notified} < $after and delete $workers->{$_->{id}} for values %$workers;
133              
134             # Jobs with missing parents (cannot be retried)
135 33         126 for my $job (values %$jobs) {
136 183 100 66     817 next unless $job->{parents} and $job->{state} eq 'inactive';
137 1         6 my $missing;
138 1         3 for my $p (@{$job->{parents}}) {
  1         4  
139 1 50 0     5 ++$missing and last unless exists $jobs->{$job->{id}};
140             }
141 1         7 @$job{qw(finished result state)} = (time, 'Parent went away', 'failed');
142             }
143              
144             # Old jobs without unfinished dependents
145 33         160 $after = time - $minion->remove_after;
146 33         223 for my $job (values %$jobs) {
147 183 100 100     696 next unless $job->{state} eq 'finished' and $job->{finished} <= $after;
148 31         88 my $id = $job->{id};
149             delete $jobs->{$id} unless grep +($jobs->{$_}{state} ne 'finished'),
150 31 100       60 @{$guard->_children($id)};
  31         119  
151             }
152              
153             # Jobs with missing worker (can be retried)
154             my @abandoned = map [@$_{qw(id retries)}],
155 33   100     306 grep +($_->{state} eq 'active' and not exists $workers->{$_->{worker}}),
156             values %$jobs;
157 33         108 undef $guard;
158 33         198 $self->fail_job(@$_, 'Worker went away') for @abandoned;
159              
160 33         330 return;
161             }
162              
163 13     13 1 10668 sub reset { shift->_guard->_spurt({}) }
164              
165             sub retry_job {
166 65   100 65 1 15672 my ($self, $id, $retries, $options) = (shift, shift, shift, shift || {});
167              
168 65         199 my $guard = $self->_guard;
169             return undef
170 65 100       230 unless my $job = $guard->_job($id, qw(active failed finished inactive));
171 55 100       331 return undef unless $job->{retries} == $retries;
172 53         180 $guard->_write;
173 53         111 ++$job->{retries};
174 53 100       207 $job->{delayed} = time + $options->{delay} if $options->{delay};
175 53   66     307 exists $options->{$_} and $job->{$_} = $options->{$_} for qw(priority queue);
176 53         308 @$job{qw(retried state)} = (time, 'inactive');
177 53         229 delete @$job{qw(finished started worker)};
178              
179 53         186 return 1;
180             }
181              
182             sub stats {
183 66     66 1 5622 my $self = shift;
184              
185 66         169 my ($active, $delayed) = (0, 0);
186 66         134 my (%seen, %states);
187 66         185 my $guard = $self->_guard;
188 66         157 for my $job (values %{$guard->_jobs}) {
  66         195  
189 222         6193 ++$states{$job->{state}};
190 222 100 100     714 ++$active if $job->{state} eq 'active' and not $seen{$job->{worker}}++;
191             ++$delayed if $job->{state} eq 'inactive'
192 222 100 100     746 and (time < $job->{delayed} or @{$job->{parents}});
      100        
193             }
194              
195             return {
196             active_workers => $active,
197 66         221 inactive_workers => keys(%{$guard->_workers}) - $active,
198             active_jobs => $states{active} // 0,
199             delayed_jobs => $delayed,
200             enqueued_jobs => $guard->_job_count,
201             failed_jobs => $states{failed} // 0,
202             finished_jobs => $states{finished} // 0,
203 66   100     891 inactive_jobs => $states{inactive} // 0
      100        
      100        
      100        
204             };
205             }
206              
207             sub unregister_worker {
208 106     106 1 30143 my ($self, $id) = @_;
209 106         321 my $guard = $self->_guard->_write;
210 106         402 delete $guard->_inboxes->{$id};
211 106         14181 delete $guard->_workers->{$id};
212             }
213              
214 60     60 1 4225 sub worker_info { $_[0]->_worker_info($_[0]->_guard, $_[1]) }
215              
216 1709     1709   21729 sub _guard { Minion::Backend::Storable::_Guard->new(backend => shift) }
217              
218             sub _try {
219 275     275   715 my ($self, $id, $options) = @_;
220 275         1055 my $tasks = $self->minion->tasks;
221 275 100       2589 my %queues = map +($_ => 1), @{$options->{queues} || ['default']};
  275         2820  
222              
223 275         1242 my $now = time;
224 275         785 my $guard = $self->_guard;
225 275         929 my $jobs = $guard->_jobs;
226             my @ready = sort {
227 65 50       416 $b->{priority} <=> $a->{priority} || $a->{created} <=> $b->{created} }
228             grep +($_->{state} eq 'inactive' and $queues{$_->{queue}}
229             and $tasks->{$_->{task}} and $_->{delayed} < $now),
230 275   100     37637 values %{$jobs};
  275         6513  
231              
232 275         586 my $job;
233 275         682 CANDIDATE: for my $candidate (@ready) {
234             $job = $candidate and last
235 207 50 50     382 unless my @parents = @{$candidate->{parents} || []};
  207 100       1143  
236 9   100     75 ($jobs->{$_}{state} // '') ne 'finished' and next CANDIDATE for @parents;
      100        
237 1         3 $job = $candidate;
238             }
239 275 100       974 return undef unless $job;
240 199         799 $guard->_write;
241 199         1005 @$job{qw(started state worker)} = (time, 'active', $id);
242 199         727 return $job;
243             }
244              
245             sub _update {
246 201     201   768 my ($self, $fail, $id, $retries, $result) = @_;
247              
248 201         782 my $guard = $self->_guard;
249 201 100       819 return undef unless my $job = $guard->_job($id, 'active');
250 156 100       562 return undef unless $job->{retries} == $retries;
251              
252 154         557 $guard->_write;
253 154         779 @$job{qw(finished result)} = (time, $result);
254 154 100       684 $job->{state} = $fail ? 'failed' : 'finished';
255 154         548 undef $guard;
256              
257 154 100 100     1476 return 1 unless $fail and $job->{attempts} > $retries + 1;
258 5         38 my $delay = $self->minion->backoff->($retries);
259 5         113 return $self->retry_job($id, $retries, {delay => $delay});
260             }
261              
262             sub _worker_info {
263 100     100   295 my ($self, $guard, $id) = @_;
264              
265 100 100 100     475 return undef unless $id && (my $worker = $guard->_workers->{$id});
266             my @jobs = map $_->{id},
267             grep +($_->{state} eq 'active' and $_->{worker} eq $id),
268 85   66     3895 values %{$guard->_jobs};
  85         215  
269              
270 85         680 return {%$worker, jobs => \@jobs};
271             }
272              
273             package Minion::Backend::Storable::_Guard;
274 8     8   17899 use Mojo::Base -base;
  8         19  
  8         62  
275              
276 8     8   946 use Fcntl ':flock';
  8         18  
  8         826  
277 8     8   55 use Digest::MD5 'md5_hex';
  8         28  
  8         315  
278 8     8   75 use Storable qw(retrieve store);
  8         50  
  8         5096  
279              
280             sub DESTROY {
281 1709     1709   11355 my $self = shift;
282 1709 100       5471 $self->_spurt($self->_data) if $self->{write};
283 1709         2701517 flock $self->{lock}, LOCK_UN;
284             }
285              
286             sub new {
287 1709     1709   6911 my $self = shift->SUPER::new(@_);
288 1709 100       16627 $self->_spurt({}) unless -f (my $file = $self->{backend}->file);
289 1709         98434 open $self->{lock}, '>', "$file.lock";
290 1709         9887 flock $self->{lock}, LOCK_EX;
291 1709         6669 return $self;
292             }
293              
294             sub _children {
295 504     504   1223 my ($self, $id) = @_;
296 504         1178 my $children = [];
297 504         943 for my $job (values %{$self->_jobs}) {
  504         1126  
298 4635 50       7417 push @$children, $job->{id} if grep +($_ eq $id), @{$job->{parents} || []};
  4635 100       14732  
299             }
300 504         2281 return $children;
301             }
302              
303 4017   66 4017   21431 sub _data { $_[0]{data} ||= retrieve($_[0]{backend}->file) }
304              
305             sub _id {
306 115     115   912 my $self = shift;
307 115         251 my $id;
308 115         225 do { $id = md5_hex(time . rand 999) } while $self->_workers->{$id};
  115         1730  
309 115         15471 return $id;
310             }
311              
312 118   100 118   331 sub _inboxes { shift->_data->{inboxes} ||= {} }
313              
314             sub _job {
315 296     296   876 my ($self, $id) = (shift, shift);
316 296 100       873 return undef unless my $job = $self->_jobs->{$id};
317 286 100       48001 return(grep(($job->{state} eq $_), @_) ? $job : undef);
318             }
319              
320             sub _job_id {
321 172     172   370 my $self = shift;
322 172         309 my $id;
323 172         383 do { $id = md5_hex(time . rand 999) } while $self->_jobs->{$id};
  172         2453  
324 172         24129 ++$self->_data->{job_count};
325 172         2850 return $id;
326             }
327              
328 66   100 66   200 sub _job_count { shift->_data->{job_count} //= 0 }
329              
330 2131   100 2131   6367 sub _jobs { shift->_data->{jobs} ||= {} }
331              
332 950     950   3366 sub _spurt { store($_[1] => $_[0]{backend}->file) }
333              
334 598   100 598   1763 sub _workers { shift->_data->{workers} ||= {} }
335              
336 930 50   930   4232 sub _write { ++$_[0]{write} && return $_[0] }
337              
338             1;
339             __END__