File Coverage

blib/lib/Minion/Backend/Storable.pm
Criterion Covered Total %
statement 221 224 98.6
branch 61 66 92.4
condition 109 127 85.8
subroutine 47 47 100.0
pod 21 21 100.0
total 459 485 94.6


line stmt bran cond sub pod time code
1             package Minion::Backend::Storable;
2 7     7   549787 use Minion::Backend -base;
  7         7441  
  7         81  
3              
4             our $VERSION = 7.011;
5              
6 7     7   2403 use Sys::Hostname 'hostname';
  7         1884  
  7         493  
7 7     7   74 use Time::HiRes qw(time usleep);
  7         16  
  7         95  
8              
9             # Attributes
10              
11             has 'file';
12              
13             # Constructor
14              
15 6     6 1 183 sub new { shift->SUPER::new(file => shift) }
16              
17             # Methods
18              
19             sub broadcast {
20 7   100 7 1 1187 my ($self, $command, $args, $ids) = (shift, shift, shift // [], shift // []);
      100        
21              
22 7         27 my $guard = $self->_guard->_write;
23 7         26 my $inboxes = $guard->_inboxes;
24 7         1653 my $workers = $guard->_workers;
25 7 50       43 @$ids = @$ids ? map exists($workers->{$_}), @$ids
    100          
26             : keys %$workers unless @$ids;
27              
28 7   100     27 push @{$inboxes->{$_} //= []}, [$command, @$args] for @$ids;
  10         59  
29              
30 7         32 return !!@$ids;
31             }
32              
33             sub dequeue {
34 229     229 1 65625 my ($self, $id, $wait, $options) = @_;
35 229   66     1021 return ($self->_try($id, $options) or do {
36             usleep $wait * 1_000_000;
37             $self->_try($id, $options);
38             });
39             }
40              
41             sub enqueue {
42 170   100 170 1 116369 my ($self, $task, $args, $options) = (shift, shift, shift // [], shift // {});
      100        
43              
44 170         700 my $guard = $self->_guard->_write;
45              
46             my $job = {
47             args => $args,
48             attempts => $options->{attempts} // 1,
49             created => time,
50             delayed => time + ($options->{delay} // 0),
51             id => $guard->_job_id,
52             notes => $options->{notes} // {},
53             parents => $options->{parents} // [],
54             priority => $options->{priority} // 0,
55 170   100     2417 queue => $options->{queue} // 'default',
      100        
      100        
      100        
      100        
      100        
56             retries => 0,
57             state => 'inactive',
58             task => $task
59             };
60 170         605 $guard->_jobs->{$job->{id}} = $job;
61              
62 170         681 return $job->{id};
63             }
64              
65 63     63 1 8268431 sub fail_job { shift->_update(1, @_) }
66              
67 135     135 1 353719 sub finish_job { shift->_update(0, @_) }
68              
69             sub job_info {
70 431     431 1 178892 my ($self, $id) = @_;
71 431         1538 my $guard = $self->_guard;
72 431 100       1668 return undef unless my $job = $guard->_jobs->{$id};
73 406         77859 $job->{children} = $guard->_children($id);
74 406         1504 return $job;
75             }
76              
77             sub list_jobs {
78 37     37 1 154581 my ($self, $offset, $limit, $options) = @_;
79              
80 37         135 my $guard = $self->_guard;
81 90         2720 my @jobs = sort { $b->{created} <=> $a->{created} }
82             grep +( (not defined $options->{queue} or $_->{queue} eq $options->{queue})
83             and (not defined $options->{state} or $_->{state} eq $options->{state})
84             and (not defined $options->{task} or $_->{task} eq $options->{task})
85 37   100     97 ), values %{$guard->_jobs};
  37         134  
86              
87 37   33     2753 return [map +($_->{children} = $guard->_children($_->{id}) and $_), grep defined, @jobs[$offset .. ($offset + $limit - 1)]];
88             }
89              
90             sub list_workers {
91 20     20 1 28635 my ($self, $offset, $limit) = @_;
92 20         80 my $guard = $self->_guard;
93 40         120 my @workers = map { $self->_worker_info($guard, $_->{id}) }
94 20         60 sort { $b->{started} <=> $a->{started} } values %{$guard->_workers};
  20         2225  
  20         90  
95 20         70 return [grep {defined} @workers[$offset .. ($offset + $limit - 1)]];
  65         175  
96             }
97              
98             sub lock {
99 75   100 75 1 895 my ($self, $name, $duration, $options) = (shift, shift, shift, shift // {});
100 75   100     280 my $limit = $options->{limit} || 1;
101              
102 75         180 my $guard = $self->_guard->_write;
103 75   100     205 my $locks = $guard->_locks->{$name} //= [];
104              
105             # Delete expired locks
106 75         7575 my $now = time;
107 75   100     375 @$locks = grep +($now < ($_ // 0)), @$locks;
108              
109             # Check capacity
110 75 100       255 return undef unless @$locks < $limit;
111 50 100       255 return 1 unless $duration > 0;
112              
113             # Add lock, maintaining order
114 40         90 my $this_expires = $now + $duration;
115              
116 40 50 50     360 push(@$locks, $this_expires) and return 1
      100        
117             if ($locks->[$#$locks] // 0) < $this_expires;
118              
119 0   0     0 @$locks = sort { ($a // 0) <=> ($b // 0) } (@$locks, $this_expires);
  0   0     0  
120 0         0 return 1;
121             }
122              
123             sub note {
124 20     20 1 420 my ($self, $id, $key, $value) = @_;
125 20         115 my $guard = $self->_guard;
126 20 100       140 return undef unless my $job = $guard->_write->_jobs->{$id};
127 10         3290 $job->{notes}{$key} = $value;
128 10         70 return 1;
129             }
130              
131             sub receive {
132 5     5 1 171 my ($self, $id) = @_;
133 5         20 my $guard = $self->_guard->_write;
134 5         19 my $inboxes = $guard->_inboxes;
135 5   100     1173 my $inbox = $inboxes->{$id} // [];
136 5         17 $inboxes->{$id} = [];
137 5         21 return $inbox;
138             }
139              
140             sub register_worker {
141 142   50 142 1 1051909 my ($self, $id, $options) = (shift, shift, shift // {});
142 142         853 my $guard = $self->_guard->_write;
143 142 100       731 my $worker = $id ? $guard->_workers->{$id} : undef;
144 142 100       5286 unless ($worker) {
145 112         684 $worker = {host => hostname, id => $guard->_id, pid => $$, started => time};
146 112         473 $guard->_workers->{$worker->{id}} = $worker;
147             }
148 142   50     1027 @$worker{qw(notified status)} = (time, $options->{status} // {});
149 142         1330 return $worker->{id};
150             }
151              
152             sub remove_job {
153 31     31 1 7404 my ($self, $id) = @_;
154 31         115 my $guard = $self->_guard;
155 31 50       165 delete $guard->_write->_jobs->{$id}
156             if my $removed = !!$guard->_job($id, qw(failed finished inactive));
157 31         182 return $removed;
158             }
159              
160             sub repair {
161 32     32 1 627 my $self = shift;
162 32         105 my $minion = $self->minion;
163              
164             # Workers without heartbeat
165 32         182 my $guard = $self->_guard->_write;
166 32         128 my $workers = $guard->_workers;
167 32         4246 my $jobs = $guard->_jobs;
168 32         198 my $after = time - $minion->missing_after;
169 32   66     375 $_->{notified} < $after and delete $workers->{$_->{id}} for values %$workers;
170              
171             # Old jobs without unfinished dependents
172 32         150 $after = time - $minion->remove_after;
173 32         241 for my $job (values %$jobs) {
174 172 100 100     787 next unless $job->{state} eq 'finished' and $job->{finished} <= $after;
175             delete $jobs->{$job->{id}} unless grep +($jobs->{$_}{state} ne 'finished'),
176 31 100       86 @{$guard->_children($job->{id})};
  31         124  
177             }
178              
179             # Jobs with missing worker (can be retried)
180             my @abandoned = map [@$_{qw(id retries)}],
181 32   100     385 grep +($_->{state} eq 'active' and not exists $workers->{$_->{worker}}),
182             values %$jobs;
183 32         181 undef $guard;
184 32         156 $self->fail_job(@$_, 'Worker went away') for @abandoned;
185              
186 32         365 return;
187             }
188              
189 13     13 1 2255 sub reset { $_[0]->_guard->_save({} => $_[0]{file}) }
190              
191             sub retry_job {
192 65   100 65 1 28558 my ($self, $id, $retries, $options) = (shift, shift, shift, shift // {});
193              
194 65         235 my $guard = $self->_guard;
195             return undef
196 65 100       351 unless my $job = $guard->_job($id, qw(active failed finished inactive));
197 55 100       302 return undef unless $job->{retries} == $retries;
198 53         348 $guard->_write;
199 53         133 ++$job->{retries};
200 53 100       260 $job->{delayed} = time + $options->{delay} if $options->{delay};
201 53   66     383 exists $options->{$_} and $job->{$_} = $options->{$_} for qw(priority queue);
202 53         352 @$job{qw(retried state)} = (time, 'inactive');
203 53         267 delete @$job{qw(finished started worker)};
204              
205 53         245 return 1;
206             }
207              
208             sub stats {
209 68     68 1 10285 my $self = shift;
210              
211 68         247 my ($active, $delayed) = (0, 0);
212 68         167 my (%seen, %states);
213 68         250 my $guard = $self->_guard;
214 68         182 for my $job (values %{$guard->_jobs}) {
  68         255  
215 249         8047 ++$states{$job->{state}};
216 249 100 100     975 ++$active if $job->{state} eq 'active' and not $seen{$job->{worker}}++;
217             ++$delayed if $job->{state} eq 'inactive'
218 249 100 100     988 and (time < $job->{delayed} or @{$job->{parents}});
      100        
219             }
220              
221             return {
222             active_workers => $active,
223 68         261 inactive_workers => keys(%{$guard->_workers}) - $active,
224             active_jobs => $states{active} // 0,
225             delayed_jobs => $delayed,
226             enqueued_jobs => $guard->_job_count,
227             failed_jobs => $states{failed} // 0,
228             finished_jobs => $states{finished} // 0,
229 68   100     1118 inactive_jobs => $states{inactive} // 0
      100        
      100        
      100        
230             };
231             }
232              
233             sub unlock {
234 55     55 1 485 my ($self, $name) = @_;
235              
236 55         150 my $guard = $self->_guard->_write;
237 55   50     140 my $locks = $guard->_locks->{$name} //= [];
238 55         4725 my $length = @$locks;
239 55         160 my $now = time;
240              
241 55         105 my $i = 0;
242 55   100     505 ++$i while $i < $length and ($locks->[$i] // 0) <= $now;
      100        
243 55 100       180 return undef if $i >= $length;
244              
245 35         50 $locks->[$i] = undef;
246 35         105 return 1;
247             }
248              
249             sub unregister_worker {
250 103     103 1 44205 my ($self, $id) = @_;
251 103         368 my $guard = $self->_guard->_write;
252 103         513 delete $guard->_inboxes->{$id};
253 103         17596 delete $guard->_workers->{$id};
254             }
255              
256 60     60 1 6955 sub worker_info { $_[0]->_worker_info($_[0]->_guard, $_[1]) }
257              
258 1829     1829   12771 sub _guard { Minion::Backend::Storable::_Guard->new(backend => shift) }
259              
260             sub _try {
261 262     262   838 my ($self, $id, $options) = @_;
262 262         1080 my $tasks = $self->minion->tasks;
263 262   100     2908 my %queues = map +($_ => 1), @{$options->{queues} // ['default']};
  262         3324  
264              
265 262         1270 my $now = time;
266 262         952 my $guard = $self->_guard;
267 262         1237 my $jobs = $guard->_jobs;
268             my @ready = sort { $b->{priority} <=> $a->{priority}
269 64 50       458 || $a->{created} <=> $b->{created} }
270             grep +($_->{state} eq 'inactive' and $queues{$_->{queue}}
271 262   100     53997 and $tasks->{$_->{task}} and $_->{delayed} <= $now),
272             values %$jobs;
273              
274 262         634 my $job;
275 262         872 CANDIDATE: for my $candidate (@ready) {
276             $job = $candidate and last CANDIDATE
277 202 100 50     569 unless my @parents = @{$candidate->{parents} // []};
  202   50     1605  
278 9         94 for my $parent (@parents) {
279             next CANDIDATE if exists $jobs->{$parent}
280 15 100 100     175 and grep +($jobs->{$parent}{state} eq $_), qw(active failed inactive)
281             }
282 3         12 $job = $candidate;
283             }
284              
285 262 100       1060 return undef unless $job;
286 196         971 $guard->_write;
287 196         1413 @$job{qw(started state worker)} = (time, 'active', $id);
288 196         833 return $job;
289             }
290              
291             sub _update {
292 198     198   970 my ($self, $fail, $id, $retries, $result) = @_;
293              
294 198         840 my $guard = $self->_guard;
295 198 100       997 return undef unless my $job = $guard->_job($id, 'active');
296 156 100       769 return undef unless $job->{retries} == $retries;
297              
298 154         660 $guard->_write;
299 154         1178 @$job{qw(finished result)} = (time, $result);
300 154 100       630 $job->{state} = $fail ? 'failed' : 'finished';
301 154         644 undef $guard;
302              
303 154 100 100     2053 return 1 unless $fail and $job->{attempts} > $retries + 1;
304 5         70 my $delay = $self->minion->backoff->($retries);
305 5         275 return $self->retry_job($id, $retries, {delay => $delay});
306             }
307              
308             sub _worker_info {
309 100     100   325 my ($self, $guard, $id) = @_;
310              
311 100 100 100     540 return undef unless $id && (my $worker = $guard->_workers->{$id});
312             my @jobs = map $_->{id},
313             grep +($_->{state} eq 'active' and $_->{worker} eq $id),
314 85   66     4590 values %{$guard->_jobs};
  85         240  
315              
316 85         670 return {%$worker, jobs => \@jobs};
317             }
318              
319             package
320             Minion::Backend::Storable::_Guard;
321 7     7   22635 use Mojo::Base -base;
  7         16  
  7         45  
322              
323 7     7   933 use Fcntl ':flock';
  7         20  
  7         970  
324 7     7   67 use Digest::MD5 'md5_hex';
  7         20  
  7         350  
325 7     7   75 use Storable ();
  7         31  
  7         5041  
326              
327             sub DESTROY {
328 1829     1829   15884 my $self = shift;
329 1829 100       7100 $self->_save($self->_data => $self->{backend}->file) if $self->{write};
330 1829         894283 flock $self->{lock}, LOCK_UN;
331             }
332              
333             sub new {
334 1829     1829   7890 my $self = shift->SUPER::new(@_);
335 1829         21106 my $path = $self->{backend}->file;
336 1829 100       39147 $self->_save({} => $path) unless -f $path;
337 1829         80443 open $self->{lock}, '>', "$path.lock";
338 1829         11541 flock $self->{lock}, LOCK_EX;
339 1829         7901 return $self;
340             }
341              
342             sub _children {
343 499     499   1461 my ($self, $id) = @_;
344 499         1195 my $children = [];
345 499         1188 for my $job (values %{$self->_jobs}) {
  499         2513  
346 4637 100 50     9372 push @$children, $job->{id} if grep +($_ eq $id), @{$job->{parents} // []};
  4637         17656  
347             }
348 499         2431 return $children;
349             }
350              
351 4259   66 4259   26772 sub _data { $_[0]{data} //= $_[0]->_load($_[0]{backend}->file) }
352              
353             sub _id {
354 112     112   923 my $self = shift;
355 112         269 my $id;
356 112         312 do { $id = md5_hex(time . rand 999) } while $self->_workers->{$id};
  112         1899  
357 112         19676 return $id;
358             }
359              
360 115   100 115   366 sub _inboxes { $_[0]->_data->{inboxes} //= {} }
361              
362             sub _job {
363 294     294   1068 my ($self, $id) = (shift, shift);
364 294 100       924 return undef unless my $job = $self->_jobs->{$id};
365 284 100       66210 return grep(($job->{state} eq $_), @_) ? $job : undef;
366             }
367              
368 68   100 68   238 sub _job_count { $_[0]->_data->{job_count} //= 0 }
369              
370             sub _job_id {
371 170     170   536 my ($self) = @_;
372 170         384 my $id;
373 170         456 do { $id = md5_hex(time . rand 999) } while $self->_jobs->{$id};
  170         3118  
374 170         28483 ++$self->_data->{job_count};
375 170         3802 return $id;
376             }
377              
378 2124   100 2124   7179 sub _jobs { $_[0]->_data->{jobs} //= {} }
379              
380 1806     1806   15177 sub _load { Storable::retrieve($_[1]) }
381              
382 135   100 135   305 sub _locks { $_[0]->_data->{locks} //= {} }
383              
384 1082     1082   8648 sub _save { Storable::store($_[1] => $_[2]) }
385              
386 584   100 584   2054 sub _workers { $_[0]->_data->{workers} //= {} }
387              
388 1063 50   1063   5456 sub _write { ++$_[0]{write} && return $_[0] }
389              
390             1;
391             __END__