File Coverage

blib/lib/Gearman/Spawner/Client/Async.pm
Criterion Covered Total %
statement 7 9 77.7
branch n/a
condition n/a
subroutine 3 3 100.0
pod n/a
total 10 12 83.3


line stmt bran cond sub pod time code
1             =head1 NAME
2              
3             Gearman::Spawner::Client::Async - asynchronous Danga::Socket client for Gearman::Spawner::Worker workers
4              
5             =head1 SYNOPSIS
6              
7             $client = Gearman::Spawner::Client::Async->new(
8             job_servers => ['localhost:4730']
9             );
10              
11             $client->run_method(
12             class => 'MyWorker',
13             method => 'sing',
14             arg => [qw( do re mi )],
15             success_cb => sub {
16             my $result = shift;
17             say "success! result is $result";
18             },
19             error_cb => sub {
20             my $reason = shift;
21             say "failed because $reason";
22             },
23             timeout => 3,
24             });
25              
26             =cut
27              
28             package Gearman::Spawner::Client::Async;
29              
30 9     9   6670 use strict;
  9         11  
  9         255  
31 9     9   40 use warnings;
  9         17  
  9         192  
32              
33 9     9   3334 use Gearman::Client::Async;
  0            
  0            
34             use base 'Gearman::Client::Async';
35              
36             use Carp qw( croak );
37             use Gearman::Spawner::Util;
38             use Storable qw( nfreeze thaw );
39              
40             =head1 METHODS
41              
42             =over 4
43              
44             =item Gearman::Spawner::Client::Async->new(%options)
45              
46             Creates a new client object. Options:
47              
48             =over 4
49              
50             =item job_servers
51              
52             (Required) Arrayref of servers to connect to.
53              
54             =back
55              
56             =cut
57              
58             sub new {
59             my $ref = shift;
60             my $class = ref $ref || $ref;
61              
62             my Gearman::Spawner::Client::Async $self = fields::new($class)->SUPER::new(@_);
63             return $self;
64             }
65              
66             =item $client->run_method(%options)
67              
68             Dispatches a foreground job to a worker. Options:
69              
70             =over 4
71              
72             =item class
73              
74             (Required) The name of the worker class.
75              
76             =item method
77              
78             (Required) The name of the method in I to call.
79              
80             =item success_cb
81              
82             (Required) The coderef to be called when the job completes successfully. The
83             first argument to it will be the deserialized result returned by the worker
84             method.
85              
86             =item error_cb
87              
88             (Required) The coderef to be called if the job does not complete. This may
89             occur for several reasons, including but not limited to: the worker code threw
90             an exception; the server did not respond before the timeout period; or the
91             number of job retries was exceeded.
92              
93             The first argument passed to I is a string providing the best
94             available information about the error.
95              
96             =item data
97              
98             (Optional) The job-specific data to pass to the worker. Any structure that can
99             be serialized with Storable is allowed. If omitted, undef is sent.
100              
101             =item timeout
102              
103             (Optional) If the job has not completed or failed within this amount of time,
104             I will be called. Even if the job subsequently completes,
105             I will not be called.
106              
107             =item unique
108              
109             (Optional) The opaque unique tag for coalescing jobs.
110              
111             =back
112              
113             =cut
114              
115             sub run_method {
116             my Gearman::Spawner::Client::Async $self = shift;
117             my %params = @_;
118              
119             my $class = delete $params{class} || croak "need class";
120             my $method = delete $params{method} || croak "need method";
121             my $success_cb = delete $params{success_cb} || croak "need success_cb";
122             my $error_cb = delete $params{error_cb} || croak "need error_cb";
123             my $data = delete $params{data} || undef;
124             my $timeout = delete $params{timeout} || undef;
125             my $unique = delete $params{unique} || undef;
126              
127             croak "unknown parameters to run_method: @{[%params]}" if %params;
128              
129             my $function = Gearman::Spawner::Util::method2function($class, $method);
130              
131             my $serialized = nfreeze([$data]);
132              
133             my %options;
134              
135             $options{timeout} = $timeout if defined $timeout;
136             $options{uniq} = $unique if defined $unique;
137              
138             $options{on_complete} = sub {
139             my $ref_to_frozen_retval = shift;
140              
141             unless (defined $ref_to_frozen_retval) {
142             return $error_cb->('no serialized return value from worker');
143             }
144              
145             if (!ref $ref_to_frozen_retval || ref $ref_to_frozen_retval ne 'SCALAR') {
146             return $error_cb->('unexpected value type');
147             }
148              
149             my $rets = eval { thaw($$ref_to_frozen_retval) };
150             if ($@) {
151             return $error_cb->("deserialization error: $@");
152             }
153             elsif (ref $rets ne 'ARRAY') {
154             return $error_cb->("gearman function did not return an array");
155             }
156              
157             $success_cb->(@$rets);
158             };
159              
160             $options{on_fail} = sub {
161             my $reason = shift;
162             $error_cb->($reason);
163             };
164              
165             $self->add_task(Gearman::Task->new($function, \$serialized, \%options));
166             }
167              
168             =item run_method_background
169              
170             Dispatches a background job to a worker.
171              
172             Options:
173              
174             =over 4
175              
176             =item class
177              
178             (Required) The name of the worker class.
179              
180             =item method
181              
182             (Required) The name of the method in I to call.
183              
184             =item data
185              
186             (Optional) The job-specific data to pass to the worker. Any structure that can
187             be serialized with Storable is allowed. If omitted, undef is sent.
188              
189             =item unique
190              
191             (Optional) The opaque unique tag for coalescing jobs.
192              
193             =back
194              
195             =cut
196              
197             sub run_method_background {
198             my $self = shift;
199             my %params = @_;
200              
201             my $class = delete $params{class} || croak "need class";
202             my $method = delete $params{method} || croak "need method";
203             my $data = delete $params{data} || undef;
204             my $unique = delete $params{unique} || undef;
205              
206             croak "unknown parameters to run_method_background: @{[%params]}" if %params;
207              
208             my %options;
209             $options{uniq} = $unique if defined $unique;
210              
211             my $function = Gearman::Spawner::Util::method2function($class, $method);
212              
213             my $serialized = nfreeze([$data]);
214              
215             # XXX dispatch_background does not exist in Gearman::Client::Async
216             croak "dispatch_background is not supported by Async client";
217             $self->dispatch_background($function => \$serialized, \%options);
218              
219             return;
220             }
221              
222             1;