File Coverage

blib/lib/Gearman/Client.pm
Criterion Covered Total %
statement 73 201 36.3
branch 16 78 20.5
condition 0 24 0.0
subroutine 16 32 50.0
pod 10 10 100.0
total 115 345 33.3


line stmt bran cond sub pod time code
1             package Gearman::Client;
2 7     7   19463 use version;
  7         5277  
  7         29  
3             $Gearman::Client::VERSION = qv("2.001_001");
4              
5 7     7   461 use strict;
  7         8  
  7         105  
6 7     7   24 use warnings;
  7         6  
  7         265  
7              
8             =head1 NAME
9              
10             Gearman::Client - Client for gearman distributed job system
11              
12             =head1 SYNOPSIS
13              
14             use Gearman::Client;
15             my $client = Gearman::Client->new;
16             $client->job_servers('127.0.0.1', '10.0.0.1');
17              
18             # running a single task
19             my $result_ref = $client->do_task("add", "1+2");
20             print "1 + 2 = $$result_ref\n";
21              
22             # waiting on a set of tasks in parallel
23             my $taskset = $client->new_task_set;
24             $taskset->add_task( "add" => "1+2", {
25             on_complete => sub { ... }
26             });
27             $taskset->add_task( "divide" => "5/0", {
28             on_fail => sub { print "divide by zero error!\n"; },
29             });
30             $taskset->wait;
31              
32              
33             =head1 DESCRIPTION
34              
35             I is a client class for the Gearman distributed job
36             system, providing a framework for sending jobs to one or more Gearman
37             servers. These jobs are then distributed out to a farm of workers.
38              
39             Callers instantiate a I object and from it dispatch
40             single tasks, sets of tasks, or check on the status of tasks.
41              
42             =head1 USAGE
43              
44             =head2 Gearman::Client->new(%options)
45              
46             Creates a new I object, and returns the object.
47              
48             If I<%options> is provided, initializes the new client object with the
49             settings in I<%options>, which can contain:
50              
51             =over 4
52              
53             =item * job_servers
54              
55             Calls I (see below) to initialize the list of job
56             servers. Value in this case should be an arrayref.
57              
58             =item * prefix
59              
60             Calls I (see below) to set the prefix / namespace.
61              
62             =back
63              
64             =head2 $client->job_servers(@servers)
65              
66             Initializes the client I<$client> with the list of job servers in I<@servers>.
67             I<@servers> should contain a list of IP addresses, with optional port
68             numbers. For example:
69              
70             $client->job_servers('127.0.0.1', '192.168.1.100:4730');
71              
72             If the port number is not provided, C<4730> is used as the default.
73              
74             =head2 $client-Edo_task($task)
75              
76             =head2 $client-Edo_task($funcname, $arg, \%options)
77              
78             Dispatches a task and waits on the results. May either provide a
79             L object, or the 3 arguments that the Gearman::Task
80             constructor takes.
81              
82             Returns a scalar reference to the result, or undef on failure.
83              
84             If you provide on_complete and on_fail handlers, they're ignored, as
85             this function currently overrides them.
86              
87             =head2 $client-Edispatch_background($task)
88              
89             =head2 $client-Edispatch_background($funcname, $arg, \%options)
90              
91             Dispatches a task and doesn't wait for the result. Return value
92             is an opaque scalar that can be used to refer to the task with get_status.
93              
94             =head2 $taskset = $client-Enew_task_set
95              
96             Creates and returns a new L object.
97              
98             =head2 $taskset-Eadd_task($task)
99              
100             =head2 $taskset-Eadd_task($funcname, $arg, $uniq)
101              
102             =head2 $taskset-Eadd_task($funcname, $arg, \%options)
103              
104             Adds a task to a taskset. Three different calling conventions are
105             available.
106              
107             =head2 $taskset-Ewait
108              
109             Waits for a response from the job server for any of the tasks listed
110             in the taskset. Will call the I handlers for each of the tasks
111             that have been completed, updated, etc. Doesn't return until
112             everything has finished running or failing.
113              
114             =head2 $client-Eprefix($prefix)
115              
116             Sets the namespace / prefix for the function names.
117              
118             See L for more details.
119              
120              
121             =head1 EXAMPLES
122              
123             =head2 Summation
124              
125             This is an example client that sends off a request to sum up a list of
126             integers.
127              
128             use Gearman::Client;
129             use Storable qw( freeze );
130             my $client = Gearman::Client->new;
131             $client->job_servers('127.0.0.1');
132             my $tasks = $client->new_task_set;
133             my $handle = $tasks->add_task(sum => freeze([ 3, 5 ]), {
134             on_complete => sub { print ${ $_[0] }, "\n" }
135             });
136             $tasks->wait;
137              
138             See the L documentation for the worker for the I
139             function.
140              
141             =cut
142              
143 7     7   20 use base 'Gearman::Objects';
  7         6  
  7         2474  
144              
145             use fields (
146 7         23 'sock_cache', # hostport -> socket
147             'sock_info', # hostport -> hashref
148             'hooks', # hookname -> coderef
149             'exceptions',
150             'backoff_max',
151              
152             # maximum time a gearman command should take to get a result (not a job timeout)
153             'command_timeout',
154 7     7   36 );
  7         8  
155              
156 7     7   380 use Carp;
  7         8  
  7         330  
157 7     7   2372 use Gearman::Task;
  7         11  
  7         158  
158 7     7   1870 use Gearman::Taskset;
  7         13  
  7         178  
159 7     7   2210 use Gearman::JobStatus;
  7         12  
  7         139  
160 7     7   27 use Time::HiRes;
  7         8  
  7         120  
161              
162             sub new {
163 8     8 1 9008 my ($self, %opts) = @_;
164 8 50       34 unless (ref $self) {
165 8         25 $self = fields::new($self);
166             }
167              
168 8         7899 $self->SUPER::new(%opts);
169              
170 8         13 $self->{sock_cache} = {};
171 8         11 $self->{hooks} = {};
172 8         10 $self->{exceptions} = 0;
173 8         10 $self->{backoff_max} = 90;
174 8         7 $self->{command_timeout} = 30;
175              
176             $self->{exceptions} = delete $opts{exceptions}
177 8 50       17 if exists $opts{exceptions};
178              
179             $self->{backoff_max} = $opts{backoff_max}
180 8 50       17 if defined $opts{backoff_max};
181              
182             $self->{command_timeout} = $opts{command_timeout}
183 8 50       20 if defined $opts{command_timeout};
184              
185 8         17 return $self;
186             } ## end sub new
187              
188             =head1 METHODS
189              
190             =head2 new_task_set()
191              
192             B Gearman::Taskset
193              
194             =cut
195              
196             sub new_task_set {
197 1     1 1 1 my $self = shift;
198 1         6 my $taskset = Gearman::Taskset->new($self);
199 1         2 $self->run_hook('new_task_set', $self, $taskset);
200 1         3 return $taskset;
201             } ## end sub new_task_set
202              
203             #
204             # _job_server_status_command($command, $each_line_sub)
205             # $command e.g. "status\n".
206             # $each_line_sub A sub to be called on each line of response;
207             # takes $hostport and the $line as args.
208             #
209             sub _job_server_status_command {
210 0     0   0 my ($self, $command, $each_line_sub) = (shift, shift, shift);
211              
212 0         0 my $list = $self->canonicalize_job_servers(@_);
213 0 0       0 $list = $self->{job_servers} unless @$list;
214              
215 0         0 foreach my $hostport (@$list) {
216 0 0       0 next unless grep { $_ eq $hostport } @{ $self->{job_servers} };
  0         0  
  0         0  
217              
218 0 0       0 my $sock = $self->_get_js_sock($hostport)
219             or next;
220              
221 0         0 my $rv = $sock->write($command);
222              
223 0         0 my $err;
224 0         0 my @lines = Gearman::Util::read_text_status($sock, \$err);
225 0 0       0 if ($err) {
226              
227             #TODO warn
228 0         0 next;
229             }
230              
231 0         0 foreach my $l (@lines) {
232 0         0 $each_line_sub->($hostport, $l);
233             }
234              
235 0         0 $self->_put_js_sock($hostport, $sock);
236             } ## end foreach my $hostport (@$list)
237             } ## end sub _job_server_status_command
238              
239             =head2 get_job_server_status()
240              
241             B {job => {capable, queued, running}}
242              
243             =cut
244              
245             sub get_job_server_status {
246 0     0 1 0 my $self = shift;
247              
248 0         0 my $js_status = {};
249             $self->_job_server_status_command(
250             "status\n",
251             sub {
252 0     0   0 my ($hostport, $line) = @_;
253              
254 0 0       0 unless ($line =~ /^(\S+)\s+(\d+)\s+(\d+)\s+(\d+)$/) {
255 0         0 return;
256             }
257              
258 0         0 my ($job, $queued, $running, $capable) = ($1, $2, $3, $4);
259 0         0 $js_status->{$hostport}->{$job} = {
260             queued => $queued,
261             running => $running,
262             capable => $capable,
263             };
264             },
265             @_
266 0         0 );
267 0         0 return $js_status;
268             } ## end sub get_job_server_status
269              
270             =head2 get_job_server_jobs()
271              
272             supported only by L
273              
274             B {job => {address, listeners, key}}
275              
276             =cut
277              
278             sub get_job_server_jobs {
279 0     0 1 0 my $self = shift;
280 0         0 my $js_jobs = {};
281             $self->_job_server_status_command(
282             "jobs\n",
283             sub {
284 0     0   0 my ($hostport, $line) = @_;
285              
286             # Yes, the unique key is sometimes omitted.
287 0 0       0 return unless $line =~ /^(\S+)\s+(\S*)\s+(\S+)\s+(\d+)$/;
288              
289 0         0 my ($job, $key, $address, $listeners) = ($1, $2, $3, $4);
290 0         0 $js_jobs->{$hostport}->{$job} = {
291             key => $key,
292             address => $address,
293             listeners => $listeners,
294             };
295             },
296             @_
297 0         0 );
298 0         0 return $js_jobs;
299             } ## end sub get_job_server_jobs
300              
301             =head2 get_job_server_clients()
302              
303             supported only by L
304              
305             =cut
306              
307             sub get_job_server_clients {
308 0     0 1 0 my $self = shift;
309              
310 0         0 my $js_clients = {};
311 0         0 my $client;
312             $self->_job_server_status_command(
313             "clients\n",
314             sub {
315 0     0   0 my ($hostport, $line) = @_;
316              
317 0 0 0     0 if ($line =~ /^(\S+)$/) {
    0          
318 0         0 $client = $1;
319 0   0     0 $js_clients->{$hostport}->{$client} ||= {};
320             }
321             elsif ($client && $line =~ /^\s+(\S+)\s+(\S*)\s+(\S+)$/) {
322 0         0 my ($job, $key, $address) = ($1, $2, $3);
323 0         0 $js_clients->{$hostport}->{$client}->{$job} = {
324             key => $key,
325             address => $address,
326             };
327             } ## end elsif ($client && $line =~...)
328             },
329             @_
330 0         0 );
331              
332 0         0 return $js_clients;
333             } ## end sub get_job_server_clients
334              
335             #
336             # _get_task_from_args
337             #
338             sub _get_task_from_args {
339 3     3   5 my $self = shift;
340 3         5 my $task;
341 3 100       8 if (ref $_[0]) {
342 1         2 $task = shift;
343 1 50       4 $task->isa("Gearman::Task")
344             || Carp::croak("Argument isn't a Gearman::Task");
345             }
346             else {
347 2         3 my $func = shift;
348 2         3 my $arg_p = shift;
349 2         3 my $opts = shift;
350 2 50       5 my $argref = ref $arg_p ? $arg_p : \$arg_p;
351 2 50       8 Carp::croak("Function argument must be scalar or scalarref")
352             unless ref $argref eq "SCALAR";
353              
354 2         14 $task = Gearman::Task->new($func, $argref, $opts);
355             } ## end else [ if (ref $_[0]) ]
356 3         6 return $task;
357              
358             } ## end sub _get_task_from_args
359              
360             =head2 do_task($task)
361              
362             =head2 do_task($funcname, $arg, \%options)
363              
364             given a (func, arg_p, opts?)
365              
366             B either undef (on fail) or scalarref of result
367              
368             =cut
369              
370             sub do_task {
371 0     0 1 0 my $self = shift;
372 0         0 my $task = $self->_get_task_from_args(@_);
373              
374 0         0 my $ret = undef;
375 0         0 my $did_err = 0;
376              
377             $task->{on_complete} = sub {
378 0     0   0 $ret = shift;
379 0         0 };
380              
381             $task->{on_fail} = sub {
382 0     0   0 $did_err = 1;
383 0         0 };
384              
385 0         0 my $ts = $self->new_task_set;
386 0         0 $ts->add_task($task);
387 0         0 $ts->wait(timeout => $task->timeout);
388              
389 0 0       0 return $did_err ? undef : $ret;
390             } ## end sub do_task
391              
392             =head2 dispatch_background($func, $arg_p, $opts)
393              
394             =head2 dispatch_background($task)
395              
396             dispatches job in background
397              
398             return the handle from the jobserver, or undef on failure
399              
400             =cut
401              
402             sub dispatch_background {
403 0     0 1 0 my $self = shift;
404 0         0 my $task = $self->_get_task_from_args(@_);
405              
406 0         0 $task->{background} = 1;
407              
408 0         0 my $ts = $self->new_task_set;
409 0         0 return $ts->add_task($task);
410             } ## end sub dispatch_background
411              
412             =head2 run_hook($name)
413              
414             run a hook callback if defined
415              
416             =cut
417              
418             sub run_hook {
419 1     1 1 1 my ($self, $hookname) = @_;
420 1 50       2 $hookname || return;
421              
422 1         2 my $hook = $self->{hooks}->{$hookname};
423 1 50       2 return unless $hook;
424              
425 1         1 eval { $hook->(@_) };
  1         2  
426              
427 1 50       211 warn "Gearman::Client hook '$hookname' threw error: $@\n" if $@;
428             } ## end sub run_hook
429              
430             =head2 add_hook($name, $cb)
431              
432             add a hook
433              
434             =cut
435              
436             sub add_hook {
437 2     2 1 246 my ($self, $hookname) = (shift, shift);
438 2 50       4 $hookname || return;
439              
440 2 100       4 if (@_) {
441 1         5 $self->{hooks}->{$hookname} = shift;
442             }
443             else {
444 1         6 delete $self->{hooks}->{$hookname};
445             }
446             } ## end sub add_hook
447              
448             =head2 get_status($handle)
449              
450             The Gearman Server will assign a scalar job handle when you request a
451             background job with dispatch_background. Save this scalar, and use it later in
452             order to request the status of this job.
453              
454             B L on success
455              
456             =cut
457              
458             sub get_status {
459 0     0 1 0 my ($self, $handle) = @_;
460 0 0       0 $handle || return;
461              
462 0         0 my ($hostport, $shandle) = split(m!//!, $handle);
463              
464             #TODO simple check for $hostport in job_server doesn't work if
465             # $hostport is not contained in job_servers
466             # job_servers = ["localhost:4730"]
467             # handle = 127.0.0.1:4730//H:...
468             #
469             # hopefully commit 58e2aa5 solves this TODO
470 0 0       0 return undef unless grep { $hostport eq $_ } @{ $self->{job_servers} };
  0         0  
  0         0  
471              
472 0 0       0 my $sock = $self->_get_js_sock($hostport)
473             or return undef;
474              
475 0         0 my $req = Gearman::Util::pack_req_command("get_status", $shandle);
476 0         0 my $len = length($req);
477 0         0 my $rv = $sock->write($req, $len);
478 0         0 my $err;
479 0         0 my $res = Gearman::Util::read_res_packet($sock, \$err);
480              
481 0 0 0     0 if ($res && $res->{type} eq "error") {
482 0         0 Carp::croak
483 0         0 "Error packet from server after get_status: ${$res->{blobref}}\n";
484             }
485              
486 0 0 0     0 return undef unless $res && $res->{type} eq "status_res";
487              
488 0         0 my @args = split(/\0/, ${ $res->{blobref} });
  0         0  
489              
490             #FIXME returns on '', 0
491 0 0       0 $args[0] || return;
492              
493 0         0 shift @args;
494 0         0 $self->_put_js_sock($hostport, $sock);
495              
496 0         0 return Gearman::JobStatus->new(@args);
497             } ## end sub get_status
498              
499             #
500             # _option_request($sock, $option)
501             #
502             sub _option_request {
503 0     0   0 my ($self, $sock, $option) = @_;
504              
505 0         0 my $req = Gearman::Util::pack_req_command("option_req", $option);
506 0         0 my $len = length($req);
507 0         0 my $rv = $sock->write($req, $len);
508              
509 0         0 my $err;
510             my $res = Gearman::Util::read_res_packet($sock, \$err,
511 0         0 $self->{command_timeout});
512              
513 0 0       0 return unless $res;
514              
515 0 0       0 return 0 if $res->{type} eq "error";
516 0 0       0 return 1 if $res->{type} eq "option_res";
517              
518 0         0 warn "Got unknown response to option request: $res->{type}\n";
519 0         0 return;
520             } ## end sub _option_request
521              
522             #
523             # _get_js_sock($hostport)
524             #
525             # returns a socket from the cache. it should be returned to the
526             # cache with _put_js_sock. the hostport isn't verified. the caller
527             # should verify that $hostport is in the set of jobservers.
528             sub _get_js_sock {
529 0     0   0 my ($self, $hostport) = @_;
530              
531 0 0       0 if (my $sock = delete $self->{sock_cache}{$hostport}) {
532 0 0       0 return $sock if $sock->connected;
533             }
534              
535 0   0     0 my $sockinfo = $self->{sock_info}{$hostport} ||= {};
536 0         0 my $disabled_until = $sockinfo->{disabled_until};
537 0 0 0     0 return if defined $disabled_until && $disabled_until > Time::HiRes::time();
538              
539 0         0 my $sock = $self->socket($hostport, 1);
540 0 0       0 unless ($sock) {
541 0         0 my $count = ++$sockinfo->{failed_connects};
542 0         0 my $disable_for = $count**2;
543 0         0 my $max = $self->{backoff_max};
544 0 0       0 $disable_for = $disable_for > $max ? $max : $disable_for;
545 0         0 $sockinfo->{disabled_until} = $disable_for + Time::HiRes::time();
546 0         0 return;
547             } ## end unless ($sock)
548              
549 0         0 $self->sock_nodelay($sock);
550 0         0 $sock->autoflush(1);
551              
552             # If exceptions support is to be requested, and the request fails, disable
553             # exceptions for this client.
554 0 0 0     0 if ($self->{exceptions} && !$self->_option_request($sock, 'exceptions')) {
555 0         0 warn "Exceptions support denied by server, disabling.\n";
556 0         0 $self->{exceptions} = 0;
557             }
558              
559 0         0 delete $sockinfo->{failed_connects}; # Success, mark the socket as such.
560 0         0 delete $sockinfo->{disabled_until};
561              
562 0         0 return $sock;
563             } ## end sub _get_js_sock
564              
565             #
566             # _put_js_sock($hostport, $sock)
567             #
568             # way for a caller to give back a socket it previously requested.
569             # the $hostport isn't verified, so the caller should verify the
570             # $hostport is still in the set of jobservers.
571             sub _put_js_sock {
572 0     0   0 my ($self, $hostport, $sock) = @_;
573              
574 0   0     0 $self->{sock_cache}{$hostport} ||= $sock;
575             }
576              
577             sub _get_random_js_sock {
578 10     10   10 my ($self, $getter) = @_;
579              
580 10 50       34 $self->{js_count} || return;
581              
582             $getter ||= sub {
583 0     0     my $hostport = shift;
584 0           return $self->_get_js_sock($hostport);
585 0   0       };
586              
587 0           my $ridx = int(rand($self->{js_count}));
588 0           for (my $try = 0; $try < $self->{js_count}; $try++) {
589 0           my $aidx = ($ridx + $try) % $self->{js_count};
590 0           my $hostport = $self->{job_servers}[$aidx];
591 0 0         my $sock = $getter->($hostport) or next;
592 0           return ($hostport, $sock);
593             } ## end for (my $try = 0; $try ...)
594 0           return ();
595             } ## end sub _get_random_js_sock
596              
597             1;
598             __END__