File Coverage

blib/lib/Gearman/Worker.pm
Criterion Covered Total %
statement 98 271 36.1
branch 22 122 18.0
condition 2 11 18.1
subroutine 20 32 62.5
pod 11 11 100.0
total 153 447 34.2


line stmt bran cond sub pod time code
1             package Gearman::Worker;
2 11     11   661833 use version;
  11         12970  
  11         55  
3             $Gearman::Worker::VERSION = version->declare("2.003_001");
4              
5 11     11   731 use strict;
  11         14  
  11         162  
6 11     11   35 use warnings;
  11         12  
  11         231  
7              
8 11     11   33 use base "Gearman::Objects";
  11         13  
  11         3712  
9              
10             =head1 NAME
11              
12             Gearman::Worker - Worker for gearman distributed job system
13              
14             =head1 SYNOPSIS
15              
16             use Gearman::Worker;
17             my $worker = Gearman::Worker->new;
18             $worker->job_servers(
19             '127.0.0.1',
20             {
21             ca_certs => ...,
22             cert_file => ...,
23             host => '10.0.0.1',
24             key_file => ...,
25             port => 4733,
26             socket_cb => sub {...},
27             use_ssl => 1,
28             }
29             );
30             $worker->register_function($funcname => $subref);
31             $worker->work while 1;
32              
33             =head1 DESCRIPTION
34              
35             I is a worker class for the Gearman distributed job system,
36             providing a framework for receiving and serving jobs from a Gearman server.
37              
38             Callers instantiate a I object, register a list of functions
39             and capabilities that they can handle, then enter an event loop, waiting
40             for the server to send jobs.
41              
42             The worker can send a return value back to the server, which then gets
43             sent back to the client that requested the job; or it can simply execute
44             silently.
45              
46             =head1 USAGE
47              
48             =head2 Gearman::Worker->new(%options)
49              
50             Creates a new I object, and returns the object.
51              
52             If I<%options> is provided, initializes the new worker object with the
53             settings in I<%options>, which can contain:
54              
55             =over 4
56              
57             =item * job_servers
58              
59             Calls I (see below) to initialize the list of job
60             servers. It will be ignored if this worker is running as a child
61             process of a gearman server.
62              
63             =item * prefix
64              
65             Calls I (see below) to set the prefix / namespace.
66              
67             =back
68              
69             =head2 $client-Eprefix($prefix)
70              
71             Sets the namespace / prefix for the function names. This is useful
72             for sharing job servers between different applications or different
73             instances of the same application (different development sandboxes for
74             example).
75              
76             The namespace is currently implemented as a simple tab separated
77             concatenation of the prefix and the function name.
78              
79             =head1 EXAMPLES
80              
81             =head2 Summation
82              
83             This is an example worker that receives a request to sum up a list of
84             integers.
85              
86             use Gearman::Worker;
87             use Storable qw( thaw );
88             use List::Util qw( sum );
89             my $worker = Gearman::Worker->new;
90             $worker->job_servers('127.0.0.1');
91             $worker->register_function(sum => sub { sum @{ thaw($_[0]->arg) } });
92             $worker->work while 1;
93              
94             See the I documentation for a sample client sending the
95             I job.
96              
97             =head1 METHODS
98              
99             =cut
100              
101             #TODO: retries?
102             #
103 11     11   62 use Carp ();
  11         13  
  11         140  
104 11     11   4312 use Gearman::Util ();
  11         22  
  11         262  
105 11     11   3904 use Gearman::Job;
  11         16  
  11         247  
106 11     11   582 use Storable ();
  11         2192  
  11         261  
107              
108             use fields (
109 11         39 'last_connect_fail', # host:port -> unixtime
110             'down_since', # host:port -> unixtime
111             'connecting', # host:port -> unixtime connect started at
112             'can', # ability -> subref (ability is func with optional prefix)
113             'timeouts', # ability -> timeouts
114             'client_id', # random identifier string, no whitespace
115             'parent_pipe', # bool/obj: if we're a child process of a gearman server,
116             # this is socket to our parent process. also means parent
117             # sock can never disconnect or timeout, etc..
118 11     11   39 );
  11         12  
119              
120             sub new {
121 6     6 1 11185 my ($class, %opts) = @_;
122 6         8 my $self = $class;
123 6 50       24 $self = fields::new($class) unless ref $self;
124              
125 6 100       2843 if ($ENV{GEARMAN_WORKER_USE_STDIO}) {
126 1 50       19 open my $sock, '+<&', \*STDIN
127             or die "Unable to dup STDIN to socket for worker to use.";
128 1         2 $self->{job_servers} = [$sock];
129 1         1 $self->{parent_pipe} = $sock;
130              
131 1 50       3 die "Unable to initialize connection to gearmand"
132             unless $self->_on_connect($sock);
133 0 0       0 if ($opts{job_servers}) {
134 0         0 warn join ' ', __PACKAGE__,
135             'ignores job_servers if $ENV{GEARMAN_WORKER_USE_STDIO} is set';
136              
137 0         0 delete($opts{job_servers});
138             } ## end if ($opts{job_servers})
139             } ## end if ($ENV{GEARMAN_WORKER_USE_STDIO...})
140              
141 5         19 $self->SUPER::new(%opts);
142              
143 5         7 $self->{last_connect_fail} = {};
144 5         6 $self->{down_since} = {};
145 5         5 $self->{can} = {};
146 5         5 $self->{timeouts} = {};
147 5         5 $self->{client_id} = join("", map { chr(int(rand(26)) + 97) } (1 .. 30));
  150         160  
148              
149 5         16 return $self;
150             } ## end sub new
151              
152             =head2 reset_abilities
153              
154             tell all the jobservers that this worker can't do anything
155              
156             =cut
157              
158             sub reset_abilities {
159 1     1 1 253 my $self = shift;
160 1         3 my $req = _rc("reset_abilities");
161 1         2 foreach my $js (@{ $self->{job_servers} }) {
  1         4  
162 0 0       0 my $jss = $self->_get_js_sock($js)
163             or next;
164              
165 0 0       0 unless (_send($jss, $req)) {
166 0         0 $self->_uncache_sock($js, "err_write_reset_abilities");
167             }
168             } ## end foreach my $js (@{ $self->{...}})
169              
170 1         2 $self->{can} = {};
171 1         4 $self->{timeouts} = {};
172             } ## end sub reset_abilities
173              
174             =head2 _uncache_sock($js, $reason)
175              
176             close TCP connection
177              
178             =cut
179              
180             sub _uncache_sock {
181 0     0   0 my ($self, $js, $reason) = @_;
182              
183             # we can't reconnect as a child process, so all we can do is die and hope our
184             # parent process respawns us...
185             die "Error/timeout talking to gearman parent process: [$reason]"
186 0 0       0 if $self->{parent_pipe};
187              
188             # normal case, we just close this TCP connection and we'll reconnect later.
189             # delete cached sock
190 0         0 $self->_sock_cache($js, undef, 1);
191             } ## end sub _uncache_sock
192              
193             =head2 work(%opts)
194              
195             Endless loop takes a job and wait for the next one.
196             You can pass "stop_if", "on_start", "on_complete" and "on_fail" callbacks in I<%opts>.
197              
198             =cut
199              
200             my %job_done;
201              
202             sub work {
203 0     0 1 0 my ($self, %opts) = @_;
204 0   0 0   0 my $stop_if = delete($opts{stop_if}) || sub {0};
  0         0  
205 0         0 my $complete_cb = delete $opts{on_complete};
206 0         0 my $fail_cb = delete $opts{on_fail};
207 0         0 my $start_cb = delete $opts{on_start};
208 0 0       0 die "Unknown opts" if %opts;
209              
210 0         0 my $grab_req = _rc("grab_job");
211 0         0 my $presleep_req = _rc("pre_sleep");
212              
213 0         0 my $last_job_time;
214              
215             my $on_connect = sub {
216 0     0   0 return _send($_[0], $presleep_req);
217 0         0 };
218              
219 0         0 my %js_map = map { $self->_js_str($_) => $_ } $self->job_servers;
  0         0  
220              
221             # "Active" job servers are servers that have woken us up and should be
222             # queried to see if they have jobs for us to handle. On our first pass
223             # in the loop we contact all servers.
224 0         0 my %active_js = map { $_ => 1 } keys(%js_map);
  0         0  
225              
226             # ( js => last_update_time, ... )
227 0         0 my %last_update_time;
228              
229 0         0 while (1) {
230              
231             # "Jobby" job servers are the set of server which we will contact
232             # on this pass through the loop, because we need to clear and use
233             # the "Active" set to plan for our next pass through the loop.
234 0         0 my @jobby_js = keys %active_js;
235              
236 0         0 %active_js = ();
237              
238 0         0 my $js_count = @jobby_js;
239 0         0 my $js_offset = int(rand($js_count));
240 0         0 my $is_idle = 0;
241              
242 0         0 for (my $i = 0; $i < $js_count; $i++) {
243 0         0 my $js_index = ($i + $js_offset) % $js_count;
244 0         0 my $js_str = $jobby_js[$js_index];
245 0         0 my $js = $js_map{$js_str};
246 0 0       0 my $jss = $self->_get_js_sock($js, on_connect => $on_connect)
247             or next;
248              
249             # TODO: add an optional sleep in here for the test suite
250             # to test gearmand server going away here. (SIGPIPE on
251             # send_req, etc) this testing has been done manually, at
252             # least.
253              
254 0 0       0 unless (_send($jss, $grab_req)) {
255 0 0 0     0 if ($!{EPIPE} && $self->{parent_pipe}) {
256              
257             # our parent process died, so let's just quit
258             # gracefully.
259 0         0 exit(0);
260             } ## end if ($!{EPIPE} && $self...)
261              
262 0         0 $self->_uncache_sock($js, "grab_job_timeout");
263 0         0 delete $last_update_time{$js_str};
264 0         0 next;
265             } ## end unless (_send($jss, $grab_req...))
266              
267             # if we're a child process talking over a unix pipe, give more
268             # time, since we know there are no network issues, and also
269             # because on failure, we can't "reconnect". all we can do is
270             # die and hope our parent process respawns us.
271 0 0       0 my $timeout = $self->{parent_pipe} ? 5 : 0.50;
272 0 0       0 unless (Gearman::Util::wait_for_readability($jss->fileno, $timeout))
273             {
274 0         0 $self->_uncache_sock($js, "grab_job_timeout");
275 0         0 delete $last_update_time{$js_str};
276 0         0 next;
277             } ## end unless (Gearman::Util::wait_for_readability...)
278              
279 0         0 my $res;
280             do {
281 0         0 my $err;
282 0         0 $res = Gearman::Util::read_res_packet($jss, \$err);
283 0 0       0 unless ($res) {
284 0         0 $self->_uncache_sock($js, "read_res_error");
285 0         0 delete $last_update_time{$js_str};
286 0         0 next;
287             }
288 0         0 } while ($res->{type} eq "noop");
289              
290 0 0       0 if ($res->{type} eq "no_job") {
291 0 0       0 unless (_send($jss, $presleep_req)) {
292 0         0 delete $last_update_time{$js_str};
293 0         0 $self->_uncache_sock($js, "write_presleep_error");
294             }
295 0         0 $last_update_time{$js_str} = time;
296 0         0 next;
297             } ## end if ($res->{type} eq "no_job")
298              
299 0 0       0 unless ($res->{type} eq "job_assign") {
300 0         0 my $msg = "Uh, wasn't expecting a $res->{type} packet.";
301              
302 0 0       0 if ($res->{type} eq "error") {
303 0         0 $msg .= " [${$res->{blobref}}]\n";
  0         0  
304 0         0 $msg =~ s/\0/ -- /g;
305             }
306 0         0 die $msg;
307             } ## end unless ($res->{type} eq "job_assign")
308              
309 0 0       0 ${ $res->{blobref} } =~ s/^(.+?)\0(.+?)\0//
  0         0  
310             or die "Uh, regexp on job_assign failed";
311 0         0 my ($handle, $ability) = ($1, $2);
312             my $job = Gearman::Job->new(
313             func => $ability,
314             argref => $res->{blobref},
315 0         0 handle => $handle,
316             jss => $jss,
317             js => $js
318             );
319              
320 0         0 my $jobhandle = join("//", $js_str, $job->handle);
321 0 0       0 $start_cb->($jobhandle) if $start_cb;
322              
323 0         0 my $handler = $self->{can}{$ability};
324 0         0 my $ret = eval { $handler->($job); };
  0         0  
325 0         0 my $err = $@;
326 0 0       0 warn "Job '$ability' died: $err" if $err;
327              
328 0         0 $last_update_time{$js_str} = $last_job_time = time();
329 0 0       0 if ($err) {
330              
331             #TODO should be work_exception replaced by work_fail?
332             # see 75b65e1
333 0         0 my $exception_req
334             = _rc("work_exception",
335             _join0($handle, Storable::nfreeze(\$err)));
336 0 0       0 unless (_send($jss, $exception_req)) {
337 0         0 $self->_uncache_sock($js, "write_res_error");
338 0         0 next;
339             }
340             } ## end if ($err)
341              
342 0 0       0 if (!defined $job_done{ $job->handle }) {
343 0 0       0 if (defined $ret) {
344 0         0 $self->send_work_complete($job, $ret);
345             }
346             else {
347 0         0 $self->send_work_fail($job);
348             }
349             } ## end if (!defined $job_done...)
350              
351 0         0 my $done = delete $job_done{ $job->handle };
352 0 0       0 if ($done->{command} eq "work_complete") {
353 0 0       0 $complete_cb->($jobhandle, $ret) if $complete_cb;
354             }
355             else {
356 0 0       0 $fail_cb->($jobhandle, $err) if $fail_cb;
357             }
358              
359 0 0       0 unless ($done->{result}) {
360 0         0 $self->_uncache_sock($js, "write_res_error");
361 0         0 next;
362             }
363              
364 0         0 $active_js{$js_str} = 1;
365             } ## end for (my $i = 0; $i < $js_count...)
366              
367 0         0 my @jss;
368              
369 0         0 foreach my $js_str (keys(%js_map)) {
370             my $jss
371 0 0       0 = $self->_get_js_sock($js_map{$js_str},
372             on_connect => $on_connect)
373             or next;
374 0         0 push @jss, [$js_str, $jss];
375             } ## end foreach my $js_str (keys(%js_map...))
376              
377 0         0 $is_idle = 1;
378 0         0 my $wake_vec = '';
379              
380 0         0 foreach my $j (@jss) {
381 0         0 (undef, my $_jss) = @{$j};
  0         0  
382 0         0 my $fd = $_jss->fileno;
383 0         0 vec($wake_vec, $fd, 1) = 1;
384             }
385              
386 0 0       0 my $timeout = keys(%active_js) ? 0 : (10 + rand(2));
387              
388             # chill for some arbitrary time until we're woken up again
389 0         0 my $nready = select(my $wout = $wake_vec, undef, undef, $timeout);
390              
391 0 0       0 if ($nready) {
392 0         0 foreach my $j (@jss) {
393 0         0 my ($js_str, $jss) = @{$j};
  0         0  
394 0         0 my $fd = $jss->fileno;
395 0 0       0 $active_js{$js_str} = 1
396             if vec($wout, $fd, 1);
397             } ## end foreach my $j (@jss)
398             } ## end if ($nready)
399              
400 0 0       0 $is_idle = 0 if keys %active_js;
401              
402 0 0       0 return if $stop_if->($is_idle, $last_job_time);
403              
404 0         0 my $update_since = time - (15 + rand 60);
405              
406 0         0 while (my ($js_str, $last_update) = each %last_update_time) {
407 0 0       0 $active_js{$js_str} = 1 if $last_update < $update_since;
408             }
409             } ## end while (1)
410              
411             } ## end sub work
412              
413             =head2 $worker->register_function($funcname, $subref)
414              
415             =head2 $worker->register_function($funcname, $timeout, $subref)
416              
417             Registers the function C<$funcname> as being provided by the worker
418             C<$worker>, and advertises these capabilities to all of the job servers
419             defined in this worker.
420              
421             C<$subref> must be a subroutine reference that will be invoked when the
422             worker receives a request for this function. It will be passed a
423             L object representing the job that has been received by the
424             worker.
425              
426             C<$timeout> is an optional parameter specifying how long the jobserver will
427             wait for your subroutine to give an answer. Exceeding this time will result
428             in the jobserver reassigning the task and ignoring your result. This prevents
429             a gimpy worker from ruining the 'user experience' in many situations.
430              
431             B C<< _register_all(can_do request) >>
432              
433             =cut
434              
435             sub register_function {
436 2     2 1 382 my $self = shift;
437 2         3 my $func = shift;
438 2 100       7 my $timeout = shift unless (ref $_[0] eq 'CODE');
439 2         3 my $subref = shift;
440              
441 2         9 my $ability = $self->func($func);
442              
443 2         1 my $req;
444 2 100       5 if (defined $timeout) {
445 1         3 $req = _rc("can_do_timeout", _join0($ability, $timeout));
446 1         2 $self->{timeouts}{$ability} = $timeout;
447             }
448             else {
449 1         2 $req = _rc("can_do", $ability);
450             }
451              
452 2         4 $self->{can}{$ability} = $subref;
453              
454 2         6 return $self->_register_all($req);
455             } ## end sub register_function
456              
457             =head2 unregister_function($funcname)
458              
459             send cant_do C<$funcname> request to L
460              
461             B C<< _register_all(cant_do) >>
462              
463             =cut
464              
465             sub unregister_function {
466 0     0 1 0 my ($self, $func) = @_;
467 0         0 my $ability = $self->func($func);
468 0         0 delete $self->{can}{$ability};
469              
470 0         0 my $req = _rc("cant_do", $ability);
471 0         0 return $self->_register_all($req);
472             } ## end sub unregister_function
473              
474             =head2 job_servers(@servers)
475              
476             Override L method to skip job server initialization if
477             wokring with L.
478              
479             Calling this method will do nothing in a worker that is running as a child
480             process of a gearman server.
481              
482             =cut
483              
484             sub job_servers {
485 2     2 1 2 my $self = shift;
486 2 50       7 return if ($ENV{GEARMAN_WORKER_USE_STDIO});
487              
488 2         9 return $self->SUPER::job_servers(@_);
489             } ## end sub job_servers
490              
491             =head2 send_work_complete($job, $v)
492              
493             notify the server (and listening clients) that job completed successfully
494              
495             =cut
496              
497             sub send_work_complete {
498 0     0 1 0 return shift->_finish_job_request("work_complete", @_);
499             }
500              
501             =head2 send_work_data($job, $data)
502              
503             Use this method to update the client with data from a running job.
504              
505             =cut
506              
507             sub send_work_data {
508 0     0 1 0 my ($self, $job, $data) = @_;
509             return $self->_job_request("work_data", $job,
510 0 0       0 ref($data) ? ${$data} : $data);
  0         0  
511             }
512              
513             =head2 send_work_warning($job, $message)
514              
515             Use this method to send a warning C<$message> to the server (and any listening clients) with regard to the running C.
516              
517             =cut
518              
519             sub send_work_warning {
520 0     0 1 0 my ($self, $job, $msg) = @_;
521 0         0 return $self->_job_request("work_warning", $job, $msg);
522             }
523              
524             # =head2 send_work_exception($job, $exception)
525              
526             # Use this method to notify the server (and any listening clients) that the C failed with the given C<$exception>.
527              
528             # =cut
529              
530             # sub send_work_exception {
531             # my ($self, $job, $exc) = @_;
532             # return $self->_job_request("work_exception", $job, $exc);
533             # }
534              
535             =head2 send_work_fail($job, [$message])
536              
537             Use this method to notify the server (and any listening clients) that the job failed.
538              
539             =cut
540              
541             sub send_work_fail {
542 0     0 1 0 my ($self) = shift;
543 0         0 return $self->_finish_job_request("work_fail", @_);
544             }
545              
546             =head2 send_work_status($job, $numerator, $denominator)
547              
548             Use this method to send periodically to the server status update for long running jobs to update the percentage
549             complete.
550              
551             =cut
552              
553             sub send_work_status {
554 0     0 1 0 my ($self, $job, $numerator, $denominator) = @_;
555 0         0 return $self->_job_request("work_status", $job, $numerator, $denominator);
556             }
557              
558             # _finish_job_request($cmd, $job, [$v])
559             #
560             # send some data or message to the client for finished job
561             # $cmd = work_complete || work_fail
562             #
563             sub _finish_job_request {
564 0     0   0 my ($self, $cmd, $job, $v) = @_;
565 0 0       0 my $res = $self->_job_request($cmd, $job, ref($v) ? ${$v} : $v);
  0         0  
566              
567             # set job done flag because work method check it
568 0         0 $job_done{ $job->handle } = { command => $cmd, result => $res };
569              
570 0         0 return $res;
571             } ## end sub _finish_job_request
572              
573             # _job_request($cmd, $job, [$v])
574             #
575             # send some data to the client for the running job
576             #
577             sub _job_request {
578 0     0   0 my ($self, $cmd, $job, $v) = @_;
579 0 0       0 my $req = _rc($cmd, $v ? _join0($job->handle, $v) : $job->handle);
580              
581 0         0 return _send($job->{jss}, $req);
582             } ## end sub _job_request
583              
584             #
585             # _register_all($req)
586             #
587             sub _register_all {
588 2     2   1 my ($self, $req) = @_;
589              
590 2         3 my $count = 0;
591 2         4 my @job_servers = $self->job_servers();
592 2         4 foreach my $js (@job_servers) {
593 0 0       0 my $jss = $self->_get_js_sock($js)
594             or next;
595              
596 0 0       0 unless (_send($jss, $req)) {
597 0         0 $self->_uncache_sock($js, "write_register_func_error");
598 0         0 next;
599             }
600 0         0 $count++;
601             } ## end foreach my $js (@job_servers)
602              
603 2   33     9 return $count && $count == scalar(@job_servers);
604             } ## end sub _register_all
605              
606             #
607             # _get_js_sock($js, %opts)
608             #
609             sub _get_js_sock {
610 3     3   1527 my ($self, $js, %opts) = @_;
611 3 100       13 $js || return;
612              
613 2         11 my $js_str = $self->_js_str($js);
614 2         3 my $on_connect = delete $opts{on_connect};
615              
616             # Someday should warn when called with extra opts.
617              
618 2 50       6 warn "getting job server socket: $js_str" if $self->debug;
619              
620             # special case, if we're a child process of a gearman::server
621             # parent process, talking over a unix pipe...
622 2 100       9 return $self->{parent_pipe} if $self->{parent_pipe};
623              
624 1 50       7 if (my $sock = $self->_sock_cache($js)) {
625 0 0       0 return $sock if getpeername($sock);
626              
627             # delete cached sock
628 0         0 $self->_sock_cache($js, undef, 1);
629             } ## end if (my $sock = $self->...)
630              
631 1         2 my $now = time;
632 1         2 my $down_since = $self->{down_since}{$js_str};
633 1 50       3 if ($down_since) {
634 0 0       0 warn "job server down since $down_since" if $self->debug;
635              
636 0         0 my $down_for = $now - $down_since;
637 0 0       0 my $retry_period = $down_for > 60 ? 30 : (int($down_for / 2) + 1);
638 0 0       0 if ($self->{last_connect_fail}{$js_str} > $now - $retry_period) {
639 0         0 return;
640             }
641             } ## end if ($down_since)
642              
643 1 50       3 warn "connecting to '$js_str'" if $self->debug;
644              
645 1         5 my $sock = $self->socket($js, 1);
646 1 50       3 unless ($sock) {
647 1   33     8 $self->{down_since}{$js_str} ||= $now;
648 1         1 $self->{last_connect_fail}{$js_str} = $now;
649              
650 1         4 return;
651             } ## end unless ($sock)
652              
653 0         0 delete $self->{last_connect_fail}{$js_str};
654 0         0 delete $self->{down_since}{$js_str};
655              
656 0         0 $sock->autoflush(1);
657 0         0 $self->sock_nodelay($sock);
658              
659 0         0 $self->_sock_cache($js, $sock);
660              
661 0 0       0 my $ok = $on_connect ? $on_connect->($sock) : $self->_on_connect($sock);
662 0 0       0 unless ($ok) {
663              
664             # delete
665 0         0 $self->_sock_cache($js, undef, 1);
666 0         0 return;
667             } ## end unless ($ok)
668              
669 0         0 return $sock;
670             } ## end sub _get_js_sock
671              
672             #
673             # _on_connect($sock)
674             #
675             # Housekeeping things to do on connection to a server. Method call
676             # with one argument being the 'socket' we're going to take care of.
677             # returns true on success, false on failure.
678             #
679             sub _on_connect {
680 2     2   320 my ($self, $sock) = @_;
681              
682 2         6 my $cid_req = _rc("set_client_id", $self->{client_id});
683 2 50       6 return unless _send($sock, $cid_req);
684              
685             # get this socket's state caught-up
686 0         0 foreach my $ability (keys %{ $self->{can} }) {
  0         0  
687 0         0 my $timeout = $self->{timeouts}->{$ability};
688 0 0       0 unless ($self->_set_ability($sock, $ability, $timeout)) {
689 0         0 return;
690             }
691             } ## end foreach my $ability (keys %...)
692              
693 0         0 return 1;
694             } ## end sub _on_connect
695              
696             #
697             # _set_ability($sock, $ability, [$timeout])
698             #
699             sub _set_ability {
700 3     3   4 my ($self, $sock, $ability, $timeout) = @_;
701 3         4 my $req;
702 3 100       7 if (defined $timeout) {
703 1         2 $req = _rc("can_do_timeout", _join0($ability, $timeout));
704             }
705             else {
706 2         4 $req = _rc("can_do", $ability);
707             }
708 3         6 return _send($sock, $req);
709             } ## end sub _set_ability
710              
711             #
712             # _send($jss, $req)
713             #
714             # send C<$req> to C<$jss>
715             #
716             sub _send {
717 5     5   5 my ($jss, $req) = @_;
718 5         12 return Gearman::Util::send_req($jss, \$req);
719             }
720              
721             #
722             # _rc($cmd, [@val])
723             #
724             sub _rc {
725 8     8   21 return Gearman::Util::pack_req_command(@_);
726             }
727              
728             #
729             # _join0(@v)
730             #
731             sub _join0 {
732 2     2   8 return join("\0", @_);
733             }
734              
735             1;
736             __END__