File Coverage

blib/lib/Riak/Client.pm
Criterion Covered Total %
statement 50 319 15.6
branch 1 130 0.7
condition 0 39 0.0
subroutine 17 54 31.4
pod 16 17 94.1
total 84 559 15.0


line stmt bran cond sub pod time code
1             #
2             # This file is part of Riak-Client
3             #
4             # This software is copyright (c) 2014 by Damien Krotkine.
5             #
6             # This is free software; you can redistribute it and/or modify it under
7             # the same terms as the Perl 5 programming language system itself.
8             #
9             ## no critic (RequireUseStrict, RequireUseWarnings)
10             package Riak::Client;
11             {
12             $Riak::Client::VERSION = '1.94';
13             }
14             ## use critic
15              
16 1     1   116751 use 5.010;
  1         6  
  1         98  
17 1     1   2179 use Riak::Client::PBC;
  1         3  
  1         85  
18 1     1   1253 use Type::Params qw(compile);
  1         199278  
  1         9  
19 1     1   238 use Types::Standard -types;
  1         3  
  1         6  
20 1     1   5580 use Errno qw(EINTR);
  1         1447  
  1         131  
21 1     1   7 use Scalar::Util qw(blessed);
  1         2  
  1         44  
22 1     1   1251 use JSON::XS;
  1         20467  
  1         90  
23 1     1   12 use Carp;
  1         3  
  1         83  
24             $Carp::Internal{ (__PACKAGE__) }++;
25 1     1   1061 use Module::Runtime qw(use_module);
  1         4615  
  1         9  
26             require bytes;
27 1     1   1606 use Moo;
  1         19354  
  1         7  
28              
29 1     1   5817 use AnyEvent::Handle;
  1         79219  
  1         57  
30              
31 1     1   11787 use IO::Socket::INET;
  1         104311  
  1         12  
32 1     1   2021 use IO::Socket::Timeout;
  1         14880  
  1         15  
33              
34 1     1   49 use Scalar::Util qw(weaken);
  1         2  
  1         107  
35              
36             use constant {
37             # error
38 1         1588 ERROR_RESPONSE_CODE => 0,
39             # ping
40             PING_REQUEST_CODE => 1,
41             PING_RESPONSE_CODE => 2,
42             # get, get_raw
43             GET_REQUEST_CODE => 9,
44             GET_RESPONSE_CODE => 10,
45             # put, put_raw
46             PUT_REQUEST_CODE => 11,
47             PUT_RESPONSE_CODE => 12,
48             # del
49             DEL_REQUEST_CODE => 13,
50             DEL_RESPONSE_CODE => 14,
51             # get_buckets
52             GET_BUCKETS_REQUEST_CODE => 15,
53             GET_BUCKETS_RESPONSE_CODE => 16,
54             # get_keys
55             GET_KEYS_REQUEST_CODE => 17,
56             GET_KEYS_RESPONSE_CODE => 18,
57             # get_bucket_props
58             GET_BUCKET_PROPS_REQUEST_CODE => 19,
59             GET_BUCKET_PROPS_RESPONSE_CODE => 20,
60             # set_bucket_props
61             SET_BUCKET_PROPS_REQUEST_CODE => 21,
62             SET_BUCKET_PROPS_RESPONSE_CODE => 22,
63             # map_reducd
64             MAP_REDUCE_REQUEST_CODE => 23,
65             MAP_REDUCE_RESPONSE_CODE => 24,
66             # query_index
67             QUERY_INDEX_REQUEST_CODE => 25,
68             QUERY_INDEX_RESPONSE_CODE => 26,
69 1     1   6 };
  1         1  
70              
71              
72             # ABSTRACT: Fast and lightweight Perl client for Riak
73              
74              
75             has host => ( is => 'ro', isa => Str, required => 1 );
76             has port => ( is => 'ro', isa => Int, required => 1 );
77             has r => ( is => 'ro', isa => Int, default => sub {2} );
78             has w => ( is => 'ro', isa => Int, default => sub {2} );
79             has dw => ( is => 'ro', isa => Int, default => sub {1} );
80             has connection_timeout => ( is => 'ro', isa => Num, default => sub {5} );
81             has read_timeout => ( is => 'ro', predicate => 1, isa => Num, default => sub {5} );
82             has write_timeout => ( is => 'ro', predicate => 1, isa => Num, default => sub {5} );
83             has no_delay => ( is => 'ro', isa => Bool, default => sub {0} );
84              
85              
86             has no_auto_connect => ( is => 'ro', isa => Bool, default => sub {0} );
87              
88              
89             has anyevent_mode => ( is => 'ro', reader => 'ae', isa => Bool, default => sub {0} );
90              
91             has _cv_connected => ( is => 'ro', lazy => 1, default => sub { AE::cv });
92             has _requests_lock => ( is => 'rw', default => sub { undef });
93              
94             has _handle => ( is => 'ro', lazy => 1, builder => 1 );
95             sub _build__handle {
96 0     0   0 my ($self) = @_;
97 0         0 my ($host, $port) = ($self->host, $self->port);
98              
99 0         0 weaken $self;
100              
101             # TODO = timeouts
102             AnyEvent::Handle->new (
103             connect => [$host, $port],
104             no_delay => $self->no_delay(),
105             on_error => sub {
106 0     0   0 $_[0]->destroy; # explicitly destroy handle
107              
108 0   0     0 _die_generic_error("on host $host:$port: $_[2]", $self->_current_request_ae_args->[0] // {});
109             },
110             # rtimeout => $self->read_timeout,
111             # wtimeout => $self->write_timeout,
112             # on_prepare => sub { $self->connection_timeout },
113 0     0   0 on_connect => sub { $self->_cv_connected->send },
114             # on_timeout => sub { print STDERR " ---- PLOP \n";},
115 0         0 );
116              
117             }
118              
119              
120             # Why are we doing that ? It's because we want to avoid creating these closure
121             # everytime we send or recerive data from the socket. So we build them here
122             # once and for all. However the tricky part is that these callbacks need to
123             # access $self and $args. So we make sure they can.
124             has _current_request_ae_args => ( is => 'rw', default => sub { [] } );
125             has _handle_reader_callback => ( is => 'ro', lazy => 1, builder => 1 );
126             sub _build__handle_reader_callback {
127 0     0   0 my ($self) = @_;
128 0         0 weaken $self;
129              
130 0         0 my $handle_reader_callback_weak;
131              
132             my $inner_handle_reader_callback = sub {
133 0     0   0 my ( $response_code, $response_body ) = unpack( 'c a*', $_[1] );
134            
135 0 0       0 my $args = $self->_current_request_ae_args->[0]
136             or _die_generic_error( "Unexpected Response (got: $response_code, expected: nothing)", {} );
137              
138              
139             # in case of error msg
140 0 0       0 if ($response_code == ERROR_RESPONSE_CODE) {
141 0         0 my $decoded_message = RpbErrorResp->decode($response_body);
142 0         0 my $errmsg = $decoded_message->errmsg;
143 0         0 my $errcode = $decoded_message->errcode;
144            
145 0         0 _die_generic_error( "Riak Error (code: $errcode) '$errmsg'", $args );
146             }
147            
148             # check if we have what we want
149 0 0       0 $response_code != $args->{expected_code}
150             and _die_generic_error(
151             "Unexpected Response Code in (got: $response_code, expected: $args->{expected_code})",
152             $args );
153            
154             # default value if we don't need to handle the response.
155 0         0 my ($ret, $more_to_come) = ( \1, undef);
156             # remember, $handle_response may or may not use $args->{cb}
157 0 0       0 if (my $handle_response = $args->{handle_response}) {
158 0         0 ($ret, $more_to_come) = $handle_response->( $self, $response_body, $args );
159             }
160             # if we expect more to come, re-prepend the handler
161 0 0       0 $more_to_come and $_[0]->unshift_read( chunk => 4, $handle_reader_callback_weak),
162             return;
163            
164             # ok, single or multiple response are over, remove the current request
165             # args, and remove the lock. This is done before last callback
166             # execution, so that user can re-enqueue a request right away.
167 0         0 shift @{$self->_current_request_ae_args};
  0         0  
168 0         0 my $lock = $self->_requests_lock;
169 0 0       0 $lock and $lock->send();
170            
171             # if no user callback provided, use the $cv and return.
172 0 0       0 !$args->{cb}
173             and $args->{cv}->send($ret),
174             return;
175            
176             # If $ret is undef, means everything has been processed and
177             # callback called in $handle_response, nothing left to do.
178             # Otherwise, we have a result, call the callback on it
179 0 0       0 $ret and $args->{cb}->($$ret);
180            
181 0         0 };
182              
183             my $handle_reader_callback = sub {
184             # length arrived, decode
185 0     0   0 my $len = unpack "N", $_[1];
186             # now read the payload
187 0         0 $_[0]->unshift_read( chunk => $len, $inner_handle_reader_callback);
188 0         0 };
189              
190 0         0 $handle_reader_callback_weak = $handle_reader_callback;
191 0         0 weaken $handle_reader_callback_weak;
192 0         0 $handle_reader_callback;
193             }
194              
195             has _socket => ( is => 'ro', lazy => 1, builder => 1 );
196             sub _build__socket {
197 0     0   0 my ($self) = @_;
198              
199 0         0 my $host = $self->host;
200 0         0 my $port = $self->port;
201              
202 0         0 my $socket = IO::Socket::INET->new(
203             PeerHost => $host,
204             PeerPort => $port,
205             Timeout => $self->connection_timeout,
206             );
207              
208 0 0       0 croak "Error ($!), can't connect to $host:$port"
209             unless defined $socket;
210              
211 0 0 0     0 $self->has_read_timeout || $self->has_write_timeout
212             or return $socket;
213              
214             # enable read and write timeouts on the socket
215 0         0 IO::Socket::Timeout->enable_timeouts_on($socket);
216             # setup the timeouts
217 0 0       0 $self->has_read_timeout
218             and $socket->read_timeout($self->read_timeout);
219 0 0       0 $self->has_write_timeout
220             and $socket->write_timeout($self->write_timeout);
221              
222 1     1   7 use Socket qw(IPPROTO_TCP TCP_NODELAY);
  1         2  
  1         11138  
223 0 0       0 $self->no_delay
224             and $socket->setsockopt(IPPROTO_TCP, TCP_NODELAY, 1);
225              
226 0         0 return $socket;
227             }
228              
229             sub BUILD {
230 3     3 0 74 my ($self) = @_;
231 3 50       79 $self->no_auto_connect
232             and return;
233              
234 0           $self->connect();
235             }
236              
237              
238             sub connect {
239 0     0 1   state $check = compile(Any, Optional[CodeRef]);
240 0           my ( $self, $cb ) = $check->(@_);
241              
242 0 0         if ( ! $self->ae ) {
243 0           $self->_socket();
244 0 0         if ($cb) {
245 0           $cb->();
246             } else {
247 0           return 1;
248             }
249             } else {
250              
251 0           $self->_handle();
252 0 0         if (my $cb = ref $_[-1] eq 'CODE' ? $_[-1] : undef) {
    0          
253 0           $self->_cv_connected->cb($cb);
254 0           return;
255             }
256              
257 0           $self->_cv_connected->recv;
258 0           return 1;
259             }
260              
261             }
262              
263             has _getkeys_accumulator => (is => 'rw', init_arg => undef);
264             has _mapreduce_accumulator => (is => 'rw', init_arg => undef);
265              
266              
267             sub ping {
268 0     0 1   state $check = compile(Any, Optional[CodeRef]);
269 0           my ( $self, $cb ) = $check->(@_);
270 0           $_[0]->_parse_response( {
271             request_code => PING_REQUEST_CODE,
272             expected_code => PING_RESPONSE_CODE,
273             operation_name => 'ping',
274             body_ref => \'',
275             cb => $cb,
276             } );
277             }
278              
279              
280             sub is_alive {
281 0     0 1   state $check = compile(Any, Optional[CodeRef]);
282 0           my ( $self, $cb ) = $check->(@_);
283 0           my $res = eval { $self->ping; 1 };
  0            
  0            
284 0 0         $cb and return $cb->($res);
285 0           return $res;
286             }
287              
288              
289             sub get {
290 0     0 1   state $check = compile(Any, Str, Str, Optional[CodeRef]);
291 0           my ( $self, $bucket, $key, $cb ) = $check->(@_);
292 0           $self->_fetch( $bucket, $key, 1, 0, $cb );
293             }
294              
295              
296             sub get_raw {
297 0     0 1   state $check = compile(Any, Str, Str, Optional[CodeRef]);
298 0           my ( $self, $bucket, $key, $cb ) = $check->(@_);
299 0           $self->_fetch( $bucket, $key, 0, 0, $cb );
300             }
301              
302              
303             #my $LinksStructure = declare as ArrayRef[Dict[bucket => Str, key => Str, tag => Str]];
304             #coerce $LinksStructure, from HashRef[] Num, q{ int($_) };
305              
306             sub put {
307 0 0   0 1   my $cb = ref $_[-1] eq 'CODE' ? pop : undef;
308 0           state $check = compile(Any, Str, Str, Any, Optional[Str],
309             Optional[HashRef[Str]], # indexes
310             Optional[ArrayRef[Dict[bucket => Str, key => Str, tag => Str]]], # links
311             );
312 0           my ( $self, $bucket, $key, $value, $content_type, $indexes, $links ) = $check->(@_);
313              
314 0 0 0       ($content_type //= 'application/json')
315             eq 'application/json'
316             and $value = encode_json($value);
317              
318 0           $self->_store( $bucket, $key, $value, $content_type, $indexes, $links, $cb);
319             }
320              
321              
322              
323             sub put_raw {
324 0 0   0 1   my $cb = ref $_[-1] eq 'CODE' ? pop : undef;
325 0           state $check = compile(Any, Str, Str, Any, Optional[Str],
326             Optional[HashRef[Str]], # indexes
327             Optional[ArrayRef[Dict[bucket => Str, key => Str, tag => Str]]], # links
328             );
329 0           my ( $self, $bucket, $key, $value, $content_type, $indexes, $links ) = $check->(@_);
330 0   0       $content_type ||= 'plain/text';
331 0           $self->_store( $bucket, $key, $value, $content_type, $indexes, $links, $cb);
332             }
333              
334              
335             sub del {
336 0     0 1   state $check = compile(Any, Str, Str, Optional[CodeRef]);
337 0           my ( $self, $bucket, $key, $cb ) = $check->(@_);
338              
339 0           my $body = RpbDelReq->encode(
340             { key => $key,
341             bucket => $bucket,
342             rw => $self->dw
343             }
344             );
345              
346 0           $self->_parse_response( {
347             request_code => DEL_REQUEST_CODE,
348             expected_code => DEL_RESPONSE_CODE,
349             operation_name => 'del',
350             key => $key,
351             bucket => $bucket,
352             body_ref => \$body,
353             cb => $cb,
354             } );
355             }
356              
357              
358             sub get_keys {
359 0     0 1   state $check = compile(Any, Str, Optional[CodeRef]);
360 0           my ( $self, $bucket, $cb ) = $check->(@_);
361              
362             # reset accumulator
363 0           $self->_getkeys_accumulator([]);
364 0           my $body = RpbListKeysReq->encode( { bucket => $bucket } );
365 0           $self->_parse_response( {
366             request_code => GET_KEYS_REQUEST_CODE,
367             expected_code => GET_KEYS_RESPONSE_CODE,
368             operation_name => 'get_keys',
369             key => "*",
370             bucket => $bucket,
371             body_ref => \$body,
372             cb => $cb,
373             handle_response => \&_handle_get_keys_response,
374             lock_requests => 1,
375             } );
376             }
377              
378             sub _handle_get_keys_response {
379 0     0     my ( $self, $encoded_message, $args ) = @_;
380              
381             # TODO: support for 1.4 (which provides 'stream', 'return_terms', and 'stream')
382 0           my $obj = RpbListKeysResp->decode( $encoded_message );
383 0   0       my @keys = @{$obj->keys // []};
  0            
384              
385             # case 1 : no user callback
386 0           my $cb = $args->{cb};
387 0 0         if (! $cb ) {
388             # accumulate results
389 0           push @{$self->_getkeys_accumulator}, @keys;
  0            
390              
391             # if more to come, return by saying so
392 0 0         $obj->done
393             or return (undef, 1);
394              
395             # all results are there, return the whole
396 0           my $keys = $self->_getkeys_accumulator;
397 0           $self->_getkeys_accumulator([]);
398 0           return \$keys;
399             }
400              
401             # case 2 : we have a user callback
402 0           my $last_key;
403 0 0         my $obj_done = $obj->done
404             and $last_key = pop @keys;
405              
406             # no second arg = more to come
407 0           $cb->($_) foreach @keys;
408              
409             # if more to come, return by saying so
410 0 0         $obj->done
411             or return (undef, 1);
412              
413             # process last keys if any
414 0 0         defined $last_key and $cb->($last_key, 1);
415              
416             # means: nothing left to do, all results processed through callback
417 0           return;
418             }
419              
420              
421             sub exists {
422 0     0 1   state $check = compile(Any, Str, Str, Optional[CodeRef]);
423 0           my ( $self, $bucket, $key, $cb ) = $check->(@_);
424 0           $self->_fetch( $bucket, $key, 0, 1, $cb );
425             }
426              
427             sub _fetch {
428 0     0     my ( $self, $bucket, $key, $decode, $test_exist, $cb ) = @_;
429              
430 0           my $body = RpbGetReq->encode(
431             { r => $self->r,
432             key => $key,
433             bucket => $bucket,
434             head => $test_exist
435             }
436             );
437              
438 0           $self->_parse_response( {
439             request_code => GET_REQUEST_CODE,
440             expected_code => GET_RESPONSE_CODE,
441             operation_name => 'get',
442             key => $key,
443             bucket => $bucket,
444             body_ref => \$body,
445             decode => $decode,
446             handle_response => \&_handle_get_response,
447             test_exist => $test_exist,
448             cb => $cb,
449             } );
450             }
451              
452             sub _handle_get_response {
453 0     0     my ( $self, $encoded_message, $args ) = @_;
454              
455 0 0         defined $encoded_message
456             or _die_generic_error( "Undefined Message", 'get', $args );
457              
458 0           my $decoded_message = RpbGetResp->decode($encoded_message);
459 0           my $content = $decoded_message->content;
460              
461             # empty content
462 0 0         ref $content eq 'ARRAY'
463             or return \undef;
464              
465             # if we just need to test existence
466 0 0         $args->{test_exist}
467             and return \1;
468              
469             # TODO: handle metadata
470 0           my $value = $content->[0]->value;
471 0           my $content_type = $content->[0]->content_type;
472              
473             # if we need to decode
474 0 0 0       $args->{decode} && ($content_type // '') eq 'application/json'
      0        
475             and return \decode_json($value);
476              
477             # simply return the value
478 0           return \$value;
479             }
480              
481             sub _store {
482 0     0     my ( $self, $bucket, $key, $encoded_value, $content_type, $indexes, $links, $cb ) = @_;
483              
484 0           my $body = RpbPutReq->encode(
485             { key => $key,
486             bucket => $bucket,
487             content => {
488             value => $encoded_value,
489             content_type => $content_type,
490             ( $indexes ?
491             ( indexes => [
492             map {
493 0 0         { key => $_ , value => $indexes->{$_} }
    0          
494             } keys %$indexes
495             ])
496             : ()
497             ),
498             ( $links ? ( links => $links) : () ),
499             },
500             }
501             );
502              
503 0           $self->_parse_response( {
504             request_code => PUT_REQUEST_CODE,
505             expected_code => PUT_RESPONSE_CODE,
506             operation_name => 'put',
507             key => $key,
508             bucket => $bucket,
509             body_ref => \$body,
510             cb => $cb,
511             } );
512             }
513              
514              
515             sub query_index {
516 0     0 1   state $check = compile(Any, Str, Str, Str|ArrayRef, Optional[CodeRef]);
517 0           my ( $self, $bucket, $index, $value_to_match, $cb ) = $check->(@_);
518              
519 0           my $query_type_is_eq = 0; # eq
520 0 0         ref $value_to_match
521             and $query_type_is_eq = 1; # range
522 0 0         my $body = RpbIndexReq->encode(
523             { index => $index,
524             bucket => $bucket,
525             qtype => $query_type_is_eq,
526             $query_type_is_eq ?
527             ( range_min => $value_to_match->[0],
528             range_max => $value_to_match->[1] )
529             : (key => $value_to_match ),
530             }
531             );
532            
533 0 0         $self->_parse_response( {
534             request_code => QUERY_INDEX_REQUEST_CODE,
535             expected_code => QUERY_INDEX_RESPONSE_CODE,
536             operation_name => 'query_index',
537             $query_type_is_eq ?
538             (key => '2i query on ' . join('...', @$value_to_match) )
539             : (key => $value_to_match ),
540             bucket => $bucket,
541             body_ref => \$body,
542             handle_response => \&_handle_query_index_response,
543             cb => $cb,
544             lock_requests => 1,
545             } );
546             }
547              
548             sub _handle_query_index_response {
549 0     0     my ( $self, $encoded_message, $args ) = @_;
550            
551 0           my $obj = RpbIndexResp->decode( $encoded_message );
552 0   0       my @keys = @{$obj->keys // []};
  0            
553              
554             # case 1 : no user callback
555 0 0         my $cb = $args->{cb}
556             or return \\@keys;
557              
558             # case 2 : we have a user callback
559 0           $cb->($_) foreach @keys;
560              
561             # means: nothing left to do, all results processed through callback
562 0           return;
563              
564             }
565              
566              
567             sub get_buckets {
568 0     0 1   state $check = compile(Any, Optional[CodeRef]);
569 0           my ( $self, $cb ) = $check->(@_);
570              
571 0           $self->_parse_response( {
572             request_code => GET_BUCKETS_REQUEST_CODE,
573             expected_code => GET_BUCKETS_RESPONSE_CODE,
574             operation_name => 'get_buckets',
575             handle_response => \&_handle_get_buckets_response,
576             cb => $cb,
577             } );
578             }
579              
580             sub _handle_get_buckets_response {
581 0     0     my ( $self, $encoded_message, $args ) = @_;
582 0           my $obj = RpbListBucketsResp->decode( $encoded_message );
583 0   0       return \($obj->buckets // []);
584             }
585              
586              
587             sub get_bucket_props {
588 0     0 1   state $check = compile(Any, Str, Optional[CodeRef]);
589 0           my ( $self, $bucket, $cb ) = $check->(@_);
590              
591 0           my $body = RpbGetBucketReq->encode( { bucket => $bucket } );
592 0           $self->_parse_response( {
593             request_code => GET_BUCKET_PROPS_REQUEST_CODE,
594             expected_code => GET_BUCKET_PROPS_RESPONSE_CODE,
595             bucket => $bucket,
596             body_ref => \$body,
597             handle_response => \&_handle_get_bucket_props_response,
598             cb => $cb,
599             } );
600             }
601              
602             sub _handle_get_bucket_props_response {
603 0     0     my ( $self, $encoded_message, $args ) = @_;
604              
605 0           my $obj = RpbListBucketsResp->decode( $encoded_message );
606 0           my $props = RpbBucketProps->decode($obj->buckets->[0]);
607 0           return \{ %$props }; # unblessing variable
608             }
609              
610              
611             sub set_bucket_props {
612 0     0 1   state $check = compile( Any, Str,
613             Dict[ n_val => Optional[Int],
614             allow_mult => Optional[Bool] ],
615             Optional[CodeRef] );
616 0           my ( $self, $bucket, $props, $cb ) = $check->(@_);
617 0 0 0       $props->{n_val} && $props->{n_val} < 0 and croak 'n_val should be possitive integer';
618              
619 0           my $body = RpbSetBucketReq->encode({ bucket => $bucket, props => $props });
620 0           $self->_parse_response( {
621             request_code => SET_BUCKET_PROPS_REQUEST_CODE,
622             expected_code => SET_BUCKET_PROPS_RESPONSE_CODE,
623             bucket => $bucket,
624             body_ref => \$body,
625             } );
626             }
627              
628              
629             sub map_reduce {
630 0     0 1   state $check = compile(Any, Any, Optional[CodeRef]);
631 0           my ( $self, $request, $cb) = $check->(@_);
632              
633 0           my @args;
634            
635 0 0         push @args, ref($request) ? encode_json($request): $request;
636 0           push @args, 'application/json';
637 0 0         push @args, $cb if $cb;
638            
639 0           map_reduce_raw($self, @args);
640              
641             }
642              
643              
644             sub map_reduce_raw {
645 0     0 1   state $check = compile(Any, Str, Str, Optional[CodeRef]);
646 0           my ( $self, $request, $content_type, $cb) = $check->(@_);
647            
648 0           my $body = RpbMapRedReq->encode(
649             {
650             request => $request,
651             content_type => $content_type,
652             }
653             );
654              
655             # reset accumulator
656 0           $self->_mapreduce_accumulator([]);
657              
658 0           $self->_parse_response( {
659             request_code => MAP_REDUCE_REQUEST_CODE,
660             expected_code => MAP_REDUCE_RESPONSE_CODE,
661             operation => 'map_reduce',
662             body_ref => \$body,
663             cb => $cb,
664             decode => ($content_type eq 'application/json'),
665             handle_response => \&_handle_map_reduce_response,
666             lock_requests => 1,
667             } );
668             }
669              
670             sub _handle_map_reduce_response {
671 0     0     my ( $self, $encoded_message, $args ) = @_;
672 0           my $obj = RpbMapRedResp->decode( $encoded_message );
673              
674             # case 1 : no user callback
675 0           my $cb = $args->{cb};
676 0 0         if (! $cb ) {
677              
678             # all results were there, reset the accumulator and return the whole,
679 0 0         if ($obj->done) {
680 0           my $results = $self->_mapreduce_accumulator();
681 0           $self->_mapreduce_accumulator([]);
682 0           return \$results;
683             }
684              
685             # accumulate results
686 0 0 0       push @{$self->_mapreduce_accumulator},
  0            
687             { phase => $obj->phase, response => ($args->{decode}) ? decode_json($obj->response // '[]') : $obj->response };
688              
689             # more stuff to come, say so
690 0           return (undef, 1);
691              
692             }
693              
694             # case 2 : we have a user callback
695              
696             # means: nothing left to do, all results processed through callback
697             $obj->done
698 0 0         and return;
699              
700 0           $cb->($obj->response, $obj->phase, $obj->done);
701              
702             # more stuff to come, say so
703 0           return (undef, 1);
704              
705             }
706              
707             sub _parse_response {
708 0     0     my ( $self, $args ) = @_;
709              
710 0 0         $self->ae
711             and goto &_parse_response_ae;
712              
713 0           my $socket = $self->_socket;
714 0   0       _send_bytes($socket, $args->{request_code}, $args->{body_ref} // \'');
715              
716 0           while (1) {
717 0           my $response;
718             # get and check response
719 0 0 0       my $raw_response_ref = _read_response($socket)
720             or _die_generic_error( $! || "Socket Closed", $args);
721              
722 0           my ( $response_code, $response_body ) = unpack( 'c a*', $$raw_response_ref );
723              
724             # in case of error msg
725 0 0         if ($response_code == ERROR_RESPONSE_CODE) {
726 0           my $decoded_message = RpbErrorResp->decode($response_body);
727 0           my $errmsg = $decoded_message->errmsg;
728 0           my $errcode = $decoded_message->errcode;
729              
730 0           _die_generic_error( "Riak Error (code: $errcode) '$errmsg'", $args);
731             }
732              
733              
734             # check if we have what we want
735 0 0         $response_code != $args->{expected_code}
736             and _die_generic_error(
737             "Unexpected Response Code in (got: $response_code, expected: $args->{expected_code})",
738             $args );
739            
740             # default value if we don't need to handle the response.
741 0           my ($ret, $more_to_come) = ( \1, undef);
742              
743             # handle the response.
744 0 0         if (my $handle_response = $args->{handle_response}) {
745 0           ($ret, $more_to_come) = $handle_response->( $self, $response_body, $args);
746             }
747              
748             # it's a multiple response request, loop again
749             $more_to_come
750 0 0         and next;
751              
752             # there is a result, process or return it
753 0 0         if ($ret) {
754 0 0         $args->{cb} and return $args->{cb}->($$ret);
755 0           return $$ret;
756             }
757              
758             # ret was undef, means we have processed everything in the callback
759 0           return;
760              
761             }
762             }
763              
764             sub _parse_response_ae {
765 0     0     my ( $self, $args ) = @_;
766              
767              
768             # OK so Riak doesn't support pipelining. That means that you can't send a
769             # request before the previous one has returned. Especially true for
770             # multiple response requests, like get_keys. So we need a way to detect
771             # that a running request is occuring, and we can't push_read before the
772             # previous request is done.
773              
774             # if there is a lock on the requests
775 0 0         if ($self->_requests_lock) {
776             # wait to acquire lock
777 0           $self->_requests_lock->recv();
778             # delete the lock
779 0           $self->_requests_lock(undef);
780             }
781              
782             # if this request can have multiple responses, set the lock.
783 0 0         $args->{lock_requests}
784             and $self->_requests_lock(AE::cv);
785              
786 0   0       my $body_ref = $args->{body_ref} // \'';
787            
788 0           $self->_handle->push_write(pack('N',
789             bytes::length($$body_ref) + 1)
790             . pack('c', $args->{request_code}) . $$body_ref
791             );
792              
793             # maybe we'll use a cv to force synchronous call
794 0           my $cv;
795             # if we don't have a user callback, we need to be synchronous, create the cv.
796 0 0         $args->{cb}
797             or $cv = AE::cv;
798              
799             # Store the cv also in the args so that the callback can get access to it
800 0           $args->{cv} = $cv;
801              
802             # Finally, store the given args to be reacheble from $self, so that
803             # _handle_reader_callback can get it. We push it in our stack, because we
804             # can stack up multiple requests, and get the response only after that.
805             # However we have the guarantee from AnyEvent and Riak that we will get the
806             # answers in order (for a given Riak connection, that is, for a given
807             # $self).
808 0           push @{$self->_current_request_ae_args()}, $args;
  0            
809              
810             # OK, now try to read from the socket with the handler
811 0           $self->_handle->push_read( chunk => 4, $self->_handle_reader_callback);
812              
813             # we were given a user callback, don't be synchronous, immediately return.
814 0 0         $args->{cb} and return;
815              
816             # no user callback, let's be synchronous
817 0           my $res = $cv->recv();
818 0 0         $res and
819             return $$res;
820              
821             # $res was undef, that's an error here
822 0           _die_generic_error( "internal error: response handler returns , but not in callback mode",
823             $args );
824             }
825              
826             sub _die_generic_error {
827 0     0     my ( $error, $args ) = @_;
828              
829 0   0       my ($operation_name, $bucket, $key) =
830 0           map { $args->{$_} // "" } ( qw( operation_name bucket key) );
831              
832 0           my $extra = '';
833 0 0 0       defined $bucket && defined $key
834             and $extra = "(bucket: $bucket, key: $key) ";
835              
836 0           my $msg = "Error in '$operation_name' $extra: $error";
837 0 0         if (my $cv = $args->{cv}) {
838 0           $cv->croak($msg);
839             } else {
840 0           croak $msg;
841             }
842             }
843              
844             sub _read_response {
845 0     0     my ($socket) = @_;
846 0   0       _read_bytes($socket, unpack( 'N', ${ _read_bytes($socket, 4) // return } ));
  0            
847             }
848              
849             sub _read_bytes {
850 0     0     my ( $socket, $length ) = @_;
851              
852 0           my $buffer;
853 0           my $offset = 0;
854 0           my $read = 0;
855              
856 0           while ($length > 0) {
857 0           $read = $socket->sysread( $buffer, $length, $offset );
858 0 0         if (! defined $read) {
859 0 0         $! == EINTR
860             and next;
861 0           return;
862             }
863              
864 0 0         $read > 0
865             or return;
866              
867 0           $offset += $read;
868 0           $length -= $read;
869             }
870              
871 0           return \$buffer;
872             }
873              
874              
875             sub _send_bytes {
876 0     0     my ( $socket, $request_code, $body_ref ) = @_;
877              
878 0           my $bytes = pack('N', my $length = (bytes::length($$body_ref) + 1)) . pack('c', $request_code) . $$body_ref;
879              
880 0           $length += 4;
881 0           my $offset = 0;
882 0           my $sent = 0;
883              
884 0           while ($length > 0) {
885 0           $sent = $socket->syswrite( $bytes, $length, $offset );
886 0 0         if (! defined $sent) {
887 0 0         $! == EINTR
888             and next;
889 0           return;
890             }
891              
892 0 0         $sent > 0
893             or return;
894              
895 0           $offset += $sent;
896 0           $length -= $sent;
897             }
898              
899 0           return $offset;
900             }
901              
902              
903              
904             1;
905              
906             __END__