File Coverage

blib/lib/Protocol/Gearman.pm
Criterion Covered Total %
statement 88 88 100.0
branch 22 32 68.7
condition 5 15 33.3
subroutine 20 20 100.0
pod 14 15 93.3
total 149 170 87.6


line stmt bran cond sub pod time code
1             # You may distribute under the terms of either the GNU General Public License
2             # or the Artistic License (the same terms as Perl itself)
3             #
4             # (C) Paul Evans, 2014 -- leonerd@leonerd.org.uk
5              
6             package Protocol::Gearman;
7              
8 9     9   156534 use strict;
  9         20  
  9         341  
9 9     9   52 use warnings;
  9         16  
  9         372  
10              
11             our $VERSION = '0.04';
12              
13 9     9   61 use Carp;
  9         18  
  9         839  
14 9     9   48 use Scalar::Util qw( reftype );
  9         15  
  9         2529  
15              
16             =head1 NAME
17              
18             C - abstract base class for both client and worker
19              
20             =head1 DESCRIPTION
21              
22             This base class is used by both L and
23             L. It shouldn't be used directly by end-user
24             implementations. It is documented here largely to explain what methods an end
25             implementation needs to provide in order to create a Gearman client or worker.
26              
27             For implementing a Gearman client or worker, see the modules
28              
29             =over 2
30              
31             =item *
32              
33             L
34              
35             =item *
36              
37             L
38              
39             =back
40              
41             For a simple synchronous Gearman client or worker module for use during
42             testing or similar, see
43              
44             =over 2
45              
46             =item *
47              
48             L
49              
50             =item *
51              
52             L
53              
54             =back
55              
56             =cut
57              
58             =head1 REQUIRED METHODS
59              
60             The implementation should provide the following methods:
61              
62             =cut
63              
64             =head2 $f = $gearman->new_future
65              
66             Return a new L subclass instance, for request methods to use. This
67             instance should support awaiting appropriately.
68              
69             =cut
70              
71             sub new_future
72             {
73 1     1 1 276 my $self = shift;
74 1 50 33     12 reftype $self eq "HASH" and ref( my $code = $self->{gearman_method_new_future} ) eq "CODE" or
75             croak "Can't locate object method \"new_future\" via package ".ref($self).", or it is not a prototypical object";
76              
77 1         5 $code->( $self, @_ );
78             }
79              
80             =head2 $gearman->send( $bytes )
81              
82             Send the given bytes to the server.
83              
84             =cut
85              
86             sub send
87             {
88 1     1 1 3 my $self = shift;
89 1 50 33     15 reftype $self eq "HASH" and ref( my $code = $self->{gearman_method_send} ) eq "CODE" or
90             croak "Can't locate object method \"send\" via package ".ref($self).", or it is not a prototypical object";
91              
92 1         4 $code->( $self, @_ );
93             }
94              
95             =head2 $h = $gearman->gearman_state
96              
97             Return a HASH reference for the Gearman-related code to store its state on.
98             If not implemented, a default method will be provided which uses C<$gearman>
99             itself, for the common case of HASH-based objects. All the Gearman-related
100             state will be stored in keys whose names are prefixed by C, to avoid
101             clashes with other object state.
102              
103             =cut
104              
105 25     25 1 50 sub gearman_state { shift }
106              
107             # These are used internally but not exported
108             use constant {
109 9         13210 MAGIC_REQUEST => "\0REQ",
110             MAGIC_RESPONSE => "\0RES",
111 9     9   57 };
  9         24  
112              
113             my %CONSTANTS = (
114             TYPE_CAN_DO => 1,
115             TYPE_CANT_DO => 2,
116             TYPE_RESET_ABILITIES => 3,
117             TYPE_PRE_SLEEP => 4,
118             TYPE_NOOP => 6,
119             TYPE_SUBMIT_JOB => 7,
120             TYPE_JOB_CREATED => 8,
121             TYPE_GRAB_JOB => 9,
122             TYPE_NO_JOB => 10,
123             TYPE_JOB_ASSIGN => 11,
124             TYPE_WORK_STATUS => 12,
125             TYPE_WORK_COMPLETE => 13,
126             TYPE_WORK_FAIL => 14,
127             TYPE_GET_STATUS => 15,
128             TYPE_ECHO_REQ => 16,
129             TYPE_ECHO_RES => 17,
130             TYPE_SUBMIT_JOB_BG => 18,
131             TYPE_ERROR => 19,
132             TYPE_STATUS_RES => 20,
133             TYPE_SUBMIT_JOB_HIGH => 21,
134             TYPE_SET_CLIENT_ID => 22,
135             TYPE_CAN_DO_TIMEOUT => 23,
136             TYPE_ALL_YOURS => 24,
137             TYPE_WORK_EXCEPTION => 25,
138             TYPE_OPTION_REQ => 26,
139             TYPE_OPTION_RES => 27,
140             TYPE_WORK_DATA => 28,
141             TYPE_WORK_WARNING => 29,
142             TYPE_GRAB_JOB_UNIQ => 30,
143             TYPE_JOB_ASSIGN_UNIQ => 31,
144             TYPE_SUBMIT_JOB_HIGH_BG => 32,
145             TYPE_SUBMIT_JOB_LOW => 33,
146             TYPE_SUBMIT_JOB_LOW_BG => 34,
147             );
148              
149             require constant;
150             constant->import( $_, $CONSTANTS{$_} ) for keys %CONSTANTS;
151              
152             =head1 INTERNAL METHODS
153              
154             These methods are provided for the client and worker subclasses to use; it is
155             unlikely these will be of interest to other users but they are documented here
156             for completeness.
157              
158             =cut
159              
160             # All Gearman packet bodies follow a standard format, of a fixed number of
161             # string arguments (given by the packet type), separated by a single NUL byte.
162             # All but the final argument may not contain embedded NULs.
163              
164             my %TYPENAMES = map { m/^TYPE_(.*)$/ ? ( $CONSTANTS{$_} => $1 ) : () } keys %CONSTANTS;
165              
166             my %ARGS_FOR_TYPE = (
167             # In order from doc/PROTOCOL
168             # common
169             ECHO_REQ => 1,
170             ECHO_RES => 1,
171             ERROR => 2,
172             # client->server
173             SUBMIT_JOB => 3,
174             SUBMIT_JOB_BG => 3,
175             SUBMIT_JOB_HIGH => 3,
176             SUBMIT_JOB_HIGH_BG => 3,
177             SUBMIT_JOB_LOW => 3,
178             SUBMIT_JOB_LOW_BG => 3,
179             GET_STATUS => 1,
180             OPTION_REQ => 1,
181             # server->client
182             JOB_CREATED => 1,
183             STATUS_RES => 5,
184             OPTION_RES => 1,
185             # worker->server
186             CAN_DO => 1,
187             CAN_DO_TIMEOUT => 2,
188             CANT_DO => 1,
189             RESET_ABILITIES => 0,
190             PRE_SLEEP => 0,
191             GRAB_JOB => 0,
192             GRAB_JOB_UNIQ => 0,
193             WORK_DATA => 2,
194             WORK_WARNING => 2,
195             WORK_STATUS => 3,
196             WORK_COMPLETE => 2,
197             WORK_FAIL => 1,
198             WORK_EXCEPTION => 2,
199             SET_CLIENT_ID => 1,
200             ALL_YOURS => 0,
201             # server->worker
202             NOOP => 0,
203             NO_JOB => 0,
204             JOB_ASSIGN => 3,
205             JOB_ASSIGN_UNIQ => 4,
206             );
207              
208             =head2 ( $type, $body ) = $gearman->pack_packet( $name, @args )
209              
210             Given a name of a packet type (specified as a string as the name of one of the
211             C constants, without the leading C prefix; case insignificant)
212             returns the type value and the arguments for the packet packed into a body
213             string. This is intended for passing directly into C or
214             C:
215              
216             send_packet $fh, pack_packet( SUBMIT_JOB => $func, $id, $arg );
217              
218             =cut
219              
220             sub pack_packet
221             {
222 10     10 1 2196 shift;
223 10         32 my ( $typename, @args ) = @_;
224              
225 10 100       342 my $typefn = __PACKAGE__->can( "TYPE_\U$typename" ) or
226             croak "Unrecognised packet type '$typename'";
227              
228 9         31 my $n_args = $ARGS_FOR_TYPE{uc $typename};
229              
230 9 100       165 @args == $n_args or croak "Expected '\U$typename\E' to take $n_args args";
231             $args[$_] =~ m/\0/ and croak "Non-final argument [$_] of '\U$typename\E' cannot contain a \\0"
232 8   33     69 for 0 .. $n_args-2;
233              
234 8         27 my $type = $typefn->();
235 8         36 return ( $type, join "\0", @args );
236             }
237              
238             =head2 ( $name, @args ) = $gearman->unpack_packet( $type, $body )
239              
240             Given a type code and body string, returns the type name and unpacked
241             arguments from the body. This function is the reverse of C and is
242             intended to be used on the result of C or C:
243              
244             The returned C<$name> will always be a fully-captialised type name, as one of
245             the C constants without the leading C prefix.
246              
247             This is intended for a C control block, or dynamic method
248             dispatch:
249              
250             my ( $name, @args ) = unpack_packet( recv_packet $fh );
251              
252             $self->${\"handle_$name"}( @args )
253              
254             =cut
255              
256             sub unpack_packet
257             {
258 9     9 1 2332 shift;
259 9         18 my ( $type, $body ) = @_;
260              
261 9 100       182 my $typename = $TYPENAMES{$type} or
262             croak "Unrecognised packet type $type";
263              
264 8         23 my $n_args = $ARGS_FOR_TYPE{$typename};
265              
266 8 50       31 return ( $typename ) if $n_args == 0;
267 8         69 return ( $typename, split m/\0/, $body, $n_args );
268             }
269              
270             =head2 ( $name, @args ) = $gearman->parse_packet_from_string( $bytes )
271              
272             Attempts to parse a complete message packet from the given byte string. If it
273             succeeds, it returns the type name and arguments. If it fails it returns an
274             empty list.
275              
276             If successful, it will remove the bytes of the packet form the C<$bytes>
277             scalar, which must therefore be mutable.
278              
279             If the byte string begins with some bytes that are not recognised as the
280             Gearman packet magic for a response, the function will immediately throw an
281             exception before modifying the string.
282              
283             =cut
284              
285             sub parse_packet_from_string
286             {
287 10     10 1 224 my $self = shift;
288              
289 10 100       43 return unless length $_[0] >= 4;
290 7 100       243 croak "Expected to find 'RES' magic in packet" unless
291             unpack( "a4", $_[0] ) eq MAGIC_RESPONSE;
292              
293 6 50       32 return unless length $_[0] >= 12;
294              
295 6         21 my $bodylen = unpack( "x8 N", $_[0] );
296 6 50       26 return unless length $_[0] >= 12 + $bodylen;
297              
298             # Now committed to extracting it
299 6         29 my ( $type ) = unpack( "x4 N x4", substr $_[0], 0, 12, "" );
300 6         24 my $body = substr $_[0], 0, $bodylen, "";
301              
302 6         47 return $self->unpack_packet( $type, $body );
303             }
304              
305             =head2 ( $name, @args ) = $gearman->recv_packet_from_fh( $fh )
306              
307             Attempts to read a complete packet from the given filehandle, blocking until
308             it is available. The results are undefined if this function is called on a
309             non-blocking filehandle.
310              
311             If an IO error happens, an exception is thrown. If the first four bytes read
312             are not recognised as the Gearman packet magic for a response, the function
313             will immediately throw an exception. If either of these conditions happen, the
314             filehandle should be considered no longer valid and should be closed.
315              
316             =cut
317              
318             sub recv_packet_from_fh
319             {
320 2     2 1 6824 my $self = shift;
321 2         5 my ( $fh ) = @_;
322              
323 2 50       12 $fh->read( my $magic, 4 ) or croak "Cannot read header - $!";
324 2 100       231 croak "Expected to find 'RES' magic in packet" unless
325             $magic eq MAGIC_RESPONSE;
326              
327 1 50       30 $fh->read( my $header, 8 ) or croak "Cannot read header - $!";
328 1         17 my ( $type, $bodylen ) = unpack( "N N", $header );
329              
330 1         3 my $body = "";
331 1 50 33     6 $fh->read( $body, $bodylen ) or croak "Cannot read body - $!" if $bodylen;
332              
333 1         13 return $self->unpack_packet( $type, $body );
334             }
335              
336             =head2 $bytes = $gearman->build_packet_to_string( $name, @args )
337              
338             Returns a byte string containing a complete packet with the given fields.
339              
340             =cut
341              
342             sub build_packet_to_string
343             {
344 7     7 1 2297 my $self = shift;
345 7         46 my ( $type, $body ) = $self->pack_packet( @_ );
346              
347 7         100 return pack "a4 N N a*", MAGIC_REQUEST, $type, length $body, $body;
348             }
349              
350             =head2 $gearman->send_packet_to_fh( $fh, $name, @args )
351              
352             Sends a complete packet to the given filehandle. If an IO error happens, an
353             exception is thrown.
354              
355             =cut
356              
357             sub send_packet_to_fh
358             {
359 1     1 1 583 my $self = shift;
360 1         2 my $fh = shift;
361 1 50       4 $fh->print( $self->build_packet_to_string( @_ ) ) or croak "Cannot send packet - $!";
362             }
363              
364             =head2 $gearman->send_packet( $typename, @args )
365              
366             Packs a packet from a list of arguments then sends it; a combination of
367             C and C. Uses the implementation's C method.
368              
369             =cut
370              
371             sub send_packet
372             {
373 5     5 1 1004 my $self = shift;
374 5         44 $self->send( $self->build_packet_to_string( @_ ) );
375             }
376              
377             =head2 $gearman->on_recv( $buffer )
378              
379             The implementation should call this method when more bytes of data have been
380             received. It parses and unpacks packets from the buffer, then dispatches to
381             the appropriately named C method. A combination of C and
382             C.
383              
384             The C<$buffer> scalar may be modified; if it still contains bytes left over
385             after the call these should be preserved by the implementation for the next
386             time it is called.
387              
388             =cut
389              
390             sub on_recv
391             {
392 4     4 1 1005 my $self = shift;
393              
394 4         34 while( my ( $type, @args ) = $self->parse_packet_from_string( $_[0] ) ) {
395 5         9 $self->${\"on_$type"}( @args );
  5         50  
396             }
397             }
398              
399             *on_read = \&on_recv;
400              
401             =head2 $gearman->on_ERROR( $name, $message )
402              
403             Default handler for the C packet. This method should be overriden
404             by subclasses to change the behaviour.
405              
406             =cut
407              
408             sub on_ERROR
409             {
410 1     1 1 2 my $self = shift;
411 1         2 my ( $name, $message ) = @_;
412              
413 1         23 die "Received Gearman error '$name' (\"$message\")\n";
414             }
415              
416             =head2 $gearman->echo_request( $payload ) ==> ( $payload )
417              
418             Sends an C packet to the Gearman server, and returns a future that
419             will eventually yield the payload when the server responds.
420              
421             =cut
422              
423             sub echo_request
424             {
425 1     1 1 415 my $self = shift;
426 1         3 my ( $payload ) = @_;
427              
428 1         9 my $state = $self->gearman_state;
429              
430 1         2 push @{ $state->{gearman_echos} }, my $f = $self->new_future;
  1         9  
431              
432 1         19 $self->send_packet( ECHO_REQ => $payload );
433              
434 1         58 return $f;
435             }
436              
437             sub on_ECHO_RES
438             {
439 1     1 0 700 my $self = shift;
440 1         2 my ( $payload ) = @_;
441              
442 1         3 my $state = $self->gearman_state;
443              
444 1         3 ( shift @{ $state->{gearman_echos} } )->done( $payload );
  1         10  
445             }
446              
447             =head1 PROTOTYPICAL OBJECTS
448              
449             An alternative option to subclassing to provide the missing methods, is to use
450             C (or rather, one of the client or worker subclasses) as a
451             prototypical object, passing in CODE references for the missing methods to a
452             special constructor that creates a concrete object.
453              
454             This may be more convenient to use in smaller one-shot cases (like unit tests
455             or small scripts) instead of creating a subclass.
456              
457             my $socket = ...;
458              
459             my $client = Protocol::Gearman::Client->new_prototype(
460             send => sub { $socket->print( $_[1] ); },
461             new_future => sub { My::Future::Subclass->new },
462             );
463              
464             =head2 $gearman = Protocol::Gearman->new_prototype( %methods )
465              
466             Returns a new prototypical object constructed using the given methods. The
467             named arguments must give values for the C and C methods.
468              
469             =cut
470              
471             sub new_prototype
472             {
473 1     1 1 26 my $class = shift;
474 1         5 my %methods = @_;
475              
476 1         5 my $self = bless {}, $class;
477              
478 1         3 foreach (qw( send new_future )) {
479 2 50 33     16 defined $methods{$_} and ref $methods{$_} eq "CODE" or
480             croak "Expected to receive a CODE reference for '$_'";
481              
482 2         17 $self->{"gearman_method_$_"} = $methods{$_};
483             }
484              
485 1         5 return $self;
486             }
487              
488             =head1 AUTHOR
489              
490             Paul Evans
491              
492             =cut
493              
494             0x55AA;