File Coverage

blib/lib/Protocol/Gearman/Client.pm
Criterion Covered Total %
statement 89 89 100.0
branch 12 18 66.6
condition 7 14 50.0
subroutine 17 17 100.0
pod 2 10 20.0
total 127 148 85.8


line stmt bran cond sub pod time code
1             # You may distribute under the terms of either the GNU General Public License
2             # or the Artistic License (the same terms as Perl itself)
3             #
4             # (C) Paul Evans, 2014 -- leonerd@leonerd.org.uk
5              
6             package Protocol::Gearman::Client;
7              
8 3     3   35880 use strict;
  3         6  
  3         125  
9 3     3   15 use warnings;
  3         16  
  3         83  
10 3     3   79 use 5.010; # //
  3         12  
  3         176  
11              
12             our $VERSION = '0.04';
13              
14 3     3   16 use base qw( Protocol::Gearman );
  3         5  
  3         1537  
15              
16 3     3   20 use Carp;
  3         6  
  3         209  
17              
18 3     3   2585 use Struct::Dumb;
  3         3304  
  3         19  
19              
20             struct Job => [qw( f on_data on_warning on_status exception )];
21              
22             =head1 NAME
23              
24             C - implement a Gearman client
25              
26             =head1 DESCRIPTION
27              
28             A base class that implements a complete Gearman client. This abstract class
29             still requires the implementation methods as documented in
30             L, but otherwise provides a full set of behaviour useful to
31             Gearman clients.
32              
33             As it is based on L it is suitable for both synchronous and
34             asynchronous use. When backed by an implementation capable of performing
35             asynchronously, this object fully supports asynchronous Gearman communication.
36             When backed by a synchronous implementation, it will still yield C
37             instances but the limitations of the synchronous implementation may limit how
38             much concurrency and asynchronous behaviour can be acheived.
39              
40             A simple concrete implementation suitable for synchronous use can be found in
41             L.
42              
43             =cut
44              
45             =head1 METHODS
46              
47             =cut
48              
49             =head2 $client->option_request( $option ) ==> ()
50              
51             Requests that the Gearman server enable the named option for this connection.
52              
53             The following options are defined by Gearman:
54              
55             =over 8
56              
57             =item * C
58              
59             Enables the use of the C packet; meaning the client is happy
60             to receive them.
61              
62             =back
63              
64             =cut
65              
66             sub option_request
67             {
68 1     1 1 41 my $self = shift;
69 1         4 my ( $option ) = @_;
70              
71 1         14 my $state = $self->gearman_state;
72              
73 1         5 my $request_f = $self->new_future;
74 1         28 $state->{gearman_option_reqs}{$option} = $request_f;
75              
76 1         5 $self->send_packet( OPTION_REQ => $option );
77              
78 1         41 return $request_f;
79             }
80              
81             sub on_OPTION_RES
82             {
83 1     1 0 747 my $self = shift;
84 1         2 my ( $option ) = @_;
85              
86 1         4 my $state = $self->gearman_state;
87 1         6 ( delete $state->{gearman_option_reqs}{$option} )->done();
88             }
89              
90             =head2 $client->submit_job( %args ) ==> $result
91              
92             Submits a job request to the Gearman server, and returns a future that will
93             eventually yield the result of the job or its failure.
94              
95             Takes the following required arguments:
96              
97             =over 8
98              
99             =item func => STRING
100              
101             The name of the function to call
102              
103             =item arg => STRING
104              
105             An opaque bytestring containing the argument data for the function. Its exact
106             format should be specified by the registered function.
107              
108             =back
109              
110             Takes the following optional arguments;
111              
112             =over 8
113              
114             =item background => BOOL
115              
116             If true, the job is submitted as a background request. Such a request will not
117             yield any status or completion information from the server. Once submitted the
118             server will not communicate about it further. In this case, the returned
119             future will complete with an empty result as soon as the job is accepted by
120             the server.
121              
122             =item priority => "high" | "low" | ""
123              
124             Alters the job priority on the server. If present, must be either C<"high">,
125             C<"low"> or the empty string.
126              
127             =item on_data => CODE
128              
129             Invoked on receipt of more incremental data from the worker.
130              
131             $on_data->( $data )
132              
133             =item on_warning => CODE
134              
135             Invoked on receipt of a warning from the worker.
136              
137             $on_warning->( $warning )
138              
139             =item on_status => CODE
140              
141             Invoked on a status update from the worker.
142              
143             $on_status->( $numerator, $denominator )
144              
145             =back
146              
147             =cut
148              
149             sub submit_job
150             {
151 5     5 1 3696 my $self = shift;
152 5         43 my %args = @_;
153              
154 5   33     17 my $func = $args{func} // croak "Required 'func' is missing in submit_job";
155 5   33     15 my $arg = $args{arg} // croak "Required 'arg' is missing in submit_job";
156              
157 5         8 my $bg = !!$args{background};
158              
159 5   100     26 my $prio = $args{priority} // "";
160 5 50 66     20 $prio eq "" or $prio eq "high" or $prio eq "low" or
      33        
161             croak "Unrecognised 'priority' of '$prio'";
162              
163 5         26 my $state = $self->gearman_state;
164              
165 5         18 my $submit_f = $self->new_future;
166 5         33 push @{ $state->{gearman_submits} }, $submit_f;
  5         17  
167              
168 5         14 my $f = $self->new_future;
169             $submit_f->on_done( sub {
170 5     5   169 my ( $job_handle ) = @_;
171              
172 5 100       11 if( $bg ) {
173 1         6 $f->done;
174             }
175             else {
176 4         32 $state->{gearman_job}{$job_handle} = Job(
177             $f, $args{on_data}, $args{on_warning}, $args{on_status}, undef
178             );
179             }
180 5         54 });
181              
182 5         36 my $type = "SUBMIT_JOB";
183              
184 5 50       14 $type .= "_LOW" if $prio eq "low";
185 5 100       12 $type .= "_HIGH" if $prio eq "high";
186              
187 5 100       12 $type .= "_BG" if $bg;
188              
189 5         30 $self->send_packet( $type => $func, $state->{gearman_next_id}++, $arg );
190              
191 5         216 return $f;
192             }
193              
194             sub on_JOB_CREATED
195             {
196 5     5 0 1376 my $self = shift;
197 5         8 my ( $job_handle ) = @_;
198              
199 5         12 my $state = $self->gearman_state;
200              
201 5         7 my $f = shift @{ $state->{gearman_submits} };
  5         9  
202 5         19 $f->done( $job_handle );
203             }
204              
205             sub on_WORK_DATA
206             {
207 1     1 0 26 my $self = shift;
208 1         2 my ( $job_handle, $data ) = @_;
209              
210 1         3 my $state = $self->gearman_state;
211              
212 1         2 my $job = $state->{gearman_job}{$job_handle};
213              
214 1 50       5 $job->on_data->( $data ) if $job->on_data;
215             }
216              
217             sub on_WORK_WARNING
218             {
219 1     1 0 12 my $self = shift;
220 1         2 my ( $job_handle, $warning ) = @_;
221              
222 1         3 my $state = $self->gearman_state;
223              
224 1         2 my $job = $state->{gearman_job}{$job_handle};
225              
226 1 50       4 $job->on_warning->( $warning ) if $job->on_warning;
227             }
228              
229             sub on_WORK_STATUS
230             {
231 2     2 0 52 my $self = shift;
232 2         4 my ( $job_handle, $num, $denom ) = @_;
233              
234 2         6 my $state = $self->gearman_state;
235              
236 2         5 my $job = $state->{gearman_job}{$job_handle};
237              
238 2 50       5 $job->on_status->( $num, $denom ) if $job->on_status;
239             }
240              
241             sub on_WORK_COMPLETE
242             {
243 3     3 0 31 my $self = shift;
244 3         7 my ( $job_handle, $result ) = @_;
245              
246 3         18 my $state = $self->gearman_state;
247              
248 3         9 my $job = delete $state->{gearman_job}{$job_handle};
249              
250 3         14 $job->f->done( $result );
251             }
252              
253             sub on_WORK_EXCEPTION
254             {
255 1     1 0 22 my $self = shift;
256 1         2 my ( $job_handle, $exception ) = @_;
257              
258 1         3 my $state = $self->gearman_state;
259              
260 1         2 my $job = $state->{gearman_job}{$job_handle};
261 1         4 $job->exception = $exception;
262             }
263              
264             sub on_WORK_FAIL
265             {
266 1     1 0 10 my $self = shift;
267 1         2 my ( $job_handle ) = @_;
268              
269 1         3 my $state = $self->gearman_state;
270              
271 1         4 my $job = delete $state->{gearman_job}{$job_handle};
272              
273 1         10 my $exception = $job->exception;
274 1 50       5 $job->f->fail( "Work failed", gearman => ( defined $exception ? ( $exception ) : () ) );
275             }
276              
277             =head1 AUTHOR
278              
279             Paul Evans
280              
281             =cut
282              
283             0x55AA;