File Coverage

blib/lib/Minion/Backend/Storable.pm
Criterion Covered Total %
statement 221 224 98.6
branch 61 66 92.4
condition 109 127 85.8
subroutine 47 47 100.0
pod 21 21 100.0
total 459 485 94.6


line stmt bran cond sub pod time code
1             package Minion::Backend::Storable;
2 7     7   516710 use Minion::Backend -base;
  7         5216  
  7         67  
3              
4             our $VERSION = 7.012;
5              
6 7     7   1870 use Sys::Hostname 'hostname';
  7         1942  
  7         380  
7 7     7   42 use Time::HiRes qw(time usleep);
  7         15  
  7         80  
8              
9             # Attributes
10              
11             has 'file';
12              
13             # Constructor
14              
15 6     6 1 211 sub new { shift->SUPER::new(file => shift) }
16              
17             # Methods
18              
19             sub broadcast {
20 7   100 7 1 1112 my ($self, $command, $args, $ids) = (shift, shift, shift // [], shift // []);
      100        
21              
22 7         28 my $guard = $self->_guard->_write;
23 7         22 my $inboxes = $guard->_inboxes;
24 7         1105 my $workers = $guard->_workers;
25 7 50       37 @$ids = @$ids ? map exists($workers->{$_}), @$ids
    100          
26             : keys %$workers unless @$ids;
27              
28 7   100     19 push @{$inboxes->{$_} //= []}, [$command, @$args] for @$ids;
  10         48  
29              
30 7         27 return !!@$ids;
31             }
32              
33             sub dequeue {
34 229     229 1 54808 my ($self, $id, $wait, $options) = @_;
35 229   66     997 return ($self->_try($id, $options) or do {
36             usleep $wait * 1_000_000;
37             $self->_try($id, $options);
38             });
39             }
40              
41             sub enqueue {
42 170   100 170 1 101900 my ($self, $task, $args, $options) = (shift, shift, shift // [], shift // {});
      100        
43              
44 170         635 my $guard = $self->_guard->_write;
45              
46             my $job = {
47             args => $args,
48             attempts => $options->{attempts} // 1,
49             created => time,
50             delayed => time + ($options->{delay} // 0),
51             id => $guard->_job_id,
52             notes => $options->{notes} // {},
53             parents => $options->{parents} // [],
54             priority => $options->{priority} // 0,
55 170   100     2636 queue => $options->{queue} // 'default',
      100        
      100        
      100        
      100        
      100        
56             retries => 0,
57             state => 'inactive',
58             task => $task
59             };
60 170         764 $guard->_jobs->{$job->{id}} = $job;
61              
62 170         808 return $job->{id};
63             }
64              
65 63     63 1 7281203 sub fail_job { shift->_update(1, @_) }
66              
67 135     135 1 353771 sub finish_job { shift->_update(0, @_) }
68              
69             sub job_info {
70 431     431 1 178150 my ($self, $id) = @_;
71 431         1452 my $guard = $self->_guard;
72 431 100       1672 return undef unless my $job = $guard->_jobs->{$id};
73 406         65056 $job->{children} = $guard->_children($id);
74 406         1389 return $job;
75             }
76              
77             sub list_jobs {
78 37     37 1 173834 my ($self, $offset, $limit, $options) = @_;
79              
80 37         171 my $guard = $self->_guard;
81 85         3780 my @jobs = sort { $b->{created} <=> $a->{created} }
82             grep +( (not defined $options->{queue} or $_->{queue} eq $options->{queue})
83             and (not defined $options->{state} or $_->{state} eq $options->{state})
84             and (not defined $options->{task} or $_->{task} eq $options->{task})
85 37   100     121 ), values %{$guard->_jobs};
  37         148  
86              
87 37   33     3104 return [map +($_->{children} = $guard->_children($_->{id}) and $_), grep defined, @jobs[$offset .. ($offset + $limit - 1)]];
88             }
89              
90             sub list_workers {
91 20     20 1 31765 my ($self, $offset, $limit) = @_;
92 20         95 my $guard = $self->_guard;
93 40         125 my @workers = map { $self->_worker_info($guard, $_->{id}) }
94 20         50 sort { $b->{started} <=> $a->{started} } values %{$guard->_workers};
  20         2175  
  20         65  
95 20         90 return [grep {defined} @workers[$offset .. ($offset + $limit - 1)]];
  65         215  
96             }
97              
98             sub lock {
99 75   100 75 1 1055 my ($self, $name, $duration, $options) = (shift, shift, shift, shift // {});
100 75   100     290 my $limit = $options->{limit} || 1;
101              
102 75         195 my $guard = $self->_guard->_write;
103 75   100     255 my $locks = $guard->_locks->{$name} //= [];
104              
105             # Delete expired locks
106 75         8090 my $now = time;
107 75   100     480 @$locks = grep +($now < ($_ // 0)), @$locks;
108              
109             # Check capacity
110 75 100       290 return undef unless @$locks < $limit;
111 50 100       160 return 1 unless $duration > 0;
112              
113             # Add lock, maintaining order
114 40         120 my $this_expires = $now + $duration;
115              
116 40 50 50     420 push(@$locks, $this_expires) and return 1
      100        
117             if ($locks->[$#$locks] // 0) < $this_expires;
118              
119 0   0     0 @$locks = sort { ($a // 0) <=> ($b // 0) } (@$locks, $this_expires);
  0   0     0  
120 0         0 return 1;
121             }
122              
123             sub note {
124 20     20 1 345 my ($self, $id, $key, $value) = @_;
125 20         75 my $guard = $self->_guard;
126 20 100       90 return undef unless my $job = $guard->_write->_jobs->{$id};
127 10         1895 $job->{notes}{$key} = $value;
128 10         50 return 1;
129             }
130              
131             sub receive {
132 5     5 1 143 my ($self, $id) = @_;
133 5         42 my $guard = $self->_guard->_write;
134 5         15 my $inboxes = $guard->_inboxes;
135 5   100     777 my $inbox = $inboxes->{$id} // [];
136 5         19 $inboxes->{$id} = [];
137 5         20 return $inbox;
138             }
139              
140             sub register_worker {
141 142   50 142 1 991489 my ($self, $id, $options) = (shift, shift, shift // {});
142 142         574 my $guard = $self->_guard->_write;
143 142 100       750 my $worker = $id ? $guard->_workers->{$id} : undef;
144 142 100       4834 unless ($worker) {
145 112         924 $worker = {host => hostname, id => $guard->_id, pid => $$, started => time};
146 112         461 $guard->_workers->{$worker->{id}} = $worker;
147             }
148 142   50     2648 @$worker{qw(notified status)} = (time, $options->{status} // {});
149 142         748 return $worker->{id};
150             }
151              
152             sub remove_job {
153 31     31 1 7294 my ($self, $id) = @_;
154 31         129 my $guard = $self->_guard;
155 31 50       156 delete $guard->_write->_jobs->{$id}
156             if my $removed = !!$guard->_job($id, qw(failed finished inactive));
157 31         161 return $removed;
158             }
159              
160             sub repair {
161 32     32 1 809 my $self = shift;
162 32         145 my $minion = $self->minion;
163              
164             # Workers without heartbeat
165 32         230 my $guard = $self->_guard->_write;
166 32         137 my $workers = $guard->_workers;
167 32         3646 my $jobs = $guard->_jobs;
168 32         237 my $after = time - $minion->missing_after;
169 32   66     470 $_->{notified} < $after and delete $workers->{$_->{id}} for values %$workers;
170              
171             # Old jobs without unfinished dependents
172 32         178 $after = time - $minion->remove_after;
173 32         296 for my $job (values %$jobs) {
174 172 100 100     812 next unless $job->{state} eq 'finished' and $job->{finished} <= $after;
175             delete $jobs->{$job->{id}} unless grep +($jobs->{$_}{state} ne 'finished'),
176 31 100       85 @{$guard->_children($job->{id})};
  31         112  
177             }
178              
179             # Jobs with missing worker (can be retried)
180             my @abandoned = map [@$_{qw(id retries)}],
181 32   100     435 grep +($_->{state} eq 'active' and not exists $workers->{$_->{worker}}),
182             values %$jobs;
183 32         162 undef $guard;
184 32         210 $self->fail_job(@$_, 'Worker went away') for @abandoned;
185              
186 32         399 return;
187             }
188              
189 13     13 1 2350 sub reset { $_[0]->_guard->_save({} => $_[0]{file}) }
190              
191             sub retry_job {
192 65   100 65 1 23021 my ($self, $id, $retries, $options) = (shift, shift, shift, shift // {});
193              
194 65         272 my $guard = $self->_guard;
195             return undef
196 65 100       277 unless my $job = $guard->_job($id, qw(active failed finished inactive));
197 55 100       244 return undef unless $job->{retries} == $retries;
198 53         233 $guard->_write;
199 53         119 ++$job->{retries};
200 53 100       230 $job->{delayed} = time + $options->{delay} if $options->{delay};
201 53   66     312 exists $options->{$_} and $job->{$_} = $options->{$_} for qw(priority queue);
202 53         292 @$job{qw(retried state)} = (time, 'inactive');
203 53         223 delete @$job{qw(finished started worker)};
204              
205 53         210 return 1;
206             }
207              
208             sub stats {
209 68     68 1 9491 my $self = shift;
210              
211 68         184 my ($active, $delayed) = (0, 0);
212 68         162 my (%seen, %states);
213 68         206 my $guard = $self->_guard;
214 68         163 for my $job (values %{$guard->_jobs}) {
  68         253  
215 249         6645 ++$states{$job->{state}};
216 249 100 100     798 ++$active if $job->{state} eq 'active' and not $seen{$job->{worker}}++;
217             ++$delayed if $job->{state} eq 'inactive'
218 249 100 100     853 and (time < $job->{delayed} or @{$job->{parents}});
      100        
219             }
220              
221             return {
222             active_workers => $active,
223 68         193 inactive_workers => keys(%{$guard->_workers}) - $active,
224             active_jobs => $states{active} // 0,
225             delayed_jobs => $delayed,
226             enqueued_jobs => $guard->_job_count,
227             failed_jobs => $states{failed} // 0,
228             finished_jobs => $states{finished} // 0,
229 68   100     844 inactive_jobs => $states{inactive} // 0
      100        
      100        
      100        
230             };
231             }
232              
233             sub unlock {
234 55     55 1 515 my ($self, $name) = @_;
235              
236 55         185 my $guard = $self->_guard->_write;
237 55   50     145 my $locks = $guard->_locks->{$name} //= [];
238 55         4835 my $length = @$locks;
239 55         175 my $now = time;
240              
241 55         100 my $i = 0;
242 55   100     525 ++$i while $i < $length and ($locks->[$i] // 0) <= $now;
      100        
243 55 100       180 return undef if $i >= $length;
244              
245 35         95 $locks->[$i] = undef;
246 35         110 return 1;
247             }
248              
249             sub unregister_worker {
250 103     103 1 47068 my ($self, $id) = @_;
251 103         440 my $guard = $self->_guard->_write;
252 103         496 delete $guard->_inboxes->{$id};
253 103         15099 delete $guard->_workers->{$id};
254             }
255              
256 60     60 1 7740 sub worker_info { $_[0]->_worker_info($_[0]->_guard, $_[1]) }
257              
258 1829     1829   13438 sub _guard { Minion::Backend::Storable::_Guard->new(backend => shift) }
259              
260             sub _try {
261 262     262   816 my ($self, $id, $options) = @_;
262 262         1098 my $tasks = $self->minion->tasks;
263 262   100     2720 my %queues = map +($_ => 1), @{$options->{queues} // ['default']};
  262         3232  
264              
265 262         1207 my $now = time;
266 262         960 my $guard = $self->_guard;
267 262         1066 my $jobs = $guard->_jobs;
268             my @ready = sort { $b->{priority} <=> $a->{priority}
269 57 50       611 || $a->{created} <=> $b->{created} }
270             grep +($_->{state} eq 'inactive' and $queues{$_->{queue}}
271 262   100     44361 and $tasks->{$_->{task}} and $_->{delayed} <= $now),
272             values %$jobs;
273              
274 262         835 my $job;
275 262         783 CANDIDATE: for my $candidate (@ready) {
276             $job = $candidate and last CANDIDATE
277 202 100 50     461 unless my @parents = @{$candidate->{parents} // []};
  202   50     1441  
278 9         23 for my $parent (@parents) {
279             next CANDIDATE if exists $jobs->{$parent}
280 15 100 100     99 and grep +($jobs->{$parent}{state} eq $_), qw(active failed inactive)
281             }
282 3         12 $job = $candidate;
283             }
284              
285 262 100       1037 return undef unless $job;
286 196         791 $guard->_write;
287 196         1479 @$job{qw(started state worker)} = (time, 'active', $id);
288 196         799 return $job;
289             }
290              
291             sub _update {
292 198     198   918 my ($self, $fail, $id, $retries, $result) = @_;
293              
294 198         822 my $guard = $self->_guard;
295 198 100       1058 return undef unless my $job = $guard->_job($id, 'active');
296 156 100       735 return undef unless $job->{retries} == $retries;
297              
298 154         933 $guard->_write;
299 154         1257 @$job{qw(finished result)} = (time, $result);
300 154 100       619 $job->{state} = $fail ? 'failed' : 'finished';
301 154         626 undef $guard;
302              
303 154 100 100     1790 return 1 unless $fail and $job->{attempts} > $retries + 1;
304 5         75 my $delay = $self->minion->backoff->($retries);
305 5         183 return $self->retry_job($id, $retries, {delay => $delay});
306             }
307              
308             sub _worker_info {
309 100     100   315 my ($self, $guard, $id) = @_;
310              
311 100 100 100     595 return undef unless $id && (my $worker = $guard->_workers->{$id});
312             my @jobs = map $_->{id},
313             grep +($_->{state} eq 'active' and $_->{worker} eq $id),
314 85   66     3810 values %{$guard->_jobs};
  85         260  
315              
316 85         680 return {%$worker, jobs => \@jobs};
317             }
318              
319             package
320             Minion::Backend::Storable::_Guard;
321 7     7   17895 use Mojo::Base -base;
  7         16  
  7         59  
322              
323 7     7   947 use Fcntl ':flock';
  7         25  
  7         870  
324 7     7   55 use Digest::MD5 'md5_hex';
  7         15  
  7         293  
325 7     7   78 use Storable ();
  7         27  
  7         4299  
326              
327             sub DESTROY {
328 1829     1829   16783 my $self = shift;
329 1829 100       6503 $self->_save($self->_data => $self->{backend}->file) if $self->{write};
330 1829         518209 flock $self->{lock}, LOCK_UN;
331             }
332              
333             sub new {
334 1829     1829   7216 my $self = shift->SUPER::new(@_);
335 1829         18929 my $path = $self->{backend}->file;
336 1829 100       39329 $self->_save({} => $path) unless -f $path;
337 1829         75387 open $self->{lock}, '>', "$path.lock";
338 1829         10316 flock $self->{lock}, LOCK_EX;
339 1829         7309 return $self;
340             }
341              
342             sub _children {
343 499     499   1528 my ($self, $id) = @_;
344 499         1193 my $children = [];
345 499         1053 for my $job (values %{$self->_jobs}) {
  499         1257  
346 4637 100 50     7446 push @$children, $job->{id} if grep +($_ eq $id), @{$job->{parents} // []};
  4637         16486  
347             }
348 499         2238 return $children;
349             }
350              
351 4259   66 4259   24756 sub _data { $_[0]{data} //= $_[0]->_load($_[0]{backend}->file) }
352              
353             sub _id {
354 112     112   950 my $self = shift;
355 112         272 my $id;
356 112         349 do { $id = md5_hex(time . rand 999) } while $self->_workers->{$id};
  112         2202  
357 112         16356 return $id;
358             }
359              
360 115   100 115   474 sub _inboxes { $_[0]->_data->{inboxes} //= {} }
361              
362             sub _job {
363 294     294   906 my ($self, $id) = (shift, shift);
364 294 100       951 return undef unless my $job = $self->_jobs->{$id};
365 284 100       52245 return grep(($job->{state} eq $_), @_) ? $job : undef;
366             }
367              
368 68   100 68   190 sub _job_count { $_[0]->_data->{job_count} //= 0 }
369              
370             sub _job_id {
371 170     170   575 my ($self) = @_;
372 170         374 my $id;
373 170         374 do { $id = md5_hex(time . rand 999) } while $self->_jobs->{$id};
  170         3377  
374 170         24086 ++$self->_data->{job_count};
375 170         3772 return $id;
376             }
377              
378 2124   100 2124   6595 sub _jobs { $_[0]->_data->{jobs} //= {} }
379              
380 1806     1806   13801 sub _load { Storable::retrieve($_[1]) }
381              
382 135   100 135   355 sub _locks { $_[0]->_data->{locks} //= {} }
383              
384 1082     1082   8117 sub _save { Storable::store($_[1] => $_[2]) }
385              
386 584   100 584   1661 sub _workers { $_[0]->_data->{workers} //= {} }
387              
388 1063 50   1063   4625 sub _write { ++$_[0]{write} && return $_[0] }
389              
390             1;
391             __END__