File Coverage

blib/lib/Minion/Worker.pm
Criterion Covered Total %
statement 9 93 9.6
branch 0 30 0.0
condition 0 48 0.0
subroutine 3 19 15.7
pod 8 8 100.0
total 20 198 10.1


line stmt bran cond sub pod time code
1             package Minion::Worker;
2 2     2   12 use Mojo::Base 'Mojo::EventEmitter';
  2         3  
  2         16  
3              
4 2     2   279 use Carp qw(croak);
  2         3  
  2         76  
5 2     2   10 use Mojo::Util qw(steady_time);
  2         3  
  2         2885  
6              
7             has [qw(commands status)] => sub { {} };
8             has [qw(id minion)];
9              
10 0 0   0 1   sub add_command { $_[0]->commands->{$_[1]} = $_[2] and return $_[0] }
11              
12             sub dequeue {
13 0     0 1   my ($self, $wait, $options) = @_;
14              
15             # Worker not registered
16 0 0         return undef unless my $id = $self->id;
17              
18 0           my $minion = $self->minion;
19 0 0         return undef unless my $job = $minion->backend->dequeue($id, $wait, $options);
20             $job = $minion->class_for_task($job->{task})
21 0           ->new(args => $job->{args}, id => $job->{id}, minion => $minion, retries => $job->{retries}, task => $job->{task});
22 0           $self->emit(dequeue => $job);
23 0           return $job;
24             }
25              
26 0     0 1   sub info { $_[0]->minion->backend->list_workers(0, 1, {ids => [$_[0]->id]})->{workers}[0] }
27              
28             sub new {
29 0     0 1   my $self = shift->SUPER::new(@_);
30 0     0     $self->on(busy => sub { sleep 1 });
  0            
31 0           return $self;
32             }
33              
34             sub process_commands {
35 0     0 1   my $self = shift;
36              
37 0           for my $command (@{$self->minion->backend->receive($self->id)}) {
  0            
38 0 0         next unless my $cb = $self->commands->{shift @$command};
39 0           $self->$cb(@$command);
40             }
41              
42 0           return $self;
43             }
44              
45             sub register {
46 0     0 1   my $self = shift;
47 0           my $status = {status => $self->status};
48 0           return $self->id($self->minion->backend->register_worker($self->id, $status));
49             }
50              
51             sub run {
52 0     0 1   my $self = shift;
53              
54 0           my $status = $self->status;
55 0   0       $status->{command_interval} //= 10;
56 0   0       $status->{dequeue_timeout} //= 5;
57 0   0       $status->{heartbeat_interval} //= 300;
58 0   0       $status->{jobs} //= 4;
59 0   0       $status->{queues} ||= ['default'];
60 0   0       $status->{performed} //= 0;
61 0   0       $status->{repair_interval} //= 21600;
62 0           $status->{repair_interval} -= int rand $status->{repair_interval} / 2;
63 0   0       $status->{spare} //= 1;
64 0   0       $status->{spare_min_priority} //= 1;
65              
66             # Reset event loop
67 0           Mojo::IOLoop->reset;
68 0     0     local $SIG{CHLD} = sub { };
69 0     0     local $SIG{INT} = local $SIG{TERM} = sub { $self->{finished}++ };
  0            
70             local $SIG{QUIT} = sub {
71 0 0   0     ++$self->{finished} and kill 'KILL', map { $_->pid } @{$self->{jobs}};
  0            
  0            
72 0           };
73              
74             # Remote control commands need to validate arguments carefully
75 0           my $commands = $self->commands;
76 0 0 0 0     local $commands->{jobs} = sub { $status->{jobs} = $_[1] if ($_[1] // '') =~ /^\d+$/ };
  0            
77 0           local $commands->{kill} = \&_kill;
78 0     0     local $commands->{stop} = sub { $self->_kill('KILL', $_[1]) };
  0            
79              
80 0   0       eval { $self->_work until $self->{finished} && !@{$self->{jobs}} };
  0            
  0            
81 0           my $err = $@;
82 0           $self->unregister;
83 0 0         croak $err if $err;
84             }
85              
86             sub unregister {
87 0     0 1   my $self = shift;
88 0           $self->minion->backend->unregister_worker(delete $self->{id});
89 0           return $self;
90             }
91              
92             sub _kill {
93 0   0 0     my ($self, $signal, $id) = (shift, shift // '', shift // '');
      0        
94 0 0         return unless grep { $signal eq $_ } qw(INT TERM KILL USR1 USR2);
  0            
95 0           $_->kill($signal) for grep { $_->id eq $id } @{$self->{jobs}};
  0            
  0            
96             }
97              
98             sub _work {
99 0     0     my $self = shift;
100              
101             # Send heartbeats in regular intervals
102 0           my $status = $self->status;
103 0   0       $self->{last_heartbeat} ||= -$status->{heartbeat_interval};
104             $self->register and $self->{last_heartbeat} = steady_time
105 0 0 0       if ($self->{last_heartbeat} + $status->{heartbeat_interval}) < steady_time;
106              
107             # Process worker remote control commands in regular intervals
108 0   0       $self->{last_command} ||= 0;
109             $self->process_commands and $self->{last_command} = steady_time
110 0 0 0       if ($self->{last_command} + $status->{command_interval}) < steady_time;
111              
112             # Repair in regular intervals (randomize to avoid congestion)
113 0   0       $self->{last_repair} ||= 0;
114 0 0         if (($self->{last_repair} + $status->{repair_interval}) < steady_time) {
115 0           $self->minion->repair;
116 0           $self->{last_repair} = steady_time;
117             }
118              
119             # Check if jobs are finished
120 0   0       my $jobs = $self->{jobs} ||= [];
121 0 0 0       @$jobs = map { $_->is_finished && ++$status->{performed} ? () : $_ } @$jobs;
  0            
122              
123             # Job limit has been reached or worker is stopping
124 0           my @extra;
125 0 0 0       if ($self->{finished} || ($status->{jobs} + $status->{spare}) <= @$jobs) { return $self->emit('busy') }
  0 0          
126 0           elsif ($status->{jobs} <= @$jobs) { @extra = (min_priority => $status->{spare_min_priority}) }
127              
128             # Try to get more jobs
129 0           my ($max, $queues) = @{$status}{qw(dequeue_timeout queues)};
  0            
130 0           my $job = $self->emit('wait')->dequeue($max => {queues => $queues, @extra});
131 0 0         push @$jobs, $job->start if $job;
132             }
133              
134             1;
135              
136             =encoding utf8
137              
138             =head1 NAME
139              
140             Minion::Worker - Minion worker
141              
142             =head1 SYNOPSIS
143              
144             use Minion::Worker;
145              
146             my $worker = Minion::Worker->new(minion => $minion);
147              
148             =head1 DESCRIPTION
149              
150             L performs jobs for L.
151              
152             =head1 WORKER SIGNALS
153              
154             The L process can be controlled at runtime with the following signals.
155              
156             =head2 INT, TERM
157              
158             Stop gracefully after finishing the current jobs.
159              
160             =head2 QUIT
161              
162             Stop immediately without finishing the current jobs.
163              
164             =head1 JOB SIGNALS
165              
166             The job processes spawned by the L process can be controlled at runtime with the following signals.
167              
168             =head2 INT, TERM
169              
170             This signal starts out with the operating system default and allows for jobs to install a custom signal handler to stop
171             gracefully.
172              
173             =head2 USR1, USR2
174              
175             These signals start out being ignored and allow for jobs to install custom signal handlers.
176              
177             =head1 EVENTS
178              
179             L inherits all events from L and can emit the following new ones.
180              
181             =head2 busy
182              
183             $worker->on(busy => sub ($worker) {
184             ...
185             });
186              
187             Emitted in the worker process when it is performing the maximum number of jobs in parallel.
188              
189             $worker->on(busy => sub ($worker) {
190             my $max = $worker->status->{jobs};
191             say "Performing $max jobs.";
192             });
193              
194             =head2 dequeue
195              
196             $worker->on(dequeue => sub ($worker, $job) {
197             ...
198             });
199              
200             Emitted in the worker process after a job has been dequeued.
201              
202             $worker->on(dequeue => sub ($worker, $job) {
203             my $id = $job->id;
204             say "Job $id has been dequeued.";
205             });
206              
207             =head2 wait
208              
209             $worker->on(wait => sub ($worker) {
210             ...
211             });
212              
213             Emitted in the worker process before it tries to dequeue a job.
214              
215             $worker->on(wait => sub ($worker) {
216             my $max = $worker->status->{dequeue_timeout};
217             say "Waiting up to $max seconds for a new job.";
218             });
219              
220             =head1 ATTRIBUTES
221              
222             L implements the following attributes.
223              
224             =head2 commands
225              
226             my $commands = $worker->commands;
227             $worker = $worker->commands({jobs => sub {...}});
228              
229             Registered worker remote control commands.
230              
231             =head2 id
232              
233             my $id = $worker->id;
234             $worker = $worker->id($id);
235              
236             Worker id.
237              
238             =head2 minion
239              
240             my $minion = $worker->minion;
241             $worker = $worker->minion(Minion->new);
242              
243             L object this worker belongs to.
244              
245             =head2 status
246              
247             my $status = $worker->status;
248             $worker = $worker->status({queues => ['default', 'important']);
249              
250             Status information to configure workers started with L and to share every time L is called.
251              
252             =head1 METHODS
253              
254             L inherits all methods from L and implements the following new ones.
255              
256             =head2 add_command
257              
258             $worker = $worker->add_command(jobs => sub {...});
259              
260             Register a worker remote control command.
261              
262             $worker->add_command(foo => sub ($worker, @args) {
263             ...
264             });
265              
266             =head2 dequeue
267              
268             my $job = $worker->dequeue(0.5);
269             my $job = $worker->dequeue(0.5 => {queues => ['important']});
270              
271             Wait a given amount of time in seconds for a job, dequeue L object and transition from C to
272             C state, or return C if queues were empty.
273              
274             These options are currently available:
275              
276             =over 2
277              
278             =item id
279              
280             id => '10023'
281              
282             Dequeue a specific job.
283              
284             =item min_priority
285              
286             min_priority => 3
287              
288             Do not dequeue jobs with a lower priority.
289              
290             =item queues
291              
292             queues => ['important']
293              
294             One or more queues to dequeue jobs from, defaults to C.
295              
296             =back
297              
298             =head2 info
299              
300             my $info = $worker->info;
301              
302             Get worker information.
303              
304             # Check worker host
305             my $host = $worker->info->{host};
306              
307             These fields are currently available:
308              
309             =over 2
310              
311             =item host
312              
313             host => 'localhost'
314              
315             Worker host.
316              
317             =item jobs
318              
319             jobs => ['10023', '10024', '10025', '10029']
320              
321             Ids of jobs the worker is currently processing.
322              
323             =item notified
324              
325             notified => 784111777
326              
327             Epoch time worker sent the last heartbeat.
328              
329             =item pid
330              
331             pid => 12345
332              
333             Process id of worker.
334              
335             =item started
336              
337             started => 784111777
338              
339             Epoch time worker was started.
340              
341             =item status
342              
343             status => {queues => ['default', 'important']}
344              
345             Hash reference with whatever status information the worker would like to share.
346              
347             =back
348              
349             =head2 new
350              
351             my $worker = Minion::Worker->new;
352             my $worker = Minion::Worker->new(status => {foo => 'bar'});
353             my $worker = Minion::Worker->new({status => {foo => 'bar'}});
354              
355             Construct a new L object and subscribe to L event with default handler that sleeps for one
356             second.
357              
358             =head2 process_commands
359              
360             $worker = $worker->process_commands;
361              
362             Process worker remote control commands.
363              
364             =head2 register
365              
366             $worker = $worker->register;
367              
368             Register worker or send heartbeat to show that this worker is still alive.
369              
370             =head2 run
371              
372             $worker->run;
373              
374             Run worker and wait for L.
375              
376             # Start a worker for a special named queue
377             my $worker = $minion->worker;
378             $worker->status->{queues} = ['important'];
379             $worker->run;
380              
381             These L options are currently available:
382              
383             =over 2
384              
385             =item command_interval
386              
387             command_interval => 20
388              
389             Worker remote control command interval, defaults to C<10>.
390              
391             =item dequeue_timeout
392              
393             dequeue_timeout => 5
394              
395             Maximum amount time in seconds to wait for a job, defaults to C<5>.
396              
397             =item heartbeat_interval
398              
399             heartbeat_interval => 60
400              
401             Heartbeat interval, defaults to C<300>.
402              
403             =item jobs
404              
405             jobs => 12
406              
407             Maximum number of jobs to perform parallel in forked worker processes (not including spare processes), defaults to C<4>.
408              
409             =item queues
410              
411             queues => ['test']
412              
413             One or more queues to get jobs from, defaults to C.
414              
415             =item repair_interval
416              
417             repair_interval => 3600
418              
419             Repair interval, up to half of this value can be subtracted randomly to make sure not all workers repair at the same
420             time, defaults to C<21600> (6 hours).
421              
422             =item spare
423              
424             spare => 2
425              
426             Number of spare worker processes to reserve for high priority jobs, defaults to C<1>.
427              
428             =item spare_min_priority
429              
430             spare_min_priority => 7
431              
432             Minimum priority of jobs to use spare worker processes for, defaults to C<1>.
433              
434             =back
435              
436             These remote control L are currently available:
437              
438             =over 2
439              
440             =item jobs
441              
442             $minion->broadcast('jobs', [10]);
443             $minion->broadcast('jobs', [10], [$worker_id]);
444              
445             Instruct one or more workers to change the number of jobs to perform concurrently. Setting this value to C<0> will
446             effectively pause the worker. That means all current jobs will be finished, but no new ones accepted, until the number
447             is increased again.
448              
449             =item kill
450              
451             $minion->broadcast('kill', ['INT', 10025]);
452             $minion->broadcast('kill', ['INT', 10025], [$worker_id]);
453              
454             Instruct one or more workers to send a signal to a job that is currently being performed. This command will be ignored
455             by workers that do not have a job matching the id. That means it is safe to broadcast this command to all workers.
456              
457             =item stop
458              
459             $minion->broadcast('stop', [10025]);
460             $minion->broadcast('stop', [10025], [$worker_id]);
461              
462             Instruct one or more workers to stop a job that is currently being performed immediately. This command will be ignored
463             by workers that do not have a job matching the id. That means it is safe to broadcast this command to all workers.
464              
465             =back
466              
467             =head2 unregister
468              
469             $worker = $worker->unregister;
470              
471             Unregister worker.
472              
473             =head1 SEE ALSO
474              
475             L, L, L, L, L.
476              
477             =cut