File Coverage

blib/lib/Gearman/Client.pm
Criterion Covered Total %
statement 75 201 37.3
branch 16 78 20.5
condition 0 21 0.0
subroutine 17 32 53.1
pod 10 10 100.0
total 118 342 34.5


line stmt bran cond sub pod time code
1             package Gearman::Client;
2 7     7   25369 use version ();
  7         6822  
  7         294  
3             $Gearman::Client::VERSION = version->declare("2.002.001"); #TRIAL
4              
5 7     7   37 use strict;
  7         7  
  7         126  
6 7     7   32 use warnings;
  7         8  
  7         375  
7              
8             =head1 NAME
9              
10             Gearman::Client - Client for gearman distributed job system
11              
12             =head1 SYNOPSIS
13              
14             use Gearman::Client;
15             my $client = Gearman::Client->new;
16             $client->job_servers(
17             '127.0.0.1',
18             {
19             ca_certs => ...,
20             cert_file => ...,
21             host => '10.0.0.1',
22             key_file => ...,
23             port => 4733,
24             socket_cb => sub {...},
25             use_ssl => 1,
26             }
27             );
28              
29             # running a single task
30             my $result_ref = $client->do_task("add", "1+2");
31             print "1 + 2 = $$result_ref\n";
32              
33             # waiting on a set of tasks in parallel
34             my $taskset = $client->new_task_set;
35             $taskset->add_task( "add" => "1+2", {
36             on_complete => sub { ... }
37             });
38             $taskset->add_task( "divide" => "5/0", {
39             on_fail => sub { print "divide by zero error!\n"; },
40             });
41             $taskset->wait;
42              
43              
44             =head1 DESCRIPTION
45              
46             I is a client class for the Gearman distributed job
47             system, providing a framework for sending jobs to one or more Gearman
48             servers. These jobs are then distributed out to a farm of workers.
49              
50             Callers instantiate a I object and from it dispatch
51             single tasks, sets of tasks, or check on the status of tasks.
52              
53             =head1 USAGE
54              
55             =head2 Gearman::Client->new(%options)
56              
57             Creates a new I object, and returns the object.
58              
59             If I<%options> is provided, initializes the new client object with the
60             settings in I<%options>, which can contain:
61              
62             =over 4
63              
64             =item * job_servers
65              
66             Calls I (see below) to initialize the list of job
67             servers. Value in this case should be an arrayref.
68              
69             =item * prefix
70              
71             Calls I (see below) to set the prefix / namespace.
72              
73             =back
74              
75             =head2 $client->job_servers(@servers)
76              
77             Initializes the client I<$client> with the list of job servers in I<@servers>.
78             I<@servers> should contain a list of IP addresses, with optional port
79             numbers. For example:
80              
81             $client->job_servers('127.0.0.1', '192.168.1.100:4730');
82              
83             If the port number is not provided, C<4730> is used as the default.
84              
85             =head2 $client-Edo_task($task)
86              
87             =head2 $client-Edo_task($funcname, $arg, \%options)
88              
89             Dispatches a task and waits on the results. May either provide a
90             L object, or the 3 arguments that the Gearman::Task
91             constructor takes.
92              
93             Returns a scalar reference to the result, or undef on failure.
94              
95             If you provide on_complete and on_fail handlers, they're ignored, as
96             this function currently overrides them.
97              
98             =head2 $client-Edispatch_background($task)
99              
100             =head2 $client-Edispatch_background($funcname, $arg, \%options)
101              
102             Dispatches a task and doesn't wait for the result. Return value
103             is an opaque scalar that can be used to refer to the task with get_status.
104              
105             =head2 $taskset = $client-Enew_task_set
106              
107             Creates and returns a new L object.
108              
109             =head2 $taskset-Eadd_task($task)
110              
111             =head2 $taskset-Eadd_task($funcname, $arg, $uniq)
112              
113             =head2 $taskset-Eadd_task($funcname, $arg, \%options)
114              
115             Adds a task to a taskset. Three different calling conventions are
116             available.
117              
118             =head2 $taskset-Ewait
119              
120             Waits for a response from the job server for any of the tasks listed
121             in the taskset. Will call the I handlers for each of the tasks
122             that have been completed, updated, etc. Doesn't return until
123             everything has finished running or failing.
124              
125             =head2 $client-Eprefix($prefix)
126              
127             Sets the namespace / prefix for the function names.
128              
129             See L for more details.
130              
131              
132             =head1 EXAMPLES
133              
134             =head2 Summation
135              
136             This is an example client that sends off a request to sum up a list of
137             integers.
138              
139             use Gearman::Client;
140             use Storable qw( freeze );
141             my $client = Gearman::Client->new;
142             $client->job_servers('127.0.0.1');
143             my $tasks = $client->new_task_set;
144             my $handle = $tasks->add_task(sum => freeze([ 3, 5 ]), {
145             on_complete => sub { print ${ $_[0] }, "\n" }
146             });
147             $tasks->wait;
148              
149             See the L documentation for the worker for the I
150             function.
151              
152             =cut
153              
154 7     7   29 use base 'Gearman::Objects';
  7         17  
  7         3012  
155              
156             use fields (
157 7         33 'sock_info', # hostport -> hashref
158             'hooks', # hookname -> coderef
159             'exceptions',
160             'backoff_max',
161              
162             # maximum time a gearman command should take to get a result (not a job timeout)
163             'command_timeout',
164 7     7   46 );
  7         14  
165              
166 7     7   504 use Carp;
  7         12  
  7         422  
167 7     7   3401 use Gearman::Task;
  7         16  
  7         207  
168 7     7   2544 use Gearman::Taskset;
  7         14  
  7         217  
169 7     7   3395 use Gearman::JobStatus;
  7         18  
  7         208  
170 7     7   40 use Time::HiRes;
  7         11  
  7         73  
171 7         14522 use Ref::Util qw/
172             is_plain_scalarref
173             is_ref
174 7     7   836 /;
  7         11  
175              
176             sub new {
177 8     8 1 16166 my ($self, %opts) = @_;
178 8 50       72 unless (is_ref($self)) {
179 8         44 $self = fields::new($self);
180             }
181              
182 8         10996 $self->SUPER::new(%opts);
183              
184 8         14 $self->{hooks} = {};
185 8         18 $self->{exceptions} = 0;
186 8         13 $self->{backoff_max} = 90;
187 8         13 $self->{command_timeout} = 30;
188              
189             $self->{exceptions} = delete $opts{exceptions}
190 8 50       25 if exists $opts{exceptions};
191              
192             $self->{backoff_max} = $opts{backoff_max}
193 8 50       23 if defined $opts{backoff_max};
194              
195             $self->{command_timeout} = $opts{command_timeout}
196 8 50       28 if defined $opts{command_timeout};
197              
198 8         23 return $self;
199             } ## end sub new
200              
201             =head1 METHODS
202              
203             =head2 new_task_set()
204              
205             B Gearman::Taskset
206              
207             =cut
208              
209             sub new_task_set {
210 1     1 1 2 my $self = shift;
211 1         9 my $taskset = Gearman::Taskset->new($self);
212 1         5 $self->run_hook('new_task_set', $self, $taskset);
213 1         5 return $taskset;
214             } ## end sub new_task_set
215              
216             #
217             # _job_server_status_command($command, $each_line_sub)
218             # $command e.g. "status\n".
219             # $each_line_sub A sub to be called on each line of response;
220             # takes $hostport and the $line as args.
221             #
222             sub _job_server_status_command {
223 0     0   0 my ($self, $command, $each_line_sub) = (shift, shift, shift);
224              
225 0 0       0 my $list
226             = scalar(@_)
227             ? $self->canonicalize_job_servers(@_)
228             : $self->job_servers();
229 0         0 my %js_map = map { $self->_js_str($_) => 1 } $self->job_servers();
  0         0  
230              
231 0         0 foreach my $js (@{$list}) {
  0         0  
232 0 0       0 defined($js_map{ $self->_js_str($js) }) || next;
233              
234 0 0       0 my $sock = $self->_get_js_sock($js)
235             or next;
236              
237 0         0 my $rv = $sock->write($command);
238              
239 0         0 my $err;
240 0         0 my @lines = Gearman::Util::read_text_status($sock, \$err);
241 0 0       0 if ($err) {
242              
243             #TODO warn
244 0         0 next;
245             }
246              
247 0         0 foreach my $l (@lines) {
248 0         0 $each_line_sub->($js, $l);
249             }
250              
251 0         0 $self->_sock_cache($js, $sock);
252             } ## end foreach my $js (@{$list})
253             } ## end sub _job_server_status_command
254              
255             =head2 get_job_server_status()
256              
257             B {job => {capable, queued, running}}
258              
259             =cut
260              
261             sub get_job_server_status {
262 0     0 1 0 my $self = shift;
263              
264 0         0 my $js_status = {};
265             $self->_job_server_status_command(
266             "status\n",
267             sub {
268 0     0   0 my ($hostport, $line) = @_;
269              
270 0 0       0 unless ($line =~ /^(\S+)\s+(\d+)\s+(\d+)\s+(\d+)$/) {
271 0         0 return;
272             }
273              
274 0         0 my ($job, $queued, $running, $capable) = ($1, $2, $3, $4);
275 0         0 $js_status->{$hostport}->{$job} = {
276             queued => $queued,
277             running => $running,
278             capable => $capable,
279             };
280             },
281             @_
282 0         0 );
283 0         0 return $js_status;
284             } ## end sub get_job_server_status
285              
286             =head2 get_job_server_jobs()
287              
288             supported only by L
289              
290             B {job => {address, listeners, key}}
291              
292             =cut
293              
294             sub get_job_server_jobs {
295 0     0 1 0 my $self = shift;
296 0         0 my $js_jobs = {};
297             $self->_job_server_status_command(
298             "jobs\n",
299             sub {
300 0     0   0 my ($hostport, $line) = @_;
301              
302             # Yes, the unique key is sometimes omitted.
303 0 0       0 return unless $line =~ /^(\S+)\s+(\S*)\s+(\S+)\s+(\d+)$/;
304              
305 0         0 my ($job, $key, $address, $listeners) = ($1, $2, $3, $4);
306 0         0 $js_jobs->{$hostport}->{$job} = {
307             key => $key,
308             address => $address,
309             listeners => $listeners,
310             };
311             },
312             @_
313 0         0 );
314 0         0 return $js_jobs;
315             } ## end sub get_job_server_jobs
316              
317             =head2 get_job_server_clients()
318              
319             supported only by L
320              
321             =cut
322              
323             sub get_job_server_clients {
324 0     0 1 0 my $self = shift;
325              
326 0         0 my $js_clients = {};
327 0         0 my $client;
328             $self->_job_server_status_command(
329             "clients\n",
330             sub {
331 0     0   0 my ($hostport, $line) = @_;
332              
333 0 0 0     0 if ($line =~ /^(\S+)$/) {
    0          
334 0         0 $client = $1;
335 0   0     0 $js_clients->{$hostport}->{$client} ||= {};
336             }
337             elsif ($client && $line =~ /^\s+(\S+)\s+(\S*)\s+(\S+)$/) {
338 0         0 my ($job, $key, $address) = ($1, $2, $3);
339 0         0 $js_clients->{$hostport}->{$client}->{$job} = {
340             key => $key,
341             address => $address,
342             };
343             } ## end elsif ($client && $line =~...)
344             },
345             @_
346 0         0 );
347              
348 0         0 return $js_clients;
349             } ## end sub get_job_server_clients
350              
351             #
352             # _get_task_from_args
353             #
354             sub _get_task_from_args {
355 3     3   26 my $self = shift;
356 3         3 my $task;
357 3 100       12 if (is_ref($_[0])) {
358 1         1 $task = shift;
359 1 50       7 $task->isa("Gearman::Task")
360             || Carp::croak("Argument isn't a Gearman::Task");
361             }
362             else {
363 2         4 my $func = shift;
364 2         2 my $arg_p = shift;
365 2         3 my $opts = shift;
366 2 50       5 my $argref = is_ref($arg_p) ? $arg_p : \$arg_p;
367 2 50       6 is_plain_scalarref($argref)
368             || Carp::croak("Function argument must be scalar or scalarref");
369              
370 2         13 $task = Gearman::Task->new($func, $argref, $opts);
371             } ## end else [ if (is_ref($_[0])) ]
372 3         7 return $task;
373              
374             } ## end sub _get_task_from_args
375              
376             =head2 do_task($task)
377              
378             =head2 do_task($funcname, $arg, \%options)
379              
380             given a (func, arg_p, opts?)
381              
382             B either undef (on fail) or scalarref of result
383              
384             =cut
385              
386             sub do_task {
387 0     0 1 0 my $self = shift;
388 0         0 my $task = $self->_get_task_from_args(@_);
389              
390 0         0 my $ret = undef;
391 0         0 my $did_err = 0;
392              
393             $task->{on_complete} = sub {
394 0     0   0 $ret = shift;
395 0         0 };
396              
397             $task->{on_fail} = sub {
398 0     0   0 $did_err = 1;
399 0         0 };
400              
401 0         0 my $ts = $self->new_task_set;
402 0         0 $ts->add_task($task);
403 0         0 $ts->wait(timeout => $task->timeout);
404              
405 0 0       0 return $did_err ? undef : $ret;
406             } ## end sub do_task
407              
408             =head2 dispatch_background($func, $arg_p, $opts)
409              
410             =head2 dispatch_background($task)
411              
412             dispatches job in background
413              
414             return the handle from the jobserver, or undef on failure
415              
416             =cut
417              
418             sub dispatch_background {
419 0     0 1 0 my $self = shift;
420 0         0 my $task = $self->_get_task_from_args(@_);
421              
422 0         0 $task->{background} = 1;
423              
424 0         0 my $ts = $self->new_task_set;
425 0         0 return $ts->add_task($task);
426             } ## end sub dispatch_background
427              
428             =head2 run_hook($name)
429              
430             run a hook callback if defined
431              
432             =cut
433              
434             sub run_hook {
435 1     1 1 2 my ($self, $hookname) = @_;
436 1 50       4 $hookname || return;
437              
438 1         2 my $hook = $self->{hooks}->{$hookname};
439 1 50       4 return unless $hook;
440              
441 1         1 eval { $hook->(@_) };
  1         4  
442              
443 1 50       338 warn "Gearman::Client hook '$hookname' threw error: $@\n" if $@;
444             } ## end sub run_hook
445              
446             =head2 add_hook($name, $cb)
447              
448             add a hook
449              
450             =cut
451              
452             sub add_hook {
453 2     2 1 445 my ($self, $hookname) = (shift, shift);
454 2 50       6 $hookname || return;
455              
456 2 100       6 if (@_) {
457 1         9 $self->{hooks}->{$hookname} = shift;
458             }
459             else {
460 1         7 delete $self->{hooks}->{$hookname};
461             }
462             } ## end sub add_hook
463              
464             =head2 get_status($handle)
465              
466             The Gearman Server will assign a scalar job handle when you request a
467             background job with dispatch_background. Save this scalar, and use it later in
468             order to request the status of this job.
469              
470             B L on success
471              
472             =cut
473              
474             sub get_status {
475 0     0 1 0 my ($self, $handle) = @_;
476 0 0       0 $handle || return;
477              
478 0         0 my ($js_str, $shandle) = split(m!//!, $handle);
479              
480             #TODO simple check for $js_str in job_server doesn't work if
481             # $js_str is not contained in job_servers
482             # job_servers = ["localhost:4730"]
483             # handle = 127.0.0.1:4730//H:...
484             #
485             # hopefully commit 58e2aa5 solves this TODO
486              
487 0         0 my $js = $self->_js($js_str);
488 0 0       0 $js || return;
489              
490 0         0 my $sock = $self->_get_js_sock($js);
491 0 0       0 $sock || return;
492              
493 0         0 my $req = Gearman::Util::pack_req_command("get_status", $shandle);
494 0         0 my $len = length($req);
495 0         0 my $rv = $sock->write($req, $len);
496 0         0 my $err;
497 0         0 my $res = Gearman::Util::read_res_packet($sock, \$err);
498              
499 0 0 0     0 if ($res && $res->{type} eq "error") {
500 0         0 Carp::croak
501 0         0 "Error packet from server after get_status: ${$res->{blobref}}\n";
502             }
503              
504 0 0 0     0 return undef unless $res && $res->{type} eq "status_res";
505              
506 0         0 my @args = split(/\0/, ${ $res->{blobref} });
  0         0  
507              
508             #FIXME returns on '', 0
509 0 0       0 $args[0] || return;
510              
511 0         0 shift @args;
512 0         0 $self->_sock_cache($js_str, $sock);
513              
514 0         0 return Gearman::JobStatus->new(@args);
515             } ## end sub get_status
516              
517             #
518             # _option_request($sock, $option)
519             #
520             sub _option_request {
521 0     0   0 my ($self, $sock, $option) = @_;
522              
523 0         0 my $req = Gearman::Util::pack_req_command("option_req", $option);
524 0         0 my $len = length($req);
525 0         0 my $rv = $sock->write($req, $len);
526              
527 0         0 my $err;
528             my $res = Gearman::Util::read_res_packet($sock, \$err,
529 0         0 $self->{command_timeout});
530              
531 0 0       0 return unless $res;
532              
533 0 0       0 return 0 if $res->{type} eq "error";
534 0 0       0 return 1 if $res->{type} eq "option_res";
535              
536 0         0 warn "Got unknown response to option request: $res->{type}\n";
537 0         0 return;
538             } ## end sub _option_request
539              
540             #
541             # _get_js_sock($js)
542             #
543             # returns a socket from the cache. it should be returned to the
544             # cache with _sock_cache($js, $sock).
545             # The hostport isn't verified. the caller
546             # should verify that $js is in the set of jobservers.
547             sub _get_js_sock {
548 0     0   0 my ($self, $js) = @_;
549 0 0       0 if (my $sock = $self->_sock_cache($js, undef, 1)) {
550 0 0       0 return $sock if $sock->connected;
551             }
552              
553 0   0     0 my $sockinfo = $self->{sock_info}{ $self->_js_str($js) } ||= {};
554 0         0 my $disabled_until = $sockinfo->{disabled_until};
555 0 0 0     0 return if defined $disabled_until && $disabled_until > Time::HiRes::time();
556              
557 0         0 my $sock = $self->socket($js, 1);
558 0 0       0 unless ($sock) {
559 0         0 my $count = ++$sockinfo->{failed_connects};
560 0         0 my $disable_for = $count**2;
561 0         0 my $max = $self->{backoff_max};
562 0 0       0 $disable_for = $disable_for > $max ? $max : $disable_for;
563 0         0 $sockinfo->{disabled_until} = $disable_for + Time::HiRes::time();
564 0         0 return;
565             } ## end unless ($sock)
566              
567 0         0 $self->sock_nodelay($sock);
568 0         0 $sock->autoflush(1);
569              
570             # If exceptions support is to be requested, and the request fails, disable
571             # exceptions for this client.
572 0 0 0     0 if ($self->{exceptions} && !$self->_option_request($sock, 'exceptions')) {
573 0         0 warn "Exceptions support denied by server, disabling.\n";
574 0         0 $self->{exceptions} = 0;
575             }
576              
577 0         0 delete $sockinfo->{failed_connects}; # Success, mark the socket as such.
578 0         0 delete $sockinfo->{disabled_until};
579              
580 0         0 return $sock;
581             } ## end sub _get_js_sock
582              
583             sub _get_random_js_sock {
584 10     10   13 my ($self, $getter) = @_;
585              
586 10 50       32 $self->{js_count} || return;
587              
588             $getter ||= sub {
589 0     0     my $js = shift;
590 0           return $self->_get_js_sock($js);
591 0   0       };
592              
593 0           my $ridx = int(rand($self->{js_count}));
594 0           for (my $try = 0; $try < $self->{js_count}; $try++) {
595 0           my $aidx = ($ridx + $try) % $self->{js_count};
596 0           my $js = $self->{job_servers}[$aidx];
597 0 0         my $sock = $getter->($js) or next;
598 0           return ($js, $sock);
599             } ## end for (my $try = 0; $try ...)
600 0           return ();
601             } ## end sub _get_random_js_sock
602              
603             1;
604             __END__