File Coverage

blib/lib/Minion/Backend/Sereal.pm
Criterion Covered Total %
statement 233 236 98.7
branch 65 74 87.8
condition 114 135 84.4
subroutine 48 48 100.0
pod 21 21 100.0
total 481 514 93.5


line stmt bran cond sub pod time code
1             package Minion::Backend::Sereal;
2 8     8   26018 use Minion::Backend -base;
  8         4219  
  8         58  
3              
4             our $VERSION = 7.012;
5              
6 8     8   1681 use Sys::Hostname 'hostname';
  8         1625  
  8         358  
7 8     8   48 use Time::HiRes qw(time usleep);
  8         11  
  8         69  
8              
9             # Attributes
10              
11             has 'file';
12              
13             # Constructor
14              
15 7     7 1 174 sub new { shift->SUPER::new(file => shift) }
16              
17             # Methods
18              
19             sub broadcast {
20 7   100 7 1 810 my ($self, $command, $args, $ids) = (shift, shift, shift // [], shift // []);
      100        
21              
22 7         18 my $guard = $self->_guard->_write;
23 7         19 my $inboxes = $guard->_inboxes;
24 7         23 my $workers = $guard->_workers;
25 7 50       36 @$ids = @$ids ? map exists($workers->{$_}), @$ids
    100          
26             : keys %$workers unless @$ids;
27              
28 7   100     21 push @{$inboxes->{$_} //= []}, [$command, @$args] for @$ids;
  10         48  
29              
30 7         27 return !!@$ids;
31             }
32              
33             sub dequeue {
34 239     239 1 50356 my ($self, $id, $wait, $options) = @_;
35 239   66     760 return ($self->_try($id, $options) or do {
36             usleep $wait * 1_000_000;
37             $self->_try($id, $options);
38             });
39             }
40              
41             sub enqueue {
42 175   100 175 1 150326 my ($self, $task, $args, $options) = (shift, shift, shift // [], shift // {});
      100        
43              
44 175         575 my $guard = $self->_guard->_write;
45              
46             my $job = {
47             args => $args,
48             attempts => $options->{attempts} // 1,
49             created => time,
50             delayed => time + ($options->{delay} // 0),
51             id => $guard->_job_id,
52             notes => $options->{notes} // {},
53             parents => $options->{parents} // [],
54             priority => $options->{priority} // 0,
55 175   100     2133 queue => $options->{queue} // 'default',
      100        
      100        
      100        
      100        
      100        
56             retries => 0,
57             state => 'inactive',
58             task => $task
59             };
60 175         576 $guard->_jobs->{$job->{id}} = $job;
61              
62 175         643 return $job->{id};
63             }
64              
65 63     63 1 7197089 sub fail_job { shift->_update(1, @_) }
66              
67 141     141 1 307761 sub finish_job { shift->_update(0, @_) }
68              
69             sub job_info {
70 437     437 1 170003 my ($self, $id) = @_;
71 437         1305 my $guard = $self->_guard;
72 437 100       1317 return undef unless my $job = $guard->_jobs->{$id};
73 412         1636 $job->{children} = $guard->_children($id);
74 412         1387 return $job;
75             }
76              
77             sub list_jobs {
78 35     35 1 147905 my ($self, $offset, $limit, $options) = @_;
79              
80 35         115 my $guard = $self->_guard;
81 100         240 my @jobs = sort { $b->{created} <=> $a->{created} }
82             grep +( (not defined $options->{queue} or $_->{queue} eq $options->{queue})
83             and (not defined $options->{state} or $_->{state} eq $options->{state})
84             and (not defined $options->{task} or $_->{task} eq $options->{task})
85 35   100     80 ), values %{$guard->_jobs};
  35         100  
86              
87 35   33     230 return [map +($_->{children} = $guard->_children($_->{id}) and $_), grep defined, @jobs[$offset .. ($offset + $limit - 1)]];
88             }
89              
90             sub list_workers {
91 20     20 1 27765 my ($self, $offset, $limit) = @_;
92 20         65 my $guard = $self->_guard;
93 40         130 my @workers = map { $self->_worker_info($guard, $_->{id}) }
94 20         60 sort { $b->{started} <=> $a->{started} } values %{$guard->_workers};
  20         100  
  20         55  
95 20         65 return [grep {defined} @workers[$offset .. ($offset + $limit - 1)]];
  65         175  
96             }
97              
98             sub lock {
99 75   100 75 1 955 my ($self, $name, $duration, $options) = (shift, shift, shift, shift // {});
100 75   100     260 my $limit = $options->{limit} || 1;
101              
102 75         165 my $guard = $self->_guard->_write;
103 75   100     250 my $locks = $guard->_locks->{$name} //= [];
104              
105             # Delete expired locks
106 75         280 my $now = time;
107 75   100     385 @$locks = grep +($now < ($_ // 0)), @$locks;
108              
109             # Check capacity
110 75 100       220 return undef unless @$locks < $limit;
111 50 100       140 return 1 unless $duration > 0;
112              
113             # Add lock, maintaining order
114 40         95 my $this_expires = $now + $duration;
115              
116 40 50 50     305 push(@$locks, $this_expires) and return 1
      100        
117             if ($locks->[$#$locks] // 0) < $this_expires;
118              
119 0   0     0 @$locks = sort { ($a // 0) <=> ($b // 0) } (@$locks, $this_expires);
  0   0     0  
120 0         0 return 1;
121             }
122              
123             sub note {
124 20     20 1 240 my ($self, $id, $key, $value) = @_;
125 20         50 my $guard = $self->_guard;
126 20 100       80 return undef unless my $job = $guard->_write->_jobs->{$id};
127 10         45 $job->{notes}{$key} = $value;
128 10         40 return 1;
129             }
130              
131             sub receive {
132 5     5 1 129 my ($self, $id) = @_;
133 5         13 my $guard = $self->_guard->_write;
134 5         13 my $inboxes = $guard->_inboxes;
135 5   100     25 my $inbox = $inboxes->{$id} // [];
136 5         12 $inboxes->{$id} = [];
137 5         17 return $inbox;
138             }
139              
140             sub register_worker {
141 152   50 152 1 936100 my ($self, $id, $options) = (shift, shift, shift // {});
142 152         476 my $guard = $self->_guard->_write;
143 152 100       641 my $worker = $id ? $guard->_workers->{$id} : undef;
144 152 100       604 unless ($worker) {
145 116         730 $worker = {host => hostname, id => $guard->_id, pid => $$, started => time};
146 116         394 $guard->_workers->{$worker->{id}} = $worker;
147             }
148 152   50     895 @$worker{qw(notified status)} = (time, $options->{status} // {});
149 152         721 return $worker->{id};
150             }
151              
152             sub remove_job {
153 31     31 1 6919 my ($self, $id) = @_;
154 31         104 my $guard = $self->_guard;
155 31 50       121 delete $guard->_write->_jobs->{$id}
156             if my $removed = !!$guard->_job($id, qw(failed finished inactive));
157 31         189 return $removed;
158             }
159              
160             sub repair {
161 32     32 1 625 my $self = shift;
162 32         115 my $minion = $self->minion;
163              
164             # Workers without heartbeat
165 32         191 my $guard = $self->_guard->_write;
166 32         104 my $workers = $guard->_workers;
167 32         108 my $jobs = $guard->_jobs;
168 32         180 my $after = time - $minion->missing_after;
169 32   66     376 $_->{notified} < $after and delete $workers->{$_->{id}} for values %$workers;
170              
171             # Old jobs without unfinished dependents
172 32         128 $after = time - $minion->remove_after;
173 32         210 for my $job (values %$jobs) {
174 172 100 100     869 next unless $job->{state} eq 'finished' and $job->{finished} <= $after;
175             delete $jobs->{$job->{id}} unless grep +($jobs->{$_}{state} ne 'finished'),
176 31 100       59 @{$guard->_children($job->{id})};
  31         87  
177             }
178              
179             # Jobs with missing worker (can be retried)
180             my @abandoned = map [@$_{qw(id retries)}],
181 32   100     329 grep +($_->{state} eq 'active' and not exists $workers->{$_->{worker}}),
182             values %$jobs;
183 32         98 undef $guard;
184 32         129 $self->fail_job(@$_, 'Worker went away') for @abandoned;
185              
186 32         309 return;
187             }
188              
189 13     13 1 9831 sub reset { $_[0]->_guard->_save({} => $_[0]{file}) }
190              
191             sub retry_job {
192 65   100 65 1 27070 my ($self, $id, $retries, $options) = (shift, shift, shift, shift // {});
193              
194 65         228 my $guard = $self->_guard;
195             return undef
196 65 100       275 unless my $job = $guard->_job($id, qw(active failed finished inactive));
197 55 100       207 return undef unless $job->{retries} == $retries;
198 53         204 $guard->_write;
199 53         106 ++$job->{retries};
200 53 100       221 $job->{delayed} = time + $options->{delay} if $options->{delay};
201 53   66     332 exists $options->{$_} and $job->{$_} = $options->{$_} for qw(priority queue);
202 53         343 @$job{qw(retried state)} = (time, 'inactive');
203 53         218 delete @$job{qw(finished started worker)};
204              
205 53         209 return 1;
206             }
207              
208             sub stats {
209 68     68 1 7756 my $self = shift;
210              
211 68         173 my ($active, $delayed) = (0, 0);
212 68         153 my (%seen, %states);
213 68         178 my $guard = $self->_guard;
214 68         158 for my $job (values %{$guard->_jobs}) {
  68         249  
215 249         672 ++$states{$job->{state}};
216 249 100 100     781 ++$active if $job->{state} eq 'active' and not $seen{$job->{worker}}++;
217             ++$delayed if $job->{state} eq 'inactive'
218 249 100 100     820 and (time < $job->{delayed} or @{$job->{parents}});
      100        
219             }
220              
221             return {
222             active_workers => $active,
223 68         197 inactive_workers => keys(%{$guard->_workers}) - $active,
224             active_jobs => $states{active} // 0,
225             delayed_jobs => $delayed,
226             enqueued_jobs => $guard->_job_count,
227             failed_jobs => $states{failed} // 0,
228             finished_jobs => $states{finished} // 0,
229 68   100     162 inactive_jobs => $states{inactive} // 0
      100        
      100        
      100        
230             };
231             }
232              
233             sub unlock {
234 55     55 1 475 my ($self, $name) = @_;
235              
236 55         145 my $guard = $self->_guard->_write;
237 55   50     135 my $locks = $guard->_locks->{$name} //= [];
238 55         130 my $length = @$locks;
239 55         170 my $now = time;
240              
241 55         85 my $i = 0;
242 55   100     585 ++$i while $i < $length and ($locks->[$i] // 0) <= $now;
      100        
243 55 100       330 return undef if $i >= $length;
244              
245 35         75 $locks->[$i] = undef;
246 35         120 return 1;
247             }
248              
249             sub unregister_worker {
250 107     107 1 43061 my ($self, $id) = @_;
251 107         451 my $guard = $self->_guard->_write;
252 107         372 delete $guard->_inboxes->{$id};
253 107         505 delete $guard->_workers->{$id};
254             }
255              
256 60     60 1 5630 sub worker_info { $_[0]->_worker_info($_[0]->_guard, $_[1]) }
257              
258 1874     1874   22505 sub _guard { Minion::Backend::Sereal::_Guard->new(backend => shift) }
259              
260             sub _try {
261 276     276   751 my ($self, $id, $options) = @_;
262 276         875 my $tasks = $self->minion->tasks;
263 276   100     2323 my %queues = map +($_ => 1), @{$options->{queues} // ['default']};
  276         2719  
264              
265 276         1023 my $now = time;
266 276         692 my $guard = $self->_guard;
267 276         850 my $jobs = $guard->_jobs;
268             my @ready = sort { $b->{priority} <=> $a->{priority}
269 65 50       373 || $a->{created} <=> $b->{created} }
270             grep +($_->{state} eq 'inactive' and $queues{$_->{queue}}
271 276   100     6696 and $tasks->{$_->{task}} and $_->{delayed} <= $now),
272             values %$jobs;
273              
274 276         560 my $job;
275 276         666 CANDIDATE: for my $candidate (@ready) {
276             $job = $candidate and last CANDIDATE
277 208 100 50     399 unless my @parents = @{$candidate->{parents} // []};
  208   50     1245  
278 9         21 for my $parent (@parents) {
279             next CANDIDATE if exists $jobs->{$parent}
280 15 100 100     106 and grep +($jobs->{$parent}{state} eq $_), qw(active failed inactive)
281             }
282 3         9 $job = $candidate;
283             }
284              
285 276 100       944 return undef unless $job;
286 202         684 $guard->_write;
287 202         1119 @$job{qw(started state worker)} = (time, 'active', $id);
288 202         754 return $job;
289             }
290              
291             sub _update {
292 204     204   755 my ($self, $fail, $id, $retries, $result) = @_;
293              
294 204         600 my $guard = $self->_guard;
295 204 100       799 return undef unless my $job = $guard->_job($id, 'active');
296 159 100       570 return undef unless $job->{retries} == $retries;
297              
298 157         556 $guard->_write;
299 157         953 @$job{qw(finished result)} = (time, $result);
300 157 100       592 $job->{state} = $fail ? 'failed' : 'finished';
301 157         686 undef $guard;
302              
303 157 100 100     1476 return 1 unless $fail and $job->{attempts} > $retries + 1;
304 5         52 my $delay = $self->minion->backoff->($retries);
305 5         138 return $self->retry_job($id, $retries, {delay => $delay});
306             }
307              
308             sub _worker_info {
309 100     100   250 my ($self, $guard, $id) = @_;
310              
311 100 100 100     420 return undef unless $id && (my $worker = $guard->_workers->{$id});
312             my @jobs = map $_->{id},
313             grep +($_->{state} eq 'active' and $_->{worker} eq $id),
314 85   66     185 values %{$guard->_jobs};
  85         205  
315              
316 85         660 return {%$worker, jobs => \@jobs};
317             }
318              
319             package
320             Minion::Backend::Sereal::_Guard;
321 8     8   17996 use Mojo::Base -base;
  8         18  
  8         52  
322              
323 8     8   948 use Fcntl ':flock';
  8         22  
  8         804  
324 8     8   49 use Digest::MD5 'md5_hex';
  8         18  
  8         318  
325 8     8   41 use Sereal::Decoder 'sereal_decode_with_object';
  8         39  
  8         412  
326 8     8   42 use Sereal::Encoder 'sereal_encode_with_object';
  8         17  
  8         6113  
327              
328             sub DESTROY {
329 1874 100   1874   12387 $_[0]->_save($_[0]->_data => $_[0]{backend}{file}) if $_[0]{write};
330 1874         46819 flock $_[0]{lock}, LOCK_UN;
331             }
332              
333             sub new {
334 1874     1874   7025 my $self = shift->SUPER::new(@_);
335 1874         16540 my $path = $self->{backend}{file};
336 1874 100       29253 $self->_save({} => $path) unless -f $path;
337 1874         68066 open $self->{lock}, '>', "$path.lock";
338 1874         9628 flock $self->{lock}, LOCK_EX;
339 1874         6707 return $self;
340             }
341              
342             sub _children {
343 503     503   1205 my ($self, $id) = @_;
344 503         1085 my $children = [];
345 503         838 for my $job (values %{$self->_jobs}) {
  503         1065  
346 4649 100 50     7543 push @$children, $job->{id} if grep +($_ eq $id), @{$job->{parents} // []};
  4649         14852  
347             }
348 503         2031 return $children;
349             }
350              
351 4354   66 4354   20832 sub _data { $_[0]{data} //= $_[0]->_load($_[0]{backend}{file}) }
352              
353             sub _id {
354 116     116   781 my $id;
355 116         211 do { $id = md5_hex(time . rand 999) } while $_[0]->_workers->{$id};
  116         1704  
356 116         1180 return $id;
357             }
358              
359 119   100 119   311 sub _inboxes { $_[0]->_data->{inboxes} //= {} }
360              
361             sub _job {
362 300     300   857 my ($self, $id) = (shift, shift);
363 300 100       912 return undef unless my $job = $self->_jobs->{$id};
364 290 100       2403 return grep(($job->{state} eq $_), @_) ? $job : undef;
365             }
366              
367 68   100 68   181 sub _job_count { $_[0]->_data->{job_count} //= 0 }
368              
369             sub _job_id {
370 175     175   330 my $id;
371 175         354 do { $id = md5_hex(time . rand 999) } while $_[0]->_jobs->{$id};
  175         2470  
372 175         690 ++$_[0]->_data->{job_count};
373 175         3181 return $id;
374             }
375              
376 2162   100 2162   5256 sub _jobs { $_[0]->_data->{jobs} //= {} }
377              
378             sub _load {
379 1851     1851   4287 my ($self, $path) = @_;
380 1851   66     5727 my $decoder = $self->{backend}{_guard_decoder} //= Sereal::Decoder->new;
381              
382             # Borrowed from Mojo::File v7.33
383 1851 50       38235 CORE::open my $file, '<', $path or die qq{Failed to open file ($path): $!};
384 1851         5663 my ($payload, $ret) = ('', undef);
385 1851         11220 while ($ret = sysread $file, my $buffer, 131072, 0) { $payload .= $buffer }
  1851         10340  
386 1851 50       4954 die qq{Failed to read file ($path): $!} unless defined $ret;
387              
388 1851         104807 return sereal_decode_with_object $decoder, $payload;
389             }
390              
391 135   100 135   305 sub _locks { shift->_data->{locks} //= {} }
392              
393             sub _save {
394 1111     1111   2643 my ($self, $content, $path) = @_;
395 1111   66     3341 my $encoder = $self->{backend}{_guard_encoder} //= Sereal::Encoder->new;
396 1111         23370 my $payload = sereal_encode_with_object $encoder, $content;
397              
398             # Borrowed from Mojo::File v7.33
399 1111 50       52891 CORE::open my $file, '>', $path or die qq{Failed to open file ($path): $!};
400 1111 50 50     23304 (syswrite($file, $payload) // -1) == length $payload
401             or die qq{Failed to write file ($path): $!};
402 1111         11430 return;
403             }
404              
405 602   100 602   1486 sub _workers { $_[0]->_data->{workers} //= {} }
406              
407 1091 50   1091   4659 sub _write { ++$_[0]{write} && return $_[0] }
408              
409             1;
410             __END__