File Coverage

blib/lib/Gearman/Worker.pm
Criterion Covered Total %
statement 126 248 50.8
branch 31 124 25.0
condition 4 19 21.0
subroutine 17 22 77.2
pod 7 7 100.0
total 185 420 44.0


line stmt bran cond sub pod time code
1             package Gearman::Worker;
2 11     11   580035 use version;
  11         12522  
  11         52  
3             $Gearman::Worker::VERSION = qv("2.001_001");
4              
5 11     11   665 use strict;
  11         13  
  11         155  
6 11     11   34 use warnings;
  11         11  
  11         230  
7              
8 11     11   32 use base "Gearman::Objects";
  11         13  
  11         3462  
9              
10             =head1 NAME
11              
12             Gearman::Worker - Worker for gearman distributed job system
13              
14             =head1 SYNOPSIS
15              
16             use Gearman::Worker;
17             my $worker = Gearman::Worker->new;
18             $worker->job_servers('127.0.0.1');
19             $worker->register_function($funcname => $subref);
20             $worker->work while 1;
21              
22             =head1 DESCRIPTION
23              
24             I is a worker class for the Gearman distributed job system,
25             providing a framework for receiving and serving jobs from a Gearman server.
26              
27             Callers instantiate a I object, register a list of functions
28             and capabilities that they can handle, then enter an event loop, waiting
29             for the server to send jobs.
30              
31             The worker can send a return value back to the server, which then gets
32             sent back to the client that requested the job; or it can simply execute
33             silently.
34              
35             =head1 USAGE
36              
37             =head2 Gearman::Worker->new(%options)
38              
39             Creates a new I object, and returns the object.
40              
41             If I<%options> is provided, initializes the new worker object with the
42             settings in I<%options>, which can contain:
43              
44             =over 4
45              
46             =item * job_servers
47              
48             Calls I (see below) to initialize the list of job
49             servers. It will be ignored if this worker is running as a child
50             process of a gearman server.
51              
52             =item * prefix
53              
54             Calls I (see below) to set the prefix / namespace.
55              
56             =back
57              
58             =head2 $client-Eprefix($prefix)
59              
60             Sets the namespace / prefix for the function names. This is useful
61             for sharing job servers between different applications or different
62             instances of the same application (different development sandboxes for
63             example).
64              
65             The namespace is currently implemented as a simple tab separated
66             concatenation of the prefix and the function name.
67              
68             =head1 EXAMPLES
69              
70             =head2 Summation
71              
72             This is an example worker that receives a request to sum up a list of
73             integers.
74              
75             use Gearman::Worker;
76             use Storable qw( thaw );
77             use List::Util qw( sum );
78             my $worker = Gearman::Worker->new;
79             $worker->job_servers('127.0.0.1');
80             $worker->register_function(sum => sub { sum @{ thaw($_[0]->arg) } });
81             $worker->work while 1;
82              
83             See the I documentation for a sample client sending the
84             I job.
85              
86             =head1 METHODS
87              
88             =cut
89              
90             #TODO: retries?
91             #
92 11     11   3282 use Gearman::Util;
  11         18  
  11         256  
93 11     11   2842 use Gearman::Job;
  11         420  
  11         201  
94 11     11   131 use Carp ();
  11         11  
  11         233  
95              
96             use fields (
97 11         52 'sock_cache', # host:port -> IO::Socket::IP
98             'last_connect_fail', # host:port -> unixtime
99             'down_since', # host:port -> unixtime
100             'connecting', # host:port -> unixtime connect started at
101             'can', # ability -> subref (ability is func with optional prefix)
102             'timeouts', # ability -> timeouts
103             'client_id', # random identifier string, no whitespace
104             'parent_pipe', # bool/obj: if we're a child process of a gearman server,
105             # this is socket to our parent process. also means parent
106             # sock can never disconnect or timeout, etc..
107 11     11   31 );
  11         11  
108              
109             BEGIN {
110 11 50 33 11   1356 my $storable = eval { require Storable; 1 }
  11         540  
  11         2237  
111             if !defined &THROW_EXCEPTIONS || THROW_EXCEPTIONS();
112              
113 11   50     31 $storable ||= 0;
114              
115 11 50       31 if (defined &THROW_EXCEPTIONS) {
116 0         0 die "Exceptions support requires Storable: $@";
117             }
118             else {
119 11         368 eval "sub THROW_EXCEPTIONS () { $storable }";
120 11 50       17220 die "Couldn't define THROW_EXCEPTIONS: $@\n" if $@;
121             }
122             } ## end BEGIN
123              
124             sub new {
125 7     7 1 11606 my ($class, %opts) = @_;
126 7         9 my $self = $class;
127 7 50       30 $self = fields::new($class) unless ref $self;
128              
129 7 100       3007 if ($ENV{GEARMAN_WORKER_USE_STDIO}) {
130 1 50       16 open my $sock, '+<&', \*STDIN
131             or die "Unable to dup STDIN to socket for worker to use.";
132 1         3 $self->{job_servers} = [$sock];
133 1         1 $self->{parent_pipe} = $sock;
134              
135 1 50       3 die "Unable to initialize connection to gearmand"
136             unless $self->_on_connect($sock);
137 0 0       0 if ($opts{job_servers}) {
138 0         0 warn join ' ', __PACKAGE__,
139             'ignores job_servers if $ENV{GEARMAN_WORKER_USE_STDIO} is set';
140              
141 0         0 delete($opts{job_servers});
142             }
143             } ## end if ($ENV{GEARMAN_WORKER_USE_STDIO...})
144              
145 6         22 $self->SUPER::new(%opts);
146              
147 6         11 $self->{sock_cache} = {};
148 6         5 $self->{last_connect_fail} = {};
149 6         6 $self->{down_since} = {};
150 6         8 $self->{can} = {};
151 6         7 $self->{timeouts} = {};
152 6         8 $self->{client_id} = join("", map { chr(int(rand(26)) + 97) } (1 .. 30));
  180         186  
153              
154 6         22 return $self;
155             } ## end sub new
156              
157             #
158             # _get_js_sock($ipport, %opts)
159             #
160             sub _get_js_sock {
161 3     3   1737 my ($self, $ipport, %opts) = @_;
162 3 100       11 $ipport || return;
163              
164 2         4 my $on_connect = delete $opts{on_connect};
165              
166             # Someday should warn when called with extra opts.
167              
168 2 50       19 warn "getting job server socket: $ipport" if $self->debug;
169              
170             # special case, if we're a child process of a gearman::server
171             # parent process, talking over a unix pipe...
172 2 100       9 return $self->{parent_pipe} if $self->{parent_pipe};
173              
174 1 50       5 if (my $sock = $self->{sock_cache}{$ipport}) {
175 0 0       0 return $sock if getpeername($sock);
176 0         0 delete $self->{sock_cache}{$ipport};
177             }
178              
179 1         1 my $now = time;
180 1         3 my $down_since = $self->{down_since}{$ipport};
181 1 50       3 if ($down_since) {
182 0 0       0 warn "job server down since $down_since" if $self->debug;
183              
184 0         0 my $down_for = $now - $down_since;
185 0 0       0 my $retry_period = $down_for > 60 ? 30 : (int($down_for / 2) + 1);
186 0 0       0 if ($self->{last_connect_fail}{$ipport} > $now - $retry_period) {
187 0         0 return undef;
188             }
189             } ## end if ($down_since)
190              
191 1 50       2 warn "connecting to '$ipport'" if $self->debug;
192              
193 1         8 my $sock = $self->socket($ipport, 1);
194 1 50       3 unless ($sock) {
195 1 50       3 $self->debug && warn "$@";
196              
197 1   33     6 $self->{down_since}{$ipport} ||= $now;
198 1         2 $self->{last_connect_fail}{$ipport} = $now;
199              
200 1         4 return;
201             } ## end unless ($sock)
202              
203 0         0 delete $self->{last_connect_fail}{$ipport};
204 0         0 delete $self->{down_since}{$ipport};
205              
206 0         0 $sock->autoflush(1);
207 0         0 $self->sock_nodelay($sock);
208              
209 0         0 $self->{sock_cache}{$ipport} = $sock;
210              
211 0 0 0     0 unless ($self->_on_connect($sock) && $on_connect && $on_connect->($sock)) {
      0        
212 0         0 delete $self->{sock_cache}{$ipport};
213 0         0 return;
214             }
215              
216 0         0 return $sock;
217             } ## end sub _get_js_sock
218              
219             #
220             # _on_connect($sock)
221             #
222             # Housekeeping things to do on connection to a server. Method call
223             # with one argument being the 'socket' we're going to take care of.
224             # returns true on success, false on failure.
225             #
226             sub _on_connect {
227 2     2   227 my ($self, $sock) = @_;
228              
229             my $cid_req
230 2         8 = Gearman::Util::pack_req_command("set_client_id", $self->{client_id});
231 2 50       6 return undef unless Gearman::Util::send_req($sock, \$cid_req);
232              
233             # get this socket's state caught-up
234 0         0 foreach my $ability (keys %{ $self->{can} }) {
  0         0  
235 0         0 my $timeout = $self->{timeouts}->{$ability};
236 0 0       0 unless ($self->_set_ability($sock, $ability, $timeout)) {
237 0         0 return undef;
238             }
239             } ## end foreach my $ability (keys %...)
240              
241 0         0 return 1;
242             } ## end sub _on_connect
243              
244             #
245             # _set_ability($sock, $ability, $timeout)
246             #
247             sub _set_ability {
248 3     3   4 my ($self, $sock, $ability, $timeout) = @_;
249 3         3 my $req;
250 3 100       6 if (defined $timeout) {
251 1         3 $req = Gearman::Util::pack_req_command("can_do_timeout",
252             "$ability\0$timeout");
253             }
254             else {
255 2         4 $req = Gearman::Util::pack_req_command("can_do", $ability);
256             }
257 3         8 return Gearman::Util::send_req($sock, \$req);
258             } ## end sub _set_ability
259              
260             =head2 reset_abilities
261              
262             tell all the jobservers that this worker can't do anything
263              
264             =cut
265              
266             sub reset_abilities {
267 1     1 1 249 my $self = shift;
268 1         3 my $req = Gearman::Util::pack_req_command("reset_abilities");
269 1         1 foreach my $js (@{ $self->{job_servers} }) {
  1         4  
270 0 0       0 my $jss = $self->_get_js_sock($js)
271             or next;
272              
273 0 0       0 unless (Gearman::Util::send_req($jss, \$req)) {
274 0         0 $self->uncache_sock("js", "err_write_reset_abilities");
275             }
276             } ## end foreach my $js (@{ $self->{...}})
277              
278 1         2 $self->{can} = {};
279 1         3 $self->{timeouts} = {};
280             } ## end sub reset_abilities
281              
282             =head2 uncache_sock($ipport, $reason)
283              
284             close TCP connection
285              
286             =cut
287              
288             sub uncache_sock {
289 0     0 1 0 my ($self, $ipport, $reason) = @_;
290              
291             # we can't reconnect as a child process, so all we can do is die and hope our
292             # parent process respawns us...
293             die "Error/timeout talking to gearman parent process: [$reason]"
294 0 0       0 if $self->{parent_pipe};
295              
296             # normal case, we just close this TCP connection and we'll reconnect later.
297 0         0 delete $self->{sock_cache}{$ipport};
298             } ## end sub uncache_sock
299              
300             =head2 work(%opts)
301              
302             Endless loop takes a job and wait for the next one.
303             You can pass "stop_if", "on_start", "on_complete" and "on_fail" callbacks in I<%opts>.
304              
305             =cut
306              
307             sub work {
308 1     1 1 299 my ($self, %opts) = @_;
309 1   50 0   4 my $stop_if = delete $opts{'stop_if'} || sub {0};
  0         0  
310 1         19 my $complete_cb = delete $opts{on_complete};
311 1         2 my $fail_cb = delete $opts{on_fail};
312 1         2 my $start_cb = delete $opts{on_start};
313 1 50       2 die "Unknown opts" if %opts;
314              
315 1         3 my $grab_req = Gearman::Util::pack_req_command("grab_job");
316 1         3 my $presleep_req = Gearman::Util::pack_req_command("pre_sleep");
317              
318 1         1 my $last_job_time;
319              
320             my $on_connect = sub {
321 0     0   0 return Gearman::Util::send_req($_[0], \$presleep_req);
322 1         3 };
323              
324             # "Active" job servers are servers that have woken us up and should be
325             # queried to see if they have jobs for us to handle. On our first pass
326             # in the loop we contact all servers.
327 1         1 my %active_js = map { $_ => 1 } @{ $self->{job_servers} };
  0         0  
  1         2  
328              
329             # ( js => last_update_time, ... )
330 1         2 my %last_update_time;
331              
332 1         1 while (1) {
333              
334             # "Jobby" job servers are the set of server which we will contact
335             # on this pass through the loop, because we need to clear and use
336             # the "Active" set to plan for our next pass through the loop.
337 1         1 my @jobby_js = keys %active_js;
338              
339 1         1 %active_js = ();
340              
341 1         2 my $js_count = @jobby_js;
342 1         2 my $js_offset = int(rand($js_count));
343 1         1 my $is_idle = 0;
344              
345 1         3 for (my $i = 0; $i < $js_count; $i++) {
346 0         0 my $js_index = ($i + $js_offset) % $js_count;
347 0         0 my $js = $jobby_js[$js_index];
348 0 0       0 my $jss = $self->_get_js_sock($js, on_connect => $on_connect)
349             or next;
350              
351             # TODO: add an optional sleep in here for the test suite
352             # to test gearmand server going away here. (SIGPIPE on
353             # send_req, etc) this testing has been done manually, at
354             # least.
355              
356 0 0       0 unless (Gearman::Util::send_req($jss, \$grab_req)) {
357 0 0 0     0 if ($!{EPIPE} && $self->{parent_pipe}) {
358              
359             # our parent process died, so let's just quit
360             # gracefully.
361 0         0 exit(0);
362             } ## end if ($!{EPIPE} && $self...)
363 0         0 $self->uncache_sock($js, "grab_job_timeout");
364 0         0 delete $last_update_time{$js};
365 0         0 next;
366             } ## end unless (Gearman::Util::send_req...)
367              
368             # if we're a child process talking over a unix pipe, give more
369             # time, since we know there are no network issues, and also
370             # because on failure, we can't "reconnect". all we can do is
371             # die and hope our parent process respawns us.
372 0 0       0 my $timeout = $self->{parent_pipe} ? 5 : 0.50;
373 0 0       0 unless (Gearman::Util::wait_for_readability($jss->fileno, $timeout))
374             {
375 0         0 $self->uncache_sock($js, "grab_job_timeout");
376 0         0 delete $last_update_time{$js};
377 0         0 next;
378             } ## end unless (Gearman::Util::wait_for_readability...)
379              
380 0         0 my $res;
381             do {
382 0         0 my $err;
383 0         0 $res = Gearman::Util::read_res_packet($jss, \$err);
384 0 0       0 unless ($res) {
385 0         0 $self->uncache_sock($js, "read_res_error");
386 0         0 delete $last_update_time{$js};
387 0         0 next;
388             }
389 0         0 } while ($res->{type} eq "noop");
390              
391 0 0       0 if ($res->{type} eq "no_job") {
392 0 0       0 unless (Gearman::Util::send_req($jss, \$presleep_req)) {
393 0         0 delete $last_update_time{$js};
394 0         0 $self->uncache_sock($js, "write_presleep_error");
395             }
396 0         0 $last_update_time{$js} = time;
397 0         0 next;
398             } ## end if ($res->{type} eq "no_job")
399              
400 0 0       0 unless ($res->{type} eq "job_assign") {
401 0         0 my $msg = "Uh, wasn't expecting a $res->{type} packet.";
402 0 0       0 if ($res->{type} eq "error") {
403 0         0 $msg .= " [${$res->{blobref}}]\n";
  0         0  
404 0         0 $msg =~ s/\0/ -- /g;
405             }
406 0         0 die $msg;
407             } ## end unless ($res->{type} eq "job_assign")
408              
409 0 0       0 ${ $res->{'blobref'} } =~ s/^(.+?)\0(.+?)\0//
  0         0  
410             or die "Uh, regexp on job_assign failed";
411 0         0 my ($handle, $ability) = ($1, $2);
412             my $job
413 0         0 = Gearman::Job->new($ability, $res->{'blobref'}, $handle, $jss);
414              
415 0         0 my $jobhandle = "$js//" . $job->handle;
416 0 0       0 $start_cb->($jobhandle) if $start_cb;
417              
418 0         0 my $handler = $self->{can}{$ability};
419 0         0 my $ret = eval { $handler->($job); };
  0         0  
420 0         0 my $err = $@;
421 0 0       0 warn "Job '$ability' died: $err" if $err;
422              
423 0         0 $last_update_time{$js} = $last_job_time = time();
424              
425 0 0       0 if (THROW_EXCEPTIONS && $err) {
426 0         0 my $exception_req
427             = Gearman::Util::pack_req_command("work_exception",
428             join("\0", $handle, Storable::nfreeze(\$err)));
429 0 0       0 unless (Gearman::Util::send_req($jss, \$exception_req)) {
430 0         0 $self->uncache_sock($js, "write_res_error");
431 0         0 next;
432             }
433             } ## end if (THROW_EXCEPTIONS &&...)
434              
435 0         0 my $work_req;
436 0 0       0 if (defined $ret) {
437 0 0       0 my $rv = ref $ret ? $$ret : $ret;
438 0         0 $work_req = Gearman::Util::pack_req_command("work_complete",
439             "$handle\0$rv");
440 0 0       0 $complete_cb->($jobhandle, $ret) if $complete_cb;
441             } ## end if (defined $ret)
442             else {
443 0         0 $work_req
444             = Gearman::Util::pack_req_command("work_fail", $handle);
445 0 0       0 $fail_cb->($jobhandle, $err) if $fail_cb;
446             }
447              
448 0 0       0 unless (Gearman::Util::send_req($jss, \$work_req)) {
449 0         0 $self->uncache_sock($js, "write_res_error");
450 0         0 next;
451             }
452              
453 0         0 $active_js{$js} = 1;
454             } ## end for (my $i = 0; $i < $js_count...)
455              
456 1         1 my @jss;
457              
458 1         1 foreach my $js (@{ $self->{job_servers} }) {
  1         2  
459 0 0       0 my $jss = $self->_get_js_sock($js, on_connect => $on_connect)
460             or next;
461 0         0 push @jss, [$js, $jss];
462             }
463              
464 1         2 $is_idle = 1;
465 1         1 my $wake_vec = '';
466              
467 1         1 foreach my $j (@jss) {
468 0         0 my ($js, $jss) = @$j;
469 0         0 my $fd = $jss->fileno;
470 0         0 vec($wake_vec, $fd, 1) = 1;
471             }
472              
473 1 50       3 my $timeout = keys %active_js ? 0 : (10 + rand(2));
474              
475             # chill for some arbitrary time until we're woken up again
476 1         10131905 my $nready = select(my $wout = $wake_vec, undef, undef, $timeout);
477              
478 1 50       16 if ($nready) {
479 0         0 foreach my $j (@jss) {
480 0         0 my ($js, $jss) = @$j;
481 0         0 my $fd = $jss->fileno;
482 0 0       0 $active_js{$js} = 1
483             if vec($wout, $fd, 1);
484             } ## end foreach my $j (@jss)
485             } ## end if ($nready)
486              
487 1 50       7 $is_idle = 0 if keys %active_js;
488              
489 1 50       27 return if $stop_if->($is_idle, $last_job_time);
490              
491 0         0 my $update_since = time - (15 + rand 60);
492              
493 0         0 while (my ($js, $last_update) = each %last_update_time) {
494 0 0       0 $active_js{$js} = 1 if $last_update < $update_since;
495             }
496             } ## end while (1)
497              
498             } ## end sub work
499              
500             =head2 $worker->register_function($funcname, $subref)
501              
502             =head2 $worker->register_function($funcname, $timeout, $subref)
503              
504             Registers the function I<$funcname> as being provided by the worker
505             I<$worker>, and advertises these capabilities to all of the job servers
506             defined in this worker.
507              
508             I<$subref> must be a subroutine reference that will be invoked when the
509             worker receives a request for this function. It will be passed a
510             I object representing the job that has been received by the
511             worker.
512              
513             I<$timeout> is an optional parameter specifying how long the jobserver will
514             wait for your subroutine to give an answer. Exceeding this time will result
515             in the jobserver reassigning the task and ignoring your result. This prevents
516             a gimpy worker from ruining the 'user experience' in many situations.
517              
518             The subroutine reference can return a return value, which will be sent back
519             to the job server.
520             =cut
521              
522             sub register_function {
523 2     2 1 358 my $self = shift;
524 2         2 my $func = shift;
525 2 100       6 my $timeout = shift unless (ref $_[0] eq 'CODE');
526 2         2 my $subref = shift;
527              
528 2         6 my $prefix = $self->prefix;
529 2 50       4 my $ability = defined($prefix) ? "$prefix\t$func" : "$func";
530              
531 2         2 my $req;
532 2 100       2 if (defined $timeout) {
533 1         4 $req = Gearman::Util::pack_req_command("can_do_timeout",
534             "$ability\0$timeout");
535 1         2 $self->{timeouts}{$ability} = $timeout;
536             }
537             else {
538 1         3 $req = Gearman::Util::pack_req_command("can_do", $ability);
539             }
540              
541 2         4 $self->_register_all($req);
542 2         7 $self->{can}{$ability} = $subref;
543             } ## end sub register_function
544              
545             =head2 unregister_function($funcname)
546              
547             =cut
548              
549             sub unregister_function {
550 0     0 1 0 my ($self, $func) = @_;
551 0         0 my $prefix = $self->prefix;
552 0 0       0 my $ability = defined($prefix) ? "$prefix\t$func" : "$func";
553              
554 0         0 my $req = Gearman::Util::pack_req_command("cant_do", $ability);
555              
556 0         0 $self->_register_all($req);
557 0         0 delete $self->{can}{$ability};
558             } ## end sub unregister_function
559              
560             #
561             # _register_all($req)
562             #
563             sub _register_all {
564 2     2   2 my ($self, $req) = @_;
565              
566 2         2 foreach my $js (@{ $self->{job_servers} }) {
  2         5  
567 0 0         my $jss = $self->_get_js_sock($js)
568             or next;
569              
570 0 0         unless (Gearman::Util::send_req($jss, \$req)) {
571 0           $self->uncache_sock($js, "write_register_func_error");
572             }
573             } ## end foreach my $js (@{ $self->{...}})
574             } ## end sub _register_all
575              
576             =head2 job_servers(@servers)
577              
578             override L method to skip job server initialization
579             if defined C<$ENV{GEARMAN_WORKER_USE_STDIO}>
580              
581             Calling this method will do nothing in a worker that is running as a child
582             process of a gearman server.
583              
584             =cut
585              
586             sub job_servers {
587 0     0 1   my $self = shift;
588 0 0         return if ($ENV{GEARMAN_WORKER_USE_STDIO});
589              
590 0           return $self->SUPER::job_servers(@_);
591             } ## end sub job_servers
592              
593             1;
594             __END__