File Coverage

blib/lib/Minion/Backend/SQLite.pm
Criterion Covered Total %
statement 186 187 99.4
branch 63 76 82.8
condition 58 70 82.8
subroutine 31 31 100.0
pod 21 21 100.0
total 359 385 93.2


line stmt bran cond sub pod time code
1             package Minion::Backend::SQLite;
2 9     9   8018 use Mojo::Base 'Minion::Backend';
  9         17  
  9         62  
3              
4 9     9   15565 use Carp 'croak';
  9         18  
  9         342  
5 9     9   46 use List::Util 'min';
  9         17  
  9         451  
6 9     9   4339 use Mojo::SQLite;
  9         647443  
  9         127  
7 9     9   425 use Mojo::Util 'steady_time';
  9         26  
  9         468  
8 9     9   833 use Sys::Hostname 'hostname';
  9         1047  
  9         422  
9 9     9   49 use Time::HiRes 'usleep';
  9         26  
  9         83  
10              
11             our $VERSION = 'v5.0.5';
12              
13             has dequeue_interval => 0.5;
14             has 'sqlite';
15              
16             sub new {
17 9     9 1 392 my $self = shift->SUPER::new(sqlite => Mojo::SQLite->new(@_));
18 9         638 $self->sqlite->auto_migrate(1)->migrations->name('minion')->from_data;
19 9         16721 return $self;
20             }
21              
22             sub broadcast {
23 7   100 7 1 4065 my ($self, $command, $args, $ids) = (shift, shift, shift || [], shift || []);
      100        
24 7         23 my $ids_in = join ',', ('?')x@$ids;
25 7 100       45 return !!$self->sqlite->db->query(
26             q{update minion_workers set inbox =
27             json_set(inbox, '$[' || json_array_length(inbox) || ']', json(?))} .
28             (@$ids ? " where id in ($ids_in)" : ''),
29             {json => [$command, @$args]}, @$ids
30             )->rows;
31             }
32              
33             sub dequeue {
34 499     499 1 304924 my ($self, $id, $wait, $options) = @_;
35 499         1898 my $job = $self->_try($id, $options);
36 499 100       37996 unless ($job) {
37 86         614 my $int = $self->dequeue_interval;
38 86         1088 my $end = steady_time + $wait;
39 86         1255 my $remaining = $wait;
40 86   66     272 usleep(min($int, $remaining) * 1000000)
41             until ($remaining = $end - steady_time) <= 0
42             or $job = $self->_try($id, $options);
43             }
44 499   66     1467184 return $job || $self->_try($id, $options);
45             }
46              
47             sub enqueue {
48 394   100 394 1 541114 my ($self, $task, $args, $options) = (shift, shift, shift || [], shift || {});
      100        
49              
50             return $self->sqlite->db->query(
51             q{insert into minion_jobs
52             (args, attempts, delayed, expires, lax, notes, parents, priority, queue, task)
53             values (?, ?, datetime('now', ? || ' seconds'),
54             case when ? is not null then datetime('now', ? || ' seconds') end,
55             ?, ?, ?, ?, ?, ?)},
56             {json => $args}, $options->{attempts} // 1, $options->{delay} // 0,
57             @$options{qw(expire expire)}, $options->{lax} ? 1 : 0, {json => $options->{notes} || {}},
58             {json => ($options->{parents} || [])}, $options->{priority} // 0,
59 394 100 100     1503 $options->{queue} // 'default', $task
      100        
      100        
      100        
      100        
      100        
60             )->last_insert_id;
61             }
62              
63 130     130 1 36533207 sub fail_job { shift->_update(1, @_) }
64 276     276 1 1009174 sub finish_job { shift->_update(0, @_) }
65              
66             sub history {
67 8     8 1 2272 my $self = shift;
68              
69 8         256 my $db = $self->sqlite->db;
70 8         520 my $steps = $db->query(
71             q{with recursive generate_series(ts) as (
72             select datetime('now','-23 hours')
73             union all
74             select datetime(ts,'+1 hour') from generate_series
75             where datetime(ts,'+1 hour') <= datetime('now')
76             ) select ts, strftime('%s',ts) as epoch,
77             strftime('%d',ts,'localtime') as day,
78             strftime('%H',ts,'localtime') as hour
79             from generate_series order by epoch})->hashes;
80              
81             my $counts = $db->query(
82             q{select strftime('%d',finished,'localtime') as day,
83             strftime('%H',finished,'localtime') as hour,
84             count(case state when 'failed' then 1 end) as failed_jobs,
85             count(case state when 'finished' then 1 end) as finished_jobs
86             from minion_jobs
87 8         8912 where finished > ? group by day, hour}, $steps->first->{ts})->hashes;
88              
89 8         3816 my %daily = map { ("$_->{day}-$_->{hour}" => $_) } @$counts;
  8         176  
90 8         24 my @daily_ordered;
91 8         72 foreach my $step (@$steps) {
92 192   100     648 my $hour_counts = $daily{"$step->{day}-$step->{hour}"} // {};
93             push @daily_ordered, {
94             epoch => $step->{epoch},
95             failed_jobs => $hour_counts->{failed_jobs} // 0,
96 192   100     1096 finished_jobs => $hour_counts->{finished_jobs} // 0,
      100        
97             };
98             }
99              
100 8         184 return {daily => \@daily_ordered};
101             }
102              
103             sub list_jobs {
104 1304     1304 1 3158852 my ($self, $offset, $limit, $options) = @_;
105              
106 1304         2081 my (@where, @where_params);
107 1304 100       3605 if (defined(my $before = $options->{before})) {
108 56         96 push @where, 'id < ?';
109 56         104 push @where_params, $before;
110             }
111 1304 100       2978 if (defined(my $ids = $options->{ids})) {
112 1101         3368 my $ids_in = join ',', ('?')x@$ids;
113 1101 50       3732 push @where, @$ids ? "id in ($ids_in)" : 'id is null';
114 1101         1697 push @where_params, @$ids;
115             }
116 1304 50       2778 if (defined(my $notes = $options->{notes})) {
117 0         0 croak 'Listing jobs by existence of notes is unimplemented';
118             }
119 1304 100       2574 if (defined(my $queues = $options->{queues})) {
120 16         64 my $queues_in = join ',', ('?')x@$queues;
121 16 50       80 push @where, @$queues ? "queue in ($queues_in)" : 'queue is null';
122 16         24 push @where_params, @$queues;
123             }
124 1304 100       2498 if (defined(my $states = $options->{states})) {
125 35         319 my $states_in = join ',', ('?')x@$states;
126 35 50       160 push @where, @$states ? "state in ($states_in)" : 'state is null';
127 35         69 push @where_params, @$states;
128             }
129 1304 100       2691 if (defined(my $tasks = $options->{tasks})) {
130 16         48 my $tasks_in = join ',', ('?')x@$tasks;
131 16 50       72 push @where, @$tasks ? "task in ($tasks_in)" : 'task is null';
132 16         32 push @where_params, @$tasks;
133             }
134 1304         2436 push @where, q{(state != 'inactive' or expires is null or expires > datetime('now'))};
135              
136 1304 50       4575 my $where_str = @where ? 'where ' . join(' and ', @where) : '';
137              
138 1304         3790 my $jobs = $self->sqlite->db->query(
139             qq{select id, args, attempts,
140             (select json_group_array(distinct child.id)
141             from minion_jobs as child, json_each(child.parents) as parent_id
142             where j.id = parent_id.value) as children,
143             strftime('%s',created) as created, strftime('%s',delayed) as delayed,
144             strftime('%s',expires) as expires, strftime('%s',finished) as finished,
145             lax, notes, parents, priority, queue, result,
146             strftime('%s',retried) as retried, retries,
147             strftime('%s',started) as started, state, task,
148             strftime('%s','now') as time, worker
149             from minion_jobs as j
150             $where_str order by id desc limit ? offset ?},
151             @where_params, $limit, $offset
152             )->expand(json => [qw(args children notes parents result)])->hashes->to_array;
153              
154 1304         795405 my $total = $self->sqlite->db->query(qq{select count(*) from minion_jobs as j
155             $where_str}, @where_params)->arrays->first->[0];
156            
157 1304         361674 return {jobs => $jobs, total => $total};
158             }
159              
160             sub list_locks {
161 72     72 1 59584 my ($self, $offset, $limit, $options) = @_;
162            
163 72         144 my (@where, @where_params);
164 72         144 push @where, q{expires > datetime('now')};
165 72 100       224 if (defined(my $names = $options->{names})) {
166 16         96 my $names_in = join ',', ('?')x@$names;
167 16 50       136 push @where, @$names ? "name in ($names_in)" : 'name is null';
168 16         40 push @where_params, @$names;
169             }
170            
171 72         224 my $where_str = 'where ' . join(' and ', @where);
172            
173 72         192 my $locks = $self->sqlite->db->query(
174             qq{select name, strftime('%s',expires) as expires from minion_locks
175             $where_str order by id desc limit ? offset ?},
176             @where_params, $limit, $offset
177             )->hashes->to_array;
178            
179 72         23968 my $total = $self->sqlite->db->query(qq{select count(*) from minion_locks
180             $where_str}, @where_params)->arrays->first->[0];
181            
182 72         19736 return {locks => $locks, total => $total};
183             }
184              
185             sub list_workers {
186 233     233 1 188849 my ($self, $offset, $limit, $options) = @_;
187              
188 233         474 my (@where, @where_params);
189 233 100       845 if (defined(my $before = $options->{before})) {
190 48         144 push @where, 'w.id < ?';
191 48         88 push @where_params, $before;
192             }
193 233 100       659 if (defined(my $ids = $options->{ids})) {
194 113         404 my $ids_in = join ',', ('?')x@$ids;
195 113 50       421 push @where, @$ids ? "w.id in ($ids_in)" : 'w.id is null';
196 113         258 push @where_params, @$ids;
197             }
198              
199 233 100       820 my $where_str = @where ? 'where ' . join(' and ', @where) : '';
200 233         724 my $workers = $self->sqlite->db->query(
201             qq{select w.id, strftime('%s',w.notified) as notified,
202             group_concat(j.id) as jobs, w.host, w.pid, w.status,
203             strftime('%s',w.started) as started
204             from minion_workers as w
205             left join minion_jobs as j on j.worker = w.id and j.state = 'active'
206             $where_str group by w.id order by w.id desc limit ? offset ?},
207             @where_params, $limit, $offset
208             )->expand(json => 'status')->hashes->to_array;
209 233   100     99844 $_->{jobs} = [split /,/, ($_->{jobs} // '')] for @$workers;
210              
211 233         12535 my $total = $self->sqlite->db->query(qq{select count(*)
212             from minion_workers as w $where_str}, @where_params)->arrays->first->[0];
213              
214 233         53910 return {total => $total, workers => $workers};
215             }
216              
217             sub lock {
218 280   100 280 1 175616 my ($self, $name, $duration, $options) = (shift, shift, shift, shift // {});
219 280         840 my $db = $self->sqlite->db;
220 280         17944 $db->query(q{delete from minion_locks where expires < datetime('now')});
221 280         41432 my $tx = $db->begin('exclusive');
222 280         23520 my $locks = $db->query(q{select count(*) from minion_locks where name = ?},
223             $name)->arrays->first->[0];
224 280 100 100     37560 return !!0 if defined $locks and $locks >= ($options->{limit} || 1);
      66        
225 200 100 66     4104 if (defined $duration and $duration > 0) {
226 152         488 $db->query(q{insert into minion_locks (name, expires)
227             values (?, datetime('now', ? || ' seconds'))}, $name, $duration);
228 152         21304 $tx->commit;
229             }
230 200         14456 return !!1;
231             }
232              
233             sub note {
234 32     32 1 25168 my ($self, $id, $merge) = @_;
235 32         64 my (@set, @set_params, @remove, @remove_params);
236 32         144 foreach my $key (keys %$merge) {
237 40 50       192 croak qq{Invalid note key '$key'; must not contain '.', '[', or ']'}
238             if $key =~ m/[\[\].]/;
239 40 100       120 if (defined $merge->{$key}) {
240 24         120 push @set, q{'$.' || ?}, 'json(?)';
241 24         112 push @set_params, $key, {json => $merge->{$key}};
242             } else {
243 16         152 push @remove, q{'$.' || ?};
244 16         80 push @remove_params, $key;
245             }
246             }
247 32         128 my $json_set = join ', ', @set;
248 32         80 my $json_remove = join ', ', @remove;
249 32         144 my $set_to = 'notes';
250 32 100       120 $set_to = "json_set($set_to, $json_set)" if @set;
251 32 100       96 $set_to = "json_remove($set_to, $json_remove)" if @remove;
252 32         96 return !!$self->sqlite->db->query(
253             qq{update minion_jobs set notes = $set_to where id = ?},
254             @set_params, @remove_params, $id
255             )->rows;
256             }
257              
258             sub receive {
259 9     9 1 4054 my ($self, $id) = @_;
260 9         27 my $db = $self->sqlite->db;
261 9         563 my $tx = $db->begin;
262 9         595 my $array = $db->query(q{select inbox from minion_workers where id = ?}, $id)
263             ->expand(json => 'inbox')->array;
264 9 50       3372 $db->query(q{update minion_workers set inbox = '[]' where id = ?}, $id)
265             if $array;
266 9         1603 $tx->commit;
267 9 50       582 return $array ? $array->[0] : [];
268             }
269              
270             sub register_worker {
271 307   50 307 1 1848717 my ($self, $id, $options) = (shift, shift, shift || {});
272              
273             return $id if $id && $self->sqlite->db->query(
274             q{update minion_workers set notified = datetime('now'), status = ?
275 307 100 50     1596 where id = ?}, {json => $options->{status} // {}}, $id)->rows;
      66        
276              
277             return $self->sqlite->db->query(
278             q{insert into minion_workers (host, pid, status) values (?, ?, ?)},
279 261   50     815 hostname, $$, {json => $options->{status} // {}})->last_insert_id;
280             }
281              
282             sub remove_job {
283             !!shift->sqlite->db->query(
284 86     86 1 49518 q{delete from minion_jobs
285             where id = ? and state in ('inactive', 'failed', 'finished')}, shift
286             )->rows;
287             }
288              
289             sub repair {
290 72     72 1 56030 my $self = shift;
291              
292             # Workers without heartbeat
293 72         218 my $db = $self->sqlite->db;
294 72         217766 my $minion = $self->minion;
295 72         603 $db->query(
296             q{delete from minion_workers
297             where notified < datetime('now', '-' || ? || ' seconds')}, $minion->missing_after
298             );
299              
300             # Old jobs with no unresolved dependencies and expired jobs
301 72         12947 $db->query(
302             q{delete from minion_jobs
303             where (finished <= datetime('now', '-' || ? || ' seconds')
304             and state = 'finished' and id not in (select distinct parent_id.value
305             from minion_jobs as child, json_each(child.parents) as parent_id
306             where child.state <> 'finished'))
307             or (expires <= datetime('now') and state = 'inactive')}, $minion->remove_after);
308              
309             # Jobs with missing worker (can be retried)
310 72         18320 my $fail = $db->query(
311             q{select id, retries from minion_jobs as j
312             where state = 'active' and queue != 'minion_foreground'
313             and not exists (select 1 from minion_workers where id = j.worker)}
314             )->hashes;
315 72     22   16279 $fail->each(sub { $self->fail_job(@$_{qw(id retries)}, 'Worker went away') });
  22         760  
316              
317             # Jobs in queue without workers or not enough workers (cannot be retried and requires admin attention)
318 72         6134 $db->query(
319             q{update minion_jobs set state = 'failed', result = json_quote('Job appears stuck in queue')
320             where state = 'inactive' and delayed < datetime('now', '-' || ? || ' seconds')},
321             $minion->stuck_after);
322             }
323              
324             sub reset {
325 26   50 26 1 18567 my ($self, $options) = (shift, shift // {});
326 26         153 my $db = $self->sqlite->db;
327 26 100       1745 if ($options->{all}) {
    50          
328 18         187 my $tx = $db->begin;
329 18         1239 $db->query('delete from minion_jobs');
330 18         3408 $db->query('delete from minion_locks');
331 18         2777 $db->query('delete from minion_workers');
332 18         3138 $db->query(q{delete from sqlite_sequence
333             where name in ('minion_jobs','minion_locks','minion_workers')});
334 18         3483 $tx->commit;
335             } elsif ($options->{locks}) {
336 8         40 $db->query('delete from minion_locks');
337             }
338             }
339              
340             sub retry_job {
341 132   100 132 1 63027 my ($self, $id, $retries, $options) = (shift, shift, shift, shift || {});
342              
343             my $parents = defined $options->{parents}
344 132 100       592 ? {json => $options->{parents}} : undef;
345             return !!$self->sqlite->db->query(
346             q{update minion_jobs
347             set attempts = coalesce(?, attempts),
348             delayed = datetime('now', ? || ' seconds'),
349             expires = case when ? is not null then datetime('now', ? || ' seconds') else expires end,
350             lax = coalesce(?, lax), parents = coalesce(?, parents), priority = coalesce(?, priority),
351             queue = coalesce(?, queue), retried = datetime('now'),
352             retries = retries + 1, state = 'inactive'
353             where id = ? and retries = ?},
354             $options->{attempts}, $options->{delay} // 0, @$options{qw(expire expire)},
355             exists $options->{lax} ? $options->{lax} ? 1 : 0 : undef,
356 132 100 100     383 $parents, @$options{qw(priority queue)}, $id, $retries
    100          
357             )->rows;
358             }
359              
360             sub stats {
361 117     117 1 71276 my $self = shift;
362              
363 117         294 my $stats = $self->sqlite->db->query(
364             q{select count(case when state = 'inactive' and (expires is null or expires > datetime('now'))
365             then 1 end) as inactive_jobs,
366             count(case state when 'active' then 1 end) as active_jobs,
367             count(case state when 'failed' then 1 end) as failed_jobs,
368             count(case state when 'finished' then 1 end) as finished_jobs,
369             count(case when state = 'inactive' and delayed > datetime('now')
370             then 1 end) as delayed_jobs,
371             (select count(*) from minion_locks where expires > datetime('now'))
372             as active_locks,
373             count(distinct case when state = 'active' then worker end)
374             as active_workers,
375             ifnull((select seq from sqlite_sequence where name = 'minion_jobs'), 0)
376             as enqueued_jobs,
377             (select count(*) from minion_workers) as inactive_workers, null as uptime
378             from minion_jobs}
379             )->hash;
380 117         32081 $stats->{inactive_workers} -= $stats->{active_workers};
381              
382 117         6384 return $stats;
383             }
384              
385             sub unlock {
386             !!shift->sqlite->db->query(
387 152     152 1 86328 q{delete from minion_locks where id = (
388             select id from minion_locks
389             where expires > datetime('now') and name = ?
390             order by expires limit 1)}, shift
391             )->rows;
392             }
393              
394             sub unregister_worker {
395 238     238 1 2168913 shift->sqlite->db->query('delete from minion_workers where id = ?', shift);
396             }
397              
398             sub _try {
399 617     617   13518580 my ($self, $id, $options) = @_;
400              
401 617         1941 my $db = $self->sqlite->db;
402 617   100     45115 my $queues = $options->{queues} || ['default'];
403 617         1074 my $tasks = [keys %{$self->minion->tasks}];
  617         2845  
404 617 50 33     11327 return undef unless @$queues and @$tasks;
405 617         2650 my $queues_in = join ',', ('?')x@$queues;
406 617         1880 my $tasks_in = join ',', ('?')x@$tasks;
407            
408 617         2707 my $tx = $db->begin;
409             my $res = $db->query(
410             qq{select id from minion_jobs as j
411             where delayed <= datetime('now') and id = coalesce(?, id)
412             and (json_array_length(parents) = 0 or not exists (
413             select 1 from minion_jobs as parent, json_each(j.parents) as parent_id
414             where parent.id = parent_id.value and case parent.state
415             when 'active' then 1
416             when 'failed' then not j.lax
417             when 'inactive' then (parent.expires is null or parent.expires > datetime('now'))
418             end
419             )) and priority >= coalesce(?, priority) and queue in ($queues_in) and state = 'inactive'
420             and task in ($tasks_in) and (expires is null or expires > datetime('now'))
421             order by priority desc, id
422 617         45114 limit 1}, $options->{id}, $options->{min_priority}, @$queues, @$tasks
423             );
424 617   100     140728 my $job_id = ($res->arrays->first // [])->[0] // return undef;
      100        
425 413         17110 $db->query(
426             q{update minion_jobs
427             set started = datetime('now'), state = 'active', worker = ?
428             where id = ?}, $id, $job_id
429             );
430 413         61906 $tx->commit;
431            
432 413   50     33569 my $info = $db->query(
433             'select id, args, retries, task from minion_jobs where id = ?', $job_id
434             )->expand(json => 'args')->hash // return undef;
435            
436 413         109673 return $info;
437             }
438              
439             sub _update {
440 406     406   2029 my ($self, $fail, $id, $retries, $result) = @_;
441              
442 406         2518 my $db = $self->sqlite->db;
443 406 100       70084 return undef unless $db->query(
    100          
444             q{update minion_jobs
445             set finished = datetime('now'), result = ?, state = ?
446             where id = ? and retries = ? and state = 'active'},
447             {json => $result}, $fail ? 'failed' : 'finished', $id, $retries
448             )->rows > 0;
449            
450 336         123346 my $row = $db->query('select attempts from minion_jobs where id = ?', $id)->array;
451 336 100       60387 return $fail ? $self->auto_retry_job($id, $retries, $row->[0]) : 1;
452             }
453              
454             1;
455              
456             =encoding utf8
457              
458             =head1 NAME
459              
460             Minion::Backend::SQLite - SQLite backend for Minion job queue
461              
462             =head1 SYNOPSIS
463              
464             use Minion::Backend::SQLite;
465             my $backend = Minion::Backend::SQLite->new('sqlite:test.db');
466              
467             # Minion
468             use Minion;
469             my $minion = Minion->new(SQLite => 'sqlite:test.db');
470              
471             # Mojolicious (via Mojolicious::Plugin::Minion)
472             $self->plugin(Minion => { SQLite => 'sqlite:test.db' });
473              
474             # Mojolicious::Lite (via Mojolicious::Plugin::Minion)
475             plugin Minion => { SQLite => 'sqlite:test.db' };
476              
477             # Share the database connection cache
478             helper sqlite => sub { state $sqlite = Mojo::SQLite->new('sqlite:test.db') };
479             plugin Minion => { SQLite => app->sqlite };
480              
481             =head1 DESCRIPTION
482              
483             L is a backend for L based on L.
484             All necessary tables will be created automatically with a set of migrations
485             named C. If no connection string or C<:temp:> is provided, the database
486             will be created in a temporary directory.
487              
488             =head1 ATTRIBUTES
489              
490             L inherits all attributes from L and
491             implements the following new ones.
492              
493             =head2 dequeue_interval
494              
495             my $seconds = $backend->dequeue_interval;
496             $backend = $backend->dequeue_interval($seconds);
497              
498             Interval in seconds between L attempts. Defaults to C<0.5>.
499              
500             =head2 sqlite
501              
502             my $sqlite = $backend->sqlite;
503             $backend = $backend->sqlite(Mojo::SQLite->new);
504              
505             L object used to store all data.
506              
507             =head1 METHODS
508              
509             L inherits all methods from L and
510             implements the following new ones.
511              
512             =head2 new
513              
514             my $backend = Minion::Backend::SQLite->new;
515             my $backend = Minion::Backend::SQLite->new(':temp:');
516             my $backend = Minion::Backend::SQLite->new('sqlite:test.db');
517             my $backend = Minion::Backend::SQLite->new->tap(sub { $_->sqlite->from_filename('C:\\foo\\bar.db') });
518             my $backend = Minion::Backend::SQLite->new(Mojo::SQLite->new);
519              
520             Construct a new L object.
521              
522             =head2 broadcast
523              
524             my $bool = $backend->broadcast('some_command');
525             my $bool = $backend->broadcast('some_command', [@args]);
526             my $bool = $backend->broadcast('some_command', [@args], [$id1, $id2, $id3]);
527              
528             Broadcast remote control command to one or more workers.
529              
530             =head2 dequeue
531              
532             my $job_info = $backend->dequeue($worker_id, 0.5);
533             my $job_info = $backend->dequeue($worker_id, 0.5, {queues => ['important']});
534              
535             Wait a given amount of time in seconds for a job, dequeue it and transition
536             from C to C state, or return C if queues were empty.
537             Jobs will be checked for in intervals defined by L until
538             the timeout is reached.
539              
540             These options are currently available:
541              
542             =over 2
543              
544             =item id
545              
546             id => '10023'
547              
548             Dequeue a specific job.
549              
550             =item min_priority
551              
552             min_priority => 3
553              
554             Do not dequeue jobs with a lower priority.
555              
556             =item queues
557              
558             queues => ['important']
559              
560             One or more queues to dequeue jobs from, defaults to C.
561              
562             =back
563              
564             These fields are currently available:
565              
566             =over 2
567              
568             =item args
569              
570             args => ['foo', 'bar']
571              
572             Job arguments.
573              
574             =item id
575              
576             id => '10023'
577              
578             Job ID.
579              
580             =item retries
581              
582             retries => 3
583              
584             Number of times job has been retried.
585              
586             =item task
587              
588             task => 'foo'
589              
590             Task name.
591              
592             =back
593              
594             =head2 enqueue
595              
596             my $job_id = $backend->enqueue('foo');
597             my $job_id = $backend->enqueue(foo => [@args]);
598             my $job_id = $backend->enqueue(foo => [@args] => {priority => 1});
599              
600             Enqueue a new job with C state.
601              
602             These options are currently available:
603              
604             =over 2
605              
606             =item attempts
607              
608             attempts => 25
609              
610             Number of times performing this job will be attempted, with a delay based on
611             L after the first attempt, defaults to C<1>.
612              
613             =item delay
614              
615             delay => 10
616              
617             Delay job for this many seconds (from now).
618              
619             =item expire
620              
621             expire => 300
622              
623             Job is valid for this many seconds (from now) before it expires. Note that this
624             option is B and might change without warning!
625              
626             =item lax
627              
628             lax => 1
629              
630             Existing jobs this job depends on may also have transitioned to the C
631             state to allow for it to be processed, defaults to C. Note that this
632             option is B and might change without warning!
633              
634             =item notes
635              
636             notes => {foo => 'bar', baz => [1, 2, 3]}
637              
638             Hash reference with arbitrary metadata for this job.
639              
640             =item parents
641              
642             parents => [$id1, $id2, $id3]
643              
644             One or more existing jobs this job depends on, and that need to have
645             transitioned to the state C before it can be processed.
646              
647             =item priority
648              
649             priority => 5
650              
651             Job priority, defaults to C<0>. Jobs with a higher priority get performed first.
652              
653             =item queue
654              
655             queue => 'important'
656              
657             Queue to put job in, defaults to C.
658              
659             =back
660              
661             =head2 fail_job
662              
663             my $bool = $backend->fail_job($job_id, $retries);
664             my $bool = $backend->fail_job($job_id, $retries, 'Something went wrong!');
665             my $bool = $backend->fail_job(
666             $job_id, $retries, {msg => 'Something went wrong!'});
667              
668             Transition from C to C state with or without a result, and if
669             there are attempts remaining, transition back to C with an
670             exponentially increasing delay based on L.
671              
672             =head2 finish_job
673              
674             my $bool = $backend->finish_job($job_id, $retries);
675             my $bool = $backend->finish_job($job_id, $retries, 'All went well!');
676             my $bool = $backend->finish_job($job_id, $retries, {msg => 'All went well!'});
677              
678             Transition from C to C state with or without a result.
679              
680             =head2 history
681              
682             my $history = $backend->history;
683              
684             Get history information for job queue.
685              
686             These fields are currently available:
687              
688             =over 2
689              
690             =item daily
691              
692             daily => [{epoch => 12345, finished_jobs => 95, failed_jobs => 2}, ...]
693              
694             Hourly counts for processed jobs from the past day.
695              
696             =back
697              
698             =head2 list_jobs
699              
700             my $results = $backend->list_jobs($offset, $limit);
701             my $results = $backend->list_jobs($offset, $limit, {states => ['inactive']});
702              
703             Returns the information about jobs in batches.
704              
705             # Get the total number of results (without limit)
706             my $num = $backend->list_jobs(0, 100, {queues => ['important']})->{total};
707              
708             # Check job state
709             my $results = $backend->list_jobs(0, 1, {ids => [$job_id]});
710             my $state = $results->{jobs}[0]{state};
711              
712             # Get job result
713             my $results = $backend->list_jobs(0, 1, {ids => [$job_id]});
714             my $result = $results->{jobs}[0]{result};
715              
716             These options are currently available:
717              
718             =over 2
719              
720             =item before
721              
722             before => 23
723              
724             List only jobs before this id.
725              
726             =item ids
727              
728             ids => ['23', '24']
729              
730             List only jobs with these ids.
731              
732             =item queues
733              
734             queues => ['important', 'unimportant']
735              
736             List only jobs in these queues.
737              
738             =item states
739              
740             states => ['inactive', 'active']
741              
742             List only jobs in these states.
743              
744             =item tasks
745              
746             tasks => ['foo', 'bar']
747              
748             List only jobs for these tasks.
749              
750             =back
751              
752             These fields are currently available:
753              
754             =over 2
755              
756             =item args
757              
758             args => ['foo', 'bar']
759              
760             Job arguments.
761              
762             =item attempts
763              
764             attempts => 25
765              
766             Number of times performing this job will be attempted.
767              
768             =item children
769              
770             children => ['10026', '10027', '10028']
771              
772             Jobs depending on this job.
773              
774             =item created
775              
776             created => 784111777
777              
778             Epoch time job was created.
779              
780             =item delayed
781              
782             delayed => 784111777
783              
784             Epoch time job was delayed to.
785              
786             =item expires
787              
788             expires => 784111777
789              
790             Epoch time job is valid until before it expires.
791              
792             =item finished
793              
794             finished => 784111777
795              
796             Epoch time job was finished.
797              
798             =item id
799              
800             id => 10025
801              
802             Job id.
803              
804             =item lax
805              
806             lax => 0
807              
808             Existing jobs this job depends on may also have failed to allow for it to be
809             processed.
810              
811             =item notes
812              
813             notes => {foo => 'bar', baz => [1, 2, 3]}
814              
815             Hash reference with arbitrary metadata for this job.
816              
817             =item parents
818              
819             parents => ['10023', '10024', '10025']
820              
821             Jobs this job depends on.
822              
823             =item priority
824              
825             priority => 3
826              
827             Job priority.
828              
829             =item queue
830              
831             queue => 'important'
832              
833             Queue name.
834              
835             =item result
836              
837             result => 'All went well!'
838              
839             Job result.
840              
841             =item retried
842              
843             retried => 784111777
844              
845             Epoch time job has been retried.
846              
847             =item retries
848              
849             retries => 3
850              
851             Number of times job has been retried.
852              
853             =item started
854              
855             started => 784111777
856              
857             Epoch time job was started.
858              
859             =item state
860              
861             state => 'inactive'
862              
863             Current job state, usually C, C, C or C.
864              
865             =item task
866              
867             task => 'foo'
868              
869             Task name.
870              
871             =item time
872              
873             time => 78411177
874              
875             Current time.
876              
877             =item worker
878              
879             worker => '154'
880              
881             Id of worker that is processing the job.
882              
883             =back
884              
885             =head2 list_locks
886              
887             my $results = $backend->list_locks($offset, $limit);
888             my $results = $backend->list_locks($offset, $limit, {names => ['foo']});
889              
890             Returns information about locks in batches.
891              
892             # Get the total number of results (without limit)
893             my $num = $backend->list_locks(0, 100, {names => ['bar']})->{total};
894              
895             # Check expiration time
896             my $results = $backend->list_locks(0, 1, {names => ['foo']});
897             my $expires = $results->{locks}[0]{expires};
898              
899             These options are currently available:
900              
901             =over 2
902              
903             =item names
904              
905             names => ['foo', 'bar']
906              
907             List only locks with these names.
908              
909             =back
910              
911             These fields are currently available:
912              
913             =over 2
914              
915             =item expires
916              
917             expires => 784111777
918              
919             Epoch time this lock will expire.
920              
921             =item name
922              
923             name => 'foo'
924              
925             Lock name.
926              
927             =back
928              
929             =head2 list_workers
930              
931             my $results = $backend->list_workers($offset, $limit);
932             my $results = $backend->list_workers($offset, $limit, {ids => [23]});
933              
934             Returns information about workers in batches.
935              
936             # Get the total number of results (without limit)
937             my $num = $backend->list_workers(0, 100)->{total};
938              
939             # Check worker host
940             my $results = $backend->list_workers(0, 1, {ids => [$worker_id]});
941             my $host = $results->{workers}[0]{host};
942              
943             These options are currently available:
944              
945             =over 2
946              
947             =item before
948              
949             before => 23
950              
951             List only workers before this id.
952              
953             =item ids
954              
955             ids => ['23', '24']
956              
957             List only workers with these ids.
958              
959             =back
960              
961             These fields are currently available:
962              
963             =over 2
964              
965             =item id
966              
967             id => 22
968              
969             Worker id.
970              
971             =item host
972              
973             host => 'localhost'
974              
975             Worker host.
976              
977             =item jobs
978              
979             jobs => ['10023', '10024', '10025', '10029']
980              
981             Ids of jobs the worker is currently processing.
982              
983             =item notified
984              
985             notified => 784111777
986              
987             Epoch time worker sent the last heartbeat.
988              
989             =item pid
990              
991             pid => 12345
992              
993             Process id of worker.
994              
995             =item started
996              
997             started => 784111777
998              
999             Epoch time worker was started.
1000              
1001             =item status
1002              
1003             status => {queues => ['default', 'important']}
1004              
1005             Hash reference with whatever status information the worker would like to share.
1006              
1007             =back
1008              
1009             =head2 lock
1010              
1011             my $bool = $backend->lock('foo', 3600);
1012             my $bool = $backend->lock('foo', 3600, {limit => 20});
1013              
1014             Try to acquire a named lock that will expire automatically after the given
1015             amount of time in seconds. An expiration time of C<0> can be used to check if a
1016             named lock already exists without creating one.
1017              
1018             These options are currently available:
1019              
1020             =over 2
1021              
1022             =item limit
1023              
1024             limit => 20
1025              
1026             Number of shared locks with the same name that can be active at the same time,
1027             defaults to C<1>.
1028              
1029             =back
1030              
1031             =head2 note
1032              
1033             my $bool = $backend->note($job_id, {mojo => 'rocks', minion => 'too'});
1034              
1035             Change one or more metadata fields for a job. Setting a value to C will
1036             remove the field. It is currently an error to attempt to set a metadata field
1037             with a name containing the characters C<.>, C<[>, or C<]>.
1038              
1039             =head2 receive
1040              
1041             my $commands = $backend->receive($worker_id);
1042              
1043             Receive remote control commands for worker.
1044              
1045             =head2 register_worker
1046              
1047             my $worker_id = $backend->register_worker;
1048             my $worker_id = $backend->register_worker($worker_id);
1049             my $worker_id = $backend->register_worker(
1050             $worker_id, {status => {queues => ['default', 'important']}});
1051              
1052             Register worker or send heartbeat to show that this worker is still alive.
1053              
1054             These options are currently available:
1055              
1056             =over 2
1057              
1058             =item status
1059              
1060             status => {queues => ['default', 'important']}
1061              
1062             Hash reference with whatever status information the worker would like to share.
1063              
1064             =back
1065              
1066             =head2 remove_job
1067              
1068             my $bool = $backend->remove_job($job_id);
1069              
1070             Remove C, C or C job from queue.
1071              
1072             =head2 repair
1073              
1074             $backend->repair;
1075              
1076             Repair worker registry and job queue if necessary.
1077              
1078             =head2 reset
1079              
1080             $backend->reset({all => 1});
1081              
1082             Reset job queue.
1083              
1084             These options are currently available:
1085              
1086             =over 2
1087              
1088             =item all
1089              
1090             all => 1
1091              
1092             Reset everything.
1093              
1094             =item locks
1095              
1096             locks => 1
1097              
1098             Reset only locks.
1099              
1100             =back
1101              
1102             =head2 retry_job
1103              
1104             my $bool = $backend->retry_job($job_id, $retries);
1105             my $bool = $backend->retry_job($job_id, $retries, {delay => 10});
1106              
1107             Transition job back to C state, already C jobs may also be
1108             retried to change options.
1109              
1110             These options are currently available:
1111              
1112             =over 2
1113              
1114             =item attempts
1115              
1116             attempts => 25
1117              
1118             Number of times performing this job will be attempted.
1119              
1120             =item delay
1121              
1122             delay => 10
1123              
1124             Delay job for this many seconds (from now).
1125              
1126             =item expire
1127              
1128             expire => 300
1129              
1130             Job is valid for this many seconds (from now) before it expires. Note that this
1131             option is B and might change without warning!
1132              
1133             =item lax
1134              
1135             lax => 1
1136              
1137             Existing jobs this job depends on may also have transitioned to the C
1138             state to allow for it to be processed, defaults to C. Note that this
1139             option is B and might change without warning!
1140              
1141             =item parents
1142              
1143             parents => [$id1, $id2, $id3]
1144              
1145             Jobs this job depends on.
1146              
1147             =item priority
1148              
1149             priority => 5
1150              
1151             Job priority.
1152              
1153             =item queue
1154              
1155             queue => 'important'
1156              
1157             Queue to put job in.
1158              
1159             =back
1160              
1161             =head2 stats
1162              
1163             my $stats = $backend->stats;
1164              
1165             Get statistics for the job queue.
1166              
1167             These fields are currently available:
1168              
1169             =over 2
1170              
1171             =item active_jobs
1172              
1173             active_jobs => 100
1174              
1175             Number of jobs in C state.
1176              
1177             =item active_locks
1178              
1179             active_locks => 100
1180              
1181             Number of active named locks.
1182              
1183             =item active_workers
1184              
1185             active_workers => 100
1186              
1187             Number of workers that are currently processing a job.
1188              
1189             =item delayed_jobs
1190              
1191             delayed_jobs => 100
1192              
1193             Number of jobs in C state that are scheduled to run at specific time
1194             in the future.
1195              
1196             =item enqueued_jobs
1197              
1198             enqueued_jobs => 100000
1199              
1200             Rough estimate of how many jobs have ever been enqueued.
1201              
1202             =item failed_jobs
1203              
1204             failed_jobs => 100
1205              
1206             Number of jobs in C state.
1207              
1208             =item finished_jobs
1209              
1210             finished_jobs => 100
1211              
1212             Number of jobs in C state.
1213              
1214             =item inactive_jobs
1215              
1216             inactive_jobs => 100
1217              
1218             Number of jobs in C state.
1219              
1220             =item inactive_workers
1221              
1222             inactive_workers => 100
1223              
1224             Number of workers that are currently not processing a job.
1225              
1226             =item uptime
1227              
1228             uptime => undef
1229              
1230             Uptime in seconds. Always undefined for SQLite.
1231              
1232             =back
1233              
1234             =head2 unlock
1235              
1236             my $bool = $backend->unlock('foo');
1237              
1238             Release a named lock.
1239              
1240             =head2 unregister_worker
1241              
1242             $backend->unregister_worker($worker_id);
1243              
1244             Unregister worker.
1245              
1246             =head1 BUGS
1247              
1248             Report any issues on the public bugtracker.
1249              
1250             =head1 AUTHOR
1251              
1252             Dan Book
1253              
1254             =head1 COPYRIGHT AND LICENSE
1255              
1256             This software is Copyright (c) 2015 by Dan Book.
1257              
1258             This is free software, licensed under:
1259              
1260             The Artistic License 2.0 (GPL Compatible)
1261              
1262             =head1 SEE ALSO
1263              
1264             L, L
1265              
1266             =cut
1267              
1268             __DATA__