File Coverage

blib/lib/RPC/ExtDirect/Client/Async.pm
Criterion Covered Total %
statement 152 158 96.2
branch 44 60 73.3
condition 9 12 75.0
subroutine 31 35 88.5
pod 7 9 77.7
total 243 274 88.6


line stmt bran cond sub pod time code
1             package RPC::ExtDirect::Client::Async;
2              
3 12     12   697001 use strict;
  12         22  
  12         365  
4 12     12   52 use warnings;
  12         17  
  12         361  
5              
6 12     12   48 use Carp;
  12         23  
  12         677  
7 12     12   70 use File::Spec;
  12         15  
  12         271  
8 12     12   1611 use AnyEvent::HTTP;
  12         57351  
  12         647  
9              
10 12     12   4188 use RPC::ExtDirect::Util::Accessor;
  12         4717  
  12         304  
11 12     12   3817 use RPC::ExtDirect::Config;
  12         167636  
  12         373  
12 12     12   5749 use RPC::ExtDirect::API;
  12         38906  
  12         83  
13 12     12   4552 use RPC::ExtDirect;
  12         49150  
  12         73  
14 12     12   7817 use RPC::ExtDirect::Client;
  12         72933  
  12         363  
15              
16 12     12   80 use base 'RPC::ExtDirect::Client';
  12         18  
  12         6842  
17              
18             #
19             # This module is not compatible with RPC::ExtDirect < 3.0
20             #
21              
22             croak __PACKAGE__." requires RPC::ExtDirect 3.0+"
23             if $RPC::ExtDirect::VERSION lt '3.0';
24              
25             ### PACKAGE GLOBAL VARIABLE ###
26             #
27             # Module version
28             #
29              
30             our $VERSION = '1.25';
31              
32             ### PUBLIC INSTANCE METHOD ###
33             #
34             # Call specified Action's Method asynchronously
35             #
36              
37 48     48 1 32394 sub call_async { shift->async_request('call', @_) }
38              
39             ### PUBLIC INSTANCE METHOD ###
40             #
41             # Submit a form to specified Action's Method asynchronously
42             #
43              
44 8     8 1 5915 sub submit_async { shift->async_request('form', @_) }
45              
46             ### PUBLIC INSTANCE METHOD ###
47             #
48             # Upload a file using POST form. Same as submit()
49             #
50              
51             *upload_async = *submit_async;
52              
53             ### PUBLIC INSTANCE METHOD ###
54             #
55             # Poll server for events asynchronously
56             #
57              
58 8     8 1 2168 sub poll_async { shift->async_request('poll', @_) }
59              
60             #
61             # This is to prevent mistakes leading to hard to find bugs
62             #
63              
64 0     0 1 0 sub call { croak "Use call_async instead" }
65 0     0 1 0 sub submit { croak "Use submit_async instead" }
66 0     0 1 0 sub upload { croak "Use upload_async instead" }
67 0     0 1 0 sub poll { croak "Use poll_async instead" }
68              
69             ### PUBLIC INSTANCE METHOD ###
70             #
71             # Run a specified request type asynchronously
72             #
73              
74             sub async_request {
75 64     64 0 92 my $self = shift;
76 64         88 my $type = shift;
77            
78 64         243 my $tr_class = $self->transaction_class;
79            
80             # We try to avoid action-at-a-distance here, so we will
81             # call all the stuff that could die() up front, to pass
82             # the exception on to the caller immediately rather than
83             # blowing up later on.
84             # The only case when that may happen realistically is
85             # when the caller forgot to specify a callback coderef;
86             # anything else is passed as an $error to the callback
87             # (which is hard to do when it's missing).
88 64         71 eval {
89 64         334 my $transaction = $tr_class->new(@_);
90 62         137 $self->_async_request($type, $transaction);
91             };
92            
93 64 50       132 if ($@) { croak 'ARRAY' eq ref($@) ? $@->[0] : $@ };
  2 100       394  
94            
95             # Stay positive
96 62         117 return 1;
97             }
98              
99             ### PUBLIC INSTANCE METHOD ###
100             #
101             # Return the name of the Transaction class
102             #
103              
104 64     64 0 119 sub transaction_class { 'RPC::ExtDirect::Client::Async::Transaction' }
105              
106             ### PUBLIC INSTANCE METHOD ###
107             #
108             # Read-write accessor
109             #
110              
111             RPC::ExtDirect::Util::Accessor->mk_accessor(
112             simple => [qw/ api_ready exception request_queue /],
113             );
114              
115             ############## PRIVATE METHODS BELOW ##############
116              
117             ### PRIVATE INSTANCE METHOD ###
118             #
119             # Initialize API declaration
120             #
121              
122             sub _init_api {
123 12     12   501133 my ($self, $api) = @_;
124            
125             # If we're passed a local API instance, init immediately
126             # and don't bother with request queue - we won't need it anyway.
127 12 100       72 if ($api) {
128 1         20 my $cv = $self->cv;
129 1         24 my $api_cb = $self->api_cb;
130            
131 1 50       52 $cv->begin if $cv;
132            
133 1         17 $self->_assign_api($api);
134 1         445 $self->api_ready(1);
135            
136 1 50       23 $api_cb->($self, 1) if $api_cb;
137            
138 1 50       17 $cv->end if $cv;
139             }
140             else {
141            
142             # We want to be truly asynchronous, so instead of blocking
143             # on API retrieval, we create a request queue and return
144             # immediately. If any call/form/poll requests happen before
145             # we've got the API result back, we push them in the queue
146             # and wait for the API to arrive, then re-run the requests.
147             # After the API declaration has been retrieved, all subsequent
148             # requests run without queuing.
149 11         328 $self->request_queue([]);
150              
151             $self->_get_api(sub {
152 11     11   168 my ($success, $api_js, $error) = @_;
153            
154 11 100       85 if ( $success ) {
155 10         196 $self->_import_api($api_js);
156 10         47205 $self->api_ready(1);
157             }
158             else {
159 1         24 $self->exception($error);
160             }
161            
162 11 100       374 $self->api_cb->($self, $success, $error) if $self->api_cb;
163            
164 11         2803 my $queue = $self->request_queue;
165 11         85 delete $self->{request_queue}; # A bit quirky
166            
167 11         276 $_->($success, $error) for @$queue;
168 11         260 });
169             }
170            
171 12         100 return 1;
172             }
173              
174             ### PRIVATE INSTANCE METHOD ###
175             #
176             # Receive API declaration from the specified server,
177             # parse it and return a Client::API object
178             #
179              
180             sub _get_api {
181 11     11   25 my ($self, $cb) = @_;
182              
183             # Run additional checks before firing the curried callback
184             my $api_cb = sub {
185 11     11   403773 my ($content, $headers) = @_;
186              
187 11         44 my $status = $headers->{Status};
188 12     12   7963 my $content_length = do { use bytes; length $content; };
  12         94  
  12         60  
  11         34  
  11         31  
189 11   66     130 my $success = $status eq '200' && $content_length > 0;
190 11         20 my $error;
191            
192 11 100       47 if ( !$success ) {
193 1 50       3 if ( $status ne '200' ) {
    0          
194 1         4 $error = "Can't download API declaration: $status";
195             }
196             elsif ( !$content_length ) {
197 0         0 $error = "Empty API declaration received";
198             }
199             }
200            
201 11         492 my $cv = $self->cv;
202 11 50       268 $cv->end if $cv;
203            
204 11         233 $self->{api_guard} = undef;
205            
206 11         59 $cb->($success, $content, $error);
207 11         87 };
208            
209 11         367 my $cv = $self->cv;
210 11         311 my $uri = $self->_get_uri('api');
211 11         6967 my $params = $self->{http_params};
212            
213 11 50       576 $cv->begin if $cv;
214            
215             #
216             # Note that we're passing a falsy value to the `persistent` option
217             # here; that's because without it, GET requests will generate some
218             # weird 596 error code responses for every request after the very
219             # first one, if a condvar is used.
220             #
221             # I can surmise that it has something to do with AnyEvent::HTTP
222             # having procedural interface without any clear way to separate
223             # requests. Probably something within the (very tangled) bowels
224             # of AnyEvent::HTTP::http_request is erroneously confusing condvars;
225             # in any case, turning off permanent connections seem to cure that.
226             #
227             # You can override that by passing `persistent => 1` to the Client
228             # constructor, but don't try to do that if you are not ready to
229             # spend HOURS untangling the callback hell inside http_request.
230             # I was not, hence the "fix".
231             #
232             # Also store the "cancellation guard" to prevent it being destroyed,
233             # which would end the request prematurely.
234             #
235 11         226 $self->{api_guard} = AnyEvent::HTTP::http_request(
236             GET => $uri,
237             persistent => !1,
238             %$params,
239             $api_cb,
240             );
241            
242 11         129704 return 1;
243             }
244              
245             ### PRIVATE INSTANCE METHOD ###
246             #
247             # Queue asynchronous request(s)
248             #
249              
250             sub _queue_request {
251 38     38   48 my $self = shift;
252            
253 38         46 my $queue = $self->{request_queue};
254            
255 38         61 push @$queue, @_;
256             }
257              
258             ### PRIVATE INSTANCE METHOD ###
259             #
260             # Make an HTTP request in asynchronous fashion
261             #
262              
263             sub _async_request {
264 62     62   78 my ($self, $type, $transaction) = @_;
265            
266             # Transaction should be primed *before* the request has been
267             # dispatched. This way we ensure that requests don't get stuck
268             # in the queue if something goes wrong (API retrieval fails, etc).
269             # Also if we're passed a cv this will prime it enough times so
270             # that any blocking later on won't end prematurely before *all*
271             # queued requests have had a chance to run.
272 62         127 $transaction->start;
273            
274             # The parameters to this sub ($api_success, $api_error) mean
275             # success of the API retrieval operation, and an error that caused
276             # the failure, if any. This should NOT be confused with success
277             # of the HTTP request below.
278             my $request_closure = sub {
279 62     62   99 my ($api_success, $api_error) = @_;
280            
281             # If request was queued and API retrieval failed,
282             # transaction still has to finish.
283 62 100       143 return $transaction->finish(undef, $api_success, $api_error)
284             unless $api_success;
285            
286 61         405 my $prepare = "_prepare_${type}_request";
287 61 100       169 my $method = $type eq 'poll' ? 'GET' : 'POST';
288              
289             # We can't allow an exception to be thrown - there is no
290             # enveloping code to handle it. So we catch it here instead,
291             # and pass it to the transaction to be treated as an error.
292             # Note that the transaction itself has already been started
293             # before the request closure was executed.
294             my ($uri, $request_content, $http_params, $request_options)
295 61         73 = eval { $self->$prepare($transaction) };
  61         395  
296            
297 61 100       9057 if ( my $xcpt = $@ ) {
298 29 50       71 my $err = 'ARRAY' eq ref($xcpt) ? $xcpt->[0] : $xcpt;
299            
300 29         55 return $transaction->finish(undef, !1, $err);
301             }
302            
303 32         51 my $request_headers = $request_options->{headers};
304              
305             # TODO Handle errors
306 32         126 my $guard = AnyEvent::HTTP::http_request(
307             $method, $uri,
308             headers => $request_headers,
309             body => $request_content,
310             persistent => !1,
311             %$http_params,
312             $self->_curry_response_cb($type, $transaction),
313             );
314            
315 32         33059 $transaction->guard($guard);
316            
317 32         938 return 1;
318 62         385 };
319            
320             # If a fatal exception has occured before this point in time
321             # (API retrieval failed) run the request closure immediately
322             # with an error. This will fall through and finish the
323             # transaction, passing the error to the callback subroutine.
324 62 50       1177 if ( my $fatal_exception = $self->exception ) {
    100          
325 0         0 $request_closure->(!1, $fatal_exception);
326             }
327            
328             # If API is ready, run the request closure immediately with the
329             # success flag set to true.
330             elsif ( $self->api_ready ) {
331 24         636 $request_closure->(1);
332             }
333            
334             # If API is not ready, queue the request closure to be ran
335             # at a later time, when the result of API retrieval operation
336             # will be known.
337             else {
338 38         961 $self->_queue_request($request_closure);
339             }
340            
341 62         303 return 1;
342             }
343              
344             ### PRIVATE INSTANCE METHOD ###
345             #
346             # Parse cookies if provided, creating Cookie header
347             #
348              
349             sub _parse_cookies {
350 34     34   26798 my ($self, $to, $from) = @_;
351            
352 34         167 $self->SUPER::_parse_cookies($to, $from);
353            
354             # This results in Cookie header being a hashref,
355             # but we need a string for AnyEvent::HTTP::http_request
356 34 100 66     532 if ( $to->{headers} && (my $cookies = $to->{headers}->{Cookie}) ) {
357 8         31 $to->{headers}->{Cookie} = join '; ', @$cookies;
358             }
359             }
360              
361             ### PRIVATE INSTANCE METHOD ###
362             #
363             # Generate result handling callback
364             #
365              
366             sub _curry_response_cb {
367 32     32   43 my ($self, $type, $transaction) = @_;
368            
369             return sub {
370 32     32   283566 my ($data, $headers) = @_;
371            
372 32         97 my $status = $headers->{Status};
373 32         75 my $success = $status eq '200';
374            
375             # No sense in trying to decode the response if request failed
376             return $transaction->finish(undef, !1, $headers->{Reason})
377 32 50       137 unless $success;
378            
379 32         62 local $@;
380 32         115 my $handler = "_handle_${type}_response";
381 32 50       108 my $response = eval {
382 32         551 $self->$handler({
383             status => $status,
384             success => $success,
385             content => $data,
386             })
387             } if $success;
388            
389 32 50       5781 my $error = 'ARRAY' eq ref($@) ? $@->[0] : $@;
390            
391 32 50       123 return $transaction->finish(undef, !1, $error) if $error;
392            
393             # We're only interested in the data, unless it was a poll.
394             my $result = 'poll' eq $type ? $response
395             : 'HASH' eq ref($response) ? $response->{result}
396 32 100       168 : $response
    100          
397             ;
398            
399 32         158 return $transaction->finish($result, $success);
400 32         318 };
401             }
402              
403             package
404             RPC::ExtDirect::Client::Async::Transaction;
405              
406 12     12   8097 use base 'RPC::ExtDirect::Client::Transaction';
  12         16  
  12         6488  
407              
408             my @fields = qw/ cb cv actual_arg fields /;
409              
410             sub new {
411 64     64   323 my ($class, %params) = @_;
412            
413 64         89 my $cb = $params{cb};
414            
415 64 100 66     342 die ["Callback subroutine is required"]
      100        
416             if 'CODE' ne ref $cb && !($cb && $cb->isa('AnyEvent::CondVar'));
417            
418 62         117 my %self_params = map { $_ => delete $params{$_} } @fields;
  248         581  
419            
420 62         465 my $self = $class->SUPER::new(%params);
421            
422 62         1357 @$self{ keys %self_params } = values %self_params;
423            
424 62         136 return $self;
425             }
426              
427             sub start {
428 62     62   78 my ($self) = @_;
429            
430 62         1511 my $cv = $self->cv;
431            
432 62 100       448 $cv->begin if $cv;
433             }
434              
435             sub finish {
436 62     62   115 my ($self, $result, $success, $error) = @_;
437            
438 62         1790 my $cb = $self->cb;
439 62         1614 my $cv = $self->cv;
440            
441 62 50       577 $cb->($result, $success, $error) if $cb;
442 62 100       84279 $cv->end if $cv;
443            
444 62         3210 return $success;
445             }
446              
447             RPC::ExtDirect::Util::Accessor->mk_accessors(
448             simple => [qw/ cb cv guard /],
449             );
450              
451             1;