File Coverage

blib/lib/Gearman/Task.pm
Criterion Covered Total %
statement 138 148 93.2
branch 49 74 66.2
condition 14 26 53.8
subroutine 27 30 90.0
pod 20 20 100.0
total 248 298 83.2


line stmt bran cond sub pod time code
1             package Gearman::Task;
2 7     7   27 use version;
  7         8  
  7         44  
3             $Gearman::Task::VERSION = version->declare("2.003_001");
4              
5 7     7   535 use strict;
  7         10  
  7         103  
6 7     7   21 use warnings;
  7         36  
  7         230  
7              
8             =head1 NAME
9              
10             Gearman::Task - a task in Gearman, from the point of view of a client
11              
12             =head1 SYNOPSIS
13              
14             my $task = Gearman::Task->new("add", "1+2", {
15             ...
16             });
17              
18             $taskset->add_task($task);
19             $client->do_task($task);
20             $client->dispatch_background($task);
21              
22              
23             =head1 DESCRIPTION
24              
25             I is a Gearman::Client's representation of a task to be
26             done.
27              
28             =head1 USAGE
29              
30             =head2 Gearman::Task->new($func, $arg, \%options)
31              
32             Creates a new I object, and returns the object.
33              
34             I<$func> is the function name to be run. (that you have a worker registered to process)
35              
36             I<$arg> is an opaque scalar or scalarref representing the argument(s)
37             to pass to the distributed function. If you want to pass multiple
38             arguments, you must encode them somehow into this one. That's up to
39             you and your worker.
40              
41             I<%options> can contain:
42              
43             =over 4
44              
45             =item * uniq
46              
47             A key which indicates to the server that other tasks with the same
48             function name and key will be merged into one. That is, the task
49             will be run just once, but all the listeners waiting on that job
50             will get the response multiplexed back to them.
51              
52             Uniq may also contain the magic value "-" (a single hyphen) which
53             means the uniq key is the contents of the args.
54              
55             =item * on_complete
56              
57             A subroutine reference to be invoked when the task is completed. The
58             subroutine will be passed a reference to the return value from the worker
59             process.
60              
61             =item * on_fail
62              
63             A subroutine reference to be invoked when the task fails (or fails for
64             the last time, if retries were specified). The reason could be passed
65             to this callback as an argument. This callback won't be called after a
66             failure if more retries are still possible.
67              
68             =item * on_retry
69              
70             A subroutine reference to be invoked when the task fails, but is about
71             to be retried.
72              
73             Is passed one argument, what retry attempt number this is. (starts with 1)
74              
75             =item * on_status
76              
77             A subroutine reference to be invoked if the task emits status updates.
78             Arguments passed to the subref are ($numerator, $denominator), where those
79             are left up to the client and job to determine.
80              
81             =item * on_warning
82              
83             A subroutine reference to be invoked if the task emits status updates.
84             Arguments passed to the subref are ($numerator, $denominator), where those
85             are left up to the client and job to determine.
86              
87             =item * retry_count
88              
89             Number of times job will be retried if there are failures. Defaults to 0.
90              
91             =item * high_priority
92              
93             B. Use C<< priority => high >> instead.
94             Boolean, whether this job should take priority over other jobs already
95             enqueued.
96              
97             =item * priority
98              
99             valid value:
100              
101             =over
102              
103             =item
104              
105             high
106              
107             =item
108              
109             normal (defaul)
110              
111             =item
112              
113             low
114              
115             =back
116              
117             =item * timeout
118              
119             Automatically fail, calling your on_fail callback, after this many
120             seconds have elapsed without an on_fail or on_complete being
121             called. Defaults to 0, which means never. Bypasses any retry_count
122             remaining.
123              
124             =item * try_timeout
125              
126             Automatically fail, calling your on_retry callback (or on_fail if out of
127             retries), after this many seconds have elapsed. Defaults to 0, which means
128             never.
129              
130             =back
131              
132             =head1 METHODS
133              
134             =cut
135              
136 7     7   21 use Carp ();
  7         8  
  7         82  
137 7     7   1822 use Gearman::Util ();
  7         11  
  7         124  
138 7     7   57 use Scalar::Util ();
  7         7  
  7         82  
139 7     7   2917 use String::CRC32 ();
  7         1881  
  7         135  
140 7     7   1633 use Storable ();
  7         6524  
  7         260  
141              
142             use fields (
143              
144             # from client:
145 7         36 'func',
146             'argref',
147              
148             # opts from client:
149             'uniq',
150             'on_complete',
151             'on_data',
152             'on_fail',
153             'on_exception',
154             'on_warning',
155             'on_retry',
156             'on_status',
157             'on_post_hooks',
158              
159             # used internally,
160             # when other hooks are done running,
161             # prior to cleanup
162             'retry_count',
163             'timeout',
164             'try_timeout',
165             'high_priority',
166             'background',
167              
168             # from server:
169             'handle',
170              
171             # maintained by this module:
172             'retries_done',
173             'is_finished',
174             'taskset',
175              
176             # jobserver socket.
177             # shared by other tasks in the same taskset,
178             # but not w/ tasks in other tasksets using
179             # the same Gearman::Client
180             'jssock',
181              
182             # hookname -> coderef
183             'hooks',
184             'priority',
185 7     7   27 );
  7         5  
186              
187             # constructor, given: ($func, $argref, $opts);
188             sub new {
189 17     17 1 5895 my $self = shift;
190 17 50       32 unless (ref $self) {
191 17         41 $self = fields::new($self);
192             }
193              
194             $self->{func} = shift
195 17 50       3749 or Carp::croak("No function given");
196              
197 17   66     34 $self->{argref} = shift || do { my $empty = ""; \$empty; };
198 17 50       33 (ref $self->{argref} eq "SCALAR")
199             || Carp::croak("Argref not a scalar reference");
200              
201 17   100     30 my $opts = shift || {};
202              
203 17         123 $self->{$_} = delete $opts->{$_} for qw/
204             background
205             high_priority
206             on_complete
207             on_data
208             on_exception
209             on_fail
210             on_retry
211             on_status
212             on_warning
213             retry_count
214             timeout
215             try_timeout
216             uniq
217             /;
218              
219 17         39 $self->_priority(delete $opts->{priority});
220              
221 17   100     46 $self->{retry_count} ||= 0;
222              
223             # bool: if success or fail has been called yet on this.
224 17         16 $self->{is_finished} = 0;
225              
226 17 100       12 if (%{$opts}) {
  17         29  
227 1         25 Carp::croak("Unknown option(s): " . join(", ", sort keys %$opts));
228             }
229              
230 16         19 $self->{retries_done} = 0;
231              
232 16         26 return $self;
233             } ## end sub new
234              
235             =head2 run_hook($name)
236              
237             run a hook callback if defined
238              
239             =cut
240              
241             sub run_hook {
242 7     7 1 11 my ($self, $name) = (shift, shift);
243 7 100 33     32 ($name && $self->{hooks}->{$name}) || return;
244              
245 1         1 eval { $self->{hooks}->{$name}->(@_) };
  1         4  
246 1 50       7 warn "Gearman::Task hook '$name' threw error: $@\n" if $@;
247             } ## end sub run_hook
248              
249             =head2 add_hook($name, $cb)
250              
251             add a hook
252              
253             =cut
254              
255             sub add_hook {
256 2     2 1 2345 my ($self, $name) = (shift, shift);
257 2 50       6 $name || return;
258              
259 2 100       7 if (@_) {
260 1         9 $self->{hooks}->{$name} = shift;
261             }
262             else {
263 1         4 delete $self->{hooks}->{$name};
264             }
265             } ## end sub add_hook
266              
267             =head2 is_finished()
268              
269              
270             B bool: whether or not task is totally done (on_failure or
271             on_complete callback has been called)
272              
273             =cut
274              
275             sub is_finished {
276 0     0 1 0 return shift->{is_finished};
277             }
278              
279             =head2 taskset()
280              
281             getter
282              
283             =head2 taskset($ts)
284              
285             setter
286              
287             B Gearman::Taskset
288              
289             =cut
290              
291             sub taskset {
292 6     6 1 1480 my $self = shift;
293              
294             # getter
295 6 100       23 return $self->{taskset} unless @_;
296              
297             # setter
298 3         3 my $ts = shift;
299 3 100 66     35 (Scalar::Util::blessed($ts) && $ts->isa("Gearman::Taskset"))
300             || Carp::croak("argument is not an instance of Gearman::Taskset");
301 2         3 $self->{taskset} = $ts;
302              
303 2 50       6 if (my $hash_num = $self->hash()) {
304 2         7 $self->{jssock} = $ts->_get_hashed_sock($hash_num);
305             }
306             else {
307 0         0 $self->{jssock} = $ts->_get_default_sock;
308             }
309              
310 2         6 return $self->{taskset};
311             } ## end sub taskset
312              
313             =head2 hash()
314              
315             B undef on non-uniq packet, or the hash value (0-32767) if uniq
316              
317             =cut
318              
319             sub hash {
320 6     6 1 6 my $self = shift;
321             my $merge_on = $self->{uniq}
322 6 100 66     36 && $self->{uniq} eq "-" ? $self->{argref} : \$self->{uniq};
323 6 50       5 if (${$merge_on}) {
  6         8  
324 6         6 return (String::CRC32::crc32(${$merge_on}) >> 16) & 0x7fff;
  6         30  
325             }
326             else {
327 0         0 return;
328             }
329             } ## end sub hash
330              
331             =head2 pack_submit_packet($client)
332              
333             B Gearman::Util::pack_req_command(mode, func, uniq, argref)
334              
335             =cut
336              
337             sub pack_submit_packet {
338 0     0 1 0 my ($self, $client) = @_;
339 0 0       0 ref($client) or Carp::croak("client parameter missed");
340 0         0 my $func = $client->func($self->{func});
341              
342             return Gearman::Util::pack_req_command(
343             $self->mode,
344             join(
345 0   0     0 "\0", $func || '', $self->{uniq} || '', ${ $self->{argref} } || ''
      0        
      0        
346             )
347             );
348             } ## end sub pack_submit_packet
349              
350             =head2 fail($reason)
351              
352             =cut
353              
354             sub fail {
355 5     5 1 1176 my ($self, $reason) = @_;
356 5 100       17 return if $self->{is_finished};
357              
358             # try to retry, if we can
359 4 100       14 if ($self->{retries_done} < $self->{retry_count}) {
360 1         1 $self->{retries_done}++;
361 1 50       5 $self->{on_retry}->($self->{retries_done}) if $self->{on_retry};
362 1         317 $self->handle(undef);
363 1         5 return $self->{taskset}->add_task($self);
364             } ## end if ($self->{retries_done...})
365              
366 3         8 $self->final_fail($reason);
367             } ## end sub fail
368              
369             =head2 final_fail($reason)
370              
371             run if !is_finished
372              
373             =over
374              
375             =item
376              
377             on_fail
378              
379             =item
380              
381             on_post_hooks
382              
383             =back
384              
385             =cut
386              
387             sub final_fail {
388 4     4 1 8 my ($self, $reason) = @_;
389              
390 4 50       8 return if $self->{is_finished};
391 4   50     11 $self->{is_finished} = $reason || 1;
392              
393 4         9 $self->run_hook('final_fail', $self);
394              
395 4 100       11 $self->{on_fail}->($reason) if $self->{on_fail};
396 4 50       1103 $self->{on_post_hooks}->() if $self->{on_post_hooks};
397 4         11 $self->wipe;
398              
399 4         8 return;
400             } ## end sub final_fail
401              
402             #FIXME obsolete?
403              
404             =head2 exception($exc_ref)
405              
406             $exc_ref may be a Storable serialized value
407              
408             run on_exception if defined
409              
410             =cut
411              
412             sub exception {
413 2     2 1 2428 my ($self, $exc_ref) = @_;
414              
415             #FIXME the only on_exception callback get dereferenced value
416             # could it be changed without damage?
417 2         9 my $exception = Storable::thaw($$exc_ref);
418 2 50       59 $self->{on_exception}->($$exception) if $self->{on_exception};
419 2         684 return;
420             } ## end sub exception
421              
422             =head2 complete($result)
423              
424             C<$result> a reference profided to on_complete cb
425              
426             =cut
427              
428             sub complete {
429 2     2 1 1059 my ($self, $result_ref) = @_;
430 2 50       14 return if $self->{is_finished};
431              
432 2         3 $self->{is_finished} = 'complete';
433              
434 2         8 $self->run_hook('complete', $self);
435              
436 2 50       11 $self->{on_complete}->($result_ref) if $self->{on_complete};
437 2 50       693 $self->{on_post_hooks}->() if $self->{on_post_hooks};
438 2         7 $self->wipe;
439             } ## end sub complete
440              
441             =head2 status()
442              
443             =cut
444              
445             sub status {
446 2     2 1 1184 my $self = shift;
447 2 50       6 return if $self->{is_finished};
448 2 50       7 return unless $self->{on_status};
449 2         4 my ($nu, $de) = @_;
450 2         7 $self->{on_status}->($nu, $de);
451             } ## end sub status
452              
453             =head2 data()
454              
455             invokes C callback if worker sends work_data notification.
456              
457             =cut
458              
459             sub data {
460 2     2 1 1467 my $self = shift;
461 2 50       9 return if $self->{is_finished};
462 2         3 my $result_ref = shift;
463              
464 2 50       10 $self->{on_data}->($result_ref) if $self->{on_data};
465             } ## end sub data
466              
467             =head2 warning($message)
468              
469             invokes C callback if worker sends work_warning notification.
470              
471             =cut
472              
473             sub warning {
474 1     1 1 1135 my $self = shift;
475 1 50       5 $self->{is_finished} && return;
476 1 50       3 $self->{on_warning} || return;
477              
478 1         2 my $msg = shift;
479              
480 1         4 $self->{on_warning}->($msg);
481             } ## end sub warning
482              
483             =head2 handle()
484              
485             getter
486              
487             =head2 handle($handle)
488              
489             setter for the fully-qualified handle of form "IP:port//shandle" where
490              
491             shandle is an opaque handle specific to the job server running on IP:port
492              
493             =cut
494              
495             sub handle {
496 2     2 1 1465 my $self = shift;
497 2 50       5 if (@_) {
498 2         5 $self->{handle} = shift;
499             }
500 2         6 return $self->{handle};
501             } ## end sub handle
502              
503             #FIXME obsolete?
504              
505             =head2 set_on_post_hooks($code)
506              
507             =cut
508              
509             sub set_on_post_hooks {
510 0     0 1 0 my ($self, $code) = @_;
511 0         0 $self->{on_post_hooks} = $code;
512             }
513              
514             =head2 wipe()
515              
516             cleanup
517              
518             =over
519              
520             =item
521              
522             on_post_hooks
523              
524             =item
525              
526             on_complete
527              
528             =item
529              
530             on_fail
531              
532             =item
533              
534             on_retry
535              
536             =item
537              
538             on_status
539              
540             =item
541              
542             hooks
543              
544             =back
545              
546             =cut
547              
548             sub wipe {
549 7     7 1 1180 my $self = shift;
550 7         20 my @h = qw/
551             on_post_hooks
552             on_complete
553             on_fail
554             on_retry
555             on_status
556             hooks
557             /;
558              
559 7         11 foreach my $f (@h) {
560 42         56 $self->{$f} = undef;
561             }
562             } ## end sub wipe
563              
564             =head2 func()
565              
566             =cut
567              
568             sub func {
569 1     1 1 228 my $self = shift;
570 1         5 return $self->{func};
571             }
572              
573             =head2 timeout()
574              
575             getter
576              
577             =head2 timeout($t)
578              
579             setter
580              
581             B timeout
582             =cut
583              
584             sub timeout {
585 1     1 1 830 my $self = shift;
586 1 50       4 if (@_) {
587 0         0 $self->{timeout} = shift;
588             }
589 1         2 return $self->{timeout};
590             } ## end sub timeout
591              
592             =head2 mode()
593              
594             B mode in depends of background and priority
595              
596             =cut
597              
598             sub mode {
599 9     9 1 1400 my $self = shift;
600 9         7 my $mode = "submit_job";
601 9 100       13 if ($self->_priority() ne "normal") {
602 6         7 $mode .= "_" . $self->_priority();
603             }
604              
605 9 100       17 if ($self->{background}) {
606 4         4 $mode .= "_bg";
607             }
608              
609 9         31 return $mode;
610             } ## end sub mode
611              
612             #=head2 _priority($priority)
613             #
614             #set/get priority
615             #
616             #valid C<$priority> value
617             #
618             #=over
619             #
620             #=item
621             #
622             #high
623             #
624             #=item
625             #
626             #normal (default)
627             #
628             #=item
629             #
630             #low
631             #
632             #=back
633             #
634             #=cut
635              
636             sub _priority {
637 35     35   700 my ($self, $priority) = @_;
638 35 100       54 if ($self->{high_priority}) {
639 2         68 warn <<'HERE';
640             Gearman::Task key high_priority is deprecated.
641             Use priority => "high" instead
642             HERE
643 2         5 $self->{priority} = "high";
644 2         3 delete($self->{high_priority});
645             } ## end if ($self->{high_priority...})
646              
647 35 100       46 if ($priority) {
648 6 50       23 $priority =~ /^(high|normal|low)$/
649             || Carp::croak "unsupported priority value";
650 6         8 $self->{priority} = $priority;
651             }
652 35   100     61 $self->{priority} ||= "normal";
653              
654 35         52 return $self->{priority};
655             } ## end sub _priority
656              
657             1;
658             __END__