File Coverage

blib/lib/Gearman/Worker.pm
Criterion Covered Total %
statement 126 249 50.6
branch 31 122 25.4
condition 4 19 21.0
subroutine 18 22 81.8
pod 7 7 100.0
total 186 419 44.3


line stmt bran cond sub pod time code
1             package Gearman::Worker;
2 11     11   829679 use version;
  11         16474  
  11         71  
3             $Gearman::Worker::VERSION = version->declare("2.002.001"); #TRIAL
4              
5 11     11   926 use strict;
  11         18  
  11         229  
6 11     11   43 use warnings;
  11         16  
  11         299  
7              
8 11     11   42 use base "Gearman::Objects";
  11         18  
  11         5182  
9              
10             =head1 NAME
11              
12             Gearman::Worker - Worker for gearman distributed job system
13              
14             =head1 SYNOPSIS
15              
16             use Gearman::Worker;
17             my $worker = Gearman::Worker->new;
18             $worker->job_servers(
19             '127.0.0.1',
20             {
21             ca_certs => ...,
22             cert_file => ...,
23             host => '10.0.0.1',
24             key_file => ...,
25             port => 4733,
26             socket_cb => sub {...},
27             use_ssl => 1,
28             }
29             );
30             $worker->register_function($funcname => $subref);
31             $worker->work while 1;
32              
33             =head1 DESCRIPTION
34              
35             I is a worker class for the Gearman distributed job system,
36             providing a framework for receiving and serving jobs from a Gearman server.
37              
38             Callers instantiate a I object, register a list of functions
39             and capabilities that they can handle, then enter an event loop, waiting
40             for the server to send jobs.
41              
42             The worker can send a return value back to the server, which then gets
43             sent back to the client that requested the job; or it can simply execute
44             silently.
45              
46             =head1 USAGE
47              
48             =head2 Gearman::Worker->new(%options)
49              
50             Creates a new I object, and returns the object.
51              
52             If I<%options> is provided, initializes the new worker object with the
53             settings in I<%options>, which can contain:
54              
55             =over 4
56              
57             =item * job_servers
58              
59             Calls I (see below) to initialize the list of job
60             servers. It will be ignored if this worker is running as a child
61             process of a gearman server.
62              
63             =item * prefix
64              
65             Calls I (see below) to set the prefix / namespace.
66              
67             =back
68              
69             =head2 $client-Eprefix($prefix)
70              
71             Sets the namespace / prefix for the function names. This is useful
72             for sharing job servers between different applications or different
73             instances of the same application (different development sandboxes for
74             example).
75              
76             The namespace is currently implemented as a simple tab separated
77             concatenation of the prefix and the function name.
78              
79             =head1 EXAMPLES
80              
81             =head2 Summation
82              
83             This is an example worker that receives a request to sum up a list of
84             integers.
85              
86             use Gearman::Worker;
87             use Storable qw( thaw );
88             use List::Util qw( sum );
89             my $worker = Gearman::Worker->new;
90             $worker->job_servers('127.0.0.1');
91             $worker->register_function(sum => sub { sum @{ thaw($_[0]->arg) } });
92             $worker->work while 1;
93              
94             See the I documentation for a sample client sending the
95             I job.
96              
97             =head1 METHODS
98              
99             =cut
100              
101             #TODO: retries?
102             #
103 11     11   4584 use Gearman::Util;
  11         34  
  11         407  
104 11     11   4515 use Gearman::Job;
  11         38  
  11         296  
105 11     11   54 use Carp ();
  11         16  
  11         301  
106              
107             use fields (
108 11         68 'last_connect_fail', # host:port -> unixtime
109             'down_since', # host:port -> unixtime
110             'connecting', # host:port -> unixtime connect started at
111             'can', # ability -> subref (ability is func with optional prefix)
112             'timeouts', # ability -> timeouts
113             'client_id', # random identifier string, no whitespace
114             'parent_pipe', # bool/obj: if we're a child process of a gearman server,
115             # this is socket to our parent process. also means parent
116             # sock can never disconnect or timeout, etc..
117 11     11   50 );
  11         13  
118              
119             BEGIN {
120 11 50 33 11   1841 my $storable = eval { require Storable; 1 }
  11         799  
  11         2549  
121             if !defined &THROW_EXCEPTIONS || THROW_EXCEPTIONS();
122              
123 11   50     38 $storable ||= 0;
124              
125 11 50       47 if (defined &THROW_EXCEPTIONS) {
126 0         0 die "Exceptions support requires Storable: $@";
127             }
128             else {
129 11         588 eval "sub THROW_EXCEPTIONS () { $storable }";
130 11 50       26320 die "Couldn't define THROW_EXCEPTIONS: $@\n" if $@;
131             }
132             } ## end BEGIN
133              
134             sub new {
135 7     7 1 19572 my ($class, %opts) = @_;
136 7         9 my $self = $class;
137 7 50       38 $self = fields::new($class) unless ref $self;
138              
139 7 100       3903 if ($ENV{GEARMAN_WORKER_USE_STDIO}) {
140 1 50       23 open my $sock, '+<&', \*STDIN
141             or die "Unable to dup STDIN to socket for worker to use.";
142 1         4 $self->{job_servers} = [$sock];
143 1         3 $self->{parent_pipe} = $sock;
144              
145 1 50       4 die "Unable to initialize connection to gearmand"
146             unless $self->_on_connect($sock);
147 0 0       0 if ($opts{job_servers}) {
148 0         0 warn join ' ', __PACKAGE__,
149             'ignores job_servers if $ENV{GEARMAN_WORKER_USE_STDIO} is set';
150              
151 0         0 delete($opts{job_servers});
152             } ## end if ($opts{job_servers})
153             } ## end if ($ENV{GEARMAN_WORKER_USE_STDIO...})
154              
155 6         33 $self->SUPER::new(%opts);
156              
157 6         8 $self->{last_connect_fail} = {};
158 6         9 $self->{down_since} = {};
159 6         9 $self->{can} = {};
160 6         7 $self->{timeouts} = {};
161 6         12 $self->{client_id} = join("", map { chr(int(rand(26)) + 97) } (1 .. 30));
  180         233  
162              
163 6         24 return $self;
164             } ## end sub new
165              
166             #
167             # _get_js_sock($js, %opts)
168             #
169             sub _get_js_sock {
170 3     3   2139 my ($self, $js, %opts) = @_;
171 3 100       17 $js || return;
172              
173 2         12 my $js_str = $self->_js_str($js);
174 2         4 my $on_connect = delete $opts{on_connect};
175              
176             # Someday should warn when called with extra opts.
177              
178 2 50       5 warn "getting job server socket: $js_str" if $self->debug;
179              
180             # special case, if we're a child process of a gearman::server
181             # parent process, talking over a unix pipe...
182 2 100       10 return $self->{parent_pipe} if $self->{parent_pipe};
183              
184 1 50       9 if (my $sock = $self->_sock_cache($js)) {
185 0 0       0 return $sock if getpeername($sock);
186              
187             # delete cached sock
188 0         0 $self->_sock_cache($js, undef, 1);
189             } ## end if (my $sock = $self->...)
190              
191 1         3 my $now = time;
192 1         3 my $down_since = $self->{down_since}{$js_str};
193 1 50       2 if ($down_since) {
194 0 0       0 warn "job server down since $down_since" if $self->debug;
195              
196 0         0 my $down_for = $now - $down_since;
197 0 0       0 my $retry_period = $down_for > 60 ? 30 : (int($down_for / 2) + 1);
198 0 0       0 if ($self->{last_connect_fail}{$js_str} > $now - $retry_period) {
199 0         0 return;
200             }
201             } ## end if ($down_since)
202              
203 1 50       3 warn "connecting to '$js_str'" if $self->debug;
204              
205 1         6 my $sock = $self->socket($js, 1);
206 1 50       4 unless ($sock) {
207 1   33     9 $self->{down_since}{$js_str} ||= $now;
208 1         2 $self->{last_connect_fail}{$js_str} = $now;
209              
210 1         5 return;
211             } ## end unless ($sock)
212              
213 0         0 delete $self->{last_connect_fail}{$js_str};
214 0         0 delete $self->{down_since}{$js_str};
215              
216 0         0 $sock->autoflush(1);
217 0         0 $self->sock_nodelay($sock);
218              
219 0         0 $self->_sock_cache($js, $sock);
220              
221 0 0 0     0 unless ($self->_on_connect($sock) && $on_connect && $on_connect->($sock)) {
      0        
222              
223             # delete
224 0         0 $self->_sock_cache($js, undef, 1);
225 0         0 return;
226             } ## end unless ($self->_on_connect...)
227              
228 0         0 return $sock;
229             } ## end sub _get_js_sock
230              
231             #
232             # _on_connect($sock)
233             #
234             # Housekeeping things to do on connection to a server. Method call
235             # with one argument being the 'socket' we're going to take care of.
236             # returns true on success, false on failure.
237             #
238             sub _on_connect {
239 2     2   427 my ($self, $sock) = @_;
240              
241             my $cid_req
242 2         11 = Gearman::Util::pack_req_command("set_client_id", $self->{client_id});
243 2 50       9 return undef unless Gearman::Util::send_req($sock, \$cid_req);
244              
245             # get this socket's state caught-up
246 0         0 foreach my $ability (keys %{ $self->{can} }) {
  0         0  
247 0         0 my $timeout = $self->{timeouts}->{$ability};
248 0 0       0 unless ($self->_set_ability($sock, $ability, $timeout)) {
249 0         0 return;
250             }
251             } ## end foreach my $ability (keys %...)
252              
253 0         0 return 1;
254             } ## end sub _on_connect
255              
256             #
257             # _set_ability($sock, $ability, $timeout)
258             #
259             sub _set_ability {
260 3     3   5 my ($self, $sock, $ability, $timeout) = @_;
261 3         4 my $req;
262 3 100       7 if (defined $timeout) {
263 1         6 $req = Gearman::Util::pack_req_command("can_do_timeout",
264             "$ability\0$timeout");
265             }
266             else {
267 2         7 $req = Gearman::Util::pack_req_command("can_do", $ability);
268             }
269 3         11 return Gearman::Util::send_req($sock, \$req);
270             } ## end sub _set_ability
271              
272             =head2 reset_abilities
273              
274             tell all the jobservers that this worker can't do anything
275              
276             =cut
277              
278             sub reset_abilities {
279 1     1 1 405 my $self = shift;
280 1         4 my $req = Gearman::Util::pack_req_command("reset_abilities");
281 1         3 foreach my $js (@{ $self->{job_servers} }) {
  1         4  
282 0 0       0 my $jss = $self->_get_js_sock($js)
283             or next;
284              
285 0 0       0 unless (Gearman::Util::send_req($jss, \$req)) {
286 0         0 $self->uncache_sock($js, "err_write_reset_abilities");
287             }
288             } ## end foreach my $js (@{ $self->{...}})
289              
290 1         3 $self->{can} = {};
291 1         5 $self->{timeouts} = {};
292             } ## end sub reset_abilities
293              
294             =head2 uncache_sock($ipport, $reason)
295              
296             close TCP connection
297              
298             =cut
299              
300             sub uncache_sock {
301 0     0 1 0 my ($self, $ipport, $reason) = @_;
302              
303             # we can't reconnect as a child process, so all we can do is die and hope our
304             # parent process respawns us...
305             die "Error/timeout talking to gearman parent process: [$reason]"
306 0 0       0 if $self->{parent_pipe};
307              
308             # normal case, we just close this TCP connection and we'll reconnect later.
309             # delete cached sock
310 0         0 $self->_sock_cache($ipport, undef, 1);
311             } ## end sub uncache_sock
312              
313             =head2 work(%opts)
314              
315             Endless loop takes a job and wait for the next one.
316             You can pass "stop_if", "on_start", "on_complete" and "on_fail" callbacks in I<%opts>.
317              
318             =cut
319              
320             sub work {
321 1     1 1 514 my ($self, %opts) = @_;
322 1   50 0   5 my $stop_if = delete $opts{'stop_if'} || sub {0};
  0         0  
323 1         3 my $complete_cb = delete $opts{on_complete};
324 1         2 my $fail_cb = delete $opts{on_fail};
325 1         2 my $start_cb = delete $opts{on_start};
326 1 50       4 die "Unknown opts" if %opts;
327              
328 1         5 my $grab_req = Gearman::Util::pack_req_command("grab_job");
329 1         4 my $presleep_req = Gearman::Util::pack_req_command("pre_sleep");
330              
331 1         2 my $last_job_time;
332              
333             my $on_connect = sub {
334 0     0   0 return Gearman::Util::send_req($_[0], \$presleep_req);
335 1         3 };
336              
337 1         8 my %js_map = map { $self->_js_str($_) => $_ } $self->job_servers;
  0         0  
338              
339             # "Active" job servers are servers that have woken us up and should be
340             # queried to see if they have jobs for us to handle. On our first pass
341             # in the loop we contact all servers.
342 1         4 my %active_js = map { $_ => 1 } keys(%js_map);
  0         0  
343              
344             # ( js => last_update_time, ... )
345 1         2 my %last_update_time;
346              
347 1         2 while (1) {
348              
349             # "Jobby" job servers are the set of server which we will contact
350             # on this pass through the loop, because we need to clear and use
351             # the "Active" set to plan for our next pass through the loop.
352 1         3 my @jobby_js = keys %active_js;
353              
354 1         2 %active_js = ();
355              
356 1         1 my $js_count = @jobby_js;
357 1         3 my $js_offset = int(rand($js_count));
358 1         2 my $is_idle = 0;
359              
360 1         5 for (my $i = 0; $i < $js_count; $i++) {
361 0         0 my $js_index = ($i + $js_offset) % $js_count;
362 0         0 my $js_str = $jobby_js[$js_index];
363 0         0 my $js = $js_map{$js_str};
364 0 0       0 my $jss = $self->_get_js_sock($js, on_connect => $on_connect)
365             or next;
366              
367             # TODO: add an optional sleep in here for the test suite
368             # to test gearmand server going away here. (SIGPIPE on
369             # send_req, etc) this testing has been done manually, at
370             # least.
371              
372 0 0       0 unless (Gearman::Util::send_req($jss, \$grab_req)) {
373 0 0 0     0 if ($!{EPIPE} && $self->{parent_pipe}) {
374              
375             # our parent process died, so let's just quit
376             # gracefully.
377 0         0 exit(0);
378             } ## end if ($!{EPIPE} && $self...)
379 0         0 $self->uncache_sock($js, "grab_job_timeout");
380 0         0 delete $last_update_time{$js_str};
381 0         0 next;
382             } ## end unless (Gearman::Util::send_req...)
383              
384             # if we're a child process talking over a unix pipe, give more
385             # time, since we know there are no network issues, and also
386             # because on failure, we can't "reconnect". all we can do is
387             # die and hope our parent process respawns us.
388 0 0       0 my $timeout = $self->{parent_pipe} ? 5 : 0.50;
389 0 0       0 unless (Gearman::Util::wait_for_readability($jss->fileno, $timeout))
390             {
391 0         0 $self->uncache_sock($js, "grab_job_timeout");
392 0         0 delete $last_update_time{$js_str};
393 0         0 next;
394             } ## end unless (Gearman::Util::wait_for_readability...)
395              
396 0         0 my $res;
397             do {
398 0         0 my $err;
399 0         0 $res = Gearman::Util::read_res_packet($jss, \$err);
400 0 0       0 unless ($res) {
401 0         0 $self->uncache_sock($js, "read_res_error");
402 0         0 delete $last_update_time{$js_str};
403 0         0 next;
404             }
405 0         0 } while ($res->{type} eq "noop");
406              
407 0 0       0 if ($res->{type} eq "no_job") {
408 0 0       0 unless (Gearman::Util::send_req($jss, \$presleep_req)) {
409 0         0 delete $last_update_time{$js_str};
410 0         0 $self->uncache_sock($js, "write_presleep_error");
411             }
412 0         0 $last_update_time{$js} = time;
413 0         0 next;
414             } ## end if ($res->{type} eq "no_job")
415              
416 0 0       0 unless ($res->{type} eq "job_assign") {
417 0         0 my $msg = "Uh, wasn't expecting a $res->{type} packet.";
418 0 0       0 if ($res->{type} eq "error") {
419 0         0 $msg .= " [${$res->{blobref}}]\n";
  0         0  
420 0         0 $msg =~ s/\0/ -- /g;
421             }
422 0         0 die $msg;
423             } ## end unless ($res->{type} eq "job_assign")
424              
425 0 0       0 ${ $res->{'blobref'} } =~ s/^(.+?)\0(.+?)\0//
  0         0  
426             or die "Uh, regexp on job_assign failed";
427 0         0 my ($handle, $ability) = ($1, $2);
428             my $job
429 0         0 = Gearman::Job->new($ability, $res->{'blobref'}, $handle, $jss);
430              
431 0         0 my $jobhandle = join("//", $js_str, $job->handle);
432 0 0       0 $start_cb->($jobhandle) if $start_cb;
433              
434 0         0 my $handler = $self->{can}{$ability};
435 0         0 my $ret = eval { $handler->($job); };
  0         0  
436 0         0 my $err = $@;
437 0 0       0 warn "Job '$ability' died: $err" if $err;
438              
439 0         0 $last_update_time{$js_str} = $last_job_time = time();
440              
441 0 0       0 if (THROW_EXCEPTIONS && $err) {
442 0         0 my $exception_req
443             = Gearman::Util::pack_req_command("work_exception",
444             join("\0", $handle, Storable::nfreeze(\$err)));
445 0 0       0 unless (Gearman::Util::send_req($jss, \$exception_req)) {
446 0         0 $self->uncache_sock($js, "write_res_error");
447 0         0 next;
448             }
449             } ## end if (THROW_EXCEPTIONS &&...)
450              
451 0         0 my $work_req;
452 0 0       0 if (defined $ret) {
453 0 0       0 my $rv = ref $ret ? $$ret : $ret;
454 0         0 $work_req = Gearman::Util::pack_req_command("work_complete",
455             "$handle\0$rv");
456 0 0       0 $complete_cb->($jobhandle, $ret) if $complete_cb;
457             } ## end if (defined $ret)
458             else {
459 0         0 $work_req
460             = Gearman::Util::pack_req_command("work_fail", $handle);
461 0 0       0 $fail_cb->($jobhandle, $err) if $fail_cb;
462             }
463              
464 0 0       0 unless (Gearman::Util::send_req($jss, \$work_req)) {
465 0         0 $self->uncache_sock($js, "write_res_error");
466 0         0 next;
467             }
468              
469 0         0 $active_js{$js_str} = 1;
470             } ## end for (my $i = 0; $i < $js_count...)
471              
472 1         2 my @jss;
473              
474 1         4 foreach my $js_str (keys(%js_map)) {
475             my $jss
476 0 0       0 = $self->_get_js_sock($js_map{$js_str},
477             on_connect => $on_connect)
478             or next;
479 0         0 push @jss, [$js_str, $jss];
480             } ## end foreach my $js_str (keys(%js_map...))
481              
482 1         1 $is_idle = 1;
483 1         2 my $wake_vec = '';
484              
485 1         3 foreach my $j (@jss) {
486 0         0 (undef, my $_jss) = @{$j};
  0         0  
487 0         0 my $fd = $_jss->fileno;
488 0         0 vec($wake_vec, $fd, 1) = 1;
489             }
490              
491 1 50       6 my $timeout = keys(%active_js) ? 0 : (10 + rand(2));
492              
493             # chill for some arbitrary time until we're woken up again
494 1         11324493 my $nready = select(my $wout = $wake_vec, undef, undef, $timeout);
495              
496 1 50       14 if ($nready) {
497 0         0 foreach my $j (@jss) {
498 0         0 my ($js_str, $jss) = @{$j};
  0         0  
499 0         0 my $fd = $jss->fileno;
500 0 0       0 $active_js{$js_str} = 1
501             if vec($wout, $fd, 1);
502             } ## end foreach my $j (@jss)
503             } ## end if ($nready)
504              
505 1 50       6 $is_idle = 0 if keys %active_js;
506              
507 1 50       21 return if $stop_if->($is_idle, $last_job_time);
508              
509 0         0 my $update_since = time - (15 + rand 60);
510              
511 0         0 while (my ($js_str, $last_update) = each %last_update_time) {
512 0 0       0 $active_js{$js_str} = 1 if $last_update < $update_since;
513             }
514             } ## end while (1)
515              
516             } ## end sub work
517              
518             =head2 $worker->register_function($funcname, $subref)
519              
520             =head2 $worker->register_function($funcname, $timeout, $subref)
521              
522             Registers the function I<$funcname> as being provided by the worker
523             I<$worker>, and advertises these capabilities to all of the job servers
524             defined in this worker.
525              
526             I<$subref> must be a subroutine reference that will be invoked when the
527             worker receives a request for this function. It will be passed a
528             I object representing the job that has been received by the
529             worker.
530              
531             I<$timeout> is an optional parameter specifying how long the jobserver will
532             wait for your subroutine to give an answer. Exceeding this time will result
533             in the jobserver reassigning the task and ignoring your result. This prevents
534             a gimpy worker from ruining the 'user experience' in many situations.
535              
536             The subroutine reference can return a return value, which will be sent back
537             to the job server.
538             =cut
539              
540             sub register_function {
541 2     2 1 601 my $self = shift;
542 2         4 my $func = shift;
543 2 100       9 my $timeout = shift unless (ref $_[0] eq 'CODE');
544 2         3 my $subref = shift;
545              
546 2         7 my $prefix = $self->prefix;
547 2 50       7 my $ability = defined($prefix) ? join("\t", $prefix, $func) : $func;
548              
549 2         3 my $req;
550 2 100       5 if (defined $timeout) {
551 1         5 $req = Gearman::Util::pack_req_command("can_do_timeout",
552             "$ability\0$timeout");
553 1         3 $self->{timeouts}{$ability} = $timeout;
554             }
555             else {
556 1         4 $req = Gearman::Util::pack_req_command("can_do", $ability);
557             }
558              
559 2         6 $self->_register_all($req);
560 2         10 $self->{can}{$ability} = $subref;
561             } ## end sub register_function
562              
563             =head2 unregister_function($funcname)
564              
565             =cut
566              
567             sub unregister_function {
568 0     0 1 0 my ($self, $func) = @_;
569 0         0 my $prefix = $self->prefix;
570 0 0       0 my $ability = defined($prefix) ? join("\t", $prefix, $func) : $func;
571              
572 0         0 my $req = Gearman::Util::pack_req_command("cant_do", $ability);
573              
574 0         0 $self->_register_all($req);
575 0         0 delete $self->{can}{$ability};
576             } ## end sub unregister_function
577              
578             #
579             # _register_all($req)
580             #
581             sub _register_all {
582 2     2   3 my ($self, $req) = @_;
583              
584 2         5 foreach my $js ($self->job_servers()) {
585 0 0       0 my $jss = $self->_get_js_sock($js)
586             or next;
587              
588 0 0       0 unless (Gearman::Util::send_req($jss, \$req)) {
589 0         0 $self->uncache_sock($js, "write_register_func_error");
590             }
591             } ## end foreach my $js ($self->job_servers...)
592             } ## end sub _register_all
593              
594             =head2 job_servers(@servers)
595              
596             override L method to skip job server initialization
597             if defined C<$ENV{GEARMAN_WORKER_USE_STDIO}>
598              
599             Calling this method will do nothing in a worker that is running as a child
600             process of a gearman server.
601              
602             =cut
603              
604             sub job_servers {
605 3     3 1 4 my $self = shift;
606 3 50       9 return if ($ENV{GEARMAN_WORKER_USE_STDIO});
607              
608 3         14 return $self->SUPER::job_servers(@_);
609             } ## end sub job_servers
610              
611             1;
612             __END__