File Coverage

blib/lib/Gearman/Spawner/Client/AnyEvent.pm
Criterion Covered Total %
statement 7 9 77.7
branch n/a
condition n/a
subroutine 3 3 100.0
pod n/a
total 10 12 83.3


line stmt bran cond sub pod time code
1             =head1 NAME
2              
3             Gearman::Spawner::Client::AnyEvent - asynchronous AnyEvent client for Gearman::Spawner::Worker workers
4              
5             =head1 SYNOPSIS
6              
7             $client = Gearman::Spawner::Client::AnyEvent->new(
8             job_servers => ['localhost:4730']
9             );
10              
11             $client->run_method(
12             class => 'MyWorker',
13             method => 'sing',
14             arg => [qw( do re mi )],
15             success_cb => sub {
16             my $result = shift;
17             say "success! result is $result";
18             },
19             error_cb => sub {
20             my $reason = shift;
21             say "failed because $reason";
22             },
23             timeout => 3,
24             });
25              
26             =cut
27              
28             package Gearman::Spawner::Client::AnyEvent;
29              
30 8     8   5921 use strict;
  8         17  
  8         273  
31 8     8   47 use warnings;
  8         22  
  8         193  
32              
33 8     8   2942 use Any::Moose;
  0            
  0            
34              
35             extends 'AnyEvent::Gearman::Client';
36              
37             has cancel_timers => (
38             is => 'rw',
39             isa => 'HashRef',
40             );
41              
42             no Any::Moose;
43              
44             use AnyEvent;
45             use Gearman::Spawner::Util;
46              
47             use Carp qw( croak );
48             use Storable qw( nfreeze thaw );
49              
50             =head1 METHODS
51              
52             =over 4
53              
54             =item Gearman::Spawner::Client::AnyEvent->new(%options)
55              
56             Creates a new client object. Options:
57              
58             =over 4
59              
60             =item job_servers
61              
62             (Required) Arrayref of servers to connect to.
63              
64             =back
65              
66             =item $client->run_method(%options)
67              
68             Dispatches a foreground job to a worker. Options:
69              
70             =over 4
71              
72             =item class
73              
74             (Required) The name of the worker class.
75              
76             =item method
77              
78             (Required) The name of the method in I to call.
79              
80             =item success_cb
81              
82             (Required) The coderef to be called when the job completes successfully. The
83             first argument to it will be the deserialized result returned by the worker
84             method.
85              
86             =item error_cb
87              
88             (Required) The coderef to be called if the job does not complete. This may
89             occur for several reasons, including but not limited to: the worker code threw
90             an exception; the server did not respond before the timeout period; or the
91             number of job retries was exceeded.
92              
93             The first argument passed to I is a string providing the best
94             available information about the error.
95              
96             =item data
97              
98             (Optional) The job-specific data to pass to the worker. Any structure that can
99             be serialized with Storable is allowed. If omitted, undef is sent.
100              
101             =item timeout
102              
103             (Optional) If the job has not completed or failed within this amount of time,
104             I will be called. Even if the job subsequently completes,
105             I will not be called.
106              
107             =item unique
108              
109             (Optional) The opaque unique tag for coalescing jobs.
110              
111             =back
112              
113             =cut
114              
115             sub run_method {
116             my $self = shift;
117             my %params = @_;
118              
119             my $class = delete $params{class} || croak "need class";
120             my $method = delete $params{method} || croak "need method";
121             my $success_cb = delete $params{success_cb} || croak "need success_cb";
122             my $error_cb = delete $params{error_cb} || croak "need error_cb";
123             my $data = delete $params{data} || undef;
124             my $timeout = delete $params{timeout} || undef;
125             my $unique = delete $params{unique} || undef;
126              
127             croak "unknown parameters to run_method: @{[%params]}" if %params;
128              
129             my $function = Gearman::Spawner::Util::method2function($class, $method);
130              
131             my $serialized = nfreeze([$data]);
132              
133             my $timer;
134             my $cancel_timeout;
135             if (defined $timeout) {
136             $cancel_timeout = sub {
137             delete $self->{cancel_timers}{"$timer"};
138             undef $timer;
139             };
140             $timer = AE::timer($timeout, 0, sub {
141             $cancel_timeout->();
142             $error_cb->("timeout");
143             });
144             $self->{cancel_timers}{"$timer"} = $timer;
145             }
146              
147             my %options;
148              
149             $options{unique} = $unique if defined $unique;
150              
151             $options{on_complete} = sub {
152             return if defined $timeout && !$timer; # timeout already fired
153             $cancel_timeout->() if $cancel_timeout;
154              
155             my ($task, $frozen_retval) = @_;
156              
157             unless (defined $frozen_retval) {
158             return $error_cb->('no serialized return value from worker');
159             }
160              
161             my $rets = eval { thaw($frozen_retval) };
162             if ($@) {
163             return $error_cb->("deserialization error: $@");
164             }
165             elsif (ref $rets ne 'ARRAY') {
166             return $error_cb->("gearman function did not return an array");
167             }
168              
169             $success_cb->(@$rets);
170             };
171              
172             $options{on_fail} = sub {
173             return if defined $timeout && !$timer; # timeout already fired
174             my ($task, $reason) = @_;
175             $cancel_timeout->() if $cancel_timeout;
176             $error_cb->($reason);
177             };
178              
179             $self->add_task($function, $serialized, %options);
180             }
181              
182             =item run_method_background
183              
184             Dispatches a background job to a worker.
185              
186             Options:
187              
188             =over 4
189              
190             =item class
191              
192             (Required) The name of the worker class.
193              
194             =item method
195              
196             (Required) The name of the method in I to call.
197              
198             =item data
199              
200             (Optional) The job-specific data to pass to the worker. Any structure that can
201             be serialized with Storable is allowed. If omitted, undef is sent.
202              
203             =item unique
204              
205             (Optional) The opaque unique tag for coalescing jobs.
206              
207             =back
208              
209             =cut
210              
211             sub run_method_background {
212             my $self = shift;
213             my %params = @_;
214              
215             my $class = delete $params{class} || croak "need class";
216             my $method = delete $params{method} || croak "need method";
217             my $data = delete $params{data} || undef;
218             my $unique = delete $params{unique} || undef;
219              
220             croak "unknown parameters to run_method_background: @{[%params]}" if %params;
221              
222             my $function = Gearman::Spawner::Util::method2function($class, $method);
223              
224             my $serialized = nfreeze([$data]);
225              
226             my %options;
227              
228             $options{unique} = $unique if defined $unique;
229              
230             $self->add_task_bg($function => $serialized, %options);
231              
232             return;
233             }
234              
235             =back
236              
237             =cut
238              
239             1;