File Coverage

blib/lib/Protocol/Gearman/Worker.pm
Criterion Covered Total %
statement 63 63 100.0
branch 4 6 66.6
condition n/a
subroutine 20 20 100.0
pod 3 6 50.0
total 90 95 94.7


line stmt bran cond sub pod time code
1             # You may distribute under the terms of either the GNU General Public License
2             # or the Artistic License (the same terms as Perl itself)
3             #
4             # (C) Paul Evans, 2014 -- leonerd@leonerd.org.uk
5              
6             package Protocol::Gearman::Worker;
7              
8 3     3   45263 use strict;
  3         5  
  3         110  
9 3     3   16 use warnings;
  3         16  
  3         127  
10              
11             our $VERSION = '0.04';
12              
13 3     3   13 use base qw( Protocol::Gearman );
  3         7  
  3         1284  
14              
15 3     3   14 use Carp;
  3         5  
  3         2269  
16              
17             =head1 NAME
18              
19             C - implement a Gearman worker
20              
21             =head1 DESCRIPTION
22              
23             A base class that implements a complete Gearman worker. This abstract class
24             still requires the implementation methods as documented in
25             L, but otherwise provides a full set of behaviour useful to
26             Gearman workers.
27              
28             As it is based on L it is suitable for both synchronous and
29             asynchronous use. When backed by an implementation capable of performing
30             asynchronously, this object fully supports asynchronous Gearman communication.
31             When backed by a synchronous implementation, it will still yield C
32             instances but the limitations of the synchronous implementation may limit how
33             much concurrency and asynchronous behaviour can be acheived.
34              
35             A simple concrete implementation suitable for synchronous use can be found in
36             L.
37              
38             =cut
39              
40             =head1 METHODS
41              
42             =cut
43              
44             =head2 $worker->can_do( $name, %opts )
45              
46             Informs the server that the worker can perform a function of the given name.
47              
48             The following named options are recognised:
49              
50             =over 8
51              
52             =item timeout => INT
53              
54             If specified, the function is registered using the C variant,
55             which sets a timeout on the Gearman server after which the function ought to
56             have completed. The timeout is specified in seconds.
57              
58             =back
59              
60             =cut
61              
62             sub can_do
63             {
64 2     2 1 2427 my $self = shift;
65 2         7 my ( $name, %opts ) = @_;
66              
67 2         5 my $timeout = $opts{timeout};
68              
69 2 100       6 if( defined $timeout ) {
70 1         7 $self->send_packet( CAN_DO_TIMEOUT => $name, int $timeout );
71             }
72             else {
73 1         4 $self->send_packet( CAN_DO => $name );
74             }
75             }
76              
77             =head2 $worker->grab_job ==> $job
78              
79             Returns a future that will eventually yield another job assignment from the
80             server as an instance of a job object; see below.
81              
82             =cut
83              
84             sub grab_job
85             {
86 3     3 1 5726 my $self = shift;
87              
88 3         29 my $state = $self->gearman_state;
89              
90 3         9 push @{ $state->{gearman_assigns} }, my $f = $self->new_future;
  3         24  
91              
92 3         45 $self->send_packet( GRAB_JOB => );
93              
94 3         156 return $f;
95             }
96              
97             sub on_JOB_ASSIGN
98             {
99 3     3 0 28 my $self = shift;
100 3         9 my @args = @_;
101              
102 3         44 my $state = $self->gearman_state;
103              
104 3         7 my $f = shift @{ $state->{gearman_assigns} };
  3         6  
105 3         22 $f->done( Protocol::Gearman::Worker::Job->new( $self, @args ) );
106             }
107              
108             # Manage Gearman's slightly odd sleep/wakeup job request loop
109              
110             sub on_NO_JOB
111             {
112 1     1 0 14 my $self = shift;
113              
114 1         5 $self->send_packet( PRE_SLEEP => );
115             }
116              
117             sub on_NOOP
118             {
119 1     1 0 15 my $self = shift;
120              
121 1         4 my $state = $self->gearman_state;
122              
123 1 50       2 $self->send_packet( GRAB_JOB => ) if @{ $state->{gearman_assigns} };
  1         9  
124             }
125              
126             =head2 $worker->job_finished( $job )
127              
128             Invoked by the C and C methods on a job object, after the
129             server has been informed of the final status of the job. By default this
130             method does nothing, but it is provided for subclasses to override, to be
131             informed when a job is finished.
132              
133             =cut
134              
135 2     2 1 6 sub job_finished { }
136              
137             package # hide from CPAN
138             Protocol::Gearman::Worker::Job;
139              
140             =head1 JOB OBJECTS
141              
142             Objects of this type are returned by the C method. They represent
143             individual job assignments from the server, and can be used to obtain details
144             of the work to perform, and report on its result.
145              
146             =cut
147              
148             sub new
149             {
150 3     3   7 my $class = shift;
151 3         5 my ( $worker, $handle, $func, $arg ) = @_;
152              
153 3         35 return bless {
154             worker => $worker,
155             handle => $handle,
156             func => $func,
157             arg => $arg,
158             }, $class;
159             }
160              
161             =head2 $worker = $job->worker
162              
163             Returns the C object the job was received by.
164              
165             =head2 $handle = $job->handle
166              
167             Returns the job handle assigned by the server. Most implementations should not
168             need to use this directly.
169              
170             =head2 $func = $job->func
171              
172             =head2 $arg = $job->arg
173              
174             The function name and opaque argument data bytes sent by the requesting
175             client.
176              
177             =cut
178              
179 11     11   47 sub worker { $_[0]->{worker} }
180 9     9   870 sub handle { $_[0]->{handle} }
181 2     2   463 sub func { $_[0]->{func} }
182 2     2   11 sub arg { $_[0]->{arg} }
183              
184             =head2 $job->data( $data )
185              
186             Sends more data back to the client. Intended for long-running jobs with
187             incremental output.
188              
189             =cut
190              
191             sub data
192             {
193 1     1   12 my $self = shift;
194 1         2 my ( $data ) = @_;
195              
196 1         3 $self->worker->send_packet( WORK_DATA => $self->handle, $data );
197             }
198              
199             =head2 $job->warning( $warning )
200              
201             Sends a warning to the client.
202              
203             =cut
204              
205             sub warning
206             {
207 1     1   9 my $self = shift;
208 1         2 my ( $warning ) = @_;
209              
210 1         4 $self->worker->send_packet( WORK_WARNING => $self->handle, $warning );
211             }
212              
213             =head2 $job->status( $numerator, $denominator )
214              
215             Sets the current progress of the job.
216              
217             =cut
218              
219             sub status
220             {
221 2     2   506 my $self = shift;
222 2         4 my ( $num, $denom ) = @_;
223              
224 2         7 $self->worker->send_packet( WORK_STATUS => $self->handle, $num, $denom );
225             }
226              
227             =head2 $job->complete( $result )
228              
229             Informs the server that the job is now complete, and sets its result.
230              
231             =cut
232              
233             sub complete
234             {
235 2     2   985 my $self = shift;
236 2         5 my ( $result ) = @_;
237              
238 2         8 $self->worker->send_packet( WORK_COMPLETE => $self->handle, $result );
239 2         55 $self->worker->job_finished( $self );
240             }
241              
242             =head2 $job->fail( $exception )
243              
244             Informs the server that the job has failed.
245              
246             Optionally an exception value can be supplied; if given this will be sent to
247             the server using a C message. Note that not all clients will
248             receive this; it is an optional feature.
249              
250             =cut
251              
252             sub fail
253             {
254 1     1   29 my $self = shift;
255 1         3 my ( $exception ) = @_;
256              
257 1 50       6 if( defined $exception ) {
258 1         3 $self->worker->send_packet( WORK_EXCEPTION => $self->handle, $exception );
259             }
260              
261 1         10 $self->worker->send_packet( WORK_FAIL => $self->handle );
262 1         8 $self->worker->job_finished( $self );
263             }
264              
265             =head1 AUTHOR
266              
267             Paul Evans
268              
269             =cut
270              
271             0x55AA;