File Coverage

blib/lib/Kevin/Command/kevin/worker.pm
Criterion Covered Total %
statement 9 78 11.5
branch 0 38 0.0
condition 1 46 2.1
subroutine 3 11 27.2
pod 1 1 100.0
total 14 174 8.0


line stmt bran cond sub pod time code
1             package Kevin::Command::kevin::worker;
2             $Kevin::Command::kevin::worker::VERSION = '0.4.0';
3             # ABSTRACT: Alternative Minion worker command
4 1     1   524 use Mojo::Base 'Mojolicious::Command';
  1         3  
  1         9  
5              
6 1     1   144012 use Mojo::Util qw(getopt steady_time);
  1         2  
  1         123  
7              
8             has description => 'Start alternative Minion worker';
9             has usage => sub { shift->extract_usage };
10              
11 1   50 1   8 use constant TRACE => $ENV{KEVIN_WORKER_TRACE} || 0;
  1         2  
  1         1320  
12              
13             sub run {
14 0     0 1   my ($self, @args) = @_;
15              
16 0           my $app = $self->app;
17 0           my $worker = $self->{worker} = $app->minion->worker;
18 0           my $status = $worker->status;
19 0   0       $status->{performed} //= 0;
20              
21             getopt \@args,
22             'C|command-interval=i' => \($status->{command_interval} //= 10),
23             'f|fast-start' => \my $fast,
24             'I|heartbeat-interval=i' => \($status->{heartbeat_interval} //= 300),
25             'j|jobs=i' => \($status->{jobs} //= 4),
26             'q|queue=s' => ($status->{queues} //= []),
27 0   0       'R|repair-interval=i' => \($status->{repair_interval} //= 21600);
      0        
      0        
      0        
      0        
28 0 0         @{$status->{queues}} = ('default') unless @{$status->{queues}};
  0            
  0            
29              
30 0           my $now = steady_time;
31 0 0         $self->{next_heartbeat} = $now if $status->{heartbeat_interval};
32 0 0         $self->{next_command} = $now if $status->{command_interval};
33 0 0         if ($status->{repair_interval}) {
34              
35             # Randomize to avoid congestion
36 0           $status->{repair_interval} -= int rand $status->{repair_interval} / 2;
37              
38 0           $self->{next_repair} = $now;
39 0 0         $self->{next_repair} += $status->{repair_interval} if $fast;
40             }
41              
42 0           $self->{pid} = $$;
43 0     0     local $SIG{CHLD} = sub { };
44 0     0     local $SIG{INT} = local $SIG{TERM} = sub { $self->_term(1) };
  0            
45 0     0     local $SIG{QUIT} = sub { $self->_term };
  0            
46              
47             # Remote control commands need to validate arguments carefully
48             $worker->add_command(
49 0 0 0 0     jobs => sub { $status->{jobs} = $_[1] if ($_[1] // '') =~ /^\d+$/ });
  0            
50             $worker->add_command(
51 0 0 0 0     stop => sub { $self->{jobs}{$_[1]}->stop if $self->{jobs}{$_[1] // ''} });
  0            
52              
53             # Log fatal errors
54 0           my $log = $app->log;
55 0           $log->info("Worker $$ started");
56 0 0         eval { $self->_work until $self->{finished}; 1 }
  0            
  0            
57             or $log->fatal("Worker error: $@");
58 0           $worker->unregister;
59 0           $log->info("Worker $$ stopped");
60             }
61              
62             sub _term {
63 0     0     my ($self, $graceful) = @_;
64 0 0         return unless $self->{pid} == $$;
65 0           $self->{stopping}++;
66 0 0         $self->{graceful} = $graceful or kill 'KILL', keys %{$self->{jobs}};
  0            
67             }
68              
69             sub _work {
70 0     0     my $self = shift;
71              
72 0           my $app = $self->app;
73 0           my $log = $app->log;
74 0           my $worker = $self->{worker};
75 0           my $status = $worker->status;
76              
77 0 0 0       if ($self->{stopping} && !$self->{quit}++) {
78             $log->info("Stopping worker $$ "
79 0 0         . ($self->{graceful} ? 'gracefully' : 'immediately'));
80              
81             # Skip hearbeats, remote command and repairs
82 0           delete @{$status}{qw(heartbeat_interval command_interval )}
83 0 0         unless $self->{graceful};
84 0           delete $status->{repair_interval};
85             }
86              
87             # Send heartbeats in regular intervals
88 0 0 0       if ($status->{heartbeat_interval} && $self->{next_heartbeat} < steady_time) {
89 0           $log->debug('Sending heartbeat') if TRACE;
90 0           $worker->register;
91 0           $self->{next_heartbeat} = steady_time + $status->{heartbeat_interval};
92             }
93              
94             # Process worker remote control commands in regular intervals
95 0 0 0       if ($status->{command_interval} && $self->{next_command} < steady_time) {
96 0           $log->debug('Checking remote control') if TRACE;
97 0           $worker->process_commands;
98 0           $self->{next_command} = steady_time + $status->{command_interval};
99             }
100              
101             # Repair in regular intervals
102 0 0 0       if ($status->{repair_interval} && $self->{next_repair} < steady_time) {
103 0           $log->debug('Checking worker registry and job queue');
104 0           $app->minion->repair;
105 0           $self->{next_repair} = steady_time + $status->{repair_interval};
106             }
107              
108             # Check if jobs are finished
109 0   0       my $jobs = $self->{jobs} ||= {};
110             $jobs->{$_}->is_finished and ++$status->{performed} and delete $jobs->{$_}
111 0   0       for keys %$jobs;
      0        
112              
113             # Return if worker is finished
114 0 0 0       ++$self->{finished} and return if $self->{stopping} && !keys %{$self->{jobs}};
  0   0        
115              
116             # Wait if job limit has been reached or worker is stopping
117 0 0 0       if (($status->{jobs} <= keys %$jobs) || $self->{stopping}) { sleep 1 }
  0 0          
118              
119             # Try to get more jobs
120             elsif (my $job = $worker->dequeue(5 => {queues => $status->{queues}})) {
121 0           $jobs->{my $id = $job->id} = $job->start;
122 0           my ($pid, $task) = ($job->pid, $job->task);
123 0           $log->debug(qq{Process $pid is performing job "$id" with task "$task"});
124             }
125             }
126              
127             1;
128              
129             #pod =encoding utf8
130             #pod
131             #pod =head1 SYNOPSIS
132             #pod
133             #pod Usage: APPLICATION kevin worker [OPTIONS]
134             #pod
135             #pod ./myapp.pl kevin worker
136             #pod ./myapp.pl kevin worker -f
137             #pod ./myapp.pl kevin worker -m production -I 15 -C 5 -R 3600 -j 10
138             #pod ./myapp.pl kevin worker -q important -q default
139             #pod
140             #pod Options:
141             #pod -C, --command-interval Worker remote control command interval,
142             #pod defaults to 10
143             #pod -f, --fast-start Start processing jobs as fast as
144             #pod possible and skip repairing on startup
145             #pod -h, --help Show this summary of available options
146             #pod --home Path to home directory of your
147             #pod application, defaults to the value of
148             #pod MOJO_HOME or auto-detection
149             #pod -I, --heartbeat-interval Heartbeat interval, defaults to 300
150             #pod -j, --jobs Maximum number of jobs to perform
151             #pod parallel in forked worker processes,
152             #pod defaults to 4
153             #pod -m, --mode Operating mode for your application,
154             #pod defaults to the value of
155             #pod MOJO_MODE/PLACK_ENV or "development"
156             #pod -q, --queue One or more queues to get jobs from,
157             #pod defaults to "default"
158             #pod -R, --repair-interval Repair interval, up to half of this
159             #pod value can be subtracted randomly to
160             #pod make sure not all workers repair at the
161             #pod same time, defaults to 21600 (6 hours)
162             #pod
163             #pod =head1 DESCRIPTION
164             #pod
165             #pod L starts a L worker. You can have as
166             #pod many workers as you like.
167             #pod
168             #pod This is a clone of L. The differences are:
169             #pod
170             #pod =over 4
171             #pod
172             #pod =item *
173             #pod
174             #pod During immediate stops, the worker stops sending heartbeats,
175             #pod processing remote commands and doing repairs.
176             #pod
177             #pod =item *
178             #pod
179             #pod During graceful stops, the worker stops doing repairs.
180             #pod
181             #pod =item *
182             #pod
183             #pod During a stop, when all jobs have finished, the worker
184             #pod will quit promptly (without sleeping).
185             #pod
186             #pod =item *
187             #pod
188             #pod Allow to disable repairs with C<-R 0>.
189             #pod
190             #pod =back
191             #pod
192             #pod =head1 SIGNALS
193             #pod
194             #pod The L process can be controlled at runtime
195             #pod with the following signals.
196             #pod
197             #pod =head2 INT, TERM
198             #pod
199             #pod Stop gracefully after finishing the current jobs.
200             #pod
201             #pod =head2 QUIT
202             #pod
203             #pod Stop immediately without finishing the current jobs.
204             #pod
205             #pod =head1 REMOTE CONTROL COMMANDS
206             #pod
207             #pod The L process can be controlled at runtime
208             #pod through L, from anywhere in the network, by
209             #pod broadcasting the following remote control commands.
210             #pod
211             #pod =head2 jobs
212             #pod
213             #pod $ ./myapp.pl minion job -b jobs -a '[10]'
214             #pod $ ./myapp.pl minion job -b jobs -a '[10]' 23
215             #pod
216             #pod Instruct one or more workers to change the number of jobs to perform
217             #pod concurrently. Setting this value to C<0> will effectively pause the worker. That
218             #pod means all current jobs will be finished, but no new ones accepted, until the
219             #pod number is increased again.
220             #pod
221             #pod =head2 stop
222             #pod
223             #pod $ ./myapp.pl minion job -b stop -a '[10025]'
224             #pod $ ./myapp.pl minion job -b stop -a '[10025]' 23
225             #pod
226             #pod Instruct one or more workers to stop a job that is currently being performed
227             #pod immediately. This command will be ignored by workers that do not have a job
228             #pod matching the id. That means it is safe to broadcast this command to all workers.
229             #pod
230             #pod =head1 ATTRIBUTES
231             #pod
232             #pod L inherits all attributes from
233             #pod L and implements the following new ones.
234             #pod
235             #pod =head2 description
236             #pod
237             #pod my $description = $worker->description;
238             #pod $worker = $worker->description('Foo');
239             #pod
240             #pod Short description of this command, used for the command list.
241             #pod
242             #pod =head2 usage
243             #pod
244             #pod my $usage = $worker->usage;
245             #pod $worker = $worker->usage('Foo');
246             #pod
247             #pod Usage information for this command, used for the help screen.
248             #pod
249             #pod =head1 METHODS
250             #pod
251             #pod L inherits all methods from
252             #pod L and implements the following new ones.
253             #pod
254             #pod =head2 run
255             #pod
256             #pod $worker->run(@ARGV);
257             #pod
258             #pod Run this command.
259             #pod
260             #pod =head1 DEBUGGING
261             #pod
262             #pod You can set the C environment variable to have some
263             #pod extra diagnostics information printed to C<< $app->log >>.
264             #pod
265             #pod KEVIN_WORKER_TRACE=1
266             #pod
267             #pod =head1 SEE ALSO
268             #pod
269             #pod L, L, L.
270             #pod
271             #pod =cut
272              
273             __END__