File Coverage

blib/lib/Resque/Worker.pm
Criterion Covered Total %
statement 33 256 12.8
branch 0 76 0.0
condition 0 24 0.0
subroutine 11 63 17.4
pod 35 35 100.0
total 79 454 17.4


line stmt bran cond sub pod time code
1             package Resque::Worker;
2             # ABSTRACT: Does the hard work of babysitting Resque::Job's
3             $Resque::Worker::VERSION = '0.42';
4 9     9   72 use Moose;
  9         18  
  9         86  
5             with 'Resque::Encoder';
6              
7 9     9   67158 use FindBin; # so it will work after playing around $0
  9         10575  
  9         415  
8 9     9   4359 use Resque::Stat;
  9         3245  
  9         375  
9 9     9   78 use POSIX ":sys_wait_h";
  9         17  
  9         78  
10 9     9   21154 use Sys::Hostname;
  9         10172  
  9         517  
11 9     9   73 use Scalar::Util qw(blessed weaken);
  9         17  
  9         468  
12 9     9   5621 use List::MoreUtils qw(uniq any);
  9         120657  
  9         57  
13 9     9   15923 use Time::HiRes qw(sleep);
  9         13023  
  9         44  
14 9     9   10152 use DateTime;
  9         4200059  
  9         472  
15 9     9   93 use Try::Tiny;
  9         29  
  9         881  
16              
17             use overload
18 9         79 '""' => \&_string,
19             '==' => \&_is_equal,
20 9     9   64 'eq' => \&_is_equal;
  9         17  
21              
22             has 'resque' => (
23             is => 'ro',
24             required => 1,
25             handles => [qw/ redis key /]
26             );
27              
28             has queues => (
29             is => 'rw',
30             isa => 'ArrayRef',
31             lazy => 1,
32             default => sub {[]}
33             );
34              
35             has stat => (
36             is => 'ro',
37             lazy => 1,
38             default => sub { Resque::Stat->new( resque => $_[0]->resque ) }
39             );
40              
41             has id => (
42             is => 'rw',
43             lazy => 1,
44             clearer => '_clear_id',
45             default => sub { $_[0]->_stringify }
46             );
47 0     0     sub _string { $_[0]->id } # can't point overload to a mo[o|u]se attribute :-(
48              
49             has verbose => ( is => 'rw', default => sub {0} );
50              
51             has cant_fork => ( is => 'rw', default => sub {0} );
52              
53             has cant_poll => ( is => 'rw', default => sub {0} );
54              
55             has child => ( is => 'rw' );
56              
57             has shutdown => ( is => 'rw', default => sub{0} );
58              
59             has paused => ( is => 'rw', default => sub{0} );
60              
61             has interval => ( is => 'rw', lazy => 1, default => sub{5} );
62              
63             has timeout => ( is => 'rw', default => sub{30} );
64              
65             has autoconfig => ( is => 'rw', predicate => 'has_autoconfig' );
66              
67 0     0 1   sub pause { $_[0]->paused(1) }
68              
69 0     0 1   sub unpause { $_[0]->paused(0) }
70              
71             sub shutdown_please {
72 0     0 1   print "Shutting down...\n";
73 0           $_[0]->shutdown(1);
74             }
75              
76 0 0   0 1   sub shutdown_now { $_[0]->shutdown_please && $_[0]->kill_child }
77              
78             sub work {
79 0     0 1   my $self = shift;
80 0           my $waiting; # Keep track for logging purposes only!
81              
82 0           $self->startup;
83 0           while ( ! $self->shutdown ) {
84 0 0         $self->autoconfig->($self) if $self->has_autoconfig;
85 0 0 0       if ( !$self->paused && ( my $job = $self->reserve ) ) {
    0 0        
86 0           $waiting=0;
87 0           $self->log("Got job $job");
88 0           $self->work_tick($job);
89             }
90             elsif( !$self->cant_poll && $self->interval ) {
91 0 0         unless ( $waiting ) {
92 0 0         my $status = $self->paused ? "Paused" : 'Waiting for ' . join( ', ', @{$self->queues} );
  0            
93 0           $self->procline( $status );
94 0           $self->log( $status );
95 0           $waiting=1;
96             }
97 0           sleep( $self->interval );
98             }
99             }
100 0           $self->unregister_worker;
101             }
102              
103             sub work_tick {
104 0     0 1   my ($self, $job) = @_;
105              
106 0           $self->working_on($job);
107 0           my $timestamp = DateTime->now->strftime("%Y/%m/%d %H:%M:%S %Z");
108              
109 0 0 0       if ( !$self->cant_fork && ( my $pid = fork ) ) {
110 0           $self->procline( "Forked $pid at $timestamp" );
111 0           $self->child($pid);
112 0           $self->log( "Waiting for $pid" );
113             #while ( ! waitpid( $pid, WNOHANG ) ) { } # non-blocking has sense?
114 0           waitpid( $pid, 0 );
115 0           $self->log( "Forked job($pid) exited with status $?" );
116              
117 0 0         if ($?) {
118 0           $job->fail("Exited with status $?");
119 0           $self->failed(1);
120             }
121             }
122             else {
123 0           undef $SIG{TERM};
124 0           undef $SIG{INT};
125              
126             # Allow graceful shutdown in "cant fork mode"
127 0 0         undef $SIG{QUIT} unless $self->cant_fork;
128              
129 0           $self->procline( sprintf( "Processing %s since %s", $job->queue, $timestamp ) );
130 0           $self->perform($job);
131 0 0         exit(0) unless $self->cant_fork;
132             }
133              
134 0           $self->done_working;
135 0           $self->child(0);
136             }
137              
138              
139             sub perform {
140 0     0 1   my ( $self, $job ) = @_;
141 0           my $ret;
142             try {
143 0     0     $ret = $job->perform;
144 0           $self->log( sprintf( "done: %s", $job->stringify ) );
145             }
146             catch {
147 0     0     $self->log( sprintf( "%s failed: %s", $job->stringify, $_ ) );
148 0           $job->fail($_);
149 0           $self->failed(1);
150 0           };
151 0           $ret;
152             }
153              
154             sub kill_child {
155 0     0 1   my $self = shift;
156 0 0         return unless $self->child;
157              
158 0 0         if ( kill 0, $self->child ) {
159 0           $self->log( "Killing my child: " . $self->child );
160 0           kill 9, $self->child;
161             }
162             else {
163 0           $self->log( "Child " . $self->child . " not found, shutting down." );
164 0           $self->shutdown_please;
165             }
166             }
167              
168             sub add_queue {
169 0     0 1   my $self = shift;
170 0 0         return unless @_;
171 0           $self->queues( [ uniq( @{$self->queues}, @_ ) ] );
  0            
172             }
173              
174             sub del_queue {
175 0     0 1   my ( $self, $queue ) = @_;
176 0 0         return unless $queue;
177              
178             return
179 0           @{$self->queues}
180             -
181 0 0         @{$self->queues( [ grep {$_} map { $_ eq $queue ? undef : $_ } @{$self->queues} ] )};
  0            
  0            
  0            
  0            
182             }
183              
184             sub reserve {
185 0     0 1   my $self = shift;
186              
187 0 0         if ( $self->cant_poll ) {
188 0           return $self->resque->blpop($self->queues, $self->timeout);
189             }
190             else {
191 0           for my $queue ( @{$self->queues} ) {
  0            
192 0 0         if ( my $job = $self->resque->pop($queue) ) {
193 0           return $job;
194             }
195             }
196             }
197             }
198              
199             sub working_on {
200 0     0 1   my ( $self, $job ) = @_;
201 0           $self->redis->set(
202             $self->key( worker => $self->id ),
203             $self->encoder->encode({
204             queue => $job->queue,
205             run_at => DateTime->now->strftime("%Y/%m/%d %H:%M:%S %Z"),
206             payload => $job->payload
207             })
208             );
209 0           $job->worker($self);
210             }
211              
212             sub done_working {
213 0     0 1   my $self = shift;
214 0           $self->processed(1);
215 0           $self->redis->del( $self->key( worker => $self->id ) );
216             }
217              
218             sub started {
219 0     0 1   my $self = shift;
220 0           _parsedate( $self->_started );
221             }
222              
223             sub _started {
224 0     0     my $self = shift;
225 0           $self->redis->get( $self->key( worker => $self->id => 'started' ) );
226             }
227              
228             sub _parsedate {
229 0     0     my $str = pop;
230 0           my ( $year, $month, $day, $hour, $minute, $secs, $tz ) = $str =~ m|^(\d+)[-/](\d+)[-/](\d+) (\d+):(\d+):(\d+) (.+)$|;
231 0           DateTime->new( day => $day, month => $month, year => $year, hour => $hour, minute => $minute, second => $secs, time_zone => $tz );
232             }
233              
234             sub set_started {
235 0     0 1   my $self = shift;
236 0   0       my $date = shift || DateTime->now->strftime('%Y-%m-%d %H:%M:%S %Z');
237 0           $self->redis->set( $self->key( worker => $self->id => 'started' ), $date );
238             }
239              
240             sub processing {
241 0     0 1   my $self = shift;
242 0 0         eval { $self->encoder->decode( $self->redis->get( $self->key( worker => $self->id ) ) ) } || {};
  0            
243             }
244              
245             sub processing_map {
246 0     0 1   my $self = shift;
247 0 0         return {} unless @_;
248              
249 0 0         my @ids = map { ref($_) ? $_->id : $_ } @_;
  0            
250 0 0 0       my @proc = map { ($_ && $self->encoder->decode($_)) || {} }
251 0           $self->redis->mget( map { $self->key( worker => $_ ) } @ids );
  0            
252              
253 0           my $count = 0;
254 0           return { map { $_ => $proc[$count++] } @ids };
  0            
255             }
256              
257             sub processing_started {
258 0     0 1   my $self = shift;
259 0   0       my $proc = shift || $self->processing;
260 0   0       my $run_at = $proc->{run_at} || return;
261 0           _parsedate($run_at);
262             }
263              
264             sub state {
265 0     0 1   my $self = shift;
266 0 0         $self->redis->exists( $self->key( worker => $self->id ) ) ? 'working' : 'idle';
267             }
268              
269             sub is_working {
270 0     0 1   my $self = shift;
271 0           $self->state eq 'working';
272             }
273              
274             sub is_idle {
275 0     0 1   my $self = shift;
276 0           $self->state eq 'idle';
277             }
278              
279             sub _stringify {
280 0     0     my $self = shift;
281 0           join ':', hostname, $$, join( ',', @{$self->queues} );
  0            
282             }
283              
284             # Is this worker the same as another worker?
285             sub _is_equal {
286 0     0     my ($self, $other) = @_;
287 0           $self->id eq $other->id;
288             }
289              
290             sub procline {
291 0     0 1   my $self = shift;
292 0 0         if ( my $str = shift ) {
293 0   0       $0 = sprintf( "resque-%s: %s", $Resque::VERSION || 'devel', $str );
294             }
295 0           $0;
296             }
297              
298             sub startup {
299 0     0 1   my $self = shift;
300 0           $0 = 'resque: Starting';
301              
302 0           $self->register_signal_handlers;
303 0           $self->prune_dead_workers;
304             #run_hook: before_first_fork
305 0           $self->register_worker;
306             }
307              
308             sub register_signal_handlers {
309 0     0 1   my $self = shift;
310 0           weaken $self;
311 0     0     $SIG{TERM} = sub { $self->shutdown_now };
  0            
312 0     0     $SIG{INT} = sub { $self->shutdown_now };
  0            
313 0     0     $SIG{QUIT} = sub { $self->shutdown_please };
  0            
314 0     0     $SIG{USR1} = sub { $self->kill_child };
  0            
315 0     0     $SIG{USR2} = sub { $self->pause };
  0            
316 0     0     $SIG{CONT} = sub { $self->unpause };
  0            
317             }
318              
319             sub prune_dead_workers {
320 0     0 1   my $self = shift;
321 0           my @all_workers = $self->all;
322 0 0         my @known_workers = $self->worker_pids if @all_workers;
323 0           for my $worker (@all_workers) {
324 0           my ($host, $pid, $queues) = split( ':', $worker->id );
325 0 0         next unless $host eq hostname;
326 0 0   0     next if any { $_ eq $pid } @known_workers;
  0            
327 0           $self->log( "Pruning dead worker: $worker" );
328 0           $worker->unregister_worker;
329             }
330             }
331              
332             sub register_worker {
333 0     0 1   my $self = shift;
334 0           $self->_register_id;
335 0           $self->set_started;
336             }
337              
338             sub _register_id {
339 0     0     my $self = shift;
340 0           $self->redis->sadd( $self->key( 'workers'), $self->id );
341             }
342              
343             sub refresh_id {
344 0     0 1   my $self = shift;
345 0           my $id = $self->_stringify;
346 0 0         return if $id eq $self->id; # unchanged?
347 0   0       my $start = $self->_started || die "Can't refresh_id before start";
348 0           my $proc = $self->stat->get("processed:$self");
349 0           my $fail = $self->stat->get("failed:$self");
350 0           $self->redis->multi;
351 0           $self->_unregister_id;
352 0           $self->id($id);
353 0           $self->_register_id;
354 0           $self->set_started($start);
355 0           $self->stat->set("processed:$self", $proc);
356 0           $self->stat->set("failed:$self", $fail);
357 0           $self->redis->exec;
358             }
359              
360             sub unregister_worker {
361 0     0 1   my $self = shift;
362              
363             # If we're still processing a job, make sure it gets logged as a
364             # failure.
365             {
366 0           my $hr = $self->processing;
  0            
367 0 0         if ( %$hr ) {
368             # Ensure the proper worker is attached to this job, even if
369             # it's not the precise instance that died.
370             my $job = $self->resque->new_job({
371             worker => $self,
372             queue => $hr->{queue},
373             payload => $hr->{payload}
374 0           });
375 0           $job->fail( 'Dirty exit' );
376             }
377             }
378              
379 0           $self->_unregister_id;
380              
381             }
382              
383             # clear worker registry keys
384             sub _unregister_id {
385 0     0     my $self = shift;
386 0           $self->redis->srem( $self->key('workers'), $self->id );
387 0           $self->redis->del( $self->key( worker => $self->id ) );
388 0           $self->redis->del( $self->key( worker => $self->id => 'started' ) );
389 0           $self->stat->clear("processed:$self");
390 0           $self->stat->clear("failed:$self");
391             }
392              
393             sub worker_pids {
394 0     0 1   my $self = shift;
395 0           my @pids;
396              
397 0 0         if($^O=~m/^(cygwin|MSWin32)$/i) {
398             # $0 assignment does not work under Win32, so we'll return a list of perl PIDs instead
399 0 0         @pids = map { s/^PID:\s*// && $_ }
400 0           grep { /^PID/ }
  0            
401             split( /[\r\n]/ , `tasklist /FI "IMAGENAME eq perl.exe" /FO list` );
402             } else {
403 0 0         my $ps_command = $^O eq 'solaris'
404             ? 'ps -A -o pid,args'
405             : 'ps -A -o pid,command';
406              
407 0           for ( split "\n", `$ps_command | grep resque | grep -v resque-web | grep -v grep` ) {
408 0 0         if ( m/^\s*(\d+)\s(.+)$/ ) {
409 0           push @pids, $1;
410             }
411             }
412             }
413 0 0         return wantarray ? @pids : \@pids;
414             }
415              
416             #TODO: add logger() attr to containg a logger object and if set, use that instead of print!
417             sub log {
418 0     0 1   my $self = shift;
419 0 0         return unless $self->verbose;
420 0           print STDERR shift, "\n";
421             }
422              
423             sub processed {
424 0     0 1   my $self = shift;
425 0 0         if (shift) {
426 0           $self->stat->incr('processed');
427 0           $self->stat->incr("processed:$self");
428             }
429 0           $self->stat->get("processed:$self");
430             }
431              
432             sub failed {
433 0     0 1   my $self = shift;
434 0 0         if (shift) {
435 0           $self->stat->incr('failed');
436 0           $self->stat->incr("failed:$self");
437             }
438 0           $self->stat->get("failed:$self");
439             }
440              
441             sub find {
442 0     0 1   my ( $self, $worker_id ) = @_;
443 0 0         if ( $self->exists( $worker_id ) ) {
444 0           return $self->_from_id( $worker_id );
445             }
446             }
447              
448             sub _from_id {
449 0     0     my ( $self, $worker_id ) = @_;
450 0           my @queues = split ',', (split( ':', $worker_id))[-1];
451 0           __PACKAGE__->new(
452             resque => $self->resque,
453             queues => \@queues,
454             id => $worker_id
455             );
456             }
457              
458             sub all {
459 0     0 1   my $self = shift;
460 0           my @w = map { $self->_from_id($_) } $self->redis->smembers( $self->key('workers') );
  0            
461 0 0         return wantarray ? @w : \@w;
462             }
463              
464             sub exists {
465 0     0 1   my ($self, $worker_id) = @_;
466 0           $self->redis->sismember( $self->key( 'workers' ), $worker_id );
467             }
468              
469             __PACKAGE__->meta->make_immutable();
470              
471             __END__
472              
473             =pod
474              
475             =encoding UTF-8
476              
477             =head1 NAME
478              
479             Resque::Worker - Does the hard work of babysitting Resque::Job's
480              
481             =head1 VERSION
482              
483             version 0.42
484              
485             =head1 ATTRIBUTES
486              
487             =head2 resque
488              
489             The L<Resque> object running this worker.
490              
491             =head2 queues
492              
493             Queues this worker should fetch jobs from.
494              
495             =head2 stat
496              
497             See L<Resque::Stat>.
498              
499             =head2 id
500              
501             Unique identifier for the running worker.
502             Used to set process status all around.
503              
504             The worker stringify to this attribute.
505              
506             =head2 verbose
507              
508             Set to a true value to make this worker report what's doing while
509             on work().
510              
511             =head2 cant_fork
512              
513             Set it to a true value to stop this worker from fork jobs.
514              
515             By default, the worker will fork the job out and control the
516             children process. This make the worker more resilient to
517             memory leaks.
518              
519             =head2 cant_poll
520              
521             Set it to a true value to stop this worker from polling for jobs and
522             use experimental blocking pop instead.
523              
524             See timeout().
525              
526             =head2 child
527              
528             PID of current running child.
529              
530             =head2 shutdown
531              
532             When true, this worker will shutdown after finishing current job.
533              
534             =head2 paused
535              
536             When true, this worker won't proccess more jobs till false.
537              
538             =head2 interval
539              
540             Float representing the polling frequency. The default is 5 seconds, but for a semi-active app you may want to use a smaller value.
541              
542             =head2 timeout
543              
544             Integer representing the blocking timeout. The default is not to block but to poll queues (see inverval),
545             so this attribute will be completely ignored unless dont_poll().
546             The default is 30 seconds. Setting it to 0 will make reserve() to block until some job is assigned to this
547             workers and will prevent autoconfig() to be called until it happen.
548              
549             =head2 autoconfig
550              
551             An optional callback to be called periodically while work()'ing. It's main purpose is to
552             allow running auto-config code as this function will receive this worker as it's only argument
553             and will be called before reserving the first job.
554              
555             When this callback is provided, it will be called on every wheel iteration, so it's recommended
556             to keep track of time to prevent running slow re-configuration code every time.
557              
558             =head1 METHODS
559              
560             =head2 pause
561              
562             Stop processing jobs after the current one has completed (if we're
563             currently running one).
564              
565             $worker->pause();
566              
567             =head2 unpause
568              
569             Start processing jobs again after a pause
570              
571             $worker->unpause();
572              
573             =head2 shutdown_please
574              
575             Schedule this worker for shutdown. Will finish processing the
576             current job.
577              
578             $worker->shutdown_please();
579              
580             =head2 shutdown_now
581              
582             Kill the child and shutdown immediately.
583              
584             $worker->shutdown_now();
585              
586             =head2 work
587              
588             Calling this method will make this worker start pulling & running jobs
589             from queues().
590              
591             This is the main wheel and will run while shutdown() is false.
592              
593             $worker->work();
594              
595             =head2 work_tick
596              
597             Perform() one job and wait till it finish.
598              
599             $worker->work_tick();
600              
601             =head2 perform
602              
603             Call perform() on the given Resque::Job capturing and reporting
604             any exception.
605              
606             $worker->perform( $job );
607              
608             =head2 kill_child
609              
610             Kills the forked child immediately, without remorse. The job it
611             is processing will not be completed.
612              
613             $worker->kill_child();
614              
615             =head2 add_queue
616              
617             Add a queue this worker should listen to.
618              
619             $worker->add_queue( "queuename" );
620              
621             =head2 del_queue
622              
623             Stop listening to the given queue.
624              
625             $worker->del_queue( "queuename" );
626              
627             =head2 reserve
628              
629             Pull the next job to be precessed.
630              
631             my $job = $worker->reserve();
632              
633             =head2 working_on
634              
635             Set worker and working status on the given L<Resque::Job>.
636              
637             $job->working_on( $resque_job );
638              
639             =head2 done_working
640              
641             Inform the backend this worker has done its current job
642              
643             $job->done_working();
644              
645             =head2 started
646              
647             What time did this worker start?
648             Returns an instance of DateTime.
649              
650             my $datetime = $worker->started();
651              
652             =head2 set_started
653              
654             Tell Redis we've started
655              
656             $worker->set_started();
657              
658             =head2 processing
659              
660             Returns a hash explaining the Job we're currently processing, if any.
661              
662             $worker->processing();
663              
664             =head2 processing_map
665              
666             Returns a hashref of processing info for a given worker or worker ID list
667              
668             $worker->processing( $worker1, $worker2, $worker3->id );
669              
670             =head2 processing_started
671              
672             What time did this worker started to work on current job?
673             Returns an instance of DateTime or undef when it's not working.
674              
675             my $datetime = $worker->processing_started();
676              
677             =head2 state
678              
679             Returns a string representing the current worker state,
680             which can be either working or idle
681              
682             my $state = $worker->state();
683              
684             =head2 is_working
685              
686             Boolean - true if working, false if not
687              
688             my $working = $worker->is_working();
689              
690             =head2 is_idle
691              
692             Boolean - true if idle, false if not
693              
694             my $idle = $worker->is_idle();
695              
696             =head2 procline
697              
698             Given a string, sets the procline ($0) and logs.
699             Procline is always in the format of:
700             resque-VERSION: STRING
701              
702             $worker->procline( "string" );
703              
704             =head2 startup
705              
706             Helper method called by work() to:
707              
708             1. register_signal_handlers()
709             2. prune_dead_workers();
710             3. register_worker();
711              
712             $worker->startup();
713              
714             =head2 register_signal_handlers
715              
716             Registers the various signal handlers a worker responds to.
717              
718             TERM: Shutdown immediately, stop processing jobs.
719             INT: Shutdown immediately, stop processing jobs.
720             QUIT: Shutdown after the current job has finished processing.
721             USR1: Kill the forked child immediately, continue processing jobs.
722             USR2: Don't process any new jobs
723             CONT: Start processing jobs again after a USR2
724              
725             $worker->register_signal_handlers();
726              
727             =head2 prune_dead_workers
728              
729             Looks for any workers which should be running on this server
730             and, if they're not, removes them from Redis.
731              
732             This is a form of garbage collection. If a server is killed by a
733             hard shutdown, power failure, or something else beyond our
734             control, the Resque workers will not die gracefully and therefore
735             will leave stale state information in Redis.
736              
737             By checking the current Redis state against the actual
738             environment, we can determine if Redis is old and clean it up a bit.
739              
740             $worker->prune_dead_worker();
741              
742             =head2 register_worker
743              
744             Registers ourself as a worker. Useful when entering the worker
745             lifecycle on startup.
746              
747             $worker->register_worker();
748              
749             =head2 refresh_id
750              
751             Do the dirty work after changing the queues on an already
752             register_worker().
753              
754             This will update backend on a single transactionto reflex
755             current queues by changing this worker ID which need to
756             unregister_worker() and register_worker() again while
757             keeping stat() and started().
758              
759             Only useful when you dinamically update queues and want to
760             watch it on the web interface.
761              
762             =head2 unregister_worker
763              
764             Unregisters ourself as a worker. Useful when shutting down.
765              
766             $worker->unregister_worker();
767              
768             =head2 worker_pids
769              
770             Returns an Array of string pids of all the other workers on this
771             machine. Useful when pruning dead workers on startup.
772              
773             my @pids = $worker->worker_pids();
774              
775             =head2 log
776              
777             If verbose() is true, this will print to STDERR.
778              
779             $worker->log( 'message here' );
780              
781             =head2 processed
782              
783             Retrieve from L<Resque::Stat> many jobs has done this worker.
784             Pass a true argument to increment by one before retrieval.
785              
786             my $jobs_run = $worker->processed( $boolean );
787              
788             =head2 failed
789              
790             How many failed jobs has this worker seen.
791             Pass a true argument to increment by one before retrieval.
792              
793             my $jobs_run = $worker->failed( $boolean );
794              
795             =head2 find
796              
797             Returns a single worker object. Accepts a string id.
798              
799             my $worker_object = $worker->find( $worker_id );
800              
801             =head2 all
802              
803             Returns a list of all worker registered on the backend, or an
804             arrayref in scalar context;
805              
806             my @workers = $worker->all();
807              
808             =head2 exists
809              
810             Returns true if the given worker id exists on redis() backend.
811              
812             my $exists = $worker->exists( $worker_id );
813              
814             =head1 AUTHOR
815              
816             Diego Kuperman <diego@freekeylabs.com>
817              
818             =head1 COPYRIGHT AND LICENSE
819              
820             This software is copyright (c) 2021 by Diego Kuperman.
821              
822             This is free software; you can redistribute it and/or modify it under
823             the same terms as the Perl 5 programming language system itself.
824              
825             =cut