File Coverage

blib/lib/Resque/Worker.pm
Criterion Covered Total %
statement 33 223 14.8
branch 0 68 0.0
condition 0 13 0.0
subroutine 11 57 19.3
pod 33 33 100.0
total 77 394 19.5


line stmt bran cond sub pod time code
1             package Resque::Worker;
2             # ABSTRACT: Does the hard work of babysitting Resque::Job's
3             $Resque::Worker::VERSION = '0.40';
4 9     9   69 use Moose;
  9         19  
  9         85  
5             with 'Resque::Encoder';
6              
7 9     9   68180 use FindBin; # so it will work after playing around $0
  9         10795  
  9         421  
8 9     9   4143 use Resque::Stat;
  9         3315  
  9         358  
9 9     9   75 use POSIX ":sys_wait_h";
  9         18  
  9         80  
10 9     9   21305 use Sys::Hostname;
  9         10421  
  9         534  
11 9     9   69 use Scalar::Util qw(blessed weaken);
  9         19  
  9         470  
12 9     9   5497 use List::MoreUtils qw(uniq any);
  9         119936  
  9         67  
13 9     9   16129 use Time::HiRes qw(sleep);
  9         13152  
  9         40  
14 9     9   9985 use DateTime;
  9         4283408  
  9         499  
15 9     9   95 use Try::Tiny;
  9         21  
  9         928  
16              
17             use overload
18 9         87 '""' => \&_string,
19             '==' => \&_is_equal,
20 9     9   63 'eq' => \&_is_equal;
  9         21  
21              
22             has 'resque' => (
23             is => 'ro',
24             required => 1,
25             handles => [qw/ redis key /]
26             );
27              
28             has queues => (
29             is => 'rw',
30             isa => 'ArrayRef',
31             lazy => 1,
32             default => sub {[]}
33             );
34              
35             has stat => (
36             is => 'ro',
37             lazy => 1,
38             default => sub { Resque::Stat->new( resque => $_[0]->resque ) }
39             );
40              
41             has id => ( is => 'rw', lazy => 1, default => sub { $_[0]->_stringify } );
42 0     0     sub _string { $_[0]->id } # can't point overload to a mo[o|u]se attribute :-(
43              
44             has verbose => ( is => 'rw', default => sub {0} );
45              
46             has cant_fork => ( is => 'rw', default => sub {0} );
47              
48             has cant_poll => ( is => 'rw', default => sub {0} );
49              
50             has child => ( is => 'rw' );
51              
52             has shutdown => ( is => 'rw', default => sub{0} );
53              
54             has paused => ( is => 'rw', default => sub{0} );
55              
56             has interval => ( is => 'rw', lazy => 1, default => sub{5} );
57              
58             has timeout => ( is => 'rw', default => sub{30} );
59              
60             has autoconfig => ( is => 'rw', predicate => 'has_autoconfig' );
61              
62 0     0 1   sub pause { $_[0]->paused(1) }
63              
64 0     0 1   sub unpause { $_[0]->paused(0) }
65              
66             sub shutdown_please {
67 0     0 1   print "Shutting down...\n";
68 0           $_[0]->shutdown(1);
69             }
70              
71 0 0   0 1   sub shutdown_now { $_[0]->shutdown_please && $_[0]->kill_child }
72              
73             sub work {
74 0     0 1   my $self = shift;
75 0           my $waiting; # Keep track for logging purposes only!
76              
77 0           $self->startup;
78 0           while ( ! $self->shutdown ) {
79 0 0         $self->autoconfig->($self) if $self->has_autoconfig;
80 0 0 0       if ( !$self->paused && ( my $job = $self->reserve ) ) {
    0 0        
81 0           $waiting=0;
82 0           $self->log("Got job $job");
83 0           $self->work_tick($job);
84             }
85             elsif( !$self->cant_poll && $self->interval ) {
86 0 0         unless ( $waiting ) {
87 0 0         my $status = $self->paused ? "Paused" : 'Waiting for ' . join( ', ', @{$self->queues} );
  0            
88 0           $self->procline( $status );
89 0           $self->log( $status );
90 0           $waiting=1;
91             }
92 0           sleep( $self->interval );
93             }
94             }
95 0           $self->unregister_worker;
96             }
97              
98             sub work_tick {
99 0     0 1   my ($self, $job) = @_;
100              
101 0           $self->working_on($job);
102 0           my $timestamp = DateTime->now->strftime("%Y/%m/%d %H:%M:%S %Z");
103              
104 0 0 0       if ( !$self->cant_fork && ( my $pid = fork ) ) {
105 0           $self->procline( "Forked $pid at $timestamp" );
106 0           $self->child($pid);
107 0           $self->log( "Waiting for $pid" );
108             #while ( ! waitpid( $pid, WNOHANG ) ) { } # non-blocking has sense?
109 0           waitpid( $pid, 0 );
110 0           $self->log( "Forked job($pid) exited with status $?" );
111              
112 0 0         if ($?) {
113 0           $job->fail("Exited with status $?");
114 0           $self->failed(1);
115             }
116             }
117             else {
118 0           undef $SIG{TERM};
119 0           undef $SIG{INT};
120              
121             # Allow graceful shutdown in "cant fork mode"
122 0 0         undef $SIG{QUIT} unless $self->cant_fork;
123              
124 0           $self->procline( sprintf( "Processing %s since %s", $job->queue, $timestamp ) );
125 0           $self->perform($job);
126 0 0         exit(0) unless $self->cant_fork;
127             }
128              
129 0           $self->done_working;
130 0           $self->child(0);
131             }
132              
133              
134             sub perform {
135 0     0 1   my ( $self, $job ) = @_;
136 0           my $ret;
137             try {
138 0     0     $ret = $job->perform;
139 0           $self->log( sprintf( "done: %s", $job->stringify ) );
140             }
141             catch {
142 0     0     $self->log( sprintf( "%s failed: %s", $job->stringify, $_ ) );
143 0           $job->fail($_);
144 0           $self->failed(1);
145 0           };
146 0           $ret;
147             }
148              
149             sub kill_child {
150 0     0 1   my $self = shift;
151 0 0         return unless $self->child;
152              
153 0 0         if ( kill 0, $self->child ) {
154 0           $self->log( "Killing my child: " . $self->child );
155 0           kill 9, $self->child;
156             }
157             else {
158 0           $self->log( "Child " . $self->child . " not found, shutting down." );
159 0           $self->shutdown_please;
160             }
161             }
162              
163             sub add_queue {
164 0     0 1   my $self = shift;
165 0 0         return unless @_;
166 0           $self->queues( [ uniq( @{$self->queues}, @_ ) ] );
  0            
167             }
168              
169             sub del_queue {
170 0     0 1   my ( $self, $queue ) = @_;
171 0 0         return unless $queue;
172              
173             return
174 0           @{$self->queues}
175             -
176 0 0         @{$self->queues( [ grep {$_} map { $_ eq $queue ? undef : $_ } @{$self->queues} ] )};
  0            
  0            
  0            
  0            
177             }
178              
179             sub reserve {
180 0     0 1   my $self = shift;
181              
182 0 0         if ( $self->cant_poll ) {
183 0           return $self->resque->blpop($self->queues, $self->timeout);
184             }
185             else {
186 0           for my $queue ( @{$self->queues} ) {
  0            
187 0 0         if ( my $job = $self->resque->pop($queue) ) {
188 0           return $job;
189             }
190             }
191             }
192             }
193              
194             sub working_on {
195 0     0 1   my ( $self, $job ) = @_;
196 0           $self->redis->set(
197             $self->key( worker => $self->id ),
198             $self->encoder->encode({
199             queue => $job->queue,
200             run_at => DateTime->now->strftime("%Y/%m/%d %H:%M:%S %Z"),
201             payload => $job->payload
202             })
203             );
204 0           $job->worker($self);
205             }
206              
207             sub done_working {
208 0     0 1   my $self = shift;
209 0           $self->processed(1);
210 0           $self->redis->del( $self->key( worker => $self->id ) );
211             }
212              
213             sub started {
214 0     0 1   my $self = shift;
215 0           _parsedate( $self->redis->get( $self->key( worker => $self->id => 'started' ) ) );
216             }
217              
218             sub _parsedate {
219 0     0     my $str = pop;
220 0           my ( $year, $month, $day, $hour, $minute, $secs, $tz ) = $str =~ m|^(\d+)[-/](\d+)[-/](\d+) (\d+):(\d+):(\d+) (.+)$|;
221 0           DateTime->new( day => $day, month => $month, year => $year, hour => $hour, minute => $minute, second => $secs, time_zone => $tz );
222             }
223              
224             sub set_started {
225 0     0 1   my $self = shift;
226 0           $self->redis->set( $self->key( worker => $self->id => 'started' ), DateTime->now->strftime('%Y-%m-%d %H:%M:%S %Z') );
227             }
228              
229             sub processing {
230 0     0 1   my $self = shift;
231 0 0         eval { $self->encoder->decode( $self->redis->get( $self->key( worker => $self->id ) ) ) } || {};
  0            
232             }
233              
234             sub processing_started {
235 0     0 1   my $self = shift;
236 0   0       my $run_at = $self->processing->{run_at} || return;
237 0           _parsedate($run_at);
238             }
239              
240             sub state {
241 0     0 1   my $self = shift;
242 0 0         $self->redis->exists( $self->key( worker => $self->id ) ) ? 'working' : 'idle';
243             }
244              
245             sub is_working {
246 0     0 1   my $self = shift;
247 0           $self->state eq 'working';
248             }
249              
250             sub is_idle {
251 0     0 1   my $self = shift;
252 0           $self->state eq 'idle';
253             }
254              
255             sub _stringify {
256 0     0     my $self = shift;
257 0           join ':', hostname, $$, join( ',', @{$self->queues} );
  0            
258             }
259              
260             # Is this worker the same as another worker?
261             sub _is_equal {
262 0     0     my ($self, $other) = @_;
263 0           $self->id eq $other->id;
264             }
265              
266             sub procline {
267 0     0 1   my $self = shift;
268 0 0         if ( my $str = shift ) {
269 0   0       $0 = sprintf( "resque-%s: %s", $Resque::VERSION || 'devel', $str );
270             }
271 0           $0;
272             }
273              
274             sub startup {
275 0     0 1   my $self = shift;
276 0           $0 = 'resque: Starting';
277              
278 0           $self->register_signal_handlers;
279 0           $self->prune_dead_workers;
280             #run_hook: before_first_fork
281 0           $self->register_worker;
282             }
283              
284             sub register_signal_handlers {
285 0     0 1   my $self = shift;
286 0           weaken $self;
287 0     0     $SIG{TERM} = sub { $self->shutdown_now };
  0            
288 0     0     $SIG{INT} = sub { $self->shutdown_now };
  0            
289 0     0     $SIG{QUIT} = sub { $self->shutdown_please };
  0            
290 0     0     $SIG{USR1} = sub { $self->kill_child };
  0            
291 0     0     $SIG{USR2} = sub { $self->pause };
  0            
292 0     0     $SIG{CONT} = sub { $self->unpause };
  0            
293             }
294              
295             sub prune_dead_workers {
296 0     0 1   my $self = shift;
297 0           my @all_workers = $self->all;
298 0 0         my @known_workers = $self->worker_pids if @all_workers;
299 0           for my $worker (@all_workers) {
300 0           my ($host, $pid, $queues) = split( ':', $worker->id );
301 0 0         next unless $host eq hostname;
302 0 0   0     next if any { $_ eq $pid } @known_workers;
  0            
303 0           $self->log( "Pruning dead worker: $worker" );
304 0           $worker->unregister_worker;
305             }
306             }
307              
308             sub register_worker {
309 0     0 1   my $self = shift;
310 0           $self->redis->sadd( $self->key( 'workers'), $self->id );
311 0           $self->set_started;
312             }
313              
314             sub unregister_worker {
315 0     0 1   my $self = shift;
316              
317             # If we're still processing a job, make sure it gets logged as a
318             # failure.
319             {
320 0           my $hr = $self->processing;
  0            
321 0 0         if ( %$hr ) {
322             # Ensure the proper worker is attached to this job, even if
323             # it's not the precise instance that died.
324             my $job = $self->resque->new_job({
325             worker => $self,
326             queue => $hr->{queue},
327             payload => $hr->{payload}
328 0           });
329 0           $job->fail( 'Dirty exit' );
330             }
331             }
332              
333 0           $self->redis->srem( $self->key('workers'), $self->id );
334 0           $self->redis->del( $self->key( worker => $self->id ) );
335 0           $self->redis->del( $self->key( worker => $self->id => 'started' ) );
336              
337 0           $self->stat->clear("processed:$self");
338 0           $self->stat->clear("failed:$self");
339             }
340              
341             sub worker_pids {
342 0     0 1   my $self = shift;
343 0           my @pids;
344              
345 0 0         if($^O=~m/^(cygwin|MSWin32)$/i) {
346             # $0 assignment does not work under Win32, so we'll return a list of perl PIDs instead
347 0 0         @pids = map { s/^PID:\s*// && $_ }
348 0           grep { /^PID/ }
  0            
349             split( /[\r\n]/ , `tasklist /FI "IMAGENAME eq perl.exe" /FO list` );
350             } else {
351 0 0         my $ps_command = $^O eq 'solaris'
352             ? 'ps -A -o pid,args'
353             : 'ps -A -o pid,command';
354              
355 0           for ( split "\n", `$ps_command | grep resque | grep -v resque-web | grep -v grep` ) {
356 0 0         if ( m/^\s*(\d+)\s(.+)$/ ) {
357 0           push @pids, $1;
358             }
359             }
360             }
361 0 0         return wantarray ? @pids : \@pids;
362             }
363              
364             #TODO: add logger() attr to containg a logger object and if set, use that instead of print!
365             sub log {
366 0     0 1   my $self = shift;
367 0 0         return unless $self->verbose;
368 0           print STDERR shift, "\n";
369             }
370              
371             sub processed {
372 0     0 1   my $self = shift;
373 0 0         if (shift) {
374 0           $self->stat->incr('processed');
375 0           $self->stat->incr("processed:$self");
376             }
377 0           $self->stat->get("processed:$self");
378             }
379              
380             sub failed {
381 0     0 1   my $self = shift;
382 0 0         if (shift) {
383 0           $self->stat->incr('failed');
384 0           $self->stat->incr("failed:$self");
385             }
386 0           $self->stat->get("failed:$self");
387             }
388              
389             sub find {
390 0     0 1   my ( $self, $worker_id ) = @_;
391 0 0         if ( $self->exists( $worker_id ) ) {
392 0           my @queues = split ',', (split( ':', $worker_id))[-1];
393 0           return __PACKAGE__->new(
394             resque => $self->resque,
395             queues => \@queues,
396             id => $worker_id
397             );
398             }
399             }
400              
401             sub all {
402 0     0 1   my $self = shift;
403 0           my @w = grep {$_} map { $self->find($_) } $self->redis->smembers( $self->key('workers') );
  0            
  0            
404 0 0         return wantarray ? @w : \@w;
405             }
406              
407             sub exists {
408 0     0 1   my ($self, $worker_id) = @_;
409 0           $self->redis->sismember( $self->key( 'workers' ), $worker_id );
410             }
411              
412             __PACKAGE__->meta->make_immutable();
413              
414             __END__
415              
416             =pod
417              
418             =encoding UTF-8
419              
420             =head1 NAME
421              
422             Resque::Worker - Does the hard work of babysitting Resque::Job's
423              
424             =head1 VERSION
425              
426             version 0.40
427              
428             =head1 ATTRIBUTES
429              
430             =head2 resque
431              
432             The L<Resque> object running this worker.
433              
434             =head2 queues
435              
436             Queues this worker should fetch jobs from.
437              
438             =head2 stat
439              
440             See L<Resque::Stat>.
441              
442             =head2 id
443              
444             Unique identifier for the running worker.
445             Used to set process status all around.
446              
447             The worker stringify to this attribute.
448              
449             =head2 verbose
450              
451             Set to a true value to make this worker report what's doing while
452             on work().
453              
454             =head2 cant_fork
455              
456             Set it to a true value to stop this worker from fork jobs.
457              
458             By default, the worker will fork the job out and control the
459             children process. This make the worker more resilient to
460             memory leaks.
461              
462             =head2 cant_poll
463              
464             Set it to a true value to stop this worker from polling for jobs and
465             use experimental blocking pop instead.
466              
467             See timeout().
468              
469             =head2 child
470              
471             PID of current running child.
472              
473             =head2 shutdown
474              
475             When true, this worker will shutdown after finishing current job.
476              
477             =head2 paused
478              
479             When true, this worker won't proccess more jobs till false.
480              
481             =head2 interval
482              
483             Float representing the polling frequency. The default is 5 seconds, but for a semi-active app you may want to use a smaller value.
484              
485             =head2 timeout
486              
487             Integer representing the blocking timeout. The default is not to block but to poll queues (see inverval),
488             so this attribute will be completely ignored unless dont_poll().
489             The default is 30 seconds. Setting it to 0 will make reserve() to block until some job is assigned to this
490             workers and will prevent autoconfig() to be called until it happen.
491              
492             =head2 autoconfig
493              
494             An optional callback to be called periodically while work()'ing. It's main purpose is to
495             allow running auto-config code as this function will receive this worker as it's only argument
496             and will be called before reserving the first job.
497              
498             When this callback is provided, it will be called on every wheel iteration, so it's recommended
499             to keep track of time to prevent running slow re-configuration code every time.
500              
501             =head1 METHODS
502              
503             =head2 pause
504              
505             Stop processing jobs after the current one has completed (if we're
506             currently running one).
507              
508             $worker->pause();
509              
510             =head2 unpause
511              
512             Start processing jobs again after a pause
513              
514             $worker->unpause();
515              
516             =head2 shutdown_please
517              
518             Schedule this worker for shutdown. Will finish processing the
519             current job.
520              
521             $worker->shutdown_please();
522              
523             =head2 shutdown_now
524              
525             Kill the child and shutdown immediately.
526              
527             $worker->shutdown_now();
528              
529             =head2 work
530              
531             Calling this method will make this worker start pulling & running jobs
532             from queues().
533              
534             This is the main wheel and will run while shutdown() is false.
535              
536             $worker->work();
537              
538             =head2 work_tick
539              
540             Perform() one job and wait till it finish.
541              
542             $worker->work_tick();
543              
544             =head2 perform
545              
546             Call perform() on the given Resque::Job capturing and reporting
547             any exception.
548              
549             $worker->perform( $job );
550              
551             =head2 kill_child
552              
553             Kills the forked child immediately, without remorse. The job it
554             is processing will not be completed.
555              
556             $worker->kill_child();
557              
558             =head2 add_queue
559              
560             Add a queue this worker should listen to.
561              
562             $worker->add_queue( "queuename" );
563              
564             =head2 del_queue
565              
566             Stop listening to the given queue.
567              
568             $worker->del_queue( "queuename" );
569              
570             =head2 reserve
571              
572             Pull the next job to be precessed.
573              
574             my $job = $worker->reserve();
575              
576             =head2 working_on
577              
578             Set worker and working status on the given L<Resque::Job>.
579              
580             $job->working_on( $resque_job );
581              
582             =head2 done_working
583              
584             Inform the backend this worker has done its current job
585              
586             $job->done_working();
587              
588             =head2 started
589              
590             What time did this worker start?
591             Returns an instance of DateTime.
592              
593             my $datetime = $worker->started();
594              
595             =head2 set_started
596              
597             Tell Redis we've started
598              
599             $worker->set_started();
600              
601             =head2 processing
602              
603             Returns a hash explaining the Job we're currently processing, if any.
604              
605             $worker->processing();
606              
607             =head2 processing_started
608              
609             What time did this worker started to work on current job?
610             Returns an instance of DateTime or undef when it's not working.
611              
612             my $datetime = $worker->processing_started();
613              
614             =head2 state
615              
616             Returns a string representing the current worker state,
617             which can be either working or idle
618              
619             my $state = $worker->state();
620              
621             =head2 is_working
622              
623             Boolean - true if working, false if not
624              
625             my $working = $worker->is_working();
626              
627             =head2 is_idle
628              
629             Boolean - true if idle, false if not
630              
631             my $idle = $worker->is_idle();
632              
633             =head2 procline
634              
635             Given a string, sets the procline ($0) and logs.
636             Procline is always in the format of:
637             resque-VERSION: STRING
638              
639             $worker->procline( "string" );
640              
641             =head2 startup
642              
643             Helper method called by work() to:
644              
645             1. register_signal_handlers()
646             2. prune_dead_workers();
647             3. register_worker();
648              
649             $worker->startup();
650              
651             =head2 register_signal_handlers
652              
653             Registers the various signal handlers a worker responds to.
654              
655             TERM: Shutdown immediately, stop processing jobs.
656             INT: Shutdown immediately, stop processing jobs.
657             QUIT: Shutdown after the current job has finished processing.
658             USR1: Kill the forked child immediately, continue processing jobs.
659             USR2: Don't process any new jobs
660             CONT: Start processing jobs again after a USR2
661              
662             $worker->register_signal_handlers();
663              
664             =head2 prune_dead_workers
665              
666             Looks for any workers which should be running on this server
667             and, if they're not, removes them from Redis.
668              
669             This is a form of garbage collection. If a server is killed by a
670             hard shutdown, power failure, or something else beyond our
671             control, the Resque workers will not die gracefully and therefore
672             will leave stale state information in Redis.
673              
674             By checking the current Redis state against the actual
675             environment, we can determine if Redis is old and clean it up a bit.
676              
677             $worker->prune_dead_worker();
678              
679             =head2 register_worker
680              
681             Registers ourself as a worker. Useful when entering the worker
682             lifecycle on startup.
683              
684             $worker->register_worker();
685              
686             =head2 unregister_worker
687              
688             Unregisters ourself as a worker. Useful when shutting down.
689              
690             $worker->unregister_worker();
691              
692             =head2 worker_pids
693              
694             Returns an Array of string pids of all the other workers on this
695             machine. Useful when pruning dead workers on startup.
696              
697             my @pids = $worker->worker_pids();
698              
699             =head2 log
700              
701             If verbose() is true, this will print to STDERR.
702              
703             $worker->log( 'message here' );
704              
705             =head2 processed
706              
707             Retrieve from L<Resque::Stat> many jobs has done this worker.
708             Pass a true argument to increment by one before retrieval.
709              
710             my $jobs_run = $worker->processed( $boolean );
711              
712             =head2 failed
713              
714             How many failed jobs has this worker seen.
715             Pass a true argument to increment by one before retrieval.
716              
717             my $jobs_run = $worker->processed( $boolean );
718              
719             =head2 find
720              
721             Returns a single worker object. Accepts a string id.
722              
723             my $worker_object = $worker->find( $worker_id );
724              
725             =head2 all
726              
727             Returns a list of all worker registered on the backend, or an
728             arrayref in scalar context;
729              
730             my @workers = $worker->all();
731              
732             =head2 exists
733              
734             Returns true if the given worker id exists on redis() backend.
735              
736             my $exists = $worker->exists( $worker_id );
737              
738             =head1 AUTHOR
739              
740             Diego Kuperman <diego@freekeylabs.com>
741              
742             =head1 COPYRIGHT AND LICENSE
743              
744             This software is copyright (c) 2021 by Diego Kuperman.
745              
746             This is free software; you can redistribute it and/or modify it under
747             the same terms as the Perl 5 programming language system itself.
748              
749             =cut