File Coverage

blib/lib/Gearman/Task.pm
Criterion Covered Total %
statement 120 124 96.7
branch 45 62 72.5
condition 16 27 59.2
subroutine 25 27 92.5
pod 18 18 100.0
total 224 258 86.8


line stmt bran cond sub pod time code
1             package Gearman::Task;
2 7     7   25 use version;
  7         8  
  7         38  
3             $Gearman::Task::VERSION = qv("2.001_001");
4              
5 7     7   478 use strict;
  7         9  
  7         104  
6 7     7   18 use warnings;
  7         7  
  7         222  
7              
8             =head1 NAME
9              
10             Gearman::Task - a task in Gearman, from the point of view of a client
11              
12             =head1 SYNOPSIS
13              
14             my $task = Gearman::Task->new("add", "1+2", {
15             ...
16             });
17              
18             $taskset->add_task($task);
19             $client->do_task($task);
20             $client->dispatch_background($task);
21              
22              
23             =head1 DESCRIPTION
24              
25             I is a Gearman::Client's representation of a task to be
26             done.
27              
28             =head1 USAGE
29              
30             =head2 Gearman::Task->new($func, $arg, \%options)
31              
32             Creates a new I object, and returns the object.
33              
34             I<$func> is the function name to be run. (that you have a worker registered to process)
35              
36             I<$arg> is an opaque scalar or scalarref representing the argument(s)
37             to pass to the distributed function. If you want to pass multiple
38             arguments, you must encode them somehow into this one. That's up to
39             you and your worker.
40              
41             I<%options> can contain:
42              
43             =over 4
44              
45             =item * uniq
46              
47             A key which indicates to the server that other tasks with the same
48             function name and key will be merged into one. That is, the task
49             will be run just once, but all the listeners waiting on that job
50             will get the response multiplexed back to them.
51              
52             Uniq may also contain the magic value "-" (a single hyphen) which
53             means the uniq key is the contents of the args.
54              
55             =item * on_complete
56              
57             A subroutine reference to be invoked when the task is completed. The
58             subroutine will be passed a reference to the return value from the worker
59             process.
60              
61             =item * on_fail
62              
63             A subroutine reference to be invoked when the task fails (or fails for
64             the last time, if retries were specified). The reason could be passed
65             to this callback as an argument. This callback won't be called after a
66             failure if more retries are still possible.
67              
68             =item * on_retry
69              
70             A subroutine reference to be invoked when the task fails, but is about
71             to be retried.
72              
73             Is passed one argument, what retry attempt number this is. (starts with 1)
74              
75             =item * on_status
76              
77             A subroutine reference to be invoked if the task emits status updates.
78             Arguments passed to the subref are ($numerator, $denominator), where those
79             are left up to the client and job to determine.
80              
81             =item * retry_count
82              
83             Number of times job will be retried if there are failures. Defaults to 0.
84              
85             =item * high_priority
86              
87             Boolean, whether this job should take priority over other jobs already
88             enqueued.
89              
90             =item * timeout
91              
92             Automatically fail, calling your on_fail callback, after this many
93             seconds have elapsed without an on_fail or on_complete being
94             called. Defaults to 0, which means never. Bypasses any retry_count
95             remaining.
96              
97             =item * try_timeout
98              
99             Automatically fail, calling your on_retry callback (or on_fail if out of
100             retries), after this many seconds have elapsed. Defaults to 0, which means
101             never.
102              
103             =back
104              
105             =head1 METHODS
106              
107             =cut
108              
109 7     7   19 use Carp ();
  7         49  
  7         74  
110 7     7   1543 use Gearman::Util ();
  7         8  
  7         101  
111 7     7   30 use Scalar::Util ();
  7         8  
  7         70  
112 7     7   2820 use String::CRC32 ();
  7         1967  
  7         154  
113 7     7   2085 use Storable ();
  7         8673  
  7         246  
114              
115             use fields (
116              
117             # from client:
118 7         35 'func',
119             'argref',
120              
121             # opts from client:
122             'uniq',
123             'on_complete',
124             'on_fail',
125             'on_exception',
126             'on_retry',
127             'on_status',
128             'on_post_hooks',
129              
130             # used internally,
131             # when other hooks are done running,
132             # prior to cleanup
133             'retry_count',
134             'timeout',
135             'try_timeout',
136             'high_priority',
137             'background',
138              
139             # from server:
140             'handle',
141              
142             # maintained by this module:
143             'retries_done',
144             'is_finished',
145             'taskset',
146              
147             # jobserver socket.
148             # shared by other tasks in the same taskset,
149             # but not w/ tasks in other tasksets using
150             # the same Gearman::Client
151             'jssock',
152              
153             # hookname -> coderef
154             'hooks',
155 7     7   27 );
  7         7  
156              
157             # constructor, given: ($func, $argref, $opts);
158             sub new {
159 4     4 1 2627 my $self = shift;
160 4 50       17 unless (ref $self) {
161 4         11 $self = fields::new($self);
162             }
163              
164             $self->{func} = shift
165 4 50       2892 or Carp::croak("No function given");
166              
167 4   33     13 $self->{argref} = shift || do { my $empty = ""; \$empty; };
168 4 50       14 (ref $self->{argref} eq "SCALAR")
169             || Carp::croak("Argref not a scalar reference");
170              
171 4   100     14 my $opts = shift || {};
172              
173 4         32 $self->{$_} = delete $opts->{$_} for qw/
174             background
175             high_priority
176             on_complete
177             on_exception
178             on_fail
179             on_retry
180             on_status
181             retry_count
182             timeout
183             try_timeout
184             uniq
185             /;
186              
187 4   100     16 $self->{retry_count} ||= 0;
188              
189             # bool: if success or fail has been called yet on this.
190 4         4 $self->{is_finished} = 0;
191              
192 4 100       2 if (%{$opts}) {
  4         11  
193 1         22 Carp::croak("Unknown option(s): " . join(", ", sort keys %$opts));
194             }
195              
196 3         4 $self->{retries_done} = 0;
197              
198 3         8 return $self;
199             } ## end sub new
200              
201             =head2 run_hook($name)
202              
203             run a hook callback if defined
204              
205             =cut
206              
207             sub run_hook {
208 5     5 1 10 my ($self, $name) = (shift, shift);
209 5 100 33     23 ($name && $self->{hooks}->{$name}) || return;
210              
211 1         1 eval { $self->{hooks}->{$name}->(@_) };
  1         3  
212 1 50       4 warn "Gearman::Task hook '$name' threw error: $@\n" if $@;
213             } ## end sub run_hook
214              
215             =head2 add_hook($name, $cb)
216              
217             add a hook
218              
219             =cut
220              
221             sub add_hook {
222 2     2 1 2421 my ($self, $name) = (shift, shift);
223 2 50       6 $name || return;
224              
225 2 100       5 if (@_) {
226 1         5 $self->{hooks}->{$name} = shift;
227             }
228             else {
229 1         4 delete $self->{hooks}->{$name};
230             }
231             } ## end sub add_hook
232              
233             =head2 is_finished()
234              
235              
236             B bool: whether or not task is totally done (on_failure or
237             on_complete callback has been called)
238              
239             =cut
240              
241             sub is_finished {
242 0     0 1 0 return shift->{is_finished};
243             }
244              
245             =head2 taskset()
246              
247             getter
248              
249             =head2 taskset($ts)
250              
251             setter
252              
253             B Gearman::Taskset
254              
255             =cut
256              
257             sub taskset {
258 7     7 1 1498 my $self = shift;
259              
260             # getter
261 7 100       25 return $self->{taskset} unless @_;
262              
263             # setter
264 4         5 my $ts = shift;
265 4 100 66     42 (Scalar::Util::blessed($ts) && $ts->isa("Gearman::Taskset"))
266             || Carp::croak("argument is not an instance of Gearman::Taskset");
267 3         9 $self->{taskset} = $ts;
268              
269 3 100       10 if (my $hash_num = $self->hash()) {
270 2         7 $self->{jssock} = $ts->_get_hashed_sock($hash_num);
271             }
272             else {
273 1         4 $self->{jssock} = $ts->_get_default_sock;
274             }
275              
276 3         8 return $self->{taskset};
277             } ## end sub taskset
278              
279             =head2 hash()
280              
281             B undef on non-uniq packet, or the hash value (0-32767) if uniq
282              
283             =cut
284              
285             sub hash {
286 7     7 1 8 my $self = shift;
287             my $merge_on = $self->{uniq}
288 7 100 100     41 && $self->{uniq} eq "-" ? $self->{argref} : \$self->{uniq};
289 7 100       5 if (${$merge_on}) {
  7         14  
290 6         5 return (String::CRC32::crc32(${$merge_on}) >> 16) & 0x7fff;
  6         28  
291             }
292             else {
293 1         4 return;
294             }
295             } ## end sub hash
296              
297             =head2 pack_submit_packet($client)
298              
299             B Gearman::Util::pack_req_command(mode, func, uniq, argref)
300              
301             =cut
302              
303             sub pack_submit_packet {
304 1     1 1 296 my ($self, $client) = @_;
305 1         2 my $func = $self->{func};
306              
307 1 50 33     6 if ($client && $client->prefix()) {
308 1         2 $func = join "\t", $client->prefix(), $self->{func};
309             }
310              
311             return Gearman::Util::pack_req_command(
312             $self->mode,
313             join(
314 1   50     2 "\0", $func || '', $self->{uniq} || '', ${ $self->{argref} } || ''
      50        
      50        
315             )
316             );
317             } ## end sub pack_submit_packet
318              
319             =head2 fail($reason)
320              
321             =cut
322              
323             sub fail {
324 4     4 1 1214 my ($self, $reason) = @_;
325 4 100       13 return if $self->{is_finished};
326              
327             # try to retry, if we can
328 3 100       10 if ($self->{retries_done} < $self->{retry_count}) {
329 1         2 $self->{retries_done}++;
330 1 50       4 $self->{on_retry}->($self->{retries_done}) if $self->{on_retry};
331 1         327 $self->handle(undef);
332 1         4 return $self->{taskset}->add_task($self);
333             } ## end if ($self->{retries_done...})
334              
335 2         7 $self->final_fail($reason);
336             } ## end sub fail
337              
338             =head2 final_fail($reason)
339              
340             run if !is_finished
341              
342             =over
343              
344             =item
345              
346             on_fail
347              
348             =item
349              
350             on_post_hooks
351              
352             =back
353              
354             =cut
355              
356             sub final_fail {
357 3     3 1 5 my ($self, $reason) = @_;
358              
359 3 50       9 return if $self->{is_finished};
360 3   50     9 $self->{is_finished} = $reason || 1;
361              
362 3         7 $self->run_hook('final_fail', $self);
363              
364 3 100       8 $self->{on_fail}->($reason) if $self->{on_fail};
365 3 50       375 $self->{on_post_hooks}->() if $self->{on_post_hooks};
366 3         8 $self->wipe;
367              
368 3         16 return;
369             } ## end sub final_fail
370              
371             #FIXME obsolete?
372              
373             =head2 exception($exc_ref)
374              
375             $exc_ref may be a Storable serialized value
376              
377             run on_exception if defined
378              
379             =cut
380              
381             sub exception {
382 1     1 1 2529 my ($self, $exc_ref) = @_;
383 1         4 my $exception = Storable::thaw($$exc_ref);
384 1 50       19 $self->{on_exception}->($$exception) if $self->{on_exception};
385 1         329 return;
386             } ## end sub exception
387              
388             =head2 complete($result)
389              
390             C<$result> a reference profided to on_complete cb
391              
392             =cut
393              
394             sub complete {
395 1     1 1 1138 my ($self, $result_ref) = @_;
396 1 50       4 return if $self->{is_finished};
397              
398 1         1 $self->{is_finished} = 'complete';
399              
400 1         3 $self->run_hook('complete', $self);
401              
402 1 50       5 $self->{on_complete}->($result_ref) if $self->{on_complete};
403 1 50       413 $self->{on_post_hooks}->() if $self->{on_post_hooks};
404 1         4 $self->wipe;
405             } ## end sub complete
406              
407             =head2 status()
408              
409             =cut
410              
411             sub status {
412 1     1 1 1266 my $self = shift;
413 1 50       4 return if $self->{is_finished};
414 1 50       4 return unless $self->{on_status};
415 1         1 my ($nu, $de) = @_;
416 1         4 $self->{on_status}->($nu, $de);
417             } ## end sub status
418              
419             =head2 handle()
420              
421             getter
422              
423             =head2 handle($handle)
424              
425             setter for the fully-qualified handle of form "IP:port//shandle" where
426              
427             shandle is an opaque handle specific to the job server running on IP:port
428              
429             =cut
430              
431             sub handle {
432 2     2 1 1544 my $self = shift;
433 2 50       6 if (@_) {
434 2         4 $self->{handle} = shift;
435             }
436 2         7 return $self->{handle};
437             } ## end sub handle
438              
439             #FIXME obsolete?
440              
441             =head2 set_on_post_hooks($code)
442              
443             =cut
444              
445             sub set_on_post_hooks {
446 0     0 1 0 my ($self, $code) = @_;
447 0         0 $self->{on_post_hooks} = $code;
448             }
449              
450             =head2 wipe()
451              
452             cleanup
453              
454             =over
455              
456             =item
457              
458             on_post_hooks
459              
460             =item
461              
462             on_complete
463              
464             =item
465              
466             on_fail
467              
468             =item
469              
470             on_retry
471              
472             =item
473              
474             on_status
475              
476             =item
477              
478             hooks
479              
480             =back
481              
482             =cut
483              
484             sub wipe {
485 5     5 1 1286 my $self = shift;
486 5         13 my @h = qw/
487             on_post_hooks
488             on_complete
489             on_fail
490             on_retry
491             on_status
492             hooks
493             /;
494              
495 5         10 foreach my $f (@h) {
496 30         41 $self->{$f} = undef;
497             }
498             } ## end sub wipe
499              
500             =head2 func()
501              
502             =cut
503              
504             sub func {
505 1     1 1 355 my $self = shift;
506 1         4 return $self->{func};
507             }
508              
509             =head2 timeout()
510              
511             getter
512              
513             =head2 timeout($t)
514              
515             setter
516              
517             B timeout
518             =cut
519              
520             sub timeout {
521 1     1 1 401 my $self = shift;
522 1 50       3 if (@_) {
523 0         0 $self->{timeout} = shift;
524             }
525 1         3 return $self->{timeout};
526             } ## end sub timeout
527              
528             =head2 mode()
529              
530             B mode in depends of background and hight_priority
531              
532             =cut
533              
534             sub mode {
535 5     5 1 4743 my $self = shift;
536             return $self->{background}
537             ? (
538             $self->{high_priority}
539             ? "submit_job_high_bg"
540             : "submit_job_bg"
541             )
542             : (
543             $self->{high_priority}
544 5 100       26 ? "submit_job_high"
    100          
    100          
545             : "submit_job"
546             );
547             } ## end sub mode
548              
549             1;
550             __END__