File Coverage

blib/lib/Gearman/Taskset.pm
Criterion Covered Total %
statement 138 253 54.5
branch 28 96 29.1
condition 9 38 23.6
subroutine 28 35 80.0
pod 8 8 100.0
total 211 430 49.0


line stmt bran cond sub pod time code
1             package Gearman::Taskset;
2 7     7   1919 use version ();
  7         2701  
  7         221  
3             $Gearman::Taskset::VERSION = version->declare("2.003_001");
4              
5              
6 7     7   26 use strict;
  7         6  
  7         113  
7 7     7   20 use warnings;
  7         11  
  7         241  
8              
9             =head1 NAME
10              
11             Gearman::Taskset - a taskset in Gearman, from the point of view of a client
12              
13             =head1 SYNOPSIS
14              
15             use Gearman::Client;
16             my $client = Gearman::Client->new;
17              
18             # waiting on a set of tasks in parallel
19             my $ts = $client->new_task_set;
20             $ts->add_task( "add" => "1+2", {...});
21             $ts->wait();
22              
23              
24             =head1 DESCRIPTION
25              
26             Gearman::Taskset is a Gearman::Client's representation of tasks queue t in Gearman
27              
28             =head1 METHODS
29              
30             =cut
31              
32             use fields (
33              
34             # { handle => [Task, ...] }
35 7         34 'waiting',
36              
37             # Gearman::Client
38             'client',
39              
40             # arrayref
41             'need_handle',
42              
43             # default socket (non-merged requests)
44             'default_sock',
45              
46             # default socket's ip/port
47             'default_sockaddr',
48              
49             # { hostport => socket }
50             'loaned_sock',
51              
52             # bool, if taskset has been cancelled mid-processing
53             'cancelled',
54              
55             # hookname -> coderef
56             'hooks',
57 7     7   435 );
  7         1099  
58              
59 7     7   480 use Carp ();
  7         7  
  7         67  
60 7     7   714 use Gearman::Util ();
  7         8  
  7         105  
61 7     7   2422 use Gearman::ResponseParser::Taskset;
  7         11  
  7         138  
62              
63             # i thought about weakening taskset's client, but might be too weak.
64 7     7   23 use Scalar::Util ();
  7         8  
  7         67  
65 7     7   476 use Socket ();
  7         2567  
  7         77  
66 7     7   520 use Storable ();
  7         2073  
  7         76  
67 7     7   22 use Time::HiRes ();
  7         5  
  7         12824  
68              
69             =head2 new($client)
70              
71             =cut
72              
73             sub new {
74 5     5 1 1652 my ($self, $client) = @_;
75 5 100 66     80 (Scalar::Util::blessed($client) && $client->isa("Gearman::Client"))
76             || Carp::croak
77             "provided client argument is not a Gearman::Client reference";
78              
79 4 50       15 unless (ref $self) {
80 4         14 $self = fields::new($self);
81             }
82              
83 4         211 $self->{waiting} = {};
84 4         7 $self->{need_handle} = [];
85 4         8 $self->{client} = $client;
86 4         6 $self->{loaned_sock} = {};
87 4         5 $self->{cancelled} = 0;
88 4         8 $self->{hooks} = {};
89              
90 4         7 return $self;
91             } ## end sub new
92              
93             sub DESTROY {
94 2     2   772 my $self = shift;
95              
96             # During global cleanup this may be called out of order, and the client my not exist in the taskset.
97 2 50       8 return unless $self->{client};
98              
99 2 50       7 if ($self->{default_sock}) {
100             $self->client->_sock_cache($self->{default_sockaddr},
101 0         0 $self->{default_sock});
102             }
103              
104 2         2 while (my ($hp, $sock) = each %{ $self->{loaned_sock} }) {
  2         45  
105 0         0 $self->client->_sock_cache($hp, $sock);
106             }
107             } ## end sub DESTROY
108              
109             =head2 run_hook($name)
110              
111             run a hook callback if defined
112              
113             =cut
114              
115             sub run_hook {
116 1     1 1 3 my ($self, $name) = (shift, shift);
117 1 50 33     6 ($name && $self->{hooks}->{$name}) || return;
118              
119 0         0 eval { $self->{hooks}->{$name}->(@_) };
  0         0  
120              
121 0 0       0 warn "Gearman::Taskset hook '$name' threw error: $@\n" if $@;
122             } ## end sub run_hook
123              
124             =head2 add_hook($name, [$cb])
125              
126             add a hook
127              
128             =cut
129              
130             sub add_hook {
131 0     0 1 0 my ($self, $name, $cb) = @_;
132 0 0       0 $name || return;
133              
134 0 0       0 if ($cb) {
135 0         0 $self->{hooks}->{$name} = $cb;
136             }
137             else {
138 0         0 delete $self->{hooks}->{$name};
139             }
140             } ## end sub add_hook
141              
142             =head2 client ()
143              
144             this method is part of the "Taskset" interface, also implemented by
145             Gearman::Client::Async, where no tasksets make sense, so instead the
146             Gearman::Client::Async object itself is also its taskset. (the
147             client tracks all tasks). so don't change this, without being aware
148             of Gearman::Client::Async. similarly, don't access $ts->{client} without
149             going via this accessor.
150              
151             =cut
152              
153             sub client {
154 18     18 1 8046 return shift->{client};
155             }
156              
157             =head2 cancel()
158              
159             Close sockets, cleanup internals.
160              
161             =cut
162              
163             sub cancel {
164 0     0 1 0 my $self = shift;
165              
166 0         0 $self->{cancelled} = 1;
167              
168 0 0       0 if ($self->{default_sock}) {
169 0         0 close($self->{default_sock});
170 0         0 $self->{default_sock} = undef;
171             }
172              
173 0         0 while (my ($hp, $sock) = each %{ $self->{loaned_sock} }) {
  0         0  
174 0         0 $sock->close;
175             }
176              
177 0         0 $self->{waiting} = {};
178 0         0 $self->{need_handle} = [];
179 0         0 $self->{client} = undef;
180             } ## end sub cancel
181              
182             #
183             # _get_loaned_sock($js)
184             #
185              
186             sub _get_loaned_sock {
187 0     0   0 my ($self, $js) = @_;
188 0         0 my $js_str = $self->client()->_js_str($js);
189              
190 0 0       0 if (my $sock = $self->{loaned_sock}{$js_str}) {
191 0 0       0 return $sock if $sock->connected;
192 0         0 delete $self->{loaned_sock}{$js_str};
193             }
194              
195 0         0 my $sock = $self->client()->_get_js_sock($js);
196              
197 0         0 return $self->{loaned_sock}{$js_str} = $sock;
198             } ## end sub _get_loaned_sock
199              
200             =head2 wait(%opts)
201              
202             event loop for reading in replies
203              
204             =cut
205              
206             sub wait {
207 0     0 1 0 my ($self, %opts) = @_;
208 0         0 my $timeout;
209 0 0       0 if (exists $opts{timeout}) {
210 0         0 $timeout = delete $opts{timeout};
211 0 0       0 $timeout += Time::HiRes::time() if defined $timeout;
212             }
213              
214 0 0       0 Carp::carp "Unknown options: "
215             . join(',', keys %opts)
216             . " passed to Taskset->wait."
217             if keys %opts;
218              
219 0         0 my %parser; # fd -> Gearman::ResponseParser object
220              
221 0         0 my ($rin, $rout, $eout) = ('', '', '');
222 0         0 my %watching;
223              
224 0         0 for my $sock ($self->{default_sock}, values %{ $self->{loaned_sock} }) {
  0         0  
225 0 0       0 next unless $sock;
226 0 0       0 if (my $fd = $sock->fileno) {
227 0         0 vec($rin, $fd, 1) = 1;
228 0         0 $watching{$fd} = $sock;
229             }
230             } ## end for my $sock ($self->{default_sock...})
231              
232 0   0     0 while (!$self->{cancelled} && keys %{ $self->{waiting} }) {
  0         0  
233 0 0       0 my $time_left = $timeout ? $timeout - Time::HiRes::time() : 0.5;
234              
235             # TODO drop the eout.
236 0         0 my $nfound = select($rout = $rin, undef, $eout = $rin, $time_left);
237 0 0 0     0 if ($timeout && $time_left <= 0) {
238 0         0 $self->cancel;
239 0         0 return;
240             }
241 0 0       0 next if !$nfound;
242              
243 0         0 foreach my $fd (keys %watching) {
244 0 0       0 next unless vec($rout, $fd, 1);
245              
246             # TODO: deal with error vector
247 0         0 my $sock = $watching{$fd};
248 0   0     0 my $parser = $parser{$fd}
249             ||= Gearman::ResponseParser::Taskset->new(
250             source => $sock,
251             taskset => $self
252             );
253 0         0 eval { $parser->parse_sock($sock); };
  0         0  
254              
255 0 0       0 if ($@) {
256              
257             # TODO this should remove the fd from the list, and reassign any tasks to other jobserver, or bail.
258             # We're not in an accessible place here, so if all job servers fail we must die to prevent hanging.
259 0         0 Carp::croak("Job server failure: $@");
260             } ## end if ($@)
261             } ## end foreach my $fd (keys %watching)
262             } ## end while (!$self->{cancelled...})
263             } ## end sub wait
264              
265             =head2 add_task(Gearman::Task)
266              
267             =head2 add_task($func, <$scalar | $scalarref>, <$uniq | $opts_hr>
268              
269             C<$opts_hr> see L
270              
271             =cut
272              
273             sub add_task {
274 1     1 1 1 my $self = shift;
275 1         3 my $task = $self->client()->_get_task_from_args(@_);
276              
277 1         3 $task->taskset($self);
278              
279 1         3 $self->run_hook('add_task', $self, $task);
280              
281 1         2 my $jssock = $task->{jssock};
282              
283 1 50       5 return $task->fail("undefined jssock") unless ($jssock);
284              
285 0         0 my $req = $task->pack_submit_packet($self->client);
286 0         0 my $len = length($req);
287 0         0 my $rv = $jssock->syswrite($req, $len);
288 0   0     0 $rv ||= 0;
289 0 0       0 Carp::croak "Wrote $rv but expected to write $len" unless $rv == $len;
290              
291 0         0 push @{ $self->{need_handle} }, $task;
  0         0  
292 0         0 while (@{ $self->{need_handle} }) {
  0         0  
293             my $rv
294             = $self->_wait_for_packet($jssock,
295 0         0 $self->{client}->{command_timeout});
296 0 0       0 if (!$rv) {
297              
298             # ditch it, it failed.
299             # this will resubmit it if it failed.
300 0         0 shift @{ $self->{need_handle} };
  0         0  
301 0 0       0 return $task->fail(
302             join(' ',
303             "no rv on waiting for packet",
304             defined($rv) ? $rv : $!)
305             );
306             } ## end if (!$rv)
307             } ## end while (@{ $self->{need_handle...}})
308              
309 0         0 return $task->handle;
310             } ## end sub add_task
311              
312             #
313             # _get_default_sock()
314             # used in Gearman::Task->taskset only
315             #
316             sub _get_default_sock {
317 6     6   3761 my $self = shift;
318 6 50       15 return $self->{default_sock} if $self->{default_sock};
319              
320             my $getter = sub {
321 0     0   0 my $js = shift;
322             return $self->{loaned_sock}{$js}
323 0   0     0 || $self->{client}->_get_js_sock($js);
324 6         17 };
325              
326 6         13 my ($js, $jss) = $self->client()->_get_random_js_sock($getter);
327 6 50       31 return unless $jss;
328              
329 0         0 my $js_str = $self->client()->_js_str($js);
330 0   0     0 $self->{loaned_sock}{$js_str} ||= $jss;
331              
332 0         0 $self->{default_sock} = $jss;
333 0         0 $self->{default_sockaddr} = $js_str;
334              
335 0         0 return $jss;
336             } ## end sub _get_default_sock
337              
338             #
339             # _get_hashed_sock($hv)
340             #
341             # only used in Gearman::Task->taskset only
342             #
343             # return a socket
344             sub _get_hashed_sock {
345 2     2   7 my $self = shift;
346 2         2 my $hv = shift;
347             my ($js_count, @job_servers)
348 2         6 = ($self->client()->{js_count}, $self->client()->job_servers());
349 2         3 my $sock;
350 2         6 for (my $off = 0; $off < $js_count; $off++) {
351 0         0 my $idx = ($hv + $off) % ($js_count);
352 0         0 $sock = $self->_get_loaned_sock($job_servers[$idx]);
353 0         0 last;
354             }
355              
356 2         5 return $sock;
357             } ## end sub _get_hashed_sock
358              
359             #
360             # _wait_for_packet($sock, $timeout)
361             #
362             # $sock socket to singularly read from
363             #
364             # returns boolean when given a sock to wait on.
365             # otherwise, return value is undefined.
366             sub _wait_for_packet {
367 0     0   0 my ($self, $sock, $timeout) = @_;
368              
369             #TODO check $err after read
370 0         0 my $err;
371 0         0 my $res = Gearman::Util::read_res_packet($sock, \$err, $timeout);
372              
373 0 0       0 return $res ? $self->process_packet($res, $sock) : 0;
374             } ## end sub _wait_for_packet
375              
376             #
377             # _is_port($sock)
378             #
379             # return hostport || ipport
380             #
381             sub _ip_port {
382 1     1   2 my ($self, $sock) = @_;
383 1 50       3 $sock || return;
384              
385 0         0 my $pn = getpeername($sock);
386 0 0       0 $pn || return;
387              
388             # look for a hostport in loaned_sock
389 0         0 my $hostport;
390 0         0 while (my ($hp, $s) = each %{ $self->{loaned_sock} }) {
  0         0  
391 0 0       0 $s || next;
392 0 0       0 if ($sock == $s) {
393 0         0 $hostport = $hp;
394 0         0 last;
395             }
396             } ## end while (my ($hp, $s) = each...)
397              
398             # hopefully it solves client->get_status mismatch
399 0 0       0 $hostport && return $hostport;
400              
401 0         0 my $fam = Socket::sockaddr_family($pn);
402 0 0       0 my ($port, $iaddr)
403             = ($fam == Socket::AF_INET6)
404             ? Socket::sockaddr_in6($pn)
405             : Socket::sockaddr_in($pn);
406              
407 0         0 my $addr = Socket::inet_ntop($fam, $iaddr);
408              
409 0         0 return join ':', $addr, $port;
410             } ## end sub _ip_port
411              
412             #
413             # _fail_jshandle($shandle, $type, [$message])
414             #
415             # note the failure of a task given by its jobserver-specific handle
416             #
417             sub _fail_jshandle {
418 2     2   5 my ($self, $shandle, $type, $msg) = @_;
419 2 50       5 $shandle
420             or Carp::croak "_fail_jshandle() called without shandle parameter";
421              
422 2 50       6 my $task_list = $self->{waiting}{$shandle}
423             or Carp::croak "Uhhhh: got $type for unknown handle: $shandle";
424              
425 2         3 my $task = shift @{$task_list};
  2         3  
426 2 50 33     14 (Scalar::Util::blessed($task) && $task->isa("Gearman::Task"))
427             || Carp::croak
428             "Uhhhh: task_list is empty on $type for handle $shandle\n";
429              
430 2   50     14 $task->fail($msg || "jshandle fail");
431              
432 2 50       9 delete $self->{waiting}{$shandle} unless @$task_list;
433             } ## end sub _fail_jshandle
434              
435             =head2 process_packet($res, $sock)
436              
437             =cut
438              
439             sub process_packet {
440 14     14 1 2167 my ($self, $res, $sock) = @_;
441              
442 14         31 my $qr = qr/(.+?)\0/;
443             my %assert = (
444             task => sub {
445 7     7   7 my ($task, $msg) = @_;
446 7 100 66     71 (Scalar::Util::blessed($task) && $task->isa("Gearman::Task"))
447             || Carp::croak $msg;
448             }
449 14         51 );
450             my %type = (
451             job_created => sub {
452 2     2   4 my ($blob) = shift;
453 2         2 my $task = shift @{ $self->{need_handle} };
  2         4  
454             $assert{task}
455 2         4 ->($task, "Um, got an unexpected job_created notification");
456 1         1 my $shandle = $blob;
457 1         4 my $ipport = $self->_ip_port($sock);
458              
459             # did sock become disconnected in the meantime?
460 1 50       2 if (!$ipport) {
461 1         3 $self->_fail_jshandle($shandle, "job_created");
462 1         16 return 1;
463             }
464              
465 0         0 $task->handle("$ipport//$shandle");
466 0 0       0 return 1 if $task->{background};
467 0   0     0 push @{ $self->{waiting}{$shandle} ||= [] }, $task;
  0         0  
468 0         0 return 1;
469             },
470             work_complete => sub {
471 3     3   4 my ($blob) = shift;
472 3 100       35 ($blob =~ /^$qr/)
473             or Carp::croak "Bogus work_complete from server";
474 2         16 $blob =~ s/^$qr//;
475 2         4 my $shandle = $1;
476              
477 2         4 my $task_list = $self->{waiting}{$shandle};
478 2         3 my $task = shift @$task_list;
479 2         7 $assert{task}->(
480             $task,
481             "Uhhhh: task_list is empty on work_complete for handle $shandle"
482             );
483              
484 1         5 $task->complete(\$blob);
485 1 50       9 delete $self->{waiting}{$shandle} unless @$task_list;
486              
487 1         18 return 1;
488             },
489             work_data => sub {
490 3     3   5 my ($blob) = shift;
491 3 100       25 $blob =~ s/^(.+?)\0//
492             or Carp::croak "Bogus work_data from server";
493 2         4 my $shandle = $1;
494              
495 2         3 my $task_list = $self->{waiting}{$shandle};
496 2         3 my $task = $task_list->[0];
497 2         6 $assert{task}->(
498             $task,
499             "Uhhhh: task_list is empty on work_data for handle $shandle"
500             );
501              
502 1         4 $task->data(\$blob);
503              
504 1         382 return 1;
505             },
506             work_warning => sub {
507 0     0   0 my ($blob) = shift;
508 0 0       0 $blob =~ s/^(.+?)\0//
509             or Carp::croak "Bogus work_warning from server";
510 0         0 my $shandle = $1;
511              
512 0         0 my $task_list = $self->{waiting}{$shandle};
513 0         0 my $task = $task_list->[0];
514 0         0 $assert{task}->(
515             $task,
516             "Uhhhh: task_list is empty on work_warning for handle $shandle"
517             );
518              
519 0         0 $task->warning(\$blob);
520              
521 0         0 return 1;
522             },
523             work_exception => sub {
524 2     2   3 my ($blob) = shift;
525 2 100       31 ($blob =~ /^$qr/)
526             or Carp::croak "Bogus work_exception from server";
527 1         13 $blob =~ s/^$qr//;
528 1         3 my $shandle = $1;
529              
530 1         2 my $task_list = $self->{waiting}{$shandle};
531 1         1 my $task = $task_list->[0];
532 1         3 $assert{task}->(
533             $task,
534             "Uhhhh: task_list is empty on work_exception for handle $shandle"
535             );
536              
537             #FIXME we have to freeze $blob because Task->exception expected it in this form.
538             # The only reason I see to do it so, is Worker->work implementation. With Gearman::Server it uses nfreeze for exception value.
539 1         10 $task->exception(\Storable::freeze(\$blob));
540              
541 1         19 return 1;
542             },
543             work_fail => sub {
544 1     1   2 my ($blob) = shift;
545 1         2 my ($shandle, $msg) = split(/\0/, $blob);
546 1   33     3 $shandle ||=$blob;
547 1         3 $self->_fail_jshandle($shandle, "work_fail", $msg);
548 1         20 return 1;
549             },
550             work_status => sub {
551 2     2   2 my ($blob) = shift;
552 2         5 my ($shandle, $nu, $de) = split(/\0/, $blob);
553 2         4 my $task_list = $self->{waiting}{$shandle};
554 2 100 50     6 ref($task_list) eq "ARRAY" && scalar(@{$task_list})
  2         19  
555             or Carp::croak
556             "Uhhhh: got work_status for unknown handle: $shandle";
557              
558             # FIXME: the server is (probably) sending a work_status packet for each
559             # interested client, even if the clients are the same, so probably need
560             # to fix the server not to do that. just put this FIXME here for now,
561             # though really it's a server issue.
562 1         2 foreach my $task (@$task_list) {
563 1         5 $task->status($nu, $de);
564             }
565              
566 1         726 return 1;
567             },
568 14         137 );
569              
570             defined($type{ $res->{type} })
571 14 100       39 || Carp::croak
572 1         14 "Unimplemented packet type: $res->{type} [${$res->{blobref}}]";
573              
574 13         9 return $type{ $res->{type} }->(${ $res->{blobref} });
  13         30  
575             } ## end sub process_packet
576              
577             1;