File Coverage

blib/lib/Minion/Backend/Sereal.pm
Criterion Covered Total %
statement 233 236 98.7
branch 65 74 87.8
condition 114 135 84.4
subroutine 48 48 100.0
pod 21 21 100.0
total 481 514 93.5


line stmt bran cond sub pod time code
1             package Minion::Backend::Sereal;
2 8     8   30896 use Minion::Backend -base;
  8         4878  
  8         67  
3              
4             our $VERSION = 7.011;
5              
6 8     8   1927 use Sys::Hostname 'hostname';
  8         1812  
  8         389  
7 8     8   45 use Time::HiRes qw(time usleep);
  8         18  
  8         93  
8              
9             # Attributes
10              
11             has 'file';
12              
13             # Constructor
14              
15 7     7 1 233 sub new { shift->SUPER::new(file => shift) }
16              
17             # Methods
18              
19             sub broadcast {
20 7   100 7 1 1125 my ($self, $command, $args, $ids) = (shift, shift, shift // [], shift // []);
      100        
21              
22 7         23 my $guard = $self->_guard->_write;
23 7         22 my $inboxes = $guard->_inboxes;
24 7         30 my $workers = $guard->_workers;
25 7 50       38 @$ids = @$ids ? map exists($workers->{$_}), @$ids
    100          
26             : keys %$workers unless @$ids;
27              
28 7   100     26 push @{$inboxes->{$_} //= []}, [$command, @$args] for @$ids;
  10         46  
29              
30 7         30 return !!@$ids;
31             }
32              
33             sub dequeue {
34 239     239 1 73757 my ($self, $id, $wait, $options) = @_;
35 239   66     1132 return ($self->_try($id, $options) or do {
36             usleep $wait * 1_000_000;
37             $self->_try($id, $options);
38             });
39             }
40              
41             sub enqueue {
42 175   100 175 1 203859 my ($self, $task, $args, $options) = (shift, shift, shift // [], shift // {});
      100        
43              
44 175         778 my $guard = $self->_guard->_write;
45              
46             my $job = {
47             args => $args,
48             attempts => $options->{attempts} // 1,
49             created => time,
50             delayed => time + ($options->{delay} // 0),
51             id => $guard->_job_id,
52             notes => $options->{notes} // {},
53             parents => $options->{parents} // [],
54             priority => $options->{priority} // 0,
55 175   100     2932 queue => $options->{queue} // 'default',
      100        
      100        
      100        
      100        
      100        
56             retries => 0,
57             state => 'inactive',
58             task => $task
59             };
60 175         710 $guard->_jobs->{$job->{id}} = $job;
61              
62 175         799 return $job->{id};
63             }
64              
65 63     63 1 8591192 sub fail_job { shift->_update(1, @_) }
66              
67 141     141 1 455883 sub finish_job { shift->_update(0, @_) }
68              
69             sub job_info {
70 437     437 1 219288 my ($self, $id) = @_;
71 437         2574 my $guard = $self->_guard;
72 437 100       1811 return undef unless my $job = $guard->_jobs->{$id};
73 412         2007 $job->{children} = $guard->_children($id);
74 412         1614 return $job;
75             }
76              
77             sub list_jobs {
78 35     35 1 247375 my ($self, $offset, $limit, $options) = @_;
79              
80 35         175 my $guard = $self->_guard;
81 100         430 my @jobs = sort { $b->{created} <=> $a->{created} }
82             grep +( (not defined $options->{queue} or $_->{queue} eq $options->{queue})
83             and (not defined $options->{state} or $_->{state} eq $options->{state})
84             and (not defined $options->{task} or $_->{task} eq $options->{task})
85 35   100     120 ), values %{$guard->_jobs};
  35         180  
86              
87 35   33     370 return [map +($_->{children} = $guard->_children($_->{id}) and $_), grep defined, @jobs[$offset .. ($offset + $limit - 1)]];
88             }
89              
90             sub list_workers {
91 20     20 1 47315 my ($self, $offset, $limit) = @_;
92 20         115 my $guard = $self->_guard;
93 40         195 my @workers = map { $self->_worker_info($guard, $_->{id}) }
94 20         80 sort { $b->{started} <=> $a->{started} } values %{$guard->_workers};
  20         175  
  20         120  
95 20         315 return [grep {defined} @workers[$offset .. ($offset + $limit - 1)]];
  65         315  
96             }
97              
98             sub lock {
99 75   100 75 1 1625 my ($self, $name, $duration, $options) = (shift, shift, shift, shift // {});
100 75   100     450 my $limit = $options->{limit} || 1;
101              
102 75         355 my $guard = $self->_guard->_write;
103 75   100     385 my $locks = $guard->_locks->{$name} //= [];
104              
105             # Delete expired locks
106 75         475 my $now = time;
107 75   100     855 @$locks = grep +($now < ($_ // 0)), @$locks;
108              
109             # Check capacity
110 75 100       440 return undef unless @$locks < $limit;
111 50 100       235 return 1 unless $duration > 0;
112              
113             # Add lock, maintaining order
114 40         170 my $this_expires = $now + $duration;
115              
116 40 50 50     610 push(@$locks, $this_expires) and return 1
      100        
117             if ($locks->[$#$locks] // 0) < $this_expires;
118              
119 0   0     0 @$locks = sort { ($a // 0) <=> ($b // 0) } (@$locks, $this_expires);
  0   0     0  
120 0         0 return 1;
121             }
122              
123             sub note {
124 20     20 1 305 my ($self, $id, $key, $value) = @_;
125 20         70 my $guard = $self->_guard;
126 20 100       95 return undef unless my $job = $guard->_write->_jobs->{$id};
127 10         65 $job->{notes}{$key} = $value;
128 10         55 return 1;
129             }
130              
131             sub receive {
132 5     5 1 149 my ($self, $id) = @_;
133 5         15 my $guard = $self->_guard->_write;
134 5         20 my $inboxes = $guard->_inboxes;
135 5   100     29 my $inbox = $inboxes->{$id} // [];
136 5         16 $inboxes->{$id} = [];
137 5         19 return $inbox;
138             }
139              
140             sub register_worker {
141 152   50 152 1 1218225 my ($self, $id, $options) = (shift, shift, shift // {});
142 152         692 my $guard = $self->_guard->_write;
143 152 100       934 my $worker = $id ? $guard->_workers->{$id} : undef;
144 152 100       745 unless ($worker) {
145 116         900 $worker = {host => hostname, id => $guard->_id, pid => $$, started => time};
146 116         561 $guard->_workers->{$worker->{id}} = $worker;
147             }
148 152   50     1287 @$worker{qw(notified status)} = (time, $options->{status} // {});
149 152         845 return $worker->{id};
150             }
151              
152             sub remove_job {
153 31     31 1 7763 my ($self, $id) = @_;
154 31         139 my $guard = $self->_guard;
155 31 50       179 delete $guard->_write->_jobs->{$id}
156             if my $removed = !!$guard->_job($id, qw(failed finished inactive));
157 31         245 return $removed;
158             }
159              
160             sub repair {
161 32     32 1 873 my $self = shift;
162 32         152 my $minion = $self->minion;
163              
164             # Workers without heartbeat
165 32         236 my $guard = $self->_guard->_write;
166 32         118 my $workers = $guard->_workers;
167 32         189 my $jobs = $guard->_jobs;
168 32         281 my $after = time - $minion->missing_after;
169 32   66     602 $_->{notified} < $after and delete $workers->{$_->{id}} for values %$workers;
170              
171             # Old jobs without unfinished dependents
172 32         218 $after = time - $minion->remove_after;
173 32         339 for my $job (values %$jobs) {
174 172 100 100     1002 next unless $job->{state} eq 'finished' and $job->{finished} <= $after;
175             delete $jobs->{$job->{id}} unless grep +($jobs->{$_}{state} ne 'finished'),
176 31 100       106 @{$guard->_children($job->{id})};
  31         135  
177             }
178              
179             # Jobs with missing worker (can be retried)
180             my @abandoned = map [@$_{qw(id retries)}],
181 32   100     447 grep +($_->{state} eq 'active' and not exists $workers->{$_->{worker}}),
182             values %$jobs;
183 32         613 undef $guard;
184 32         179 $self->fail_job(@$_, 'Worker went away') for @abandoned;
185              
186 32         618 return;
187             }
188              
189 13     13 1 14510 sub reset { $_[0]->_guard->_save({} => $_[0]{file}) }
190              
191             sub retry_job {
192 65   100 65 1 27301 my ($self, $id, $retries, $options) = (shift, shift, shift, shift // {});
193              
194 65         282 my $guard = $self->_guard;
195             return undef
196 65 100       333 unless my $job = $guard->_job($id, qw(active failed finished inactive));
197 55 100       252 return undef unless $job->{retries} == $retries;
198 53         243 $guard->_write;
199 53         144 ++$job->{retries};
200 53 100       261 $job->{delayed} = time + $options->{delay} if $options->{delay};
201 53   66     387 exists $options->{$_} and $job->{$_} = $options->{$_} for qw(priority queue);
202 53         348 @$job{qw(retried state)} = (time, 'inactive');
203 53         289 delete @$job{qw(finished started worker)};
204              
205 53         246 return 1;
206             }
207              
208             sub stats {
209 68     68 1 11126 my $self = shift;
210              
211 68         250 my ($active, $delayed) = (0, 0);
212 68         196 my (%seen, %states);
213 68         259 my $guard = $self->_guard;
214 68         235 for my $job (values %{$guard->_jobs}) {
  68         387  
215 249         908 ++$states{$job->{state}};
216 249 100 100     951 ++$active if $job->{state} eq 'active' and not $seen{$job->{worker}}++;
217             ++$delayed if $job->{state} eq 'inactive'
218 249 100 100     1038 and (time < $job->{delayed} or @{$job->{parents}});
      100        
219             }
220              
221             return {
222             active_workers => $active,
223 68         271 inactive_workers => keys(%{$guard->_workers}) - $active,
224             active_jobs => $states{active} // 0,
225             delayed_jobs => $delayed,
226             enqueued_jobs => $guard->_job_count,
227             failed_jobs => $states{failed} // 0,
228             finished_jobs => $states{finished} // 0,
229 68   100     214 inactive_jobs => $states{inactive} // 0
      100        
      100        
      100        
230             };
231             }
232              
233             sub unlock {
234 55     55 1 1090 my ($self, $name) = @_;
235              
236 55         1155 my $guard = $self->_guard->_write;
237 55   50     265 my $locks = $guard->_locks->{$name} //= [];
238 55         245 my $length = @$locks;
239 55         270 my $now = time;
240              
241 55         190 my $i = 0;
242 55   100     745 ++$i while $i < $length and ($locks->[$i] // 0) <= $now;
      100        
243 55 100       460 return undef if $i >= $length;
244              
245 35         130 $locks->[$i] = undef;
246 35         200 return 1;
247             }
248              
249             sub unregister_worker {
250 107     107 1 54474 my ($self, $id) = @_;
251 107         525 my $guard = $self->_guard->_write;
252 107         570 delete $guard->_inboxes->{$id};
253 107         732 delete $guard->_workers->{$id};
254             }
255              
256 60     60 1 6850 sub worker_info { $_[0]->_worker_info($_[0]->_guard, $_[1]) }
257              
258 1874     1874   31040 sub _guard { Minion::Backend::Sereal::_Guard->new(backend => shift) }
259              
260             sub _try {
261 276     276   899 my ($self, $id, $options) = @_;
262 276         1259 my $tasks = $self->minion->tasks;
263 276   100     3312 my %queues = map +($_ => 1), @{$options->{queues} // ['default']};
  276         3561  
264              
265 276         1373 my $now = time;
266 276         925 my $guard = $self->_guard;
267 276         1133 my $jobs = $guard->_jobs;
268             my @ready = sort { $b->{priority} <=> $a->{priority}
269 68 50       518 || $a->{created} <=> $b->{created} }
270             grep +($_->{state} eq 'inactive' and $queues{$_->{queue}}
271 276   100     9085 and $tasks->{$_->{task}} and $_->{delayed} <= $now),
272             values %$jobs;
273              
274 276         785 my $job;
275 276         825 CANDIDATE: for my $candidate (@ready) {
276             $job = $candidate and last CANDIDATE
277 208 100 50     443 unless my @parents = @{$candidate->{parents} // []};
  208   50     1709  
278 9         31 for my $parent (@parents) {
279             next CANDIDATE if exists $jobs->{$parent}
280 15 100 100     150 and grep +($jobs->{$parent}{state} eq $_), qw(active failed inactive)
281             }
282 3         11 $job = $candidate;
283             }
284              
285 276 100       1249 return undef unless $job;
286 202         929 $guard->_write;
287 202         1385 @$job{qw(started state worker)} = (time, 'active', $id);
288 202         886 return $job;
289             }
290              
291             sub _update {
292 204     204   1000 my ($self, $fail, $id, $retries, $result) = @_;
293              
294 204         1478 my $guard = $self->_guard;
295 204 100       1213 return undef unless my $job = $guard->_job($id, 'active');
296 159 100       697 return undef unless $job->{retries} == $retries;
297              
298 157         712 $guard->_write;
299 157         1200 @$job{qw(finished result)} = (time, $result);
300 157 100       668 $job->{state} = $fail ? 'failed' : 'finished';
301 157         709 undef $guard;
302              
303 157 100 100     1947 return 1 unless $fail and $job->{attempts} > $retries + 1;
304 5         70 my $delay = $self->minion->backoff->($retries);
305 5         178 return $self->retry_job($id, $retries, {delay => $delay});
306             }
307              
308             sub _worker_info {
309 100     100   405 my ($self, $guard, $id) = @_;
310              
311 100 100 100     645 return undef unless $id && (my $worker = $guard->_workers->{$id});
312             my @jobs = map $_->{id},
313             grep +($_->{state} eq 'active' and $_->{worker} eq $id),
314 85   66     300 values %{$guard->_jobs};
  85         305  
315              
316 85         1095 return {%$worker, jobs => \@jobs};
317             }
318              
319             package
320             Minion::Backend::Sereal::_Guard;
321 8     8   19183 use Mojo::Base -base;
  8         19  
  8         61  
322              
323 8     8   946 use Fcntl ':flock';
  8         23  
  8         965  
324 8     8   50 use Digest::MD5 'md5_hex';
  8         22  
  8         353  
325 8     8   49 use Sereal::Decoder 'sereal_decode_with_object';
  8         44  
  8         479  
326 8     8   46 use Sereal::Encoder 'sereal_encode_with_object';
  8         22  
  8         6415  
327              
328             sub DESTROY {
329 1874 100   1874   14538 $_[0]->_save($_[0]->_data => $_[0]{backend}{file}) if $_[0]{write};
330 1874         65663 flock $_[0]{lock}, LOCK_UN;
331             }
332              
333             sub new {
334 1874     1874   8878 my $self = shift->SUPER::new(@_);
335 1874         23666 my $path = $self->{backend}{file};
336 1874 100       40871 $self->_save({} => $path) unless -f $path;
337 1874         95601 open $self->{lock}, '>', "$path.lock";
338 1874         14041 flock $self->{lock}, LOCK_EX;
339 1874         9154 return $self;
340             }
341              
342             sub _children {
343 503     503   1667 my ($self, $id) = @_;
344 503         1515 my $children = [];
345 503         1081 for my $job (values %{$self->_jobs}) {
  503         1585  
346 4649 100 50     8981 push @$children, $job->{id} if grep +($_ eq $id), @{$job->{parents} // []};
  4649         19193  
347             }
348 503         2753 return $children;
349             }
350              
351 4354   66 4354   31449 sub _data { $_[0]{data} //= $_[0]->_load($_[0]{backend}{file}) }
352              
353             sub _id {
354 116     116   1093 my $id;
355 116         312 do { $id = md5_hex(time . rand 999) } while $_[0]->_workers->{$id};
  116         2159  
356 116         1503 return $id;
357             }
358              
359 119   100 119   479 sub _inboxes { $_[0]->_data->{inboxes} //= {} }
360              
361             sub _job {
362 300     300   1272 my ($self, $id) = (shift, shift);
363 300 100       1565 return undef unless my $job = $self->_jobs->{$id};
364 290 100       3322 return grep(($job->{state} eq $_), @_) ? $job : undef;
365             }
366              
367 68   100 68   427 sub _job_count { $_[0]->_data->{job_count} //= 0 }
368              
369             sub _job_id {
370 175     175   487 my $id;
371 175         399 do { $id = md5_hex(time . rand 999) } while $_[0]->_jobs->{$id};
  175         3603  
372 175         828 ++$_[0]->_data->{job_count};
373 175         4261 return $id;
374             }
375              
376 2162   100 2162   7025 sub _jobs { $_[0]->_data->{jobs} //= {} }
377              
378             sub _load {
379 1851     1851   5887 my ($self, $path) = @_;
380 1851   66     8455 my $decoder = $self->{backend}{_guard_decoder} //= Sereal::Decoder->new;
381              
382             # Borrowed from Mojo::File v7.33
383 1851 50       53304 CORE::open my $file, '<', $path or die qq{Failed to open file ($path): $!};
384 1851         7617 my ($payload, $ret) = ('', undef);
385 1851         15521 while ($ret = sysread $file, my $buffer, 131072, 0) { $payload .= $buffer }
  1851         15883  
386 1851 50       6948 die qq{Failed to read file ($path): $!} unless defined $ret;
387              
388 1851         152478 return sereal_decode_with_object $decoder, $payload;
389             }
390              
391 135   100 135   1105 sub _locks { shift->_data->{locks} //= {} }
392              
393             sub _save {
394 1111     1111   3393 my ($self, $content, $path) = @_;
395 1111   66     4182 my $encoder = $self->{backend}{_guard_encoder} //= Sereal::Encoder->new;
396 1111         32533 my $payload = sereal_encode_with_object $encoder, $content;
397              
398             # Borrowed from Mojo::File v7.33
399 1111 50       653748 CORE::open my $file, '>', $path or die qq{Failed to open file ($path): $!};
400 1111 50 50     32344 (syswrite($file, $payload) // -1) == length $payload
401             or die qq{Failed to write file ($path): $!};
402 1111         15648 return;
403             }
404              
405 602   100 602   2396 sub _workers { $_[0]->_data->{workers} //= {} }
406              
407 1091 50   1091   6732 sub _write { ++$_[0]{write} && return $_[0] }
408              
409             1;
410             __END__