File Coverage

blib/lib/Gearman/Taskset.pm
Criterion Covered Total %
statement 128 231 55.4
branch 39 108 36.1
condition 5 37 13.5
subroutine 23 26 88.4
pod 8 8 100.0
total 203 410 49.5


line stmt bran cond sub pod time code
1             package Gearman::Taskset;
2 7     7   2274 use version ();
  7         3393  
  7         331  
3             $Gearman::Taskset::VERSION = version->declare("2.002.001"); #TRIAL
4              
5              
6 7     7   35 use strict;
  7         10  
  7         168  
7 7     7   28 use warnings;
  7         8  
  7         379  
8              
9             =head1 NAME
10              
11             Gearman::Taskset - a taskset in Gearman, from the point of view of a client
12              
13             =head1 SYNOPSIS
14              
15             use Gearman::Client;
16             my $client = Gearman::Client->new;
17              
18             # waiting on a set of tasks in parallel
19             my $ts = $client->new_task_set;
20             $ts->add_task( "add" => "1+2", {...});
21             $ts->wait();
22              
23              
24             =head1 DESCRIPTION
25              
26             Gearman::Taskset is a Gearman::Client's representation of tasks queue t in Gearman
27              
28             =head1 METHODS
29              
30             =cut
31              
32             use fields (
33              
34             # { handle => [Task, ...] }
35 7         45 'waiting',
36              
37             # Gearman::Client
38             'client',
39              
40             # arrayref
41             'need_handle',
42              
43             # default socket (non-merged requests)
44             'default_sock',
45              
46             # default socket's ip/port
47             'default_sockaddr',
48              
49             # { hostport => socket }
50             'loaned_sock',
51              
52             # bool, if taskset has been cancelled mid-processing
53             'cancelled',
54              
55             # hookname -> coderef
56             'hooks',
57 7     7   508 );
  7         1281  
58              
59 7     7   659 use Carp ();
  7         15  
  7         105  
60 7     7   913 use Gearman::Util ();
  7         12  
  7         157  
61 7     7   3248 use Gearman::ResponseParser::Taskset;
  7         17  
  7         194  
62              
63             # i thought about weakening taskset's client, but might be too weak.
64 7     7   36 use Scalar::Util ();
  7         10  
  7         93  
65 7     7   590 use Socket ();
  7         3136  
  7         103  
66 7     7   26 use Time::HiRes ();
  7         11  
  7         14649  
67              
68             =head2 new($client)
69              
70             =cut
71              
72             sub new {
73 5     5 1 2308 my ($self, $client) = @_;
74 5 100 66     84 (Scalar::Util::blessed($client) && $client->isa("Gearman::Client"))
75             || Carp::croak
76             "provided client argument is not a Gearman::Client reference";
77              
78 4 50       16 unless (ref $self) {
79 4         27 $self = fields::new($self);
80             }
81              
82 4         283 $self->{waiting} = {};
83 4         10 $self->{need_handle} = [];
84 4         10 $self->{client} = $client;
85 4         9 $self->{loaned_sock} = {};
86 4         9 $self->{cancelled} = 0;
87 4         9 $self->{hooks} = {};
88              
89 4         12 return $self;
90             } ## end sub new
91              
92             sub DESTROY {
93 3     3   1235 my $self = shift;
94              
95             # During global cleanup this may be called out of order, and the client my not exist in the taskset.
96 3 50       21 return unless $self->{client};
97              
98 3 50       16 if ($self->{default_sock}) {
99             $self->client->_sock_cache($self->{default_sockaddr},
100 0         0 $self->{default_sock});
101             }
102              
103 3         7 while (my ($hp, $sock) = each %{ $self->{loaned_sock} }) {
  3         130  
104 0         0 $self->client->_sock_cache($hp, $sock);
105             }
106             } ## end sub DESTROY
107              
108             =head2 run_hook($name)
109              
110             run a hook callback if defined
111              
112             =cut
113              
114             sub run_hook {
115 3     3 1 6 my ($self, $name) = (shift, shift);
116 3 100 33     20 ($name && $self->{hooks}->{$name}) || return;
117              
118 1         2 eval { $self->{hooks}->{$name}->(@_) };
  1         4  
119              
120 1 50       7 warn "Gearman::Taskset hook '$name' threw error: $@\n" if $@;
121             } ## end sub run_hook
122              
123             =head2 add_hook($name, [$cb])
124              
125             add a hook
126              
127             =cut
128              
129             sub add_hook {
130 2     2 1 936 my ($self, $name, $cb) = @_;
131 2 50       8 $name || return;
132              
133 2 100       4 if ($cb) {
134 1         10 $self->{hooks}->{$name} = $cb;
135             }
136             else {
137 1         7 delete $self->{hooks}->{$name};
138             }
139             } ## end sub add_hook
140              
141             =head2 client ()
142              
143             this method is part of the "Taskset" interface, also implemented by
144             Gearman::Client::Async, where no tasksets make sense, so instead the
145             Gearman::Client::Async object itself is also its taskset. (the
146             client tracks all tasks). so don't change this, without being aware
147             of Gearman::Client::Async. similarly, don't access $ts->{client} without
148             going via this accessor.
149              
150             =cut
151              
152             sub client {
153 19     19 1 7164 return shift->{client};
154             }
155              
156             =head2 cancel()
157              
158             Close sockets, cleanup internals.
159              
160             =cut
161              
162             sub cancel {
163 1     1 1 1170 my $self = shift;
164              
165 1         3 $self->{cancelled} = 1;
166              
167 1 50       4 if ($self->{default_sock}) {
168 1         5 close($self->{default_sock});
169 1         2 $self->{default_sock} = undef;
170             }
171              
172 1         8 while (my ($hp, $sock) = each %{ $self->{loaned_sock} }) {
  2         29  
173 1         7 $sock->close;
174             }
175              
176 1         3 $self->{waiting} = {};
177 1         2 $self->{need_handle} = [];
178 1         3 $self->{client} = undef;
179             } ## end sub cancel
180              
181             #
182             # _get_loaned_sock($js)
183             #
184              
185             sub _get_loaned_sock {
186 0     0   0 my ($self, $js) = @_;
187 0         0 my $js_str = $self->client()->_js_str($js);
188              
189 0 0       0 if (my $sock = $self->{loaned_sock}{$js_str}) {
190 0 0       0 return $sock if $sock->connected;
191 0         0 delete $self->{loaned_sock}{$js_str};
192             }
193              
194 0         0 my $sock = $self->client()->_get_js_sock($js);
195              
196 0         0 return $self->{loaned_sock}{$js_str} = $sock;
197             } ## end sub _get_loaned_sock
198              
199             =head2 wait(%opts)
200              
201             event loop for reading in replies
202              
203             =cut
204              
205             sub wait {
206 0     0 1 0 my ($self, %opts) = @_;
207 0         0 my $timeout;
208 0 0       0 if (exists $opts{timeout}) {
209 0         0 $timeout = delete $opts{timeout};
210 0 0       0 $timeout += Time::HiRes::time() if defined $timeout;
211             }
212              
213 0 0       0 Carp::carp "Unknown options: "
214             . join(',', keys %opts)
215             . " passed to Taskset->wait."
216             if keys %opts;
217              
218 0         0 my %parser; # fd -> Gearman::ResponseParser object
219              
220 0         0 my ($rin, $rout, $eout) = ('', '', '');
221 0         0 my %watching;
222              
223 0         0 for my $sock ($self->{default_sock}, values %{ $self->{loaned_sock} }) {
  0         0  
224 0 0       0 next unless $sock;
225 0 0       0 if (my $fd = $sock->fileno) {
226 0         0 vec($rin, $fd, 1) = 1;
227 0         0 $watching{$fd} = $sock;
228             }
229             } ## end for my $sock ($self->{default_sock...})
230              
231 0   0     0 while (!$self->{cancelled} && keys %{ $self->{waiting} }) {
  0         0  
232 0 0       0 my $time_left = $timeout ? $timeout - Time::HiRes::time() : 0.5;
233              
234             # TODO drop the eout.
235 0         0 my $nfound = select($rout = $rin, undef, $eout = $rin, $time_left);
236 0 0 0     0 if ($timeout && $time_left <= 0) {
237 0         0 $self->cancel;
238 0         0 return;
239             }
240 0 0       0 next if !$nfound;
241              
242 0         0 foreach my $fd (keys %watching) {
243 0 0       0 next unless vec($rout, $fd, 1);
244              
245             # TODO: deal with error vector
246 0         0 my $sock = $watching{$fd};
247 0   0     0 my $parser = $parser{$fd}
248             ||= Gearman::ResponseParser::Taskset->new(
249             source => $sock,
250             taskset => $self
251             );
252 0         0 eval { $parser->parse_sock($sock); };
  0         0  
253              
254 0 0       0 if ($@) {
255              
256             # TODO this should remove the fd from the list, and reassign any tasks to other jobserver, or bail.
257             # We're not in an accessible place here, so if all job servers fail we must die to prevent hanging.
258 0         0 Carp::croak("Job server failure: $@");
259             } ## end if ($@)
260             } ## end foreach my $fd (keys %watching)
261             } ## end while (!$self->{cancelled...})
262             } ## end sub wait
263              
264             =head2 add_task(Gearman::Task)
265              
266             =head2 add_task($func, <$scalar | $scalarref>, <$uniq | $opts_hr>
267              
268             C<$opts_hr> see L
269              
270             =cut
271              
272             sub add_task {
273 3     3 1 812 my $self = shift;
274 3         8 my $task = $self->client()->_get_task_from_args(@_);
275              
276 2         9 $task->taskset($self);
277              
278 2         9 $self->run_hook('add_task', $self, $task);
279              
280 2         3 my $jssock = $task->{jssock};
281              
282 2 50       25 return $task->fail("undefined jssock") unless ($jssock);
283              
284 0         0 my $req = $task->pack_submit_packet($self->client);
285 0         0 my $len = length($req);
286 0         0 my $rv = $jssock->syswrite($req, $len);
287 0   0     0 $rv ||= 0;
288 0 0       0 Carp::croak "Wrote $rv but expected to write $len" unless $rv == $len;
289              
290 0         0 push @{ $self->{need_handle} }, $task;
  0         0  
291 0         0 while (@{ $self->{need_handle} }) {
  0         0  
292             my $rv
293             = $self->_wait_for_packet($jssock,
294 0         0 $self->{client}->{command_timeout});
295 0 0       0 if (!$rv) {
296              
297             # ditch it, it failed.
298             # this will resubmit it if it failed.
299 0         0 shift @{ $self->{need_handle} };
  0         0  
300 0 0       0 return $task->fail(
301             join(' ',
302             "no rv on waiting for packet",
303             defined($rv) ? $rv : $!)
304             );
305             } ## end if (!$rv)
306             } ## end while (@{ $self->{need_handle...}})
307              
308 0         0 return $task->handle;
309             } ## end sub add_task
310              
311             #
312             # _get_default_sock()
313             # used in Gearman::Task->taskset only
314             #
315             sub _get_default_sock {
316 10     10   5283 my $self = shift;
317 10 50       37 return $self->{default_sock} if $self->{default_sock};
318              
319             my $getter = sub {
320 0     0   0 my $js = shift;
321             return $self->{loaned_sock}{$js}
322 0   0     0 || $self->{client}->_get_js_sock($js);
323 10         25 };
324              
325 10         24 my ($js, $jss) = $self->client()->_get_random_js_sock($getter);
326 10 50       54 return unless $jss;
327              
328 0         0 my $js_str = $self->client()->_js_str($js);
329 0   0     0 $self->{loaned_sock}{$js_str} ||= $jss;
330              
331 0         0 $self->{default_sock} = $jss;
332 0         0 $self->{default_sockaddr} = $js_str;
333              
334 0         0 return $jss;
335             } ## end sub _get_default_sock
336              
337             #
338             # _get_hashed_sock($hv)
339             #
340             # only used in Gearman::Task->taskset only
341             #
342             # return a socket
343             sub _get_hashed_sock {
344 2     2   3 my $self = shift;
345 2         2 my $hv = shift;
346             my ($js_count, @job_servers)
347 2         6 = ($self->client()->{js_count}, $self->client()->job_servers());
348 2         3 my $sock;
349 2         6 for (my $off = 0; $off < $js_count; $off++) {
350 0         0 my $idx = ($hv + $off) % ($js_count);
351 0         0 $sock = $self->_get_loaned_sock($job_servers[$idx]);
352 0         0 last;
353             }
354              
355 2         7 return $sock;
356             } ## end sub _get_hashed_sock
357              
358             #
359             # _wait_for_packet($sock, $timeout)
360             #
361             # $sock socket to singularly read from
362             #
363             # returns boolean when given a sock to wait on.
364             # otherwise, return value is undefined.
365             sub _wait_for_packet {
366 1     1   625 my ($self, $sock, $timeout) = @_;
367              
368             #TODO check $err after read
369 1         2 my $err;
370 1         7 my $res = Gearman::Util::read_res_packet($sock, \$err, $timeout);
371              
372 0 0       0 return $res ? $self->process_packet($res, $sock) : 0;
373             } ## end sub _wait_for_packet
374              
375             #
376             # _is_port($sock)
377             #
378             # return hostport || ipport
379             #
380             sub _ip_port {
381 1     1   2 my ($self, $sock) = @_;
382 1 50       4 $sock || return;
383              
384 0         0 my $pn = getpeername($sock);
385 0 0       0 $pn || return;
386              
387             # look for a hostport in loaned_sock
388 0         0 my $hostport;
389 0         0 while (my ($hp, $s) = each %{ $self->{loaned_sock} }) {
  0         0  
390 0 0       0 $s || next;
391 0 0       0 if ($sock == $s) {
392 0         0 $hostport = $hp;
393 0         0 last;
394             }
395             } ## end while (my ($hp, $s) = each...)
396              
397             # hopefully it solves client->get_status mismatch
398 0 0       0 $hostport && return $hostport;
399              
400 0         0 my $fam = Socket::sockaddr_family($pn);
401 0 0       0 my ($port, $iaddr)
402             = ($fam == Socket::AF_INET6)
403             ? Socket::sockaddr_in6($pn)
404             : Socket::sockaddr_in($pn);
405              
406 0         0 my $addr = Socket::inet_ntop($fam, $iaddr);
407              
408 0         0 return join ':', $addr, $port;
409             } ## end sub _ip_port
410              
411             #
412             # _fail_jshandle($shandle)
413             #
414             # note the failure of a task given by its jobserver-specific handle
415             #
416             sub _fail_jshandle {
417 4     4   1290 my ($self, $shandle) = @_;
418 4 100       29 $shandle
419             or Carp::croak "_fail_jshandle() called without shandle parameter";
420              
421 3 50       54 my $task_list = $self->{waiting}{$shandle}
422             or Carp::croak "Uhhhh: got work_fail for unknown handle: $shandle";
423              
424 0         0 my $task = shift @$task_list;
425 0 0 0     0 ($task && ref($task) eq "Gearman::Task")
426             or Carp::croak
427             "Uhhhh: task_list is empty on work_fail for handle $shandle\n";
428              
429 0         0 $task->fail("jshandle fail");
430 0 0       0 delete $self->{waiting}{$shandle} unless @$task_list;
431             } ## end sub _fail_jshandle
432              
433             =head2 process_packet($res, $sock)
434              
435             =cut
436              
437             sub process_packet {
438 9     9 1 11 my ($self, $res, $sock) = @_;
439              
440 9 100       20 if ($res->{type} eq "job_created") {
441 2         3 my $task = shift @{ $self->{need_handle} };
  2         5  
442 2 100 66     25 ($task && ref($task) eq "Gearman::Task")
443             or Carp::croak "Um, got an unexpected job_created notification";
444 1         3 my $shandle = ${ $res->{'blobref'} };
  1         3  
445 1         4 my $ipport = $self->_ip_port($sock);
446              
447             # did sock become disconnected in the meantime?
448 1 50       3 if (!$ipport) {
449 1         3 $self->_fail_jshandle($shandle);
450 0         0 return 1;
451             }
452              
453 0         0 $task->handle("$ipport//$shandle");
454 0 0       0 return 1 if $task->{background};
455 0   0     0 push @{ $self->{waiting}{$shandle} ||= [] }, $task;
  0         0  
456 0         0 return 1;
457             } ## end if ($res->{type} eq "job_created")
458              
459 7 100       21 if ($res->{type} eq "work_fail") {
460 1         2 my $shandle = ${ $res->{'blobref'} };
  1         4  
461 1         4 $self->_fail_jshandle($shandle);
462 0         0 return 1;
463             }
464              
465 6         14 my $qr = qr/(.+?)\0/;
466              
467 6 100       13 if ($res->{type} eq "work_complete") {
468 2 100       2 (${ $res->{'blobref'} } =~ /^$qr/)
  2         44  
469             or Carp::croak "Bogus work_complete from server";
470 1         2 ${ $res->{'blobref'} } =~ s/^$qr//;
  1         19  
471 1         4 my $shandle = $1;
472              
473 1 50       18 my $task_list = $self->{waiting}{$shandle}
474             or Carp::croak
475             "Uhhhh: got work_complete for unknown handle: $shandle\n";
476              
477 0         0 my $task = shift @$task_list;
478 0 0 0     0 ($task && ref($task) eq "Gearman::Task")
479             or Carp::croak
480             "Uhhhh: task_list is empty on work_complete for handle $shandle\n";
481              
482 0         0 $task->complete($res->{'blobref'});
483 0 0       0 delete $self->{waiting}{$shandle} unless @$task_list;
484              
485 0         0 return 1;
486             } ## end if ($res->{type} eq "work_complete")
487              
488 4 100       10 if ($res->{type} eq "work_exception") {
489              
490             # ${ $res->{'blobref'} } =~ s/^(.+?)\0//
491             # or Carp::croak "Bogus work_exception from server";
492              
493 2 100       3 (${ $res->{'blobref'} } =~ /^$qr/)
  2         44  
494             or Carp::croak "Bogus work_exception from server";
495 1         2 ${ $res->{'blobref'} } =~ s/^$qr//;
  1         21  
496 1         5 my $shandle = $1;
497              
498 1 50       18 my $task_list = $self->{waiting}{$shandle}
499             or Carp::croak
500             "Uhhhh: got work_exception for unknown handle: $shandle\n";
501              
502 0         0 my $task = $task_list->[0];
503 0 0 0     0 ($task && ref($task) eq "Gearman::Task")
504             or Carp::croak
505             "Uhhhh: task_list is empty on work_exception for handle $shandle\n";
506              
507 0         0 $task->exception($res->{'blobref'});
508              
509 0         0 return 1;
510             } ## end if ($res->{type} eq "work_exception")
511              
512 2 100       6 if ($res->{type} eq "work_status") {
513 1         2 my ($shandle, $nu, $de) = split(/\0/, ${ $res->{'blobref'} });
  1         6  
514              
515 1 50       17 my $task_list = $self->{waiting}{$shandle}
516             or Carp::croak
517             "Uhhhh: got work_status for unknown handle: $shandle\n";
518              
519             # FIXME: the server is (probably) sending a work_status packet for each
520             # interested client, even if the clients are the same, so probably need
521             # to fix the server not to do that. just put this FIXME here for now,
522             # though really it's a server issue.
523 0         0 foreach my $task (@$task_list) {
524 0         0 $task->status($nu, $de);
525             }
526              
527 0         0 return 1;
528             } ## end if ($res->{type} eq "work_status")
529              
530             Carp::croak
531 1         5 "Unknown/unimplemented packet type: $res->{type} [${$res->{blobref}}]";
  1         13  
532             } ## end sub process_packet
533              
534             1;