File Coverage

blib/lib/RPC/ExtDirect/Client/Async.pm
Criterion Covered Total %
statement 152 158 96.2
branch 44 60 73.3
condition 9 12 75.0
subroutine 31 35 88.5
pod 7 9 77.7
total 243 274 88.6


line stmt bran cond sub pod time code
1             package RPC::ExtDirect::Client::Async;
2              
3 12     12   836853 use strict;
  12         28  
  12         1021  
4 12     12   65 use warnings;
  12         22  
  12         448  
5              
6 12     12   61 use Carp;
  12         24  
  12         850  
7 12     12   67 use File::Spec;
  12         16  
  12         282  
8 12     12   1622 use AnyEvent::HTTP;
  12         81186  
  12         787  
9              
10 12     12   5029 use RPC::ExtDirect::Util::Accessor;
  12         6822  
  12         360  
11 12     12   4999 use RPC::ExtDirect::Config;
  12         205220  
  12         407  
12 12     12   6335 use RPC::ExtDirect::API;
  12         44346  
  12         95  
13 12     12   5510 use RPC::ExtDirect;
  12         56646  
  12         83  
14 12     12   9133 use RPC::ExtDirect::Client;
  12         83724  
  12         488  
15              
16 12     12   118 use base 'RPC::ExtDirect::Client';
  12         20  
  12         7047  
17              
18             #
19             # This module is not compatible with RPC::ExtDirect < 3.0
20             #
21              
22             croak __PACKAGE__." requires RPC::ExtDirect 3.0+"
23             if $RPC::ExtDirect::VERSION lt '3.0';
24              
25             ### PACKAGE GLOBAL VARIABLE ###
26             #
27             # Module version
28             #
29              
30             our $VERSION = '1.20';
31              
32             ### PUBLIC INSTANCE METHOD ###
33             #
34             # Call specified Action's Method asynchronously
35             #
36              
37 48     48 1 427344 sub call_async { shift->async_request('call', @_) }
38              
39             ### PUBLIC INSTANCE METHOD ###
40             #
41             # Submit a form to specified Action's Method asynchronously
42             #
43              
44 8     8 1 5301 sub submit_async { shift->async_request('form', @_) }
45              
46             ### PUBLIC INSTANCE METHOD ###
47             #
48             # Upload a file using POST form. Same as submit()
49             #
50              
51             *upload_async = *submit_async;
52              
53             ### PUBLIC INSTANCE METHOD ###
54             #
55             # Poll server for events asynchronously
56             #
57              
58 8     8 1 1780 sub poll_async { shift->async_request('poll', @_) }
59              
60             #
61             # This is to prevent mistakes leading to hard to find bugs
62             #
63              
64 0     0 1 0 sub call { croak "Use call_async instead" }
65 0     0 1 0 sub submit { croak "Use submit_async instead" }
66 0     0 1 0 sub upload { croak "Use upload_async instead" }
67 0     0 1 0 sub poll { croak "Use poll_async instead" }
68              
69             ### PUBLIC INSTANCE METHOD ###
70             #
71             # Run a specified request type asynchronously
72             #
73              
74             sub async_request {
75 64     64 0 131 my $self = shift;
76 64         159 my $type = shift;
77            
78 64         283 my $tr_class = $self->transaction_class;
79            
80             # We try to avoid action-at-a-distance here, so we will
81             # call all the stuff that could die() up front, to pass
82             # the exception on to the caller immediately rather than
83             # blowing up later on.
84             # The only case when that may happen realistically is
85             # when the caller forgot to specify a callback coderef;
86             # anything else is passed as an $error to the callback
87             # (which is hard to do when it's missing).
88 64         92 eval {
89 64         452 my $transaction = $tr_class->new(@_);
90 62         196 $self->_async_request($type, $transaction);
91             };
92            
93 64 50       173 if ($@) { croak 'ARRAY' eq ref($@) ? $@->[0] : $@ };
  2 100       580  
94            
95             # Stay positive
96 62         166 return 1;
97             }
98              
99             ### PUBLIC INSTANCE METHOD ###
100             #
101             # Return the name of the Transaction class
102             #
103              
104 64     64 0 154 sub transaction_class { 'RPC::ExtDirect::Client::Async::Transaction' }
105              
106             ### PUBLIC INSTANCE METHOD ###
107             #
108             # Read-write accessor
109             #
110              
111             RPC::ExtDirect::Util::Accessor->mk_accessor(
112             simple => [qw/ api_ready exception request_queue /],
113             );
114              
115             ############## PRIVATE METHODS BELOW ##############
116              
117             ### PRIVATE INSTANCE METHOD ###
118             #
119             # Initialize API declaration
120             #
121              
122             sub _init_api {
123 12     12   621008 my ($self, $api) = @_;
124            
125             # If we're passed a local API instance, init immediately
126             # and don't bother with request queue - we won't need it anyway.
127 12 100       115 if ($api) {
128 1         23 my $cv = $self->cv;
129 1         26 my $api_cb = $self->api_cb;
130            
131 1 50       101 $cv->begin if $cv;
132            
133 1         12 $self->_assign_api($api);
134 1         309 $self->api_ready(1);
135            
136 1 50       14 $api_cb->($self, 1) if $api_cb;
137            
138 1 50       12 $cv->end if $cv;
139             }
140             else {
141            
142             # We want to be truly asynchronous, so instead of blocking
143             # on API retrieval, we create a request queue and return
144             # immediately. If any call/form/poll requests happen before
145             # we've got the API result back, we push them in the queue
146             # and wait for the API to arrive, then re-run the requests.
147             # After the API declaration has been retrieved, all subsequent
148             # requests run without queuing.
149 11         462 $self->request_queue([]);
150              
151             $self->_get_api(sub {
152 11     11   248 my ($success, $api_js, $error) = @_;
153            
154 11 100       82 if ( $success ) {
155 10         238 $self->_import_api($api_js);
156 10         56504 $self->api_ready(1);
157             }
158             else {
159 1         41 $self->exception($error);
160             }
161            
162 11 100       542 $self->api_cb->($self, $success, $error) if $self->api_cb;
163            
164 11         4563 my $queue = $self->request_queue;
165 11         99 delete $self->{request_queue}; # A bit quirky
166            
167 11         334 $_->($success, $error) for @$queue;
168 11         281 });
169             }
170            
171 12         125 return 1;
172             }
173              
174             ### PRIVATE INSTANCE METHOD ###
175             #
176             # Receive API declaration from the specified server,
177             # parse it and return a Client::API object
178             #
179              
180             sub _get_api {
181 11     11   30 my ($self, $cb) = @_;
182              
183             # Run additional checks before firing the curried callback
184             my $api_cb = sub {
185 11     11   319405 my ($content, $headers) = @_;
186              
187 11         42 my $status = $headers->{Status};
188 12     12   8688 my $content_length = do { use bytes; length $content; };
  12         121  
  12         57  
  11         24  
  11         30  
189 11   66     171 my $success = $status eq '200' && $content_length > 0;
190 11         24 my $error;
191            
192 11 100       58 if ( !$success ) {
193 1 50       5 if ( $status ne '200' ) {
    0          
194 1         5 $error = "Can't download API declaration: $status";
195             }
196             elsif ( !$content_length ) {
197 0         0 $error = "Empty API declaration received";
198             }
199             }
200            
201 11         533 my $cv = $self->cv;
202 11 50       205 $cv->end if $cv;
203            
204 11         173 $self->{api_guard} = undef;
205            
206 11         619 $cb->($success, $content, $error);
207 11         110 };
208            
209 11         438 my $cv = $self->cv;
210 11         335 my $uri = $self->_get_uri('api');
211 11         1735 my $params = $self->{http_params};
212            
213 11 50       1101 $cv->begin if $cv;
214            
215             #
216             # Note that we're passing a falsy value to the `persistent` option
217             # here; that's because without it, GET requests will generate some
218             # weird 596 error code responses for every request after the very
219             # first one, if a condvar is used.
220             #
221             # I can surmise that it has something to do with AnyEvent::HTTP
222             # having procedural interface without any clear way to separate
223             # requests. Probably something within the (very tangled) bowels
224             # of AnyEvent::HTTP::http_request is erroneously confusing condvars;
225             # in any case, turning off permanent connections seem to cure that.
226             #
227             # You can override that by passing `persistent => 1` to the Client
228             # constructor, but don't try to do that if you are not ready to
229             # spend HOURS untangling the callback hell inside http_request.
230             # I was not, hence the "fix".
231             #
232             # Also store the "cancellation guard" to prevent it being destroyed,
233             # which would end the request prematurely.
234             #
235 11         269 $self->{api_guard} = AnyEvent::HTTP::http_request(
236             GET => $uri,
237             persistent => !1,
238             %$params,
239             $api_cb,
240             );
241            
242 11         154187 return 1;
243             }
244              
245             ### PRIVATE INSTANCE METHOD ###
246             #
247             # Queue asynchronous request(s)
248             #
249              
250             sub _queue_request {
251 38     38   43 my $self = shift;
252            
253 38         57 my $queue = $self->{request_queue};
254            
255 38         78 push @$queue, @_;
256             }
257              
258             ### PRIVATE INSTANCE METHOD ###
259             #
260             # Make an HTTP request in asynchronous fashion
261             #
262              
263             sub _async_request {
264 62     62   95 my ($self, $type, $transaction) = @_;
265            
266             # Transaction should be primed *before* the request has been
267             # dispatched. This way we ensure that requests don't get stuck
268             # in the queue if something goes wrong (API retrieval fails, etc).
269             # Also if we're passed a cv this will prime it enough times so
270             # that any blocking later on won't end prematurely before *all*
271             # queued requests have had a chance to run.
272 62         158 $transaction->start;
273            
274             # The parameters to this sub ($api_success, $api_error) mean
275             # success of the API retrieval operation, and an error that caused
276             # the failure, if any. This should NOT be confused with success
277             # of the HTTP request below.
278             my $request_closure = sub {
279 62     62   138 my ($api_success, $api_error) = @_;
280            
281             # If request was queued and API retrieval failed,
282             # transaction still has to finish.
283 62 100       164 return $transaction->finish(undef, $api_success, $api_error)
284             unless $api_success;
285            
286 61         171 my $prepare = "_prepare_${type}_request";
287 61 100       173 my $method = $type eq 'poll' ? 'GET' : 'POST';
288              
289             # We can't allow an exception to be thrown - there is no
290             # enveloping code to handle it. So we catch it here instead,
291             # and pass it to the transaction to be treated as an error.
292             # Note that the transaction itself has already been started
293             # before the request closure was executed.
294             my ($uri, $request_content, $http_params, $request_options)
295 61         91 = eval { $self->$prepare($transaction) };
  61         473  
296            
297 61 100       12478 if ( my $xcpt = $@ ) {
298 29 50       95 my $err = 'ARRAY' eq ref($xcpt) ? $xcpt->[0] : $xcpt;
299            
300 29         137 return $transaction->finish(undef, !1, $err);
301             }
302            
303 32         57 my $request_headers = $request_options->{headers};
304              
305             # TODO Handle errors
306 32         134 my $guard = AnyEvent::HTTP::http_request(
307             $method, $uri,
308             headers => $request_headers,
309             body => $request_content,
310             persistent => !1,
311             %$http_params,
312             $self->_curry_response_cb($type, $transaction),
313             );
314            
315 32         33070 $transaction->guard($guard);
316            
317 32         1075 return 1;
318 62         651 };
319            
320             # If a fatal exception has occured before this point in time
321             # (API retrieval failed) run the request closure immediately
322             # with an error. This will fall through and finish the
323             # transaction, passing the error to the callback subroutine.
324 62 50       1509 if ( my $fatal_exception = $self->exception ) {
    100          
325 0         0 $request_closure->(!1, $fatal_exception);
326             }
327            
328             # If API is ready, run the request closure immediately with the
329             # success flag set to true.
330             elsif ( $self->api_ready ) {
331 24         984 $request_closure->(1);
332             }
333            
334             # If API is not ready, queue the request closure to be ran
335             # at a later time, when the result of API retrieval operation
336             # will be known.
337             else {
338 38         1036 $self->_queue_request($request_closure);
339             }
340            
341 62         511 return 1;
342             }
343              
344             ### PRIVATE INSTANCE METHOD ###
345             #
346             # Parse cookies if provided, creating Cookie header
347             #
348              
349             sub _parse_cookies {
350 34     34   33385 my ($self, $to, $from) = @_;
351            
352 34         212 $self->SUPER::_parse_cookies($to, $from);
353            
354             # This results in Cookie header being a hashref,
355             # but we need a string for AnyEvent::HTTP::http_request
356 34 100 66     642 if ( $to->{headers} && (my $cookies = $to->{headers}->{Cookie}) ) {
357 8         35 $to->{headers}->{Cookie} = join '; ', @$cookies;
358             }
359             }
360              
361             ### PRIVATE INSTANCE METHOD ###
362             #
363             # Generate result handling callback
364             #
365              
366             sub _curry_response_cb {
367 32     32   49 my ($self, $type, $transaction) = @_;
368            
369             return sub {
370 32     32   314171 my ($data, $headers) = @_;
371            
372 32         107 my $status = $headers->{Status};
373 32         83 my $success = $status eq '200';
374            
375             # No sense in trying to decode the response if request failed
376 32 50       133 return $transaction->finish(undef, !1, $headers->{Reason})
377             unless $success;
378            
379 32         82 local $@;
380 32         108 my $handler = "_handle_${type}_response";
381 32 50       117 my $response = eval {
382 32         521 $self->$handler({
383             status => $status,
384             success => $success,
385             content => $data,
386             })
387             } if $success;
388            
389 32 50       5021 my $error = 'ARRAY' eq ref($@) ? $@->[0] : $@;
390            
391 32 50       105 return $transaction->finish(undef, !1, $error) if $error;
392            
393             # We're only interested in the data, unless it was a poll.
394 32 100       153 my $result = 'poll' eq $type ? $response
    100          
395             : 'HASH' eq ref($response) ? $response->{result}
396             : $response
397             ;
398            
399 32         135 return $transaction->finish($result, $success);
400 32         342 };
401             }
402              
403             package
404             RPC::ExtDirect::Client::Async::Transaction;
405              
406 12     12   8687 use base 'RPC::ExtDirect::Client::Transaction';
  12         20  
  12         7447  
407              
408             my @fields = qw/ cb cv actual_arg fields /;
409              
410             sub new {
411 64     64   419 my ($class, %params) = @_;
412            
413 64         109 my $cb = $params{cb};
414            
415 64 100 66     407 die ["Callback subroutine is required"]
      100        
416             if 'CODE' ne ref $cb && !($cb && $cb->isa('AnyEvent::CondVar'));
417            
418 62         127 my %self_params = map { $_ => delete $params{$_} } @fields;
  248         635  
419            
420 62         589 my $self = $class->SUPER::new(%params);
421            
422 62         1927 @$self{ keys %self_params } = values %self_params;
423            
424 62         187 return $self;
425             }
426              
427             sub start {
428 62     62   78 my ($self) = @_;
429            
430 62         2041 my $cv = $self->cv;
431            
432 62 100       583 $cv->begin if $cv;
433             }
434              
435             sub finish {
436 62     62   138 my ($self, $result, $success, $error) = @_;
437            
438 62         2217 my $cb = $self->cb;
439 62         2029 my $cv = $self->cv;
440            
441 62 50       843 $cb->($result, $success, $error) if $cb;
442 62 100       88761 $cv->end if $cv;
443            
444 62         3696 return $success;
445             }
446              
447             RPC::ExtDirect::Util::Accessor->mk_accessors(
448             simple => [qw/ cb cv guard /],
449             );
450              
451             1;