File Coverage

blib/lib/Gearman/Driver/Worker.pm
Criterion Covered Total %
statement 3 3 100.0
branch n/a
condition n/a
subroutine 1 1 100.0
pod n/a
total 4 4 100.0


line stmt bran cond sub pod time code
1             package Gearman::Driver::Worker;
2              
3 1     1   1192 use base qw(MooseX::MethodAttributes::Inheritable Gearman::Driver::Worker::Base);
  1         2  
  1         860  
4             use Moose;
5              
6             =head1 NAME
7              
8             Gearman::Driver::Worker - Base class for workers
9              
10             =head1 SYNOPSIS
11              
12             package My::Worker;
13              
14             use base qw(Gearman::Driver::Worker);
15             use Moose;
16              
17             sub begin {
18             my ( $self, $job, $workload ) = @_;
19             # called before each job
20             }
21              
22             sub prefix {
23             # default: return ref(shift) . '::';
24             return join '_', split /::/, __PACKAGE__;
25             }
26              
27             sub do_something : Job : MinProcesses(2) : MaxProcesses(15) {
28             my ( $self, $job, $workload ) = @_;
29             # $job => Gearman::XS::Job instance
30             }
31              
32             sub end {
33             my ( $self, $job, $workload ) = @_;
34             # called after each job
35             }
36              
37             sub spread_work : Job {
38             my ( $self, $job, $workload ) = @_;
39              
40             my $gc = Gearman::XS::Client->new;
41             $gc->add_servers( $self->server );
42              
43             $gc->do_background( 'some_job_1' => $job->workload );
44             $gc->do_background( 'some_job_2' => $job->workload );
45             $gc->do_background( 'some_job_3' => $job->workload );
46             $gc->do_background( 'some_job_4' => $job->workload );
47             $gc->do_background( 'some_job_5' => $job->workload );
48             }
49              
50             1;
51              
52             =head1 ATTRIBUTES
53              
54             =head2 server
55              
56             L<Gearman::Driver> connects to the L<server|Gearman::Driver/server>
57             passed to its constructor. This value is also stored in this class.
58             This can be useful if a job uses L<Gearman::XS::Client> to add
59             another job. See 'spread_work' method in L</SYNOPSIS> above.
60              
61             =head1 METHODATTRIBUTES
62              
63             =head2 Job
64              
65             This will register the method with gearmand.
66              
67             =head2 MinProcesses
68              
69             Minimum number of processes working parallel on this job/method.
70              
71             =head2 MaxProcesses
72              
73             Maximum number of processes working parallel on this job/method.
74              
75             =head2 Encode
76              
77             This will automatically look for a method C<encode> in this object
78             which has to be defined in the subclass. It will call the C<encode>
79             method passing the return value from the job method. The return
80             value of the C<encode> method will be returned to the Gearman
81             client. This is useful to serialize Perl datastructures to JSON
82             before sending them back to the client.
83              
84             sub do_some_job : Job : Encode : Decode {
85             my ( $self, $job, $workload ) = @_;
86             return { message => 'OK', status => 1 };
87              
88             # calls 'encode' and returns JSON string: {"status":1,"message":"OK"}
89             }
90              
91             sub custom_encoder : Job : Encode(enc_yaml) : Decode(dec_yaml) {
92             my ( $self, $job, $workload ) = @_;
93             return { message => 'OK', status => 1 };
94              
95             # calls 'enc_yaml' and returns YAML string:
96             # ---
97             # message: OK
98             # status: 1
99             }
100              
101             sub encode {
102             my ( $self, $result ) = @_;
103             return JSON::XS::encode_json($result);
104             }
105              
106             sub decode {
107             my ( $self, $workload ) = @_;
108             return JSON::XS::decode_json($workload);
109             }
110              
111             sub enc_yaml {
112             my ( $self, $result ) = @_;
113             return YAML::XS::Dump($result);
114             }
115              
116             sub dec_yaml {
117             my ( $self, $workload ) = @_;
118             return YAML::XS::Load($workload);
119             }
120              
121              
122             =head2 Decode
123              
124             This will automatically look for a method C<decode> in this object
125             which has to be defined in the subclass. It will call the C<decode>
126             method passing the workload value (C<< $job->workload >>). The return
127             value of the C<decode> method will be passed as 3rd argument to the
128             job method. This is useful to deserialize JSON workload to Perl
129             datastructures for example. If this attribute is not set,
130             C<< $job->workload >> and C<$workload> is the same.
131              
132             Example, workload is this string: C<{"status":1,"message":"OK"}>
133              
134             sub decode {
135             my ( $self, $workload ) = @_;
136             return JSON::XS::decode_json($workload);
137             }
138              
139             sub job1 : Job {
140             my ( $self, $job, $workload ) = @_;
141             # $workload eq $job->workload eq '{"status":1,"message":"OK"}'
142             }
143              
144             sub job2 : Job : Decode {
145             my ( $self, $job, $workload ) = @_;
146             # $workload ne $job->workload
147             # $job->workload eq '{"status":1,"message":"OK"}'
148             # $workload = { status => 1, message => 'OK' }
149             }
150              
151             =head2 ProcessGroup
152              
153             Forking each job method in an own process may not always be the way
154             to go. It's possible to run many job methods in a single process by
155             defining C<ProcessGroup> attribute. This process group alias will
156             also show up in L<Gearman::Driver::Console> instead of the single
157             method names. The workers process name will also be affected.
158              
159             sub process_name {
160             my ( $self, $orig, $job_name ) = @_;
161             return "$orig ($job_name)";
162             }
163              
164             sub scale_image : Job : ProcessGroup(image_worker) {
165             my ( $self, $job, $workload ) = @_;
166             }
167              
168             sub convert_image : Job : ProcessGroup(image_worker) {
169             my ( $self, $job, $workload ) = @_;
170             }
171              
172             # $ ~/Gearman-Driver$ ps ux|grep image_worker
173             # plu 2608 0.0 0.1 2466720 4200 s001 S 12:46PM 0:00.01 script/gearman_driver.pl (XxX::image_worker)
174              
175             # $ ~/Gearman-Driver$ telnet localhost 47300
176             # Trying ::1...
177             # telnet: connect to address ::1: Connection refused
178             # Trying fe80::1...
179             # telnet: connect to address fe80::1: Connection refused
180             # Trying 127.0.0.1...
181             # Connected to localhost.
182             # Escape character is '^]'.
183             # status
184             # XxX::image_worker 1 1 1 1970-01-01T00:00:00 1970-01-01T00:00:00
185              
186             It's possible to combine C<ProcessGroup> and C<MinProcesses> +
187             C<MaxProcesses>. But there's one small caveat: Because one single
188             process shares many methods, you can only set the min/max process
189             amount once per C<ProcessGroup>:
190              
191             sub scale_image : Job : ProcessGroup(image_worker) : MinProcesses(5) : MaxProcesses(10) {
192             my ( $self, $job, $workload ) = @_;
193             }
194              
195             sub convert_image : Job : ProcessGroup(image_worker) {
196             my ( $self, $job, $workload ) = @_;
197             }
198              
199             If you do not obey this restriction, L<Gearman::Driver> will barf:
200              
201             sub scale_image : Job : ProcessGroup(image_worker) : MinProcesses(5) : MaxProcesses(10) {
202             my ( $self, $job, $workload ) = @_;
203             }
204              
205             sub convert_image : Job : ProcessGroup(image_worker) : MinProcesses(6) : MaxProcesses(12) {
206             my ( $self, $job, $workload ) = @_;
207             }
208              
209             C<MinProcesses redefined in ProcessGroup(image_worker) at XxX::convert_image at lib/Gearman/Driver.pm line 850.>
210              
211             =head1 METHODS
212              
213             =head2 prefix
214              
215             Having the same method name in two different classes would result
216             in a clash when registering it with gearmand. To avoid this,
217             all jobs are registered with the full package and method name
218             (e.g. C<My::Worker::some_job>). The default prefix is
219             C<ref(shift . '::')>, but this can be changed by overriding the
220             C<prefix> method in the subclass, see L</SYNOPSIS> above.
221              
222             =head2 begin
223              
224             This method is called before a job method is called. In this base
225             class this methods just does nothing, but can be overridden in a
226             subclass.
227              
228             The parameters are the same as in the job method:
229              
230             =over 4
231              
232             =item * C<$self>
233              
234             =item * C<$job>
235              
236             =back
237              
238             =head2 end
239              
240             This method is called after a job method has been called. In this
241             base class this methods just does nothing, but can be overridden
242             in a subclass.
243              
244             The parameters are the same as in the job method:
245              
246             =over 4
247              
248             =item * C<$self>
249              
250             =item * C<$job>
251              
252             =back
253              
254             =head2 process_name
255              
256             If this method is overridden in the subclass it will change the
257             process name after a job has been forked.
258              
259             The following parameters are passed to this method:
260              
261             =over 4
262              
263             =item * C<$self>
264              
265             =item * C<$orig> - the original process name ( C<$0> )
266              
267             =item * C<$job_name> - the name of the job
268              
269             =back
270              
271             Example:
272              
273             sub process_name {
274             my ( $self, $orig, $job_name ) = @_;
275             return "$orig ($job_name)";
276             }
277              
278             This may look like:
279              
280             plu 2034 0.0 1.7 22392 17948 pts/2 S 21:17 0:00 gearman_driver.pl (GDExamples::Convert::convert_to_jpeg)
281             plu 2035 0.0 1.7 22392 17944 pts/2 S 21:17 0:00 gearman_driver.pl (GDExamples::Convert::convert_to_gif)
282              
283             =head2 override_attributes
284              
285             If this method is overridden in the subclass it will change B<all>
286             attributes of your job methods. It must return a reference to a hash
287             containing valid L<attribute keys|/METHODATTRIBUTES>. E.g.:
288              
289             sub override_attributes {
290             return {
291             MinProcesses => 1,
292             MaxProcesses => 1,
293             }
294             }
295              
296             sub job1 : Job : MinProcesses(10) : MaxProcesses(20) {
297             my ( $self, $job, $workload ) = @_;
298             # This will get MinProcesses(1) MaxProcesses(1) from override_attributes
299             }
300              
301             =head2 default_attributes
302              
303             If this method is overridden in the subclass it can supply default
304             attributes which are added to all job methods. This is useful if
305             you want to Encode/Decode all your jobs:
306              
307             sub default_attributes {
308             return {
309             Encode => 'encode',
310             Decode => 'decode',
311             }
312             }
313              
314             sub decode {
315             my ( $self, $workload ) = @_;
316             return JSON::XS::decode_json($workload);
317             }
318              
319             sub encode {
320             my ( $self, $result ) = @_;
321             return JSON::XS::encode_json($result);
322             }
323              
324             sub job1 : Job {
325             my ( $self, $job, $workload ) = @_;
326             }
327              
328             =cut
329              
330             no Moose;
331              
332             __PACKAGE__->meta->make_immutable;
333              
334             =head1 AUTHOR
335              
336             See L<Gearman::Driver>.
337              
338             =head1 COPYRIGHT AND LICENSE
339              
340             See L<Gearman::Driver>.
341              
342             =head1 SEE ALSO
343              
344             =over 4
345              
346             =item * L<Gearman::Driver>
347              
348             =item * L<Gearman::Driver::Adaptor>
349              
350             =item * L<Gearman::Driver::Console>
351              
352             =item * L<Gearman::Driver::Console::Basic>
353              
354             =item * L<Gearman::Driver::Console::Client>
355              
356             =item * L<Gearman::Driver::Job>
357              
358             =item * L<Gearman::Driver::Job::Method>
359              
360             =item * L<Gearman::Driver::Loader>
361              
362             =item * L<Gearman::Driver::Observer>
363              
364             =item * L<Gearman::Driver::Worker::AttributeParser>
365              
366             =item * L<Gearman::Driver::Worker::Base>
367              
368             =back
369              
370             =cut
371              
372             1;