File Coverage

blib/lib/Gearman/Taskset.pm
Criterion Covered Total %
statement 128 231 55.4
branch 39 106 36.7
condition 5 37 13.5
subroutine 23 26 88.4
pod 8 8 100.0
total 203 408 49.7


line stmt bran cond sub pod time code
1             package Gearman::Taskset;
2 7     7   1768 use version;
  7         2662  
  7         27  
3             $Gearman::Taskset::VERSION = qv("2.001_001");
4              
5 7     7   507 use strict;
  7         9  
  7         121  
6 7     7   21 use warnings;
  7         6  
  7         275  
7              
8             =head1 NAME
9              
10             Gearman::Taskset - a taskset in Gearman, from the point of view of a client
11              
12             =head1 SYNOPSIS
13              
14             use Gearman::Client;
15             my $client = Gearman::Client->new;
16              
17             # waiting on a set of tasks in parallel
18             my $ts = $client->new_task_set;
19             $ts->add_task( "add" => "1+2", {...});
20             $ts->wait();
21              
22              
23             =head1 DESCRIPTION
24              
25             Gearman::Taskset is a Gearman::Client's representation of tasks queue t in Gearman
26              
27             =head1 METHODS
28              
29             =cut
30              
31             use fields (
32              
33             # { handle => [Task, ...] }
34 7         36 'waiting',
35              
36             # Gearman::Client
37             'client',
38              
39             # arrayref
40             'need_handle',
41              
42             # default socket (non-merged requests)
43             'default_sock',
44              
45             # default socket's ip/port
46             'default_sockaddr',
47              
48             # { hostport => socket }
49             'loaned_sock',
50              
51             # bool, if taskset has been cancelled mid-processing
52             'cancelled',
53              
54             # hookname -> coderef
55             'hooks',
56 7     7   398 );
  7         1068  
57              
58 7     7   494 use Carp ();
  7         7  
  7         75  
59 7     7   758 use Gearman::Util ();
  7         8  
  7         99  
60 7     7   2434 use Gearman::ResponseParser::Taskset;
  7         12  
  7         132  
61              
62             # i thought about weakening taskset's client, but might be too weak.
63 7     7   25 use Scalar::Util ();
  7         8  
  7         69  
64 7     7   490 use Socket ();
  7         2540  
  7         75  
65 7     7   18 use Time::HiRes ();
  7         6  
  7         10906  
66              
67             =head2 new($client)
68              
69             =cut
70              
71             sub new {
72 5     5 1 1390 my ($self, $client) = @_;
73 5 100 66     69 (Scalar::Util::blessed($client) && $client->isa("Gearman::Client"))
74             || Carp::croak
75             "provided client argument is not a Gearman::Client reference";
76              
77 4 50       15 unless (ref $self) {
78 4         13 $self = fields::new($self);
79             }
80              
81 4         236 $self->{waiting} = {};
82 4         9 $self->{need_handle} = [];
83 4         9 $self->{client} = $client;
84 4         6 $self->{loaned_sock} = {};
85 4         8 $self->{cancelled} = 0;
86 4         7 $self->{hooks} = {};
87              
88 4         9 return $self;
89             } ## end sub new
90              
91             sub DESTROY {
92 3     3   641 my $self = shift;
93              
94             # During global cleanup this may be called out of order, and the client my not exist in the taskset.
95 3 50       15 return unless $self->{client};
96              
97 3 50       11 if ($self->{default_sock}) {
98             $self->{client}
99 0         0 ->_put_js_sock($self->{default_sockaddr}, $self->{default_sock});
100             }
101              
102 3         5 while (my ($hp, $sock) = each %{ $self->{loaned_sock} }) {
  3         95  
103 0         0 $self->{client}->_put_js_sock($hp, $sock);
104             }
105             } ## end sub DESTROY
106              
107             =head2 run_hook($name)
108              
109             run a hook callback if defined
110              
111             =cut
112              
113             sub run_hook {
114 3     3 1 7 my ($self, $name) = (shift, shift);
115 3 100 33     17 ($name && $self->{hooks}->{$name}) || return;
116              
117 1         3 eval { $self->{hooks}->{$name}->(@_) };
  1         4  
118              
119 1 50       8 warn "Gearman::Taskset hook '$name' threw error: $@\n" if $@;
120             } ## end sub run_hook
121              
122             =head2 add_hook($name, [$cb])
123              
124             add a hook
125              
126             =cut
127              
128             sub add_hook {
129 2     2 1 612 my ($self, $name, $cb) = @_;
130 2 50       7 $name || return;
131              
132 2 100       6 if ($cb) {
133 1         10 $self->{hooks}->{$name} = $cb;
134             }
135             else {
136 1         7 delete $self->{hooks}->{$name};
137             }
138             } ## end sub add_hook
139              
140             =head2 client ()
141              
142             this method is part of the "Taskset" interface, also implemented by
143             Gearman::Client::Async, where no tasksets make sense, so instead the
144             Gearman::Client::Async object itself is also its taskset. (the
145             client tracks all tasks). so don't change this, without being aware
146             of Gearman::Client::Async. similarly, don't access $ts->{client} without
147             going via this accessor.
148              
149             =cut
150              
151             sub client {
152 7     7 1 5113 return shift->{client};
153             }
154              
155             =head2 cancel()
156              
157             =cut
158              
159             sub cancel {
160 1     1 1 909 my $self = shift;
161              
162 1         2 $self->{cancelled} = 1;
163              
164 1 50       6 if ($self->{default_sock}) {
165 1         2 close($self->{default_sock});
166 1         1 $self->{default_sock} = undef;
167             }
168              
169 1         7 while (my ($hp, $sock) = each %{ $self->{loaned_sock} }) {
  2         21  
170 1         6 $sock->close;
171             }
172              
173 1         2 $self->{waiting} = {};
174 1         2 $self->{need_handle} = [];
175 1         2 $self->{client} = undef;
176             } ## end sub cancel
177              
178             #=head2 _get_loaned_sock($hostport)
179             #
180             #=cut
181              
182             sub _get_loaned_sock {
183 0     0   0 my ($self, $hostport) = @_;
184              
185 0 0       0 if (my $sock = $self->{loaned_sock}{$hostport}) {
186 0 0       0 return $sock if $sock->connected;
187 0         0 delete $self->{loaned_sock}{$hostport};
188             }
189              
190 0         0 my $sock = $self->{client}->_get_js_sock($hostport);
191 0         0 return $self->{loaned_sock}{$hostport} = $sock;
192             } ## end sub _get_loaned_sock
193              
194             =head2 wait(%opts)
195              
196             event loop for reading in replies
197              
198             =cut
199              
200             sub wait {
201 0     0 1 0 my ($self, %opts) = @_;
202 0         0 my $timeout;
203 0 0       0 if (exists $opts{timeout}) {
204 0         0 $timeout = delete $opts{timeout};
205 0 0       0 $timeout += Time::HiRes::time() if defined $timeout;
206             }
207              
208 0 0       0 Carp::carp "Unknown options: "
209             . join(',', keys %opts)
210             . " passed to Taskset->wait."
211             if keys %opts;
212              
213 0         0 my %parser; # fd -> Gearman::ResponseParser object
214              
215 0         0 my ($rin, $rout, $eout) = ('', '', '');
216 0         0 my %watching;
217              
218 0         0 for my $sock ($self->{default_sock}, values %{ $self->{loaned_sock} }) {
  0         0  
219 0 0       0 next unless $sock;
220 0         0 my $fd = $sock->fileno;
221 0         0 vec($rin, $fd, 1) = 1;
222 0         0 $watching{$fd} = $sock;
223             } ## end for my $sock ($self->{default_sock...})
224              
225 0         0 my $tries = 0;
226 0   0     0 while (!$self->{cancelled} && keys %{ $self->{waiting} }) {
  0         0  
227 0         0 $tries++;
228              
229 0 0       0 my $time_left = $timeout ? $timeout - Time::HiRes::time() : 0.5;
230 0         0 my $nfound = select($rout = $rin, undef, $eout = $rin, $time_left)
231             ; # TODO drop the eout.
232 0 0 0     0 if ($timeout && $time_left <= 0) {
233 0         0 $self->cancel;
234 0         0 return;
235             }
236 0 0       0 next if !$nfound;
237              
238 0         0 foreach my $fd (keys %watching) {
239 0 0       0 next unless vec($rout, $fd, 1);
240              
241             # TODO: deal with error vector
242              
243 0         0 my $sock = $watching{$fd};
244 0   0     0 my $parser = $parser{$fd}
245             ||= Gearman::ResponseParser::Taskset->new(
246             source => $sock,
247             taskset => $self
248             );
249 0         0 eval { $parser->parse_sock($sock); };
  0         0  
250              
251 0 0       0 if ($@) {
252              
253             # TODO this should remove the fd from the list, and reassign any tasks to other jobserver, or bail.
254             # We're not in an accessible place here, so if all job servers fail we must die to prevent hanging.
255 0         0 Carp::croak("Job server failure: $@");
256             } ## end if ($@)
257             } ## end foreach my $fd (keys %watching)
258              
259             } ## end while (!$self->{cancelled...})
260             } ## end sub wait
261              
262             =head2 add_task(Gearman::Task)
263              
264             =head2 add_task($func, <$scalar | $scalarref>, <$uniq | $opts_hr>
265              
266             C<$opts_hr> see L
267              
268             =cut
269              
270             sub add_task {
271 3     3 1 647 my $self = shift;
272 3         11 my $task = $self->client()->_get_task_from_args(@_);
273              
274 2         9 $task->taskset($self);
275              
276 2         5 $self->run_hook('add_task', $self, $task);
277              
278 2         4 my $jssock = $task->{jssock};
279              
280 2 50       10 return $task->fail("undefined jssock") unless ($jssock);
281              
282 0         0 my $req = $task->pack_submit_packet($self->client);
283 0         0 my $len = length($req);
284 0         0 my $rv = $jssock->syswrite($req, $len);
285 0   0     0 $rv ||= 0;
286 0 0       0 Carp::croak "Wrote $rv but expected to write $len" unless $rv == $len;
287              
288 0         0 push @{ $self->{need_handle} }, $task;
  0         0  
289 0         0 while (@{ $self->{need_handle} }) {
  0         0  
290             my $rv
291             = $self->_wait_for_packet($jssock,
292 0         0 $self->{client}->{command_timeout});
293 0 0       0 if (!$rv) {
294             # ditch it, it failed.
295             # this will resubmit it if it failed.
296 0         0 shift @{ $self->{need_handle} };
  0         0  
297 0 0       0 return $task->fail(
298             join(' ',
299             "no rv on waiting for packet",
300             defined($rv) ? $rv : $!)
301             );
302             } ## end if (!$rv)
303             } ## end while (@{ $self->{need_handle...}})
304              
305 0         0 return $task->handle;
306             } ## end sub add_task
307              
308             #
309             # _get_default_sock()
310             # used in Gearman::Task->taskset only
311             #
312             sub _get_default_sock {
313 10     10   3855 my $self = shift;
314 10 50       25 return $self->{default_sock} if $self->{default_sock};
315              
316             my $getter = sub {
317 0     0   0 my $hostport = shift;
318             return $self->{loaned_sock}{$hostport}
319 0   0     0 || $self->{client}->_get_js_sock($hostport);
320 10         28 };
321              
322 10         35 my ($jst, $jss) = $self->{client}->_get_random_js_sock($getter);
323 10 50       55 return unless $jss;
324 0   0     0 $self->{loaned_sock}{$jst} ||= $jss;
325              
326 0         0 $self->{default_sock} = $jss;
327 0         0 $self->{default_sockaddr} = $jst;
328              
329 0         0 return $jss;
330             } ## end sub _get_default_sock
331              
332             #
333             # _get_hashed_sock($hv)
334             #
335             # only used in Gearman::Task->taskset only
336             #
337             # return a socket
338             sub _get_hashed_sock {
339 2     2   2 my $self = shift;
340 2         2 my $hv = shift;
341              
342 2         3 my $cl = $self->client;
343 2         2 my $sock;
344 2         6 for (my $off = 0; $off < $cl->{js_count}; $off++) {
345 0         0 my $idx = ($hv + $off) % ($cl->{js_count});
346 0         0 $sock = $self->_get_loaned_sock($cl->{job_servers}[$idx]);
347 0         0 last;
348             }
349              
350 2         4 return $sock;
351             } ## end sub _get_hashed_sock
352              
353             #
354             # _wait_for_packet($sock, $timeout)
355             #
356             # $sock socket to singularly read from
357             #
358             # returns boolean when given a sock to wait on.
359             # otherwise, return value is undefined.
360             sub _wait_for_packet {
361 1     1   541 my ($self, $sock, $timeout) = @_;
362              
363             #TODO check $err after read
364 1         2 my $err;
365 1         7 my $res = Gearman::Util::read_res_packet($sock, \$err, $timeout);
366              
367 0 0       0 return $res ? $self->process_packet($res, $sock) : 0;
368             } ## end sub _wait_for_packet
369              
370             #
371             # _is_port($sock)
372             #
373             # return hostport || ipport
374             #
375             sub _ip_port {
376 1     1   1 my ($self, $sock) = @_;
377 1 50       4 $sock || return;
378              
379 0         0 my $pn = getpeername($sock);
380 0 0       0 $pn || return;
381              
382             # look for a hostport in loaned_sock
383 0         0 my $hostport;
384 0         0 while (my ($hp, $s) = each %{ $self->{loaned_sock} }) {
  0         0  
385 0 0       0 $s || next;
386 0 0       0 if ($sock == $s) {
387 0         0 $hostport = $hp;
388 0         0 last;
389             }
390             } ## end while (my ($hp, $s) = each...)
391              
392             # hopefully it solves client->get_status mismatch
393 0 0       0 $hostport && return $hostport;
394              
395 0         0 my $fam = Socket::sockaddr_family($pn);
396 0 0       0 my ($port, $iaddr)
397             = ($fam == Socket::AF_INET6)
398             ? Socket::sockaddr_in6($pn)
399             : Socket::sockaddr_in($pn);
400              
401 0         0 my $addr = Socket::inet_ntop($fam, $iaddr);
402              
403 0         0 return join ':', $addr, $port;
404             } ## end sub _ip_port
405              
406             #
407             # _fail_jshandle($shandle)
408             #
409             # note the failure of a task given by its jobserver-specific handle
410             #
411             sub _fail_jshandle {
412 4     4   1046 my ($self, $shandle) = @_;
413 4 100       25 $shandle
414             or Carp::croak "_fail_jshandle() called without shandle parameter";
415              
416 3 50       44 my $task_list = $self->{waiting}{$shandle}
417             or Carp::croak "Uhhhh: got work_fail for unknown handle: $shandle";
418              
419 0         0 my $task = shift @$task_list;
420 0 0 0     0 ($task && ref($task) eq "Gearman::Task")
421             or Carp::croak
422             "Uhhhh: task_list is empty on work_fail for handle $shandle\n";
423              
424 0         0 $task->fail("jshandle fail");
425 0 0       0 delete $self->{waiting}{$shandle} unless @$task_list;
426             } ## end sub _fail_jshandle
427              
428             =head2 process_packet($res, $sock)
429              
430             =cut
431              
432             sub process_packet {
433 9     9 1 16 my ($self, $res, $sock) = @_;
434              
435 9 100       23 if ($res->{type} eq "job_created") {
436 2         3 my $task = shift @{ $self->{need_handle} };
  2         5  
437 2 100 66     25 ($task && ref($task) eq "Gearman::Task")
438             or Carp::croak "Um, got an unexpected job_created notification";
439 1         1 my $shandle = ${ $res->{'blobref'} };
  1         3  
440 1         3 my $ipport = $self->_ip_port($sock);
441              
442             # did sock become disconnected in the meantime?
443 1 50       6 if (!$ipport) {
444 1         3 $self->_fail_jshandle($shandle);
445 0         0 return 1;
446             }
447              
448 0         0 $task->handle("$ipport//$shandle");
449 0 0       0 return 1 if $task->{background};
450 0   0     0 push @{ $self->{waiting}{$shandle} ||= [] }, $task;
  0         0  
451 0         0 return 1;
452             } ## end if ($res->{type} eq "job_created")
453              
454 7 100       17 if ($res->{type} eq "work_fail") {
455 1         2 my $shandle = ${ $res->{'blobref'} };
  1         2  
456 1         3 $self->_fail_jshandle($shandle);
457 0         0 return 1;
458             }
459              
460 6         17 my $qr = qr/(.+?)\0/;
461              
462 6 100       16 if ($res->{type} eq "work_complete") {
463 2 100       2 (${ $res->{'blobref'} } =~ /^$qr/)
  2         56  
464             or Carp::croak "Bogus work_complete from server";
465 1         1 ${ $res->{'blobref'} } =~ s/^$qr//;
  1         24  
466 1         4 my $shandle = $1;
467              
468 1 50       20 my $task_list = $self->{waiting}{$shandle}
469             or Carp::croak
470             "Uhhhh: got work_complete for unknown handle: $shandle\n";
471              
472 0         0 my $task = shift @$task_list;
473 0 0 0     0 ($task && ref($task) eq "Gearman::Task")
474             or Carp::croak
475             "Uhhhh: task_list is empty on work_complete for handle $shandle\n";
476              
477 0         0 $task->complete($res->{'blobref'});
478 0 0       0 delete $self->{waiting}{$shandle} unless @$task_list;
479              
480 0         0 return 1;
481             } ## end if ($res->{type} eq "work_complete")
482              
483 4 100       11 if ($res->{type} eq "work_exception") {
484              
485             # ${ $res->{'blobref'} } =~ s/^(.+?)\0//
486             # or Carp::croak "Bogus work_exception from server";
487              
488 2 100       3 (${ $res->{'blobref'} } =~ /^$qr/)
  2         45  
489             or Carp::croak "Bogus work_exception from server";
490 1         2 ${ $res->{'blobref'} } =~ s/^$qr//;
  1         22  
491 1         4 my $shandle = $1;
492              
493 1 50       19 my $task_list = $self->{waiting}{$shandle}
494             or Carp::croak
495             "Uhhhh: got work_exception for unknown handle: $shandle\n";
496              
497 0         0 my $task = $task_list->[0];
498 0 0 0     0 ($task && ref($task) eq "Gearman::Task")
499             or Carp::croak
500             "Uhhhh: task_list is empty on work_exception for handle $shandle\n";
501              
502 0         0 $task->exception($res->{'blobref'});
503              
504 0         0 return 1;
505             } ## end if ($res->{type} eq "work_exception")
506              
507 2 100       7 if ($res->{type} eq "work_status") {
508 1         3 my ($shandle, $nu, $de) = split(/\0/, ${ $res->{'blobref'} });
  1         5  
509              
510 1 50       29 my $task_list = $self->{waiting}{$shandle}
511             or Carp::croak
512             "Uhhhh: got work_status for unknown handle: $shandle\n";
513              
514             # FIXME: the server is (probably) sending a work_status packet for each
515             # interested client, even if the clients are the same, so probably need
516             # to fix the server not to do that. just put this FIXME here for now,
517             # though really it's a server issue.
518 0         0 foreach my $task (@$task_list) {
519 0         0 $task->status($nu, $de);
520             }
521              
522 0         0 return 1;
523             } ## end if ($res->{type} eq "work_status")
524              
525             Carp::croak
526 1         4 "Unknown/unimplemented packet type: $res->{type} [${$res->{blobref}}]";
  1         16  
527             } ## end sub process_packet
528              
529             1;