File Coverage

blib/lib/Gearman/Task.pm
Criterion Covered Total %
statement 138 147 93.8
branch 49 74 66.2
condition 14 26 53.8
subroutine 27 30 90.0
pod 20 20 100.0
total 248 297 83.5


line stmt bran cond sub pod time code
1             package Gearman::Task;
2 7     7   27 use version;
  7         7  
  7         44  
3             $Gearman::Task::VERSION = version->declare("2.003_002");
4              
5 7     7   518 use strict;
  7         8  
  7         109  
6 7     7   20 use warnings;
  7         41  
  7         241  
7              
8             =head1 NAME
9              
10             Gearman::Task - a task in Gearman, from the point of view of a client
11              
12             =head1 SYNOPSIS
13              
14             my $task = Gearman::Task->new("add", "1+2", {
15             ...
16             });
17              
18             $taskset->add_task($task);
19             $client->do_task($task);
20             $client->dispatch_background($task);
21              
22              
23             =head1 DESCRIPTION
24              
25             I is a Gearman::Client's representation of a task to be
26             done.
27              
28             =head1 USAGE
29              
30             =head2 Gearman::Task->new($func, $arg, \%options)
31              
32             Creates a new I object, and returns the object.
33              
34             I<$func> is the function name to be run. (that you have a worker registered to process)
35              
36             I<$arg> is an opaque scalar or scalarref representing the argument(s)
37             to pass to the distributed function. If you want to pass multiple
38             arguments, you must encode them somehow into this one. That's up to
39             you and your worker.
40              
41             I<%options> can contain:
42              
43             =over 4
44              
45             =item * uniq
46              
47             A key which indicates to the server that other tasks with the same
48             function name and key will be merged into one. That is, the task
49             will be run just once, but all the listeners waiting on that job
50             will get the response multiplexed back to them.
51              
52             Uniq may also contain the magic value "-" (a single hyphen) which
53             means the uniq key is the contents of the args.
54              
55             =item * on_complete
56              
57             A subroutine reference to be invoked when the task is completed. The
58             subroutine will be passed a reference to the return value from the worker
59             process.
60              
61             =item * on_fail
62              
63             A subroutine reference to be invoked when the task fails (or fails for
64             the last time, if retries were specified). The reason could be passed
65             to this callback as an argument. This callback won't be called after a
66             failure if more retries are still possible.
67              
68             =item * on_retry
69              
70             A subroutine reference to be invoked when the task fails, but is about
71             to be retried.
72              
73             Is passed one argument, what retry attempt number this is. (starts with 1)
74              
75             =item * on_status
76              
77             A subroutine reference to be invoked if the task emits status updates.
78             Arguments passed to the subref are ($numerator, $denominator), where those
79             are left up to the client and job to determine.
80              
81             =item * on_warning
82              
83             A subroutine reference to be invoked if the task emits status updates.
84             Arguments passed to the subref are ($numerator, $denominator), where those
85             are left up to the client and job to determine.
86              
87             =item * retry_count
88              
89             Number of times job will be retried if there are failures. Defaults to 0.
90              
91             =item * high_priority
92              
93             B. Use C<< priority => high >> instead.
94             Boolean, whether this job should take priority over other jobs already
95             enqueued.
96              
97             =item * priority
98              
99             valid value:
100              
101             =over
102              
103             =item
104              
105             high
106              
107             =item
108              
109             normal (defaul)
110              
111             =item
112              
113             low
114              
115             =back
116              
117             =item * timeout
118              
119             Automatically fail, calling your on_fail callback, after this many
120             seconds have elapsed without an on_fail or on_complete being
121             called. Defaults to 0, which means never. Bypasses any retry_count
122             remaining.
123              
124             =item * try_timeout
125              
126             Automatically fail, calling your on_retry callback (or on_fail if out of
127             retries), after this many seconds have elapsed. Defaults to 0, which means
128             never.
129              
130             =back
131              
132             =head1 METHODS
133              
134             =cut
135              
136 7     7   24 use Carp ();
  7         9  
  7         77  
137 7     7   2038 use Gearman::Util ();
  7         12  
  7         185  
138 7     7   60 use Scalar::Util ();
  7         11  
  7         92  
139 7     7   3285 use String::CRC32 ();
  7         1894  
  7         134  
140 7     7   1631 use Storable ();
  7         6641  
  7         268  
141              
142             use fields (
143              
144             # from client:
145 7         43 'func',
146             'argref',
147              
148             # opts from client:
149             'uniq',
150             'on_complete',
151             'on_data',
152             'on_fail',
153             'on_exception',
154             'on_warning',
155             'on_retry',
156             'on_status',
157             'on_post_hooks',
158              
159             # used internally,
160             # when other hooks are done running,
161             # prior to cleanup
162             'retry_count',
163             'timeout',
164             'try_timeout',
165             'high_priority',
166             'background',
167              
168             # from server:
169             'handle',
170              
171             # maintained by this module:
172             'retries_done',
173             'is_finished',
174             'taskset',
175              
176             # jobserver socket.
177             # shared by other tasks in the same taskset,
178             # but not w/ tasks in other tasksets using
179             # the same Gearman::Client
180             'jssock',
181              
182             # hookname -> coderef
183             'hooks',
184             'priority',
185 7     7   24 );
  7         8  
186              
187             # constructor, given: ($func, $argref, $opts);
188             sub new {
189 17     17 1 7288 my $self = shift;
190 17 50       41 unless (ref $self) {
191 17         53 $self = fields::new($self);
192             }
193              
194             $self->{func} = shift
195 17 50       4124 or Carp::croak("No function given");
196              
197 17   66     39 $self->{argref} = shift || do { my $empty = ""; \$empty; };
198 17 50       39 (ref $self->{argref} eq "SCALAR")
199             || Carp::croak("Argref not a scalar reference");
200              
201 17   100     31 my $opts = shift || {};
202              
203 17         160 $self->{$_} = delete $opts->{$_} for qw/
204             background
205             high_priority
206             on_complete
207             on_data
208             on_exception
209             on_fail
210             on_retry
211             on_status
212             on_warning
213             retry_count
214             timeout
215             try_timeout
216             uniq
217             /;
218              
219 17         43 $self->_priority(delete $opts->{priority});
220              
221 17   100     58 $self->{retry_count} ||= 0;
222              
223             # bool: if success or fail has been called yet on this.
224 17         19 $self->{is_finished} = 0;
225              
226 17 100       14 if (%{$opts}) {
  17         42  
227 1         25 Carp::croak("Unknown option(s): " . join(", ", sort keys %$opts));
228             }
229              
230 16         20 $self->{retries_done} = 0;
231              
232 16         32 return $self;
233             } ## end sub new
234              
235             =head2 run_hook($name)
236              
237             run a hook callback if defined
238              
239             =cut
240              
241             sub run_hook {
242 7     7 1 11 my ($self, $name) = (shift, shift);
243 7 100 33     38 ($name && $self->{hooks}->{$name}) || return;
244              
245 1         2 eval { $self->{hooks}->{$name}->(@_) };
  1         3  
246 1 50       5 warn "Gearman::Task hook '$name' threw error: $@\n" if $@;
247             } ## end sub run_hook
248              
249             =head2 add_hook($name, $cb)
250              
251             add a hook
252              
253             =cut
254              
255             sub add_hook {
256 2     2 1 2335 my ($self, $name) = (shift, shift);
257 2 50       6 $name || return;
258              
259 2 100       4 if (@_) {
260 1         6 $self->{hooks}->{$name} = shift;
261             }
262             else {
263 1         4 delete $self->{hooks}->{$name};
264             }
265             } ## end sub add_hook
266              
267             =head2 is_finished()
268              
269              
270             B bool: whether or not task is totally done (on_failure or
271             on_complete callback has been called)
272              
273             =cut
274              
275             sub is_finished {
276 0     0 1 0 return shift->{is_finished};
277             }
278              
279             =head2 taskset()
280              
281             getter
282              
283             =head2 taskset($ts)
284              
285             setter
286              
287             B Gearman::Taskset
288              
289             =cut
290              
291             sub taskset {
292 6     6 1 1487 my $self = shift;
293              
294             # getter
295 6 100       21 return $self->{taskset} unless @_;
296              
297             # setter
298 3         5 my $ts = shift;
299 3 100 66     32 (Scalar::Util::blessed($ts) && $ts->isa("Gearman::Taskset"))
300             || Carp::croak("argument is not an instance of Gearman::Taskset");
301 2         3 $self->{taskset} = $ts;
302              
303 2 50       5 if (my $hash_num = $self->hash()) {
304 2         7 $self->{jssock} = $ts->_get_hashed_sock($hash_num);
305             }
306             else {
307 0         0 $self->{jssock} = $ts->_get_default_sock;
308             }
309              
310 2         4 return $self->{taskset};
311             } ## end sub taskset
312              
313             =head2 hash()
314              
315             B undef on non-uniq packet, or the hash value (0-32767) if uniq
316              
317             =cut
318              
319             sub hash {
320 6     6 1 5 my $self = shift;
321             my $merge_on = $self->{uniq}
322 6 100 66     35 && $self->{uniq} eq "-" ? $self->{argref} : \$self->{uniq};
323 6 50       5 if (${$merge_on}) {
  6         10  
324 6         5 return (String::CRC32::crc32(${$merge_on}) >> 16) & 0x7fff;
  6         30  
325             }
326             else {
327 0         0 return;
328             }
329             } ## end sub hash
330              
331             =head2 pack_submit_packet($client)
332              
333             B Gearman::Util::pack_req_command(mode, func, uniq, argref)
334              
335             =cut
336              
337             sub pack_submit_packet {
338 0     0 1 0 my ($self, $client) = @_;
339             # $client should be optional for sake of Gearman::Client::Async
340             # see https://github.com/p-alik/perl-Gearman/issues/10
341 0 0       0 my $func = $client ? $client->func($self->{func}) : $self->{func};
342              
343             return Gearman::Util::pack_req_command(
344             $self->mode,
345             join(
346 0   0     0 "\0", $func || '', $self->{uniq} || '', ${ $self->{argref} } || ''
      0        
      0        
347             )
348             );
349             } ## end sub pack_submit_packet
350              
351             =head2 fail($reason)
352              
353             =cut
354              
355             sub fail {
356 5     5 1 1233 my ($self, $reason) = @_;
357 5 100       17 return if $self->{is_finished};
358              
359             # try to retry, if we can
360 4 100       13 if ($self->{retries_done} < $self->{retry_count}) {
361 1         2 $self->{retries_done}++;
362 1 50       6 $self->{on_retry}->($self->{retries_done}) if $self->{on_retry};
363 1         316 $self->handle(undef);
364 1         4 return $self->{taskset}->add_task($self);
365             } ## end if ($self->{retries_done...})
366              
367 3         10 $self->final_fail($reason);
368             } ## end sub fail
369              
370             =head2 final_fail($reason)
371              
372             run if !is_finished
373              
374             =over
375              
376             =item
377              
378             on_fail
379              
380             =item
381              
382             on_post_hooks
383              
384             =back
385              
386             =cut
387              
388             sub final_fail {
389 4     4 1 7 my ($self, $reason) = @_;
390              
391 4 50       12 return if $self->{is_finished};
392 4   50     12 $self->{is_finished} = $reason || 1;
393              
394 4         11 $self->run_hook('final_fail', $self);
395              
396 4 100       16 $self->{on_fail}->($reason) if $self->{on_fail};
397 4 50       1050 $self->{on_post_hooks}->() if $self->{on_post_hooks};
398 4         12 $self->wipe;
399              
400 4         7 return;
401             } ## end sub final_fail
402              
403             #FIXME obsolete?
404              
405             =head2 exception($exc_ref)
406              
407             $exc_ref may be a Storable serialized value
408              
409             run on_exception if defined
410              
411             =cut
412              
413             sub exception {
414 2     2 1 2415 my ($self, $exc_ref) = @_;
415              
416             #FIXME the only on_exception callback get dereferenced value
417             # could it be changed without damage?
418 2         8 my $exception = Storable::thaw($$exc_ref);
419 2 50       36 $self->{on_exception}->($$exception) if $self->{on_exception};
420 2         748 return;
421             } ## end sub exception
422              
423             =head2 complete($result)
424              
425             C<$result> a reference profided to on_complete cb
426              
427             =cut
428              
429             sub complete {
430 2     2 1 1059 my ($self, $result_ref) = @_;
431 2 50       15 return if $self->{is_finished};
432              
433 2         7 $self->{is_finished} = 'complete';
434              
435 2         8 $self->run_hook('complete', $self);
436              
437 2 50       10 $self->{on_complete}->($result_ref) if $self->{on_complete};
438 2 50       699 $self->{on_post_hooks}->() if $self->{on_post_hooks};
439 2         9 $self->wipe;
440             } ## end sub complete
441              
442             =head2 status()
443              
444             =cut
445              
446             sub status {
447 2     2 1 1148 my $self = shift;
448 2 50       7 return if $self->{is_finished};
449 2 50       6 return unless $self->{on_status};
450 2         4 my ($nu, $de) = @_;
451 2         7 $self->{on_status}->($nu, $de);
452             } ## end sub status
453              
454             =head2 data()
455              
456             invokes C callback if worker sends work_data notification.
457              
458             =cut
459              
460             sub data {
461 2     2 1 1488 my $self = shift;
462 2 50       9 return if $self->{is_finished};
463 2         3 my $result_ref = shift;
464              
465 2 50       10 $self->{on_data}->($result_ref) if $self->{on_data};
466             } ## end sub data
467              
468             =head2 warning($message)
469              
470             invokes C callback if worker sends work_warning notification.
471              
472             =cut
473              
474             sub warning {
475 1     1 1 1130 my $self = shift;
476 1 50       4 $self->{is_finished} && return;
477 1 50       4 $self->{on_warning} || return;
478              
479 1         2 my $msg = shift;
480              
481 1         3 $self->{on_warning}->($msg);
482             } ## end sub warning
483              
484             =head2 handle()
485              
486             getter
487              
488             =head2 handle($handle)
489              
490             setter for the fully-qualified handle of form "IP:port//shandle" where
491              
492             shandle is an opaque handle specific to the job server running on IP:port
493              
494             =cut
495              
496             sub handle {
497 2     2 1 1132 my $self = shift;
498 2 50       7 if (@_) {
499 2         4 $self->{handle} = shift;
500             }
501 2         6 return $self->{handle};
502             } ## end sub handle
503              
504             #FIXME obsolete?
505              
506             =head2 set_on_post_hooks($code)
507              
508             =cut
509              
510             sub set_on_post_hooks {
511 0     0 1 0 my ($self, $code) = @_;
512 0         0 $self->{on_post_hooks} = $code;
513             }
514              
515             =head2 wipe()
516              
517             cleanup
518              
519             =over
520              
521             =item
522              
523             on_post_hooks
524              
525             =item
526              
527             on_complete
528              
529             =item
530              
531             on_fail
532              
533             =item
534              
535             on_retry
536              
537             =item
538              
539             on_status
540              
541             =item
542              
543             hooks
544              
545             =back
546              
547             =cut
548              
549             sub wipe {
550 7     7 1 1148 my $self = shift;
551 7         17 my @h = qw/
552             on_post_hooks
553             on_complete
554             on_fail
555             on_retry
556             on_status
557             hooks
558             /;
559              
560 7         13 foreach my $f (@h) {
561 42         55 $self->{$f} = undef;
562             }
563             } ## end sub wipe
564              
565             =head2 func()
566              
567             =cut
568              
569             sub func {
570 1     1 1 381 my $self = shift;
571 1         4 return $self->{func};
572             }
573              
574             =head2 timeout()
575              
576             getter
577              
578             =head2 timeout($t)
579              
580             setter
581              
582             B timeout
583             =cut
584              
585             sub timeout {
586 1     1 1 1661 my $self = shift;
587 1 50       4 if (@_) {
588 0         0 $self->{timeout} = shift;
589             }
590 1         2 return $self->{timeout};
591             } ## end sub timeout
592              
593             =head2 mode()
594              
595             B mode in depends of background and priority
596              
597             =cut
598              
599             sub mode {
600 9     9 1 1813 my $self = shift;
601 9         7 my $mode = "submit_job";
602 9 100       18 if ($self->_priority() ne "normal") {
603 6         8 $mode .= "_" . $self->_priority();
604             }
605              
606 9 100       17 if ($self->{background}) {
607 4         5 $mode .= "_bg";
608             }
609              
610 9         34 return $mode;
611             } ## end sub mode
612              
613             #=head2 _priority($priority)
614             #
615             #set/get priority
616             #
617             #valid C<$priority> value
618             #
619             #=over
620             #
621             #=item
622             #
623             #high
624             #
625             #=item
626             #
627             #normal (default)
628             #
629             #=item
630             #
631             #low
632             #
633             #=back
634             #
635             #=cut
636              
637             sub _priority {
638 35     35   926 my ($self, $priority) = @_;
639 35 100       66 if ($self->{high_priority}) {
640 2         108 warn <<'HERE';
641             Gearman::Task key high_priority is deprecated.
642             Use priority => "high" instead
643             HERE
644 2         6 $self->{priority} = "high";
645 2         5 delete($self->{high_priority});
646             } ## end if ($self->{high_priority...})
647              
648 35 100       67 if ($priority) {
649 6 50       28 $priority =~ /^(high|normal|low)$/
650             || Carp::croak "unsupported priority value";
651 6         8 $self->{priority} = $priority;
652             }
653 35   100     75 $self->{priority} ||= "normal";
654              
655 35         61 return $self->{priority};
656             } ## end sub _priority
657              
658             1;
659             __END__