File Coverage

blib/lib/Minion/Backend/SQLite.pm
Criterion Covered Total %
statement 186 187 99.4
branch 63 76 82.8
condition 58 70 82.8
subroutine 31 31 100.0
pod 21 21 100.0
total 359 385 93.2


line stmt bran cond sub pod time code
1             package Minion::Backend::SQLite;
2 9     9   9181 use Mojo::Base 'Minion::Backend';
  9         27  
  9         71  
3              
4 9     9   16972 use Carp 'croak';
  9         27  
  9         397  
5 9     9   46 use List::Util 'min';
  9         18  
  9         484  
6 9     9   4598 use Mojo::SQLite;
  9         806224  
  9         201  
7 9     9   470 use Mojo::Util 'steady_time';
  9         26  
  9         691  
8 9     9   533 use Sys::Hostname 'hostname';
  9         1072  
  9         474  
9 9     9   68 use Time::HiRes 'usleep';
  9         21  
  9         116  
10              
11             our $VERSION = 'v5.0.6';
12              
13             has dequeue_interval => 0.5;
14             has 'sqlite';
15              
16             sub new {
17 9     9 1 417 my $self = shift->SUPER::new(sqlite => Mojo::SQLite->new(@_));
18 9         787 $self->sqlite->auto_migrate(1)->migrations->name('minion')->from_data;
19 9         20279 return $self;
20             }
21              
22             sub broadcast {
23 7   100 7 1 5543 my ($self, $command, $args, $ids) = (shift, shift, shift || [], shift || []);
      100        
24 7         27 my $ids_in = join ',', ('?')x@$ids;
25 7 100       25 return !!$self->sqlite->db->query(
26             q{update minion_workers set inbox =
27             json_set(inbox, '$[' || json_array_length(inbox) || ']', json(?))} .
28             (@$ids ? " where id in ($ids_in)" : ''),
29             {json => [$command, @$args]}, @$ids
30             )->rows;
31             }
32              
33             sub dequeue {
34 499     499 1 358379 my ($self, $id, $wait, $options) = @_;
35 499         1843 my $job = $self->_try($id, $options);
36 499 100       46545 unless ($job) {
37 86         623 my $int = $self->dequeue_interval;
38 86         1060 my $end = steady_time + $wait;
39 86         1325 my $remaining = $wait;
40 86   66     314 usleep(min($int, $remaining) * 1000000)
41             until ($remaining = $end - steady_time) <= 0
42             or $job = $self->_try($id, $options);
43             }
44 499   66     1456630 return $job || $self->_try($id, $options);
45             }
46              
47             sub enqueue {
48 394   100 394 1 636193 my ($self, $task, $args, $options) = (shift, shift, shift || [], shift || {});
      100        
49              
50             return $self->sqlite->db->query(
51             q{insert into minion_jobs
52             (args, attempts, delayed, expires, lax, notes, parents, priority, queue, task)
53             values (?, ?, datetime('now', ? || ' seconds'),
54             case when ? is not null then datetime('now', ? || ' seconds') end,
55             ?, ?, ?, ?, ?, ?)},
56             {json => $args}, $options->{attempts} // 1, $options->{delay} // 0,
57             @$options{qw(expire expire)}, $options->{lax} ? 1 : 0, {json => $options->{notes} || {}},
58             {json => ($options->{parents} || [])}, $options->{priority} // 0,
59 394 100 100     1643 $options->{queue} // 'default', $task
      100        
      100        
      100        
      100        
      100        
60             )->last_insert_id;
61             }
62              
63 130     130 1 50021192 sub fail_job { shift->_update(1, @_) }
64 276     276 1 1404483 sub finish_job { shift->_update(0, @_) }
65              
66             sub history {
67 8     8 1 2768 my $self = shift;
68              
69 8         296 my $db = $self->sqlite->db;
70 8         624 my $steps = $db->query(
71             q{with recursive generate_series(ts) as (
72             select datetime('now','-23 hours')
73             union all
74             select datetime(ts,'+1 hour') from generate_series
75             where datetime(ts,'+1 hour') <= datetime('now')
76             ) select ts, strftime('%s',ts) as epoch,
77             strftime('%d',ts,'localtime') as day,
78             strftime('%H',ts,'localtime') as hour
79             from generate_series order by epoch})->hashes;
80              
81             my $counts = $db->query(
82             q{select strftime('%d',finished,'localtime') as day,
83             strftime('%H',finished,'localtime') as hour,
84             count(case state when 'failed' then 1 end) as failed_jobs,
85             count(case state when 'finished' then 1 end) as finished_jobs
86             from minion_jobs
87 8         10192 where finished > ? group by day, hour}, $steps->first->{ts})->hashes;
88              
89 8         4552 my %daily = map { ("$_->{day}-$_->{hour}" => $_) } @$counts;
  8         208  
90 8         24 my @daily_ordered;
91 8         80 foreach my $step (@$steps) {
92 192   100     808 my $hour_counts = $daily{"$step->{day}-$step->{hour}"} // {};
93             push @daily_ordered, {
94             epoch => $step->{epoch},
95             failed_jobs => $hour_counts->{failed_jobs} // 0,
96 192   100     1096 finished_jobs => $hour_counts->{finished_jobs} // 0,
      100        
97             };
98             }
99              
100 8         200 return {daily => \@daily_ordered};
101             }
102              
103             sub list_jobs {
104 1304     1304 1 3397593 my ($self, $offset, $limit, $options) = @_;
105              
106 1304         2715 my (@where, @where_params);
107 1304 100       4427 if (defined(my $before = $options->{before})) {
108 56         128 push @where, 'id < ?';
109 56         104 push @where_params, $before;
110             }
111 1304 100       3319 if (defined(my $ids = $options->{ids})) {
112 1101         3625 my $ids_in = join ',', ('?')x@$ids;
113 1101 50       4643 push @where, @$ids ? "id in ($ids_in)" : 'id is null';
114 1101         2466 push @where_params, @$ids;
115             }
116 1304 50       3376 if (defined(my $notes = $options->{notes})) {
117 0         0 croak 'Listing jobs by existence of notes is unimplemented';
118             }
119 1304 100       3166 if (defined(my $queues = $options->{queues})) {
120 16         64 my $queues_in = join ',', ('?')x@$queues;
121 16 50       88 push @where, @$queues ? "queue in ($queues_in)" : 'queue is null';
122 16         32 push @where_params, @$queues;
123             }
124 1304 100       3138 if (defined(my $states = $options->{states})) {
125 35         123 my $states_in = join ',', ('?')x@$states;
126 35 50       174 push @where, @$states ? "state in ($states_in)" : 'state is null';
127 35         86 push @where_params, @$states;
128             }
129 1304 100       2837 if (defined(my $tasks = $options->{tasks})) {
130 16         72 my $tasks_in = join ',', ('?')x@$tasks;
131 16 50       80 push @where, @$tasks ? "task in ($tasks_in)" : 'task is null';
132 16         48 push @where_params, @$tasks;
133             }
134 1304         2911 push @where, q{(state != 'inactive' or expires is null or expires > datetime('now'))};
135              
136 1304 50       4938 my $where_str = @where ? 'where ' . join(' and ', @where) : '';
137              
138 1304         4163 my $jobs = $self->sqlite->db->query(
139             qq{select id, args, attempts,
140             (select json_group_array(distinct child.id)
141             from minion_jobs as child, json_each(child.parents) as parent_id
142             where j.id = parent_id.value) as children,
143             strftime('%s',created) as created, strftime('%s',delayed) as delayed,
144             strftime('%s',expires) as expires, strftime('%s',finished) as finished,
145             lax, notes, parents, priority, queue, result,
146             strftime('%s',retried) as retried, retries,
147             strftime('%s',started) as started, state, task,
148             strftime('%s','now') as time, worker
149             from minion_jobs as j
150             $where_str order by id desc limit ? offset ?},
151             @where_params, $limit, $offset
152             )->expand(json => [qw(args children notes parents result)])->hashes->to_array;
153              
154 1304         937818 my $total = $self->sqlite->db->query(qq{select count(*) from minion_jobs as j
155             $where_str}, @where_params)->arrays->first->[0];
156            
157 1304         444951 return {jobs => $jobs, total => $total};
158             }
159              
160             sub list_locks {
161 72     72 1 70832 my ($self, $offset, $limit, $options) = @_;
162            
163 72         152 my (@where, @where_params);
164 72         192 push @where, q{expires > datetime('now')};
165 72 100       336 if (defined(my $names = $options->{names})) {
166 16         64 my $names_in = join ',', ('?')x@$names;
167 16 50       88 push @where, @$names ? "name in ($names_in)" : 'name is null';
168 16         40 push @where_params, @$names;
169             }
170            
171 72         256 my $where_str = 'where ' . join(' and ', @where);
172            
173 72         208 my $locks = $self->sqlite->db->query(
174             qq{select name, strftime('%s',expires) as expires from minion_locks
175             $where_str order by id desc limit ? offset ?},
176             @where_params, $limit, $offset
177             )->hashes->to_array;
178            
179 72         27208 my $total = $self->sqlite->db->query(qq{select count(*) from minion_locks
180             $where_str}, @where_params)->arrays->first->[0];
181            
182 72         24016 return {locks => $locks, total => $total};
183             }
184              
185             sub list_workers {
186 233     233 1 227391 my ($self, $offset, $limit, $options) = @_;
187              
188 233         523 my (@where, @where_params);
189 233 100       942 if (defined(my $before = $options->{before})) {
190 48         160 push @where, 'w.id < ?';
191 48         120 push @where_params, $before;
192             }
193 233 100       693 if (defined(my $ids = $options->{ids})) {
194 113         477 my $ids_in = join ',', ('?')x@$ids;
195 113 50       590 push @where, @$ids ? "w.id in ($ids_in)" : 'w.id is null';
196 113         315 push @where_params, @$ids;
197             }
198              
199 233 100       1030 my $where_str = @where ? 'where ' . join(' and ', @where) : '';
200 233         846 my $workers = $self->sqlite->db->query(
201             qq{select w.id, strftime('%s',w.notified) as notified,
202             group_concat(j.id) as jobs, w.host, w.pid, w.status,
203             strftime('%s',w.started) as started
204             from minion_workers as w
205             left join minion_jobs as j on j.worker = w.id and j.state = 'active'
206             $where_str group by w.id order by w.id desc limit ? offset ?},
207             @where_params, $limit, $offset
208             )->expand(json => 'status')->hashes->to_array;
209 233   100     124734 $_->{jobs} = [split /,/, ($_->{jobs} // '')] for @$workers;
210              
211 233         15839 my $total = $self->sqlite->db->query(qq{select count(*)
212             from minion_workers as w $where_str}, @where_params)->arrays->first->[0];
213              
214 233         68037 return {total => $total, workers => $workers};
215             }
216              
217             sub lock {
218 280   100 280 1 201056 my ($self, $name, $duration, $options) = (shift, shift, shift, shift // {});
219 280         912 my $db = $self->sqlite->db;
220 280         21048 $db->query(q{delete from minion_locks where expires < datetime('now')});
221 280         50032 my $tx = $db->begin('exclusive');
222 280         27320 my $locks = $db->query(q{select count(*) from minion_locks where name = ?},
223             $name)->arrays->first->[0];
224 280 100 100     47536 return !!0 if defined $locks and $locks >= ($options->{limit} || 1);
      66        
225 200 100 66     4936 if (defined $duration and $duration > 0) {
226 152         552 $db->query(q{insert into minion_locks (name, expires)
227             values (?, datetime('now', ? || ' seconds'))}, $name, $duration);
228 152         26760 $tx->commit;
229             }
230 200         18376 return !!1;
231             }
232              
233             sub note {
234 32     32 1 29520 my ($self, $id, $merge) = @_;
235 32         88 my (@set, @set_params, @remove, @remove_params);
236 32         168 foreach my $key (keys %$merge) {
237 40 50       240 croak qq{Invalid note key '$key'; must not contain '.', '[', or ']'}
238             if $key =~ m/[\[\].]/;
239 40 100       136 if (defined $merge->{$key}) {
240 24         160 push @set, q{'$.' || ?}, 'json(?)';
241 24         120 push @set_params, $key, {json => $merge->{$key}};
242             } else {
243 16         240 push @remove, q{'$.' || ?};
244 16         136 push @remove_params, $key;
245             }
246             }
247 32         120 my $json_set = join ', ', @set;
248 32         88 my $json_remove = join ', ', @remove;
249 32         48 my $set_to = 'notes';
250 32 100       144 $set_to = "json_set($set_to, $json_set)" if @set;
251 32 100       104 $set_to = "json_remove($set_to, $json_remove)" if @remove;
252 32         128 return !!$self->sqlite->db->query(
253             qq{update minion_jobs set notes = $set_to where id = ?},
254             @set_params, @remove_params, $id
255             )->rows;
256             }
257              
258             sub receive {
259 9     9 1 6496 my ($self, $id) = @_;
260 9         33 my $db = $self->sqlite->db;
261 9         656 my $tx = $db->begin;
262 9         706 my $array = $db->query(q{select inbox from minion_workers where id = ?}, $id)
263             ->expand(json => 'inbox')->array;
264 9 50       4210 $db->query(q{update minion_workers set inbox = '[]' where id = ?}, $id)
265             if $array;
266 9         2009 $tx->commit;
267 9 50       622 return $array ? $array->[0] : [];
268             }
269              
270             sub register_worker {
271 307   50 307 1 2176694 my ($self, $id, $options) = (shift, shift, shift || {});
272              
273             return $id if $id && $self->sqlite->db->query(
274             q{update minion_workers set notified = datetime('now'), status = ?
275 307 100 50     1493 where id = ?}, {json => $options->{status} // {}}, $id)->rows;
      66        
276              
277             return $self->sqlite->db->query(
278             q{insert into minion_workers (host, pid, status) values (?, ?, ?)},
279 261   50     884 hostname, $$, {json => $options->{status} // {}})->last_insert_id;
280             }
281              
282             sub remove_job {
283             !!shift->sqlite->db->query(
284 86     86 1 59016 q{delete from minion_jobs
285             where id = ? and state in ('inactive', 'failed', 'finished')}, shift
286             )->rows;
287             }
288              
289             sub repair {
290 72     72 1 68638 my $self = shift;
291              
292             # Workers without heartbeat
293 72         243 my $db = $self->sqlite->db;
294 72         420239 my $minion = $self->minion;
295 72         625 $db->query(
296             q{delete from minion_workers
297             where notified < datetime('now', '-' || ? || ' seconds')}, $minion->missing_after
298             );
299              
300             # Old jobs with no unresolved dependencies and expired jobs
301 72         15768 $db->query(
302             q{delete from minion_jobs
303             where (finished <= datetime('now', '-' || ? || ' seconds')
304             and state = 'finished' and id not in (select distinct parent_id.value
305             from minion_jobs as child, json_each(child.parents) as parent_id
306             where child.state <> 'finished'))
307             or (expires <= datetime('now') and state = 'inactive')}, $minion->remove_after);
308              
309             # Jobs with missing worker (can be retried)
310 72         21595 my $fail = $db->query(
311             q{select id, retries from minion_jobs as j
312             where state = 'active' and queue != 'minion_foreground'
313             and not exists (select 1 from minion_workers where id = j.worker)}
314             )->hashes;
315 72     22   19906 $fail->each(sub { $self->fail_job(@$_{qw(id retries)}, 'Worker went away') });
  22         886  
316              
317             # Jobs in queue without workers or not enough workers (cannot be retried and requires admin attention)
318 72         7082 $db->query(
319             q{update minion_jobs set state = 'failed', result = json_quote('Job appears stuck in queue')
320             where state = 'inactive' and delayed < datetime('now', '-' || ? || ' seconds')},
321             $minion->stuck_after);
322             }
323              
324             sub reset {
325 26   50 26 1 22743 my ($self, $options) = (shift, shift // {});
326 26         191 my $db = $self->sqlite->db;
327 26 100       5067 if ($options->{all}) {
    50          
328 18         125 my $tx = $db->begin;
329 18         1460 $db->query('delete from minion_jobs');
330 18         4061 $db->query('delete from minion_locks');
331 18         3506 $db->query('delete from minion_workers');
332 18         3472 $db->query(q{delete from sqlite_sequence
333             where name in ('minion_jobs','minion_locks','minion_workers')});
334 18         4928 $tx->commit;
335             } elsif ($options->{locks}) {
336 8         40 $db->query('delete from minion_locks');
337             }
338             }
339              
340             sub retry_job {
341 132   100 132 1 74664 my ($self, $id, $retries, $options) = (shift, shift, shift, shift || {});
342              
343             my $parents = defined $options->{parents}
344 132 100       704 ? {json => $options->{parents}} : undef;
345             return !!$self->sqlite->db->query(
346             q{update minion_jobs
347             set attempts = coalesce(?, attempts),
348             delayed = datetime('now', ? || ' seconds'),
349             expires = case when ? is not null then datetime('now', ? || ' seconds') else expires end,
350             lax = coalesce(?, lax), parents = coalesce(?, parents), priority = coalesce(?, priority),
351             queue = coalesce(?, queue), retried = datetime('now'),
352             retries = retries + 1, state = 'inactive'
353             where id = ? and retries = ?},
354             $options->{attempts}, $options->{delay} // 0, @$options{qw(expire expire)},
355             exists $options->{lax} ? $options->{lax} ? 1 : 0 : undef,
356 132 100 100     502 $parents, @$options{qw(priority queue)}, $id, $retries
    100          
357             )->rows;
358             }
359              
360             sub stats {
361 117     117 1 83269 my $self = shift;
362              
363 117         385 my $stats = $self->sqlite->db->query(
364             q{select
365             (select count(*) from minion_jobs where state = 'inactive' and (expires is null or expires > datetime('now')))
366             as inactive_jobs,
367             (select count(*) from minion_jobs where state = 'active') as active_jobs,
368             (select count(*) from minion_jobs where state = 'failed') as failed_jobs,
369             (select count(*) from minion_jobs where state = 'finished') as finished_jobs,
370             (select count(*) from minion_jobs where state = 'inactive' and delayed > datetime('now')) as delayed_jobs,
371             (select count(*) from minion_locks where expires > datetime('now')) as active_locks,
372             (select count(distinct(worker)) from minion_jobs where state = 'active') as active_workers,
373             ifnull((select seq from sqlite_sequence where name = 'minion_jobs'), 0) as enqueued_jobs,
374             (select count(*) from minion_workers) as inactive_workers,
375             null as uptime}
376             )->hash;
377 117         39312 $stats->{inactive_workers} -= $stats->{active_workers};
378              
379 117         7841 return $stats;
380             }
381              
382             sub unlock {
383             !!shift->sqlite->db->query(
384 152     152 1 104608 q{delete from minion_locks where id = (
385             select id from minion_locks
386             where expires > datetime('now') and name = ?
387             order by expires limit 1)}, shift
388             )->rows;
389             }
390              
391             sub unregister_worker {
392 238     238 1 2205886 shift->sqlite->db->query('delete from minion_workers where id = ?', shift);
393             }
394              
395             sub _try {
396 617     617   13525985 my ($self, $id, $options) = @_;
397              
398 617         2150 my $db = $self->sqlite->db;
399 617   100     54674 my $queues = $options->{queues} || ['default'];
400 617         1647 my $tasks = [keys %{$self->minion->tasks}];
  617         3240  
401 617 50 33     11359 return undef unless @$queues and @$tasks;
402 617         2438 my $queues_in = join ',', ('?')x@$queues;
403 617         1827 my $tasks_in = join ',', ('?')x@$tasks;
404            
405 617         3015 my $tx = $db->begin;
406             my $res = $db->query(
407             qq{select id from minion_jobs as j
408             where delayed <= datetime('now') and id = coalesce(?, id)
409             and (json_array_length(parents) = 0 or not exists (
410             select 1 from minion_jobs as parent, json_each(j.parents) as parent_id
411             where parent.id = parent_id.value and case parent.state
412             when 'active' then 1
413             when 'failed' then not j.lax
414             when 'inactive' then (parent.expires is null or parent.expires > datetime('now'))
415             end
416             )) and priority >= coalesce(?, priority) and queue in ($queues_in) and state = 'inactive'
417             and task in ($tasks_in) and (expires is null or expires > datetime('now'))
418             order by priority desc, id
419 617         50218 limit 1}, $options->{id}, $options->{min_priority}, @$queues, @$tasks
420             );
421 617   100     172684 my $job_id = ($res->arrays->first // [])->[0] // return undef;
      100        
422 413         19624 $db->query(
423             q{update minion_jobs
424             set started = datetime('now'), state = 'active', worker = ?
425             where id = ?}, $id, $job_id
426             );
427 413         78863 $tx->commit;
428            
429 413   50     41523 my $info = $db->query(
430             'select id, args, retries, task from minion_jobs where id = ?', $job_id
431             )->expand(json => 'args')->hash // return undef;
432            
433 413         127973 return $info;
434             }
435              
436             sub _update {
437 406     406   1569 my ($self, $fail, $id, $retries, $result) = @_;
438              
439 406         2858 my $db = $self->sqlite->db;
440 406 100       80919 return undef unless $db->query(
    100          
441             q{update minion_jobs
442             set finished = datetime('now'), result = ?, state = ?
443             where id = ? and retries = ? and state = 'active'},
444             {json => $result}, $fail ? 'failed' : 'finished', $id, $retries
445             )->rows > 0;
446            
447 336         156363 my $row = $db->query('select attempts from minion_jobs where id = ?', $id)->array;
448 336 100       73426 return $fail ? $self->auto_retry_job($id, $retries, $row->[0]) : 1;
449             }
450              
451             1;
452              
453             =encoding utf8
454              
455             =head1 NAME
456              
457             Minion::Backend::SQLite - SQLite backend for Minion job queue
458              
459             =head1 SYNOPSIS
460              
461             use Minion::Backend::SQLite;
462             my $backend = Minion::Backend::SQLite->new('sqlite:test.db');
463              
464             # Minion
465             use Minion;
466             my $minion = Minion->new(SQLite => 'sqlite:test.db');
467              
468             # Mojolicious (via Mojolicious::Plugin::Minion)
469             $self->plugin(Minion => { SQLite => 'sqlite:test.db' });
470              
471             # Mojolicious::Lite (via Mojolicious::Plugin::Minion)
472             plugin Minion => { SQLite => 'sqlite:test.db' };
473              
474             # Share the database connection cache
475             helper sqlite => sub { state $sqlite = Mojo::SQLite->new('sqlite:test.db') };
476             plugin Minion => { SQLite => app->sqlite };
477              
478             =head1 DESCRIPTION
479              
480             L is a backend for L based on L.
481             All necessary tables will be created automatically with a set of migrations
482             named C. If no connection string or C<:temp:> is provided, the database
483             will be created in a temporary directory.
484              
485             =head1 ATTRIBUTES
486              
487             L inherits all attributes from L and
488             implements the following new ones.
489              
490             =head2 dequeue_interval
491              
492             my $seconds = $backend->dequeue_interval;
493             $backend = $backend->dequeue_interval($seconds);
494              
495             Interval in seconds between L attempts. Defaults to C<0.5>.
496              
497             =head2 sqlite
498              
499             my $sqlite = $backend->sqlite;
500             $backend = $backend->sqlite(Mojo::SQLite->new);
501              
502             L object used to store all data.
503              
504             =head1 METHODS
505              
506             L inherits all methods from L and
507             implements the following new ones.
508              
509             =head2 new
510              
511             my $backend = Minion::Backend::SQLite->new;
512             my $backend = Minion::Backend::SQLite->new(':temp:');
513             my $backend = Minion::Backend::SQLite->new('sqlite:test.db');
514             my $backend = Minion::Backend::SQLite->new->tap(sub { $_->sqlite->from_filename('C:\\foo\\bar.db') });
515             my $backend = Minion::Backend::SQLite->new(Mojo::SQLite->new);
516              
517             Construct a new L object.
518              
519             =head2 broadcast
520              
521             my $bool = $backend->broadcast('some_command');
522             my $bool = $backend->broadcast('some_command', [@args]);
523             my $bool = $backend->broadcast('some_command', [@args], [$id1, $id2, $id3]);
524              
525             Broadcast remote control command to one or more workers.
526              
527             =head2 dequeue
528              
529             my $job_info = $backend->dequeue($worker_id, 0.5);
530             my $job_info = $backend->dequeue($worker_id, 0.5, {queues => ['important']});
531              
532             Wait a given amount of time in seconds for a job, dequeue it and transition
533             from C to C state, or return C if queues were empty.
534             Jobs will be checked for in intervals defined by L until
535             the timeout is reached.
536              
537             These options are currently available:
538              
539             =over 2
540              
541             =item id
542              
543             id => '10023'
544              
545             Dequeue a specific job.
546              
547             =item min_priority
548              
549             min_priority => 3
550              
551             Do not dequeue jobs with a lower priority.
552              
553             =item queues
554              
555             queues => ['important']
556              
557             One or more queues to dequeue jobs from, defaults to C.
558              
559             =back
560              
561             These fields are currently available:
562              
563             =over 2
564              
565             =item args
566              
567             args => ['foo', 'bar']
568              
569             Job arguments.
570              
571             =item id
572              
573             id => '10023'
574              
575             Job ID.
576              
577             =item retries
578              
579             retries => 3
580              
581             Number of times job has been retried.
582              
583             =item task
584              
585             task => 'foo'
586              
587             Task name.
588              
589             =back
590              
591             =head2 enqueue
592              
593             my $job_id = $backend->enqueue('foo');
594             my $job_id = $backend->enqueue(foo => [@args]);
595             my $job_id = $backend->enqueue(foo => [@args] => {priority => 1});
596              
597             Enqueue a new job with C state.
598              
599             These options are currently available:
600              
601             =over 2
602              
603             =item attempts
604              
605             attempts => 25
606              
607             Number of times performing this job will be attempted, with a delay based on
608             L after the first attempt, defaults to C<1>.
609              
610             =item delay
611              
612             delay => 10
613              
614             Delay job for this many seconds (from now).
615              
616             =item expire
617              
618             expire => 300
619              
620             Job is valid for this many seconds (from now) before it expires. Note that this
621             option is B and might change without warning!
622              
623             =item lax
624              
625             lax => 1
626              
627             Existing jobs this job depends on may also have transitioned to the C
628             state to allow for it to be processed, defaults to C. Note that this
629             option is B and might change without warning!
630              
631             =item notes
632              
633             notes => {foo => 'bar', baz => [1, 2, 3]}
634              
635             Hash reference with arbitrary metadata for this job.
636              
637             =item parents
638              
639             parents => [$id1, $id2, $id3]
640              
641             One or more existing jobs this job depends on, and that need to have
642             transitioned to the state C before it can be processed.
643              
644             =item priority
645              
646             priority => 5
647              
648             Job priority, defaults to C<0>. Jobs with a higher priority get performed first.
649              
650             =item queue
651              
652             queue => 'important'
653              
654             Queue to put job in, defaults to C.
655              
656             =back
657              
658             =head2 fail_job
659              
660             my $bool = $backend->fail_job($job_id, $retries);
661             my $bool = $backend->fail_job($job_id, $retries, 'Something went wrong!');
662             my $bool = $backend->fail_job(
663             $job_id, $retries, {msg => 'Something went wrong!'});
664              
665             Transition from C to C state with or without a result, and if
666             there are attempts remaining, transition back to C with an
667             exponentially increasing delay based on L.
668              
669             =head2 finish_job
670              
671             my $bool = $backend->finish_job($job_id, $retries);
672             my $bool = $backend->finish_job($job_id, $retries, 'All went well!');
673             my $bool = $backend->finish_job($job_id, $retries, {msg => 'All went well!'});
674              
675             Transition from C to C state with or without a result.
676              
677             =head2 history
678              
679             my $history = $backend->history;
680              
681             Get history information for job queue.
682              
683             These fields are currently available:
684              
685             =over 2
686              
687             =item daily
688              
689             daily => [{epoch => 12345, finished_jobs => 95, failed_jobs => 2}, ...]
690              
691             Hourly counts for processed jobs from the past day.
692              
693             =back
694              
695             =head2 list_jobs
696              
697             my $results = $backend->list_jobs($offset, $limit);
698             my $results = $backend->list_jobs($offset, $limit, {states => ['inactive']});
699              
700             Returns the information about jobs in batches.
701              
702             # Get the total number of results (without limit)
703             my $num = $backend->list_jobs(0, 100, {queues => ['important']})->{total};
704              
705             # Check job state
706             my $results = $backend->list_jobs(0, 1, {ids => [$job_id]});
707             my $state = $results->{jobs}[0]{state};
708              
709             # Get job result
710             my $results = $backend->list_jobs(0, 1, {ids => [$job_id]});
711             my $result = $results->{jobs}[0]{result};
712              
713             These options are currently available:
714              
715             =over 2
716              
717             =item before
718              
719             before => 23
720              
721             List only jobs before this id.
722              
723             =item ids
724              
725             ids => ['23', '24']
726              
727             List only jobs with these ids.
728              
729             =item queues
730              
731             queues => ['important', 'unimportant']
732              
733             List only jobs in these queues.
734              
735             =item states
736              
737             states => ['inactive', 'active']
738              
739             List only jobs in these states.
740              
741             =item tasks
742              
743             tasks => ['foo', 'bar']
744              
745             List only jobs for these tasks.
746              
747             =back
748              
749             These fields are currently available:
750              
751             =over 2
752              
753             =item args
754              
755             args => ['foo', 'bar']
756              
757             Job arguments.
758              
759             =item attempts
760              
761             attempts => 25
762              
763             Number of times performing this job will be attempted.
764              
765             =item children
766              
767             children => ['10026', '10027', '10028']
768              
769             Jobs depending on this job.
770              
771             =item created
772              
773             created => 784111777
774              
775             Epoch time job was created.
776              
777             =item delayed
778              
779             delayed => 784111777
780              
781             Epoch time job was delayed to.
782              
783             =item expires
784              
785             expires => 784111777
786              
787             Epoch time job is valid until before it expires.
788              
789             =item finished
790              
791             finished => 784111777
792              
793             Epoch time job was finished.
794              
795             =item id
796              
797             id => 10025
798              
799             Job id.
800              
801             =item lax
802              
803             lax => 0
804              
805             Existing jobs this job depends on may also have failed to allow for it to be
806             processed.
807              
808             =item notes
809              
810             notes => {foo => 'bar', baz => [1, 2, 3]}
811              
812             Hash reference with arbitrary metadata for this job.
813              
814             =item parents
815              
816             parents => ['10023', '10024', '10025']
817              
818             Jobs this job depends on.
819              
820             =item priority
821              
822             priority => 3
823              
824             Job priority.
825              
826             =item queue
827              
828             queue => 'important'
829              
830             Queue name.
831              
832             =item result
833              
834             result => 'All went well!'
835              
836             Job result.
837              
838             =item retried
839              
840             retried => 784111777
841              
842             Epoch time job has been retried.
843              
844             =item retries
845              
846             retries => 3
847              
848             Number of times job has been retried.
849              
850             =item started
851              
852             started => 784111777
853              
854             Epoch time job was started.
855              
856             =item state
857              
858             state => 'inactive'
859              
860             Current job state, usually C, C, C or C.
861              
862             =item task
863              
864             task => 'foo'
865              
866             Task name.
867              
868             =item time
869              
870             time => 78411177
871              
872             Current time.
873              
874             =item worker
875              
876             worker => '154'
877              
878             Id of worker that is processing the job.
879              
880             =back
881              
882             =head2 list_locks
883              
884             my $results = $backend->list_locks($offset, $limit);
885             my $results = $backend->list_locks($offset, $limit, {names => ['foo']});
886              
887             Returns information about locks in batches.
888              
889             # Get the total number of results (without limit)
890             my $num = $backend->list_locks(0, 100, {names => ['bar']})->{total};
891              
892             # Check expiration time
893             my $results = $backend->list_locks(0, 1, {names => ['foo']});
894             my $expires = $results->{locks}[0]{expires};
895              
896             These options are currently available:
897              
898             =over 2
899              
900             =item names
901              
902             names => ['foo', 'bar']
903              
904             List only locks with these names.
905              
906             =back
907              
908             These fields are currently available:
909              
910             =over 2
911              
912             =item expires
913              
914             expires => 784111777
915              
916             Epoch time this lock will expire.
917              
918             =item name
919              
920             name => 'foo'
921              
922             Lock name.
923              
924             =back
925              
926             =head2 list_workers
927              
928             my $results = $backend->list_workers($offset, $limit);
929             my $results = $backend->list_workers($offset, $limit, {ids => [23]});
930              
931             Returns information about workers in batches.
932              
933             # Get the total number of results (without limit)
934             my $num = $backend->list_workers(0, 100)->{total};
935              
936             # Check worker host
937             my $results = $backend->list_workers(0, 1, {ids => [$worker_id]});
938             my $host = $results->{workers}[0]{host};
939              
940             These options are currently available:
941              
942             =over 2
943              
944             =item before
945              
946             before => 23
947              
948             List only workers before this id.
949              
950             =item ids
951              
952             ids => ['23', '24']
953              
954             List only workers with these ids.
955              
956             =back
957              
958             These fields are currently available:
959              
960             =over 2
961              
962             =item id
963              
964             id => 22
965              
966             Worker id.
967              
968             =item host
969              
970             host => 'localhost'
971              
972             Worker host.
973              
974             =item jobs
975              
976             jobs => ['10023', '10024', '10025', '10029']
977              
978             Ids of jobs the worker is currently processing.
979              
980             =item notified
981              
982             notified => 784111777
983              
984             Epoch time worker sent the last heartbeat.
985              
986             =item pid
987              
988             pid => 12345
989              
990             Process id of worker.
991              
992             =item started
993              
994             started => 784111777
995              
996             Epoch time worker was started.
997              
998             =item status
999              
1000             status => {queues => ['default', 'important']}
1001              
1002             Hash reference with whatever status information the worker would like to share.
1003              
1004             =back
1005              
1006             =head2 lock
1007              
1008             my $bool = $backend->lock('foo', 3600);
1009             my $bool = $backend->lock('foo', 3600, {limit => 20});
1010              
1011             Try to acquire a named lock that will expire automatically after the given
1012             amount of time in seconds. An expiration time of C<0> can be used to check if a
1013             named lock already exists without creating one.
1014              
1015             These options are currently available:
1016              
1017             =over 2
1018              
1019             =item limit
1020              
1021             limit => 20
1022              
1023             Number of shared locks with the same name that can be active at the same time,
1024             defaults to C<1>.
1025              
1026             =back
1027              
1028             =head2 note
1029              
1030             my $bool = $backend->note($job_id, {mojo => 'rocks', minion => 'too'});
1031              
1032             Change one or more metadata fields for a job. Setting a value to C will
1033             remove the field. It is currently an error to attempt to set a metadata field
1034             with a name containing the characters C<.>, C<[>, or C<]>.
1035              
1036             =head2 receive
1037              
1038             my $commands = $backend->receive($worker_id);
1039              
1040             Receive remote control commands for worker.
1041              
1042             =head2 register_worker
1043              
1044             my $worker_id = $backend->register_worker;
1045             my $worker_id = $backend->register_worker($worker_id);
1046             my $worker_id = $backend->register_worker(
1047             $worker_id, {status => {queues => ['default', 'important']}});
1048              
1049             Register worker or send heartbeat to show that this worker is still alive.
1050              
1051             These options are currently available:
1052              
1053             =over 2
1054              
1055             =item status
1056              
1057             status => {queues => ['default', 'important']}
1058              
1059             Hash reference with whatever status information the worker would like to share.
1060              
1061             =back
1062              
1063             =head2 remove_job
1064              
1065             my $bool = $backend->remove_job($job_id);
1066              
1067             Remove C, C or C job from queue.
1068              
1069             =head2 repair
1070              
1071             $backend->repair;
1072              
1073             Repair worker registry and job queue if necessary.
1074              
1075             =head2 reset
1076              
1077             $backend->reset({all => 1});
1078              
1079             Reset job queue.
1080              
1081             These options are currently available:
1082              
1083             =over 2
1084              
1085             =item all
1086              
1087             all => 1
1088              
1089             Reset everything.
1090              
1091             =item locks
1092              
1093             locks => 1
1094              
1095             Reset only locks.
1096              
1097             =back
1098              
1099             =head2 retry_job
1100              
1101             my $bool = $backend->retry_job($job_id, $retries);
1102             my $bool = $backend->retry_job($job_id, $retries, {delay => 10});
1103              
1104             Transition job back to C state, already C jobs may also be
1105             retried to change options.
1106              
1107             These options are currently available:
1108              
1109             =over 2
1110              
1111             =item attempts
1112              
1113             attempts => 25
1114              
1115             Number of times performing this job will be attempted.
1116              
1117             =item delay
1118              
1119             delay => 10
1120              
1121             Delay job for this many seconds (from now).
1122              
1123             =item expire
1124              
1125             expire => 300
1126              
1127             Job is valid for this many seconds (from now) before it expires. Note that this
1128             option is B and might change without warning!
1129              
1130             =item lax
1131              
1132             lax => 1
1133              
1134             Existing jobs this job depends on may also have transitioned to the C
1135             state to allow for it to be processed, defaults to C. Note that this
1136             option is B and might change without warning!
1137              
1138             =item parents
1139              
1140             parents => [$id1, $id2, $id3]
1141              
1142             Jobs this job depends on.
1143              
1144             =item priority
1145              
1146             priority => 5
1147              
1148             Job priority.
1149              
1150             =item queue
1151              
1152             queue => 'important'
1153              
1154             Queue to put job in.
1155              
1156             =back
1157              
1158             =head2 stats
1159              
1160             my $stats = $backend->stats;
1161              
1162             Get statistics for the job queue.
1163              
1164             These fields are currently available:
1165              
1166             =over 2
1167              
1168             =item active_jobs
1169              
1170             active_jobs => 100
1171              
1172             Number of jobs in C state.
1173              
1174             =item active_locks
1175              
1176             active_locks => 100
1177              
1178             Number of active named locks.
1179              
1180             =item active_workers
1181              
1182             active_workers => 100
1183              
1184             Number of workers that are currently processing a job.
1185              
1186             =item delayed_jobs
1187              
1188             delayed_jobs => 100
1189              
1190             Number of jobs in C state that are scheduled to run at specific time
1191             in the future.
1192              
1193             =item enqueued_jobs
1194              
1195             enqueued_jobs => 100000
1196              
1197             Rough estimate of how many jobs have ever been enqueued.
1198              
1199             =item failed_jobs
1200              
1201             failed_jobs => 100
1202              
1203             Number of jobs in C state.
1204              
1205             =item finished_jobs
1206              
1207             finished_jobs => 100
1208              
1209             Number of jobs in C state.
1210              
1211             =item inactive_jobs
1212              
1213             inactive_jobs => 100
1214              
1215             Number of jobs in C state.
1216              
1217             =item inactive_workers
1218              
1219             inactive_workers => 100
1220              
1221             Number of workers that are currently not processing a job.
1222              
1223             =item uptime
1224              
1225             uptime => undef
1226              
1227             Uptime in seconds. Always undefined for SQLite.
1228              
1229             =back
1230              
1231             =head2 unlock
1232              
1233             my $bool = $backend->unlock('foo');
1234              
1235             Release a named lock.
1236              
1237             =head2 unregister_worker
1238              
1239             $backend->unregister_worker($worker_id);
1240              
1241             Unregister worker.
1242              
1243             =head1 BUGS
1244              
1245             Report any issues on the public bugtracker.
1246              
1247             =head1 AUTHOR
1248              
1249             Dan Book
1250              
1251             =head1 COPYRIGHT AND LICENSE
1252              
1253             This software is Copyright (c) 2015 by Dan Book.
1254              
1255             This is free software, licensed under:
1256              
1257             The Artistic License 2.0 (GPL Compatible)
1258              
1259             =head1 SEE ALSO
1260              
1261             L, L
1262              
1263             =cut
1264              
1265             __DATA__