File Coverage

blib/lib/Gearman/Client.pm
Criterion Covered Total %
statement 72 195 36.9
branch 16 76 21.0
condition 0 21 0.0
subroutine 16 30 53.3
pod 10 10 100.0
total 114 332 34.3


line stmt bran cond sub pod time code
1             package Gearman::Client;
2 7     7   20823 use version ();
  7         5459  
  7         234  
3             $Gearman::Client::VERSION = version->declare("2.003_001");
4              
5 7     7   30 use strict;
  7         10  
  7         119  
6 7     7   25 use warnings;
  7         8  
  7         306  
7              
8             =head1 NAME
9              
10             Gearman::Client - Client for gearman distributed job system
11              
12             =head1 SYNOPSIS
13              
14             use Gearman::Client;
15             my $client = Gearman::Client->new;
16             $client->job_servers(
17             '127.0.0.1',
18             {
19             ca_certs => ...,
20             cert_file => ...,
21             host => '10.0.0.1',
22             key_file => ...,
23             port => 4733,
24             socket_cb => sub {...},
25             use_ssl => 1,
26             }
27             );
28              
29             # running a single task
30             my $result_ref = $client->do_task("add", "1+2");
31             print "1 + 2 = $$result_ref\n";
32              
33             # waiting on a set of tasks in parallel
34             my $taskset = $client->new_task_set;
35             $taskset->add_task( "add" => "1+2", {
36             on_complete => sub { ... }
37             });
38             $taskset->add_task( "divide" => "5/0", {
39             on_fail => sub { print "divide by zero error!\n"; },
40             });
41             $taskset->wait;
42              
43              
44             =head1 DESCRIPTION
45              
46             I is a client class for the Gearman distributed job
47             system, providing a framework for sending jobs to one or more Gearman
48             servers. These jobs are then distributed out to a farm of workers.
49              
50             Callers instantiate a I object and from it dispatch
51             single tasks, sets of tasks, or check on the status of tasks.
52              
53             =head1 USAGE
54              
55             =head2 Gearman::Client->new(%options)
56              
57             Creates a new I object, and returns the object.
58              
59             If I<%options> is provided, initializes the new client object with the
60             settings in I<%options>, which can contain:
61              
62             =over 4
63              
64             =item * job_servers
65              
66             Calls I (see below) to initialize the list of job
67             servers. Value in this case should be an arrayref.
68              
69             =item * prefix
70              
71             Calls I (see below) to set the prefix / namespace.
72              
73             =back
74              
75             =head2 $client->job_servers(@servers)
76              
77             Initializes the client I<$client> with the list of job servers in I<@servers>.
78             I<@servers> should contain a list of IP addresses, with optional port
79             numbers. For example:
80              
81             $client->job_servers('127.0.0.1', '192.168.1.100:4730');
82              
83             If the port number is not provided, C<4730> is used as the default.
84              
85             =head2 $client-Edo_task($task)
86              
87             =head2 $client-Edo_task($funcname, $arg, \%options)
88              
89             Dispatches a task and waits on the results. May either provide a
90             L object, or the 3 arguments that the Gearman::Task
91             constructor takes.
92              
93             Returns a scalar reference to the result, or undef on failure.
94              
95             If you provide on_complete and on_fail handlers, they're ignored, as
96             this function currently overrides them.
97              
98             =head2 $client-Edispatch_background($task)
99              
100             =head2 $client-Edispatch_background($funcname, $arg, \%options)
101              
102             Dispatches a task and doesn't wait for the result. Return value
103             is an opaque scalar that can be used to refer to the task with get_status.
104              
105             =head2 $taskset = $client-Enew_task_set
106              
107             Creates and returns a new L object.
108              
109             =head2 $taskset-Eadd_task($task)
110              
111             =head2 $taskset-Eadd_task($funcname, $arg, $uniq)
112              
113             =head2 $taskset-Eadd_task($funcname, $arg, \%options)
114              
115             Adds a task to a taskset. Three different calling conventions are
116             available.
117              
118             =head2 $taskset-Ewait
119              
120             Waits for a response from the job server for any of the tasks listed
121             in the taskset. Will call the I handlers for each of the tasks
122             that have been completed, updated, etc. Doesn't return until
123             everything has finished running or failing.
124              
125             =head2 $client-Eprefix($prefix)
126              
127             Sets the namespace / prefix for the function names.
128              
129             See L for more details.
130              
131              
132             =head1 EXAMPLES
133              
134             =head2 Summation
135              
136             This is an example client that sends off a request to sum up a list of
137             integers.
138              
139             use Gearman::Client;
140             use Storable qw( freeze );
141             my $client = Gearman::Client->new;
142             $client->job_servers('127.0.0.1');
143             my $tasks = $client->new_task_set;
144             my $handle = $tasks->add_task(sum => freeze([ 3, 5 ]), {
145             on_complete => sub { print ${ $_[0] }, "\n" }
146             });
147             $tasks->wait;
148              
149             See the L documentation for the worker for the I
150             function.
151              
152             =cut
153              
154 7     7   21 use base 'Gearman::Objects';
  7         11  
  7         2463  
155              
156             use fields (
157 7         27 'sock_info', # hostport -> hashref
158             'hooks', # hookname -> coderef
159             'exceptions',
160             'backoff_max',
161              
162             # maximum time a gearman command should take to get a result (not a job timeout)
163             'command_timeout',
164 7     7   35 );
  7         10  
165              
166 7     7   362 use Carp;
  7         9  
  7         381  
167 7     7   2738 use Gearman::Task;
  7         12  
  7         163  
168 7     7   1925 use Gearman::Taskset;
  7         11  
  7         153  
169 7     7   2287 use Gearman::JobStatus;
  7         11  
  7         167  
170 7     7   31 use Time::HiRes;
  7         8  
  7         40  
171              
172             sub new {
173 6     6 1 9383 my ($self, %opts) = @_;
174 6 50       29 unless (ref $self) {
175 6         17 $self = fields::new($self);
176             }
177              
178 6         7464 $self->SUPER::new(%opts);
179              
180 6         6 $self->{hooks} = {};
181 6         9 $self->{exceptions} = 0;
182 6         7 $self->{backoff_max} = 90;
183 6         8 $self->{command_timeout} = 30;
184              
185             $self->{exceptions} = delete $opts{exceptions}
186 6 50       13 if exists $opts{exceptions};
187              
188             $self->{backoff_max} = $opts{backoff_max}
189 6 50       12 if defined $opts{backoff_max};
190              
191             $self->{command_timeout} = $opts{command_timeout}
192 6 50       13 if defined $opts{command_timeout};
193              
194 6         12 return $self;
195             } ## end sub new
196              
197             =head1 METHODS
198              
199             =head2 new_task_set()
200              
201             B Gearman::Taskset
202              
203             =cut
204              
205             sub new_task_set {
206 1     1 1 2 my $self = shift;
207 1         8 my $taskset = Gearman::Taskset->new($self);
208 1         2 $self->run_hook('new_task_set', $self, $taskset);
209 1         3 return $taskset;
210             } ## end sub new_task_set
211              
212             #
213             # _job_server_status_command($command, $each_line_sub)
214             # $command e.g. "status\n".
215             # $each_line_sub A sub to be called on each line of response;
216             # takes $hostport and the $line as args.
217             #
218             sub _job_server_status_command {
219 0     0   0 my ($self, $command, $each_line_sub) = (shift, shift, shift);
220              
221 0 0       0 my $list
222             = scalar(@_)
223             ? $self->canonicalize_job_servers(@_)
224             : $self->job_servers();
225 0         0 my %js_map = map { $self->_js_str($_) => 1 } $self->job_servers();
  0         0  
226              
227 0         0 foreach my $js (@{$list}) {
  0         0  
228 0 0       0 defined($js_map{ $self->_js_str($js) }) || next;
229              
230 0 0       0 my $sock = $self->_get_js_sock($js)
231             or next;
232              
233 0         0 my $rv = $sock->write($command);
234              
235 0         0 my $err;
236 0         0 my @lines = Gearman::Util::read_text_status($sock, \$err);
237 0 0       0 if ($err) {
238              
239             #TODO warn
240 0         0 next;
241             }
242              
243 0         0 foreach my $l (@lines) {
244 0         0 $each_line_sub->($js, $l);
245             }
246              
247 0         0 $self->_sock_cache($js, $sock);
248             } ## end foreach my $js (@{$list})
249             } ## end sub _job_server_status_command
250              
251             =head2 get_job_server_status()
252              
253             B {job => {capable, queued, running}}
254              
255             =cut
256              
257             sub get_job_server_status {
258 0     0 1 0 my $self = shift;
259              
260 0         0 my $js_status = {};
261             $self->_job_server_status_command(
262             "status\n",
263             sub {
264 0     0   0 my ($hostport, $line) = @_;
265              
266 0 0       0 unless ($line =~ /^(\S+)\s+(\d+)\s+(\d+)\s+(\d+)$/) {
267 0         0 return;
268             }
269              
270 0         0 my ($job, $queued, $running, $capable) = ($1, $2, $3, $4);
271 0         0 $js_status->{$hostport}->{$job} = {
272             queued => $queued,
273             running => $running,
274             capable => $capable,
275             };
276             },
277             @_
278 0         0 );
279 0         0 return $js_status;
280             } ## end sub get_job_server_status
281              
282             =head2 get_job_server_jobs()
283              
284             supported only by L
285              
286             B {job => {address, listeners, key}}
287              
288             =cut
289              
290             sub get_job_server_jobs {
291 0     0 1 0 my $self = shift;
292 0         0 my $js_jobs = {};
293             $self->_job_server_status_command(
294             "jobs\n",
295             sub {
296 0     0   0 my ($hostport, $line) = @_;
297              
298             # Yes, the unique key is sometimes omitted.
299 0 0       0 return unless $line =~ /^(\S+)\s+(\S*)\s+(\S+)\s+(\d+)$/;
300              
301 0         0 my ($job, $key, $address, $listeners) = ($1, $2, $3, $4);
302 0         0 $js_jobs->{$hostport}->{$job} = {
303             key => $key,
304             address => $address,
305             listeners => $listeners,
306             };
307             },
308             @_
309 0         0 );
310 0         0 return $js_jobs;
311             } ## end sub get_job_server_jobs
312              
313             =head2 get_job_server_clients()
314              
315             supported only by L
316              
317             =cut
318              
319             sub get_job_server_clients {
320 0     0 1 0 my $self = shift;
321              
322 0         0 my $js_clients = {};
323 0         0 my $client;
324             $self->_job_server_status_command(
325             "clients\n",
326             sub {
327 0     0   0 my ($hostport, $line) = @_;
328              
329 0 0 0     0 if ($line =~ /^(\S+)$/) {
    0          
330 0         0 $client = $1;
331 0   0     0 $js_clients->{$hostport}->{$client} ||= {};
332             }
333             elsif ($client && $line =~ /^\s+(\S+)\s+(\S*)\s+(\S+)$/) {
334 0         0 my ($job, $key, $address) = ($1, $2, $3);
335 0         0 $js_clients->{$hostport}->{$client}->{$job} = {
336             key => $key,
337             address => $address,
338             };
339             } ## end elsif ($client && $line =~...)
340             },
341             @_
342 0         0 );
343              
344 0         0 return $js_clients;
345             } ## end sub get_job_server_clients
346              
347             #
348             # _get_task_from_args
349             #
350             sub _get_task_from_args {
351 7     7   10 my $self = shift;
352 7         6 my $task;
353 7 100       17 if (ref $_[0]) {
354 1         2 $task = shift;
355 1 50       4 $task->isa("Gearman::Task")
356             || Carp::croak("Argument isn't a Gearman::Task");
357             }
358             else {
359 6         6 my $func = shift;
360 6         5 my $arg_p = shift;
361 6         4 my $opts = shift;
362 6 50       10 my $argref = ref $arg_p ? $arg_p : \$arg_p;
363 6 50       18 Carp::croak("Function argument must be scalar or scalarref")
364             unless ref $argref eq "SCALAR";
365              
366 6         21 $task = Gearman::Task->new($func, $argref, $opts);
367             } ## end else [ if (ref $_[0]) ]
368 7         24 return $task;
369              
370             } ## end sub _get_task_from_args
371              
372             =head2 do_task($task)
373              
374             =head2 do_task($funcname, $arg, \%options)
375              
376             given a (func, arg_p, opts?)
377              
378             B scalarref of WORK_COMPLETE result
379              
380             =cut
381              
382             sub do_task {
383 0     0 1 0 my $self = shift;
384 0         0 my $task = $self->_get_task_from_args(@_);
385 0         0 my $ret = undef;
386              
387             $task->{on_complete} = sub {
388 0     0   0 $ret = shift;
389 0         0 };
390              
391 0         0 my $ts = $self->new_task_set;
392 0         0 $ts->add_task($task);
393 0         0 $ts->wait(timeout => $task->timeout);
394              
395 0         0 return $ret;
396             } ## end sub do_task
397              
398             =head2 dispatch_background($func, $arg_p, $opts)
399              
400             =head2 dispatch_background($task)
401              
402             dispatches job in background
403              
404             return the handle from the jobserver, or undef on failure
405              
406             =cut
407              
408             sub dispatch_background {
409 0     0 1 0 my $self = shift;
410 0         0 my $task = $self->_get_task_from_args(@_);
411              
412 0         0 $task->{background} = 1;
413              
414 0         0 my $ts = $self->new_task_set;
415 0         0 return $ts->add_task($task);
416             } ## end sub dispatch_background
417              
418             =head2 run_hook($name)
419              
420             run a hook callback if defined
421              
422             =cut
423              
424             sub run_hook {
425 1     1 1 2 my ($self, $hookname) = @_;
426 1 50       2 $hookname || return;
427              
428 1         1 my $hook = $self->{hooks}->{$hookname};
429 1 50       2 return unless $hook;
430              
431 1         1 eval { $hook->(@_) };
  1         2  
432              
433 1 50       195 warn "Gearman::Client hook '$hookname' threw error: $@\n" if $@;
434             } ## end sub run_hook
435              
436             =head2 add_hook($name, $cb)
437              
438             add a hook
439              
440             =cut
441              
442             sub add_hook {
443 2     2 1 236 my ($self, $hookname) = (shift, shift);
444 2 50       4 $hookname || return;
445              
446 2 100       5 if (@_) {
447 1         6 $self->{hooks}->{$hookname} = shift;
448             }
449             else {
450 1         4 delete $self->{hooks}->{$hookname};
451             }
452             } ## end sub add_hook
453              
454             =head2 get_status($handle)
455              
456             The Gearman Server will assign a scalar job handle when you request a
457             background job with dispatch_background. Save this scalar, and use it later in
458             order to request the status of this job.
459              
460             B L on success
461              
462             =cut
463              
464             sub get_status {
465 0     0 1 0 my ($self, $handle) = @_;
466 0 0       0 $handle || return;
467              
468 0         0 my ($js_str, $shandle) = split(m!//!, $handle);
469              
470             #TODO simple check for $js_str in job_server doesn't work if
471             # $js_str is not contained in job_servers
472             # job_servers = ["localhost:4730"]
473             # handle = 127.0.0.1:4730//H:...
474             #
475             # hopefully commit 58e2aa5 solves this TODO
476              
477 0         0 my $js = $self->_js($js_str);
478 0 0       0 $js || return;
479              
480 0         0 my $sock = $self->_get_js_sock($js);
481 0 0       0 $sock || return;
482              
483 0         0 my $req = Gearman::Util::pack_req_command("get_status", $shandle);
484 0         0 my $len = length($req);
485 0         0 my $rv = $sock->write($req, $len);
486 0         0 my $err;
487 0         0 my $res = Gearman::Util::read_res_packet($sock, \$err);
488              
489 0 0 0     0 if ($res && $res->{type} eq "error") {
490 0         0 Carp::croak
491 0         0 "Error packet from server after get_status: ${$res->{blobref}}\n";
492             }
493              
494 0 0 0     0 return undef unless $res && $res->{type} eq "status_res";
495              
496 0         0 my @args = split(/\0/, ${ $res->{blobref} });
  0         0  
497              
498             #FIXME returns on '', 0
499 0 0       0 $args[0] || return;
500              
501 0         0 shift @args;
502 0         0 $self->_sock_cache($js_str, $sock);
503              
504 0         0 return Gearman::JobStatus->new(@args);
505             } ## end sub get_status
506              
507             #
508             # _option_request($sock, $option)
509             #
510             sub _option_request {
511 0     0   0 my ($self, $sock, $option) = @_;
512              
513 0         0 my $req = Gearman::Util::pack_req_command("option_req", $option);
514 0         0 my $len = length($req);
515 0         0 my $rv = $sock->write($req, $len);
516              
517 0         0 my $err;
518             my $res = Gearman::Util::read_res_packet($sock, \$err,
519 0         0 $self->{command_timeout});
520              
521 0 0       0 return unless $res;
522              
523 0 0       0 return 0 if $res->{type} eq "error";
524 0 0       0 return 1 if $res->{type} eq "option_res";
525              
526 0         0 warn "Got unknown response to option request: $res->{type}\n";
527 0         0 return;
528             } ## end sub _option_request
529              
530             #
531             # _get_js_sock($js)
532             #
533             # returns a socket from the cache. it should be returned to the
534             # cache with _sock_cache($js, $sock).
535             # The hostport isn't verified. the caller
536             # should verify that $js is in the set of jobservers.
537             sub _get_js_sock {
538 0     0   0 my ($self, $js) = @_;
539 0 0       0 if (my $sock = $self->_sock_cache($js, undef, 1)) {
540 0 0       0 return $sock if $sock->connected;
541             }
542              
543 0   0     0 my $sockinfo = $self->{sock_info}{ $self->_js_str($js) } ||= {};
544 0         0 my $disabled_until = $sockinfo->{disabled_until};
545 0 0 0     0 return if defined $disabled_until && $disabled_until > Time::HiRes::time();
546              
547 0         0 my $sock = $self->socket($js, 1);
548 0 0       0 unless ($sock) {
549 0         0 my $count = ++$sockinfo->{failed_connects};
550 0         0 my $disable_for = $count**2;
551 0         0 my $max = $self->{backoff_max};
552 0 0       0 $disable_for = $disable_for > $max ? $max : $disable_for;
553 0         0 $sockinfo->{disabled_until} = $disable_for + Time::HiRes::time();
554 0         0 return;
555             } ## end unless ($sock)
556              
557 0         0 $self->sock_nodelay($sock);
558 0         0 $sock->autoflush(1);
559              
560             # If exceptions support is to be requested, and the request fails, disable
561             # exceptions for this client.
562 0 0 0     0 if ($self->{exceptions} && !$self->_option_request($sock, 'exceptions')) {
563 0         0 warn "Exceptions support denied by server, disabling.\n";
564 0         0 $self->{exceptions} = 0;
565             }
566              
567 0         0 delete $sockinfo->{failed_connects}; # Success, mark the socket as such.
568 0         0 delete $sockinfo->{disabled_until};
569              
570 0         0 return $sock;
571             } ## end sub _get_js_sock
572              
573             sub _get_random_js_sock {
574 6     6   7 my ($self, $getter) = @_;
575              
576 6 50       19 $self->{js_count} || return;
577              
578             $getter ||= sub {
579 0     0     my $js = shift;
580 0           return $self->_get_js_sock($js);
581 0   0       };
582              
583 0           my $ridx = int(rand($self->{js_count}));
584 0           for (my $try = 0; $try < $self->{js_count}; $try++) {
585 0           my $aidx = ($ridx + $try) % $self->{js_count};
586 0           my $js = $self->{job_servers}[$aidx];
587 0 0         my $sock = $getter->($js) or next;
588 0           return ($js, $sock);
589             } ## end for (my $try = 0; $try ...)
590 0           return ();
591             } ## end sub _get_random_js_sock
592              
593             1;
594             __END__