File Coverage

blib/lib/Riak/Light.pm
Criterion Covered Total %
statement 239 269 88.8
branch 86 106 81.1
condition 27 35 77.1
subroutine 49 55 89.0
pod 21 22 95.4
total 422 487 86.6


line stmt bran cond sub pod time code
1             #
2             # This file is part of Riak-Light
3             #
4             # This software is copyright (c) 2013 by Weborama.
5             #
6             # This is free software; you can redistribute it and/or modify it under
7             # the same terms as the Perl 5 programming language system itself.
8             #
9             ## no critic (RequireUseStrict, RequireUseWarnings)
10             package Riak::Light;
11             {
12             $Riak::Light::VERSION = '0.12';
13             }
14             ## use critic
15              
16 14     14   434247 use 5.010;
  14         60  
17 14     14   9272 use Riak::Light::PBC;
  14         52  
  14         607  
18 14     14   9826 use Riak::Light::Driver;
  14         53  
  14         495  
19 14     14   12210 use MIME::Base64 qw(encode_base64);
  14         9802  
  14         1128  
20 14     14   10391 use Type::Params qw(compile);
  14         176028  
  14         133  
21 14     14   3188 use Types::Standard -types;
  14         29  
  14         67  
22 14     14   49107 use English qw(-no_match_vars );
  14         28  
  14         130  
23 14     14   6801 use Scalar::Util qw(blessed);
  14         25  
  14         640  
24 14     14   12497 use IO::Socket;
  14         315494  
  14         67  
25 14     14   8626 use Socket qw(TCP_NODELAY IPPROTO_TCP);
  14         28  
  14         3027  
26 14     14   10182 use Const::Fast;
  14         13784  
  14         88  
27 14     14   13702 use JSON;
  14         182484  
  14         88  
28 14     14   2167 use Carp;
  14         33  
  14         1042  
29 14     14   75 use Module::Runtime qw(use_module);
  14         58  
  14         191  
30 14     14   747 use Moo;
  14         31  
  14         138  
31              
32             # ABSTRACT: Fast and lightweight Perl client for Riak
33              
34             has pid => ( is => 'lazy', isa => Int, clearer => 1, predicate => 1 );
35             has port => ( is => 'ro', isa => Int, required => 1 );
36             has host => ( is => 'ro', isa => Str, required => 1 );
37             has r => ( is => 'ro', isa => Int, default => sub {2} );
38             has w => ( is => 'ro', isa => Int, default => sub {2} );
39             has dw => ( is => 'ro', isa => Int, default => sub {2} );
40              
41             has pr => ( is => 'ro', isa => Int, predicate => 1 );
42             has pw => ( is => 'ro', isa => Int, predicate => 1 );
43             has rw => ( is => 'ro', isa => Int, predicate => 1 );
44              
45             has autodie => ( is => 'ro', isa => Bool, default => sub {1}, trigger => 1 );
46             has timeout => ( is => 'ro', isa => Num, default => sub {0.5} );
47             has tcp_nodelay => ( is => 'ro', isa => Bool, default => sub {1} );
48             has in_timeout => ( is => 'lazy', trigger => 1 );
49             has out_timeout => ( is => 'lazy', trigger => 1 );
50             has client_id => ( is => 'lazy', isa => Str );
51              
52             sub _build_pid {
53 59     59   8950 $$;
54             }
55              
56             sub _build_client_id {
57 0     0   0 "perl_riak_light" . encode_base64( int( rand(10737411824) ), '' );
58             }
59              
60             sub _trigger_autodie {
61 45     45   199346 my ( $self, $value ) = @_;
62 45 100       964 carp "autodie will be disable in the next version" unless $value;
63             }
64              
65             sub _trigger_in_timeout {
66 1     1   56 carp
67             "this feature will be disabled in the next version, you should use just timeout instead";
68             }
69              
70             sub _trigger_out_timeout {
71 6     6   1246 carp
72             "this feature will be disabled in the next version, you should use just timeout instead";
73             }
74              
75             sub _build_in_timeout {
76 10     10   2684 $_[0]->timeout;
77             }
78              
79             sub _build_out_timeout {
80 5     5   592 $_[0]->timeout;
81             }
82              
83             has timeout_provider => (
84             is => 'ro',
85             isa => Maybe [Str],
86             default => sub {'Riak::Light::Timeout::Select'}
87             );
88              
89             has driver => ( is => 'lazy', clearer => 1 );
90              
91             sub _build_driver {
92 17     17   4268 Riak::Light::Driver->new( socket => $_[0]->_build_socket() );
93             }
94              
95             sub _build_socket {
96 17     17   108 my ($self) = @_;
97              
98 17         462 $self->pid; # force associate the pid with the current socket
99              
100 17         1629 my $host = $self->host;
101 17         132 my $port = $self->port;
102              
103 17         431 my $socket = IO::Socket::INET->new(
104             PeerHost => $host,
105             PeerPort => $port,
106             Timeout => $self->timeout,
107             );
108              
109 17 100       31761 croak "Error ($!), can't connect to $host:$port"
110             unless defined $socket;
111              
112 16 100       128 if ( $self->tcp_nodelay ) {
113 15 50       364 $socket->setsockopt( IPPROTO_TCP, TCP_NODELAY, 1 )
114             or croak "Cannot set tcp nodelay $! ($^E)";
115             }
116              
117 16 100       637 return $socket unless defined $self->timeout_provider;
118              
119 14     14   31741 use Module::Load qw(load);
  14         14859  
  14         97  
120 10         130 load $self->timeout_provider;
121              
122             # TODO: add a easy way to inject this proxy
123 10         1630 $self->timeout_provider->new(
124             socket => $socket,
125             in_timeout => $self->in_timeout,
126             out_timeout => $self->out_timeout,
127             );
128             }
129              
130             sub BUILD {
131 62     62 0 3243 $_[0]->driver;
132             }
133              
134             const my $PING => 'ping';
135             const my $GET => 'get';
136             const my $PUT => 'put';
137             const my $DEL => 'del';
138             const my $GET_KEYS => 'get_keys';
139             const my $QUERY_INDEX => 'query_index';
140             const my $MAP_REDUCE => 'map_reduce';
141             const my $SET_CLIENT_ID => 'set_client_id';
142             const my $GET_CLIENT_ID => 'get_client_id';
143              
144             const my $ERROR_RESPONSE_CODE => 0;
145             const my $GET_RESPONSE_CODE => 10;
146             const my $GET_KEYS_RESPONSE_CODE => 18;
147             const my $MAP_REDUCE_RESPONSE_CODE => 24;
148             const my $QUERY_INDEX_RESPONSE_CODE => 26;
149             const my $GET_CLIENT_ID_RESPONSE_CODE => 4;
150              
151             const my $CODES => {
152             $PING => { request_code => 1, response_code => 2 },
153             $GET => { request_code => 9, response_code => 10 },
154             $PUT => { request_code => 11, response_code => 12 },
155             $DEL => { request_code => 13, response_code => 14 },
156             $GET_KEYS => { request_code => 17, response_code => 18 },
157             $MAP_REDUCE => { request_code => 23, response_code => 24 },
158             $QUERY_INDEX => { request_code => 25, response_code => 26 },
159             $GET_CLIENT_ID => { request_code => 3, response_code => 4 },
160             $SET_CLIENT_ID => { request_code => 5, response_code => 6 },
161             };
162              
163             const my $DEFAULT_MAX_RESULTS => 100;
164              
165             sub ping {
166 30     30 1 28759 $_[0]->_parse_response(
167             operation => $PING,
168             body => q(),
169             );
170             }
171              
172             sub is_alive {
173 2     2 1 1220 eval { $_[0]->ping };
  2         5  
174             }
175              
176             sub get_keys {
177 3     3 1 176 state $check = compile( Any, Str, Optional [CodeRef] );
178 3         2498 my ( $self, $bucket, $callback ) = $check->(@_);
179              
180 3         144 my $body = RpbListKeysReq->encode( { bucket => $bucket } );
181 3         149 $self->_parse_response(
182             key => "*",
183             bucket => $bucket,
184             operation => $GET_KEYS,
185             body => $body,
186             callback => $callback,
187             );
188             }
189              
190             sub get_raw {
191 3     3 1 7 state $check = compile( Any, Str, Str, Optional [Bool] );
192 3         1935 my ( $self, $bucket, $key, $return_all ) = $check->(@_);
193 3         95 my $response = $self->_fetch( $bucket, $key, 0 );
194              
195 3         100 my $result;
196 3 50       9 if ( defined $response ) {
197 3 100       10 $result = ($return_all) ? $response : $response->{value};
198             }
199 3         27 $result;
200             }
201              
202             sub get_full_raw {
203 1     1 1 35 state $check = compile( Any, Str, Str );
204 1         1104 my ( $self, $bucket, $key ) = $check->(@_);
205              
206 1         17 $self->get_raw( $bucket, $key, 1 );
207             }
208              
209             sub get {
210 8     8 1 689 state $check = compile( Any, Str, Str, Optional [Bool] );
211 8         4745 my ( $self, $bucket, $key, $return_all ) = $check->(@_);
212 8         240 my $response = $self->_fetch( $bucket, $key, 1 );
213 6         214 my $result;
214 6 100       15 if ( defined $response ) {
215 5 100       16 $result = ($return_all) ? $response : $response->{value};
216             }
217 6         34 $result;
218             }
219              
220             sub get_full {
221 1     1 1 41 state $check = compile( Any, Str, Str );
222 1         1082 my ( $self, $bucket, $key ) = $check->(@_);
223              
224 1         16 $self->get( $bucket, $key, 1 );
225             }
226              
227             sub get_all_indexes {
228 2     2 1 82 state $check = compile( Any, Str, Str );
229 2         1084 my ( $self, $bucket, $key ) = $check->(@_);
230 2         27 my $response = $self->_fetch( $bucket, $key, 0, 1 );
231              
232             return ( !defined $response )
233             ? []
234 3         45 : [ map { +{ value => $_->value, key => $_->key } }
235 2 50 100     75 @{ $response->{indexes} // [] } ];
  2         15  
236             }
237              
238             sub get_index_value {
239 0     0 1 0 state $check = compile( Any, Str, Str, Str );
240 0         0 my ( $self, $bucket, $key, $index_name ) = $check->(@_);
241              
242 0         0 $self->get_all_index_values( $bucket, $key )->{$index_name};
243             }
244              
245             sub get_all_index_values {
246 0     0 1 0 state $check = compile( Any, Str, Str );
247 0         0 my ( $self, $bucket, $key ) = $check->(@_);
248              
249 0         0 my %values;
250 0         0 foreach my $index ( @{ $self->get_all_indexes( $bucket, $key ) } ) {
  0         0  
251 0         0 my $key = $index->{key};
252 0   0     0 $values{$key} //= [];
253 0         0 push @{ $values{$key} }, $index->{value};
  0         0  
254             }
255              
256 0         0 \%values;
257             }
258              
259             sub get_vclock {
260 1     1 1 35 state $check = compile( Any, Str, Str );
261 1         1408 my ( $self, $bucket, $key ) = $check->(@_);
262 1         15 my $response = $self->_fetch( $bucket, $key, 0, 1 );
263              
264 1 50       86 defined $response and $response->{vclock};
265             }
266              
267             sub exists {
268 4     4 1 804 state $check = compile( Any, Str, Str );
269 4         2586 my ( $self, $bucket, $key ) = $check->(@_);
270 4         54 defined $self->_fetch( $bucket, $key, 0, 1 );
271             }
272              
273             sub _fetch {
274 18     18   35 my ( $self, $bucket, $key, $decode, $head ) = @_;
275              
276 18         30 my %extra_parameters;
277 18 100       83 $extra_parameters{pr} = $self->pr if $self->has_pr;
278              
279 18         174 my $body = RpbGetReq->encode(
280             { r => $self->r,
281             key => $key,
282             bucket => $bucket,
283             head => $head,
284             %extra_parameters
285             }
286             );
287              
288 18         2466 $self->_parse_response(
289             key => $key,
290             bucket => $bucket,
291             operation => $GET,
292             body => $body,
293             decode => $decode,
294             );
295             }
296              
297             sub put_raw {
298 4     4 1 75 state $check =
299             compile( Any, Str, Str, Any, Optional [Str],
300             Optional [ HashRef [ Str | ArrayRef [Str] ] ], Optional [Str] );
301 4         30998 my ( $self, $bucket, $key, $value, $content_type, $indexes, $vclock ) =
302             $check->(@_);
303 4   100     512 $content_type ||= 'plain/text';
304 4         17 $self->_store( $bucket, $key, $value, $content_type, $indexes, $vclock );
305             }
306              
307             sub put {
308 3     3 1 178 state $check =
309             compile( Any, Str, Str, Any, Optional [Str],
310             Optional [ HashRef [ Str | ArrayRef [Str] ] ], Optional [Str] );
311 3         31059 my ( $self, $bucket, $key, $value, $content_type, $indexes, $vclock ) =
312             $check->(@_);
313              
314 3 100 100     302 ( $content_type ||= 'application/json' ) eq 'application/json'
315             and $value = encode_json($value);
316              
317 3         14 $self->_store( $bucket, $key, $value, $content_type, $indexes, $vclock );
318             }
319              
320             sub _store {
321 7     7   18 my ( $self, $bucket, $key, $encoded_value, $content_type, $indexes,
322             $vclock ) = @_;
323              
324 7         17 my %extra_parameters = ();
325              
326 7 50       21 $extra_parameters{vclock} = $vclock if $vclock;
327 7         35 $extra_parameters{dw} = $self->dw;
328 7 100       36 $extra_parameters{pw} = $self->pw if $self->has_pw;
329              
330             my $body = RpbPutReq->encode(
331             { key => $key,
332             bucket => $bucket,
333             content => {
334             value => $encoded_value,
335             content_type => $content_type,
336             ( $indexes
337             ? ( indexes => [
338             map {
339 7 50       125 my $k = $_;
  0         0  
340 0         0 my $v = $indexes->{$_};
341             ref $v eq 'ARRAY'
342 0 0       0 ? map { { key => $k, value => $_ }; } @$v
  0         0  
343             : { key => $k, value => $v };
344             } keys %$indexes
345             ]
346             )
347             : ()
348             ),
349             },
350             w => $self->w,
351             %extra_parameters,
352             }
353             );
354              
355 7         2161 $self->_parse_response(
356             key => $key,
357             bucket => $bucket,
358             operation => $PUT,
359             body => $body,
360             );
361             }
362              
363             sub del {
364 4     4 1 183 state $check = compile( Any, Str, Str );
365 4         2743 my ( $self, $bucket, $key ) = $check->(@_);
366              
367 4         54 my %extra_parameters;
368              
369 4 100       29 $extra_parameters{rw} = $self->rw if $self->has_rw;
370 4 100       21 $extra_parameters{pr} = $self->pr if $self->has_pr;
371 4 100       22 $extra_parameters{pw} = $self->pw if $self->has_pw;
372              
373 4         83 my $body = RpbDelReq->encode(
374             { key => $key,
375             bucket => $bucket,
376             r => $self->r,
377             w => $self->w,
378             dw => $self->dw,
379             %extra_parameters
380             }
381             );
382              
383 4         746 $self->_parse_response(
384             key => $key,
385             bucket => $bucket,
386             operation => $DEL,
387             body => $body,
388             );
389             }
390              
391             sub query_index_loop {
392 1     1 1 36 state $check =
393             compile( Any, Str, Str, Str | ArrayRef, Optional [HashRef] );
394 1         4943 my ( $self, $bucket, $index, $value_to_match, $extra_parameters ) =
395             $check->(@_);
396              
397 1   50     143 $extra_parameters //= {};
398 1   33     9 $extra_parameters->{max_results} //= $DEFAULT_MAX_RESULTS;
399              
400 1         2 my @keys;
401             do {
402              
403 2         8 my ( $temp_keys, $continuation, undef ) =
404             $self->query_index( $bucket, $index, $value_to_match,
405             $extra_parameters );
406              
407 2         51 $extra_parameters->{continuation} = $continuation;
408              
409 2         3 push @keys, @{$temp_keys};
  2         12  
410              
411 1         2 } while ( defined $extra_parameters->{continuation} );
412              
413 1         4 return \@keys;
414             }
415              
416             sub query_index {
417 6     6 1 258 state $check =
418             compile( Any, Str, Str, Str | ArrayRef, Optional [HashRef] );
419 6         6733 my ( $self, $bucket, $index, $value_to_match, $extra_parameters ) =
420             $check->(@_);
421              
422 6         233 my $query_type = 0; # eq
423 6 50       16 ref $value_to_match
424             and $query_type = 1; # range
425              
426             croak "query index in stream mode not supported"
427 6 50 66     31 if defined $extra_parameters && $extra_parameters->{stream};
428              
429             my $body = RpbIndexReq->encode(
430             { index => $index,
431             bucket => $bucket,
432             qtype => $query_type,
433             $query_type
434             ? ( range_min => $value_to_match->[0],
435             range_max => $value_to_match->[1]
436             )
437             : ( key => $value_to_match ),
438 6 50 100     20 %{ $extra_parameters // {} },
  6         94  
439             }
440             );
441              
442             $self->_parse_response(
443             $query_type
444             ? ( key => "2i query on index='$index' => "
445             . $value_to_match->[0] . '...'
446             . $value_to_match->[1] )
447             : ( key => "2i query on index='$index' => " . $value_to_match ),
448             bucket => $bucket,
449             operation => $QUERY_INDEX,
450             body => $body,
451             paginate => defined $extra_parameters
452             && exists $extra_parameters->{max_results},
453 6 50 66     1494 );
454             }
455              
456             sub map_reduce {
457 3     3 1 210 state $check = compile( Any, Any, Optional [CodeRef] );
458 3         2216 my ( $self, $request, $callback ) = $check->(@_);
459              
460 3         106 my @args;
461              
462 3 50       15 push @args, ref($request) ? encode_json($request) : $request;
463 3         5 push @args, 'application/json';
464 3 100       12 push @args, $callback if $callback;
465              
466 3         10 $self->map_reduce_raw(@args);
467             }
468              
469             sub map_reduce_raw {
470 3     3 1 8 state $check = compile( Any, Str, Str, Optional [CodeRef] );
471 3         2342 my ( $self, $request, $content_type, $callback ) = $check->(@_);
472              
473 3         150 my $body = RpbMapRedReq->encode(
474             { request => $request,
475             content_type => $content_type,
476             }
477             );
478              
479 3         239 $self->_parse_response(
480             key => 'no-key',
481             bucket => 'no-bucket',
482             operation => $MAP_REDUCE,
483             body => $body,
484             callback => $callback,
485             decode => ( $content_type eq 'application/json' ),
486             );
487             }
488              
489             sub get_client_id {
490 0     0 1 0 my $self = shift;
491              
492 0         0 $self->_parse_response(
493             operation => $GET_CLIENT_ID,
494             body => q(),
495             );
496             }
497              
498             sub set_client_id {
499 0     0 1 0 state $check = compile( Any, Str );
500 0         0 my ( $self, $client_id ) = $check->(@_);
501              
502 0         0 my $body = RpbSetClientIdReq->encode( { client_id => $client_id } );
503              
504 0         0 $self->_parse_response(
505             operation => $SET_CLIENT_ID,
506             body => $body,
507             );
508             }
509              
510             sub _pid_change {
511 71     71   1797 $_[0]->pid != $$;
512             }
513              
514             sub _parse_response {
515 71     71   651 my ( $self, %args ) = @_;
516              
517 71         174 my $operation = $args{operation};
518              
519 71         238 my $request_code = $CODES->{$operation}->{request_code};
520 71         514 my $expected_code = $CODES->{$operation}->{response_code};
521              
522 71         161 my $request_body = $args{body};
523 71         135 my $decode = $args{decode};
524 71         142 my $bucket = $args{bucket};
525 71         147 my $key = $args{key};
526 71         117 my $callback = $args{callback};
527 71         193 my $paginate = $args{paginate};
528              
529 71 100       352 $self->autodie
530             or undef $@; ## no critic (RequireLocalizedPunctuationVars)
531              
532 71 100       264 if ( $self->_pid_change ) {
533 1         29 $self->clear_pid;
534 1         379 $self->clear_driver;
535             }
536              
537             $self->driver->perform_request(
538 71 100       3160 code => $request_code,
539             body => $request_body
540             )
541             or return $self->_process_generic_error(
542             $ERRNO, $operation, $bucket,
543             $key
544             );
545              
546             # my $done = 0;
547             #$expected_code != $GET_KEYS_RESPONSE_CODE;
548              
549 65         3877 my $response;
550             my @results;
551 65         149 while (1) {
552              
553             # get and check response
554 68   100     1424 $response = $self->driver->read_response()
555             // { code => -1, body => undef, error => $ERRNO };
556              
557             my ( $response_code, $response_body, $response_error ) =
558 68         3204 @{$response}{qw(code body error)};
  68         223  
559              
560             # in case of internal error message
561 68 100       264 defined $response_error
562             and return $self->_process_generic_error(
563             $response_error, $operation, $bucket,
564             $key
565             );
566              
567             # in case of error msg
568 49 100       188 $response_code == $ERROR_RESPONSE_CODE
569             and return $self->_process_riak_error(
570             $response_body, $operation, $bucket,
571             $key
572             );
573              
574             # in case of default message
575 48 100       177 $response_code != $expected_code
576             and return $self->_process_generic_error(
577             "Unexpected Response Code in (got: $response_code, expected: $expected_code)",
578             $operation, $bucket, $key
579             );
580              
581 47 50       184 $response_code == $GET_CLIENT_ID_RESPONSE_CODE
582             and return $self->_process_get_client_id_response($response_body);
583              
584             # we have a 'get' response
585 47 100       303 $response_code == $GET_RESPONSE_CODE
586             and
587             return $self->_process_get_response( $response_body, $bucket, $key,
588             $decode );
589              
590             # we have a 'get_keys' response
591             # TODO: support for 1.4 (which provides 'stream', 'return_terms', and 'stream')
592 31 100       275 if ( $response_code == $GET_KEYS_RESPONSE_CODE ) {
    100          
    100          
593 3         19 my $obj = RpbListKeysResp->decode($response_body);
594 3   100     279 my @keys = @{ $obj->keys // [] };
  3         13  
595 3 50       59 if ($callback) {
596 3         12 $callback->($_) foreach @keys;
597 3 100       15 $obj->done
598             and return;
599             }
600             else {
601 0         0 push @results, @keys;
602 0 0       0 $obj->done
603             and return \@results;
604             }
605 1         13 next;
606             } # in case of a 'query_index' response
607             elsif ( $response_code == $QUERY_INDEX_RESPONSE_CODE ) {
608 5         30 my $obj = RpbIndexResp->decode($response_body);
609              
610 5   100     775 my $keys = $obj->keys // [];
611              
612 5 100 66     101 if ( $paginate and wantarray ) {
613 3         11 return ( $keys, $obj->continuation, $obj->done );
614             }
615             else {
616 2         20 return $keys;
617             }
618             }
619             elsif ( $response_code == $MAP_REDUCE_RESPONSE_CODE ) {
620 4         18 my $obj = RpbMapRedResp->decode($response_body);
621              
622 4         344 my $phase = $obj->phase;
623 4 50 100     61 my $response =
624             ($decode)
625             ? decode_json( $obj->response // '[]' )
626             : $obj->response;
627              
628 4 100       72 if ($callback) {
629 2 100       7 $obj->done
630             and return;
631 1         16 $callback->( $response, $phase );
632             }
633             else {
634 2 100       7 $obj->done
635             and return \@results;
636 1         19 push @results, { phase => $phase, response => $response };
637             }
638 2         14 next;
639             }
640              
641             # in case of no return value, signify success
642 19         240 return 1;
643             }
644              
645             }
646              
647             sub _process_get_client_id_response {
648 0     0   0 my ( $self, $encoded_message ) = @_;
649              
650 0 0       0 $self->_process_generic_error( "Undefined Message", 'get client id', '-',
651             '-' )
652             unless ( defined $encoded_message );
653              
654 0         0 my $decoded_message = RpbGetClientIdResp->decode($encoded_message);
655 0         0 $decoded_message->client_id;
656             }
657              
658             sub _process_get_response {
659 16     16   29 my ( $self, $encoded_message, $bucket, $key, $should_decode ) = @_;
660              
661 16 100       36 $self->_process_generic_error( "Undefined Message", 'get', $bucket, $key )
662             unless ( defined $encoded_message );
663              
664 15         78 my $decoded_message = RpbGetResp->decode($encoded_message);
665              
666 15         2788 my $contents = $decoded_message->content;
667 15 100       233 if ( ref($contents) eq 'ARRAY' ) {
668 13         24 my $content = $contents->[0];
669              
670 13   100     42 my $decode =
671             $should_decode && ( $content->content_type eq 'application/json' );
672             return {
673 13 100       110 value => ($decode)
674             ? decode_json( $content->value )
675             : $content->value,
676             indexes => $content->indexes,
677             vclock => $decoded_message->vclock,
678             };
679             }
680              
681 2         21 undef;
682             }
683              
684             sub _process_riak_error {
685 1     1   3 my ( $self, $encoded_message, $operation, $bucket, $key ) = @_;
686              
687 1         8 my $decoded_message = RpbErrorResp->decode($encoded_message);
688              
689 1         116 my $errmsg = $decoded_message->errmsg;
690 1         15 my $errcode = $decoded_message->errcode;
691              
692 1         14 $self->_process_generic_error(
693             "Riak Error (code: $errcode) '$errmsg'",
694             $operation, $bucket, $key
695             );
696             }
697              
698             sub _process_generic_error {
699 28     28   189 my ( $self, $error, $operation, $bucket, $key ) = @_;
700              
701 28         127 my $extra = '';
702              
703 28 100       149 if ( $operation eq $PING ) {
    100          
    100          
704 20         75 $extra = '';
705             }
706             elsif ( $operation eq $QUERY_INDEX ) {
707 1         5 $extra = "(bucket: $bucket, $key)";
708             }
709             elsif ( $operation eq $MAP_REDUCE ) {
710 1         2 $extra = ''; # maybe add the sha1 of the request?
711             }
712             else {
713 6         20 $extra = "(bucket: $bucket, key: $key)";
714             }
715              
716 28         104 my $error_message = "Error in '$operation' $extra: $error";
717              
718 28 100       643 croak $error_message if $self->autodie;
719              
720 4         5 $@ = $error_message; ## no critic (RequireLocalizedPunctuationVars)
721              
722 4         30 undef;
723             }
724              
725             1;
726              
727              
728             =pod
729              
730             =head1 NAME
731              
732             Riak::Light - Fast and lightweight Perl client for Riak
733              
734             =head1 VERSION
735              
736             version 0.12
737              
738             =head1 SYNOPSIS
739              
740             use Riak::Light;
741              
742             # create a new instance - using pbc only
743             my $client = Riak::Light->new(
744             host => '127.0.0.1',
745             port => 8087
746             );
747              
748             $client->is_alive() or die "ops, riak is not alive";
749              
750             # store hashref into bucket 'foo', key 'bar'
751             # will serializer as 'application/json'
752             $client->put( foo => bar => { baz => 1024 });
753              
754             # store text into bucket 'foo', key 'bar'
755             $client->put( foo => baz => "sometext", 'text/plain');
756             $client->put_raw( foo => baz => "sometext"); # does not encode !
757              
758             # fetch hashref from bucket 'foo', key 'bar'
759             my $hash = $client->get( foo => 'bar');
760             my $text = $client->get_raw( foo => 'baz'); # does not decode !
761              
762             # delete hashref from bucket 'foo', key 'bar'
763             $client->del(foo => 'bar');
764              
765             # check if exists (like get but using less bytes in the response)
766             $client->exists(foo => 'baz') or warn "ops, foo => bar does not exist";
767              
768             # list keys in stream (callback only)
769             $client->get_keys(foo => sub{
770             my $key = $_[0];
771              
772             # you should use another client inside this callback!
773             $another_client->del(foo => $key);
774             });
775            
776             # perform 2i queries
777             my $keys = $client->query_index( $bucket_name => 'index_test_field_bin', 'plop');
778            
779             # list all 2i indexes and values
780             my $indexes = $client->get_all_indexes( $bucket_name => $key );
781            
782             # perform map / reduce operations
783             my $response = $client->map_reduce('{
784             "inputs":"training",
785             "query":[{"map":{"language":"javascript",
786             "source":"function(riakObject) {
787             var val = riakObject.values[0].data.match(/pizza/g);
788             return [[riakObject.key, (val ? val.length : 0 )]];
789             }"}}]}');
790              
791             =head1 DESCRIPTION
792              
793             Riak::Light is a very light (and fast) Perl client for Riak using PBC
794             interface. Support operations like ping, get, exists, put, del, and secondary
795             indexes (so-called 2i) setting and querying.
796              
797             It is flexible to change the timeout backend for I/O operations and can
798             suppress 'die' in case of error (autodie) using the configuration. There is no
799             auto-reconnect option. It can be very easily wrapped up by modules like
800             L to manage flexible retry/reconnect strategies.
801              
802             =head2 ATTRIBUTES
803              
804             =head3 host
805              
806             Riak ip or hostname. There is no default.
807              
808             =head3 port
809              
810             Port of the PBC interface. There is no default.
811              
812             =head3 r
813              
814             R value setting for this client. Default 2.
815              
816             =head3 w
817              
818             W value setting for this client. Default 2.
819              
820             =head3 dw
821              
822             DW value setting for this client. Default 2.
823              
824             =head3 rw
825              
826             RW value setting for this client. Default not set ( and omit in the request)
827              
828             =head3 pr
829              
830             PR value setting for this client. Default not set ( and omit in the request)
831              
832             =head3 pw
833              
834             PW value setting for this client. Default not set ( and omit in the request)
835              
836             =head3 autodie
837              
838             Boolean, if false each operation will return undef in case of error (stored in $@). Default is true.
839              
840             =head3 timeout
841              
842             Timeout for connection, write and read operations. Default is 0.5 seconds.
843              
844             =head3 in_timeout
845              
846             Timeout for read operations. Default is timeout value.
847              
848             =head3 out_timeout
849              
850             Timeout for write operations. Default is timeout value.
851              
852             =head3 tcp_nodelay
853              
854             Boolean, enable or disable TCP_NODELAY. If True (default), disables Nagle's Algorithm.
855              
856             See more in: L.
857              
858             =head3 timeout_provider
859              
860             Can change the backend for timeout. The default value is IO::Socket::INET and
861             there is only support to connection timeout.
862              
863             B: in case of any timeout error, the socket between this client and the
864             Riak server will be closed. To support I/O timeout you can choose 5 options (or
865             you can set undef to avoid IO Timeout):
866              
867             =over
868              
869             =item * Riak::Light::Timeout::Alarm
870              
871             uses alarm and Time::HiRes to control the I/O timeout. Does not work on Win32.
872             (Not Safe)
873              
874             =item * Riak::Light::Timeout::Time::Out
875              
876             uses Time::Out and Time::HiRes to control the I/O timeout. Does not work on
877             Win32. (Not Safe)
878              
879             =item * Riak::Light::Timeout::Select
880              
881             uses IO::Select to control the I/O timeout
882              
883             =item * Riak::Light::Timeout::SelectOnWrite
884              
885             uses IO::Select to control only Output Operations. Can block in Write
886             Operations. Be Careful.
887              
888             =item * Riak::Light::Timeout::SetSockOpt
889              
890             uses setsockopt to set SO_RCVTIMEO and SO_SNDTIMEO socket properties. Does not
891             Work on NetBSD 6.0.
892              
893             =back
894              
895             =head3 driver
896              
897             This is a Riak::Light::Driver instance, to be able to connect and perform
898             requests to Riak over PBC interface.
899              
900             =head2 METHODS
901              
902             =head3 is_alive
903              
904             $client->is_alive() or warn "ops... something is wrong: $@";
905              
906             Perform a ping operation. Will return false in case of error (will store in $@).
907              
908             =head3 ping
909              
910             try { $client->ping() } catch { "oops... something is wrong: $_" };
911              
912             Perform a ping operation. Will die in case of error.
913              
914             =head3 set_client_id
915              
916             $client->set_client_id('foobar');
917              
918             Set the client id.
919              
920             =head3 get_client_id
921              
922             my $client_id = $client->get_client_id();
923              
924             Get the client id.
925              
926             =head3 get
927              
928             my $value_or_reference = $client->get(bucket => 'key');
929              
930             Perform a fetch operation. Expects bucket and key names. Decode the json into a
931             Perl structure, if the content_type is 'application/json'. If you need the raw
932             data you can use L.
933              
934             There is a third argument: return_all. Default is false. If true, we will return an hashref with 3 entries:
935             value (the data decoded), indexes and vclock.
936              
937             =head3 get_raw
938              
939             my $scalar_value = $client->get_raw(bucket => 'key');
940              
941             Perform a fetch operation. Expects bucket and key names. Return the raw data.
942             If you need decode the json, you should use L instead.
943              
944             There is a third argument: return_all. Default is false. If true, we will return an hashref with 3 entries:
945             value (the data decoded), indexes and vclock.
946              
947             =head3 get_full
948              
949             my $value_or_reference = $client->get_full(bucket => 'key');
950              
951             Perform a fetch operation. Expects bucket and key names. Will return an hashref with 3 entries:
952             value (the data decoded), indexes and vclock. It is the equivalent to call get(bucket, key, 1)
953              
954             =head3 get_full_raw
955              
956             my $scalar_value = $client->get_full_raw(bucket => 'key');
957              
958             Perform a fetch operation. Expects bucket and key names. Will return an hashref with 3 entries:
959             value (the raw data), indexes and vclock. It is the equivalent to call get_raw(bucket, key, 1)
960              
961             =head3 exists
962              
963             $client->exists(bucket => 'key') or warn "key not found";
964              
965             Perform a fetch operation but with head => 0, and the if there is something
966             stored in the bucket/key.
967              
968             =head3 get_all_indexes
969              
970             $client->get_all_indexes(bucket => 'key');
971              
972             Perform a fetch operation but instead return the content, return a hashref with a mapping between index name and an arrayref with all possible values (or empty arrayref if none). For example one possible return is:
973              
974             [
975             { key => 'index_test_field_bin', value => 'plop' },
976             { key => 'index_test_field2_bin', value => 'plop2' },
977             { key => 'index_test_field2_bin', value => 'plop3' },
978             ]
979              
980             IMPORT: this arrayref is unsortered.
981              
982             =head3 get_index_value
983              
984             Perform a fetch operation, will return an arrayref with all values of the index or undef (if does not exists). There is no order for the array.
985              
986             my $value = $client->get_index_value(bucket => key => 'index_test_field_bin');
987              
988             It is similar to do
989              
990             my $value = $client->get_all_index_values(bucket => 'key')->{index_test_field_bin};
991              
992             =head3 get_all_index_values
993              
994             Perform a fetch operation, will return an hashref with all 2i indexes names as keys, and arrayref of all values for values.
995              
996             =head3 get_vclock
997              
998             Perform a fetch operation, will return the value of the vclock
999              
1000             my $vclock = $client->get_vclock(bucket => 'key');
1001              
1002             =head3 put
1003              
1004             $client->put('bucket', 'key', { some_values => [1,2,3] });
1005             $client->put('bucket', 'key', { some_values => [1,2,3] }, 'application/json);
1006             $client->put('bucket', 'key', 'text', 'plain/text');
1007              
1008             # you can set secondary indexes (2i)
1009             $client->put( 'bucket', 'key', 'text', 'plain/text',
1010             { field1_bin => 'abc', field2_int => 42 }
1011             );
1012             $client->put( 'bucket', 'key', { some_values => [1,2,3] }, undef,
1013             { field1_bin => 'abc', field2_int => 42 }
1014             );
1015             # remember that a key can have more than one value in a given index. In this
1016             # case, use ArrayRef:
1017             $client->put( 'bucket', 'key', 'value', undef,
1018             { field1_bin => [ 'abc', 'def' ] } );
1019              
1020             Perform a store operation. Expects bucket and key names, the value, the content
1021             type (optional, default is 'application/json'), and the indexes to set for this
1022             value (optional, default is none).
1023              
1024             Will encode the structure in json string if necessary. If you need only store
1025             the raw data you can use L instead.
1026              
1027             B: all the index field names should end by either C<_int> or
1028             C<_bin>, depending if the index type is integer or binary.
1029              
1030             To query secondary indexes, see L.
1031              
1032             =head3 put_raw
1033              
1034             $client->put_raw('bucket', 'key', encode_json({ some_values => [1,2,3] }), 'application/json');
1035             $client->put_raw('bucket', 'key', 'text');
1036             $client->put_raw('bucket', 'key', 'text', undef, {field_bin => 'foo'});
1037              
1038             Perform a store operation. Expects bucket and key names, the value, the content
1039             type (optional, default is 'plain/text'), and the indexes to set for this value
1040             (optional, default is none).
1041              
1042             Will encode the raw data. If you need encode the structure you can use L
1043             instead.
1044              
1045             B: all the index field names should end by either C<_int> or
1046             C<_bin>, depending if the index type is integer or binary.
1047              
1048             To query secondary indexes, see L.
1049              
1050             =head3 del
1051              
1052             $client->del(bucket => key);
1053              
1054             Perform a delete operation. Expects bucket and key names.
1055              
1056             =head3 get_keys
1057              
1058             $client->get_keys(foo => sub{
1059             my $key = $_[0];
1060              
1061             # you should use another client inside this callback!
1062             $another_client->del(foo => $key);
1063             });
1064              
1065             Perform a list keys operation. Receive a callback and will call it for each
1066             key. You can't use this callback to perform other operations!
1067              
1068             The callback is optional, in which case an ArrayRef of all the keys are
1069             returned. But you should always provide a callback, to avoid your RAM usage to
1070             skyrocket...
1071              
1072             =head3 query_index
1073              
1074             Perform a secondary index query. Expects a bucket name, the index field name,
1075             and the index value you're searching on. Returns and ArrayRef of matching keys.
1076              
1077             The index value you're searching on can be of two types. If it's a scalar, an
1078             B query will be performed. if the value is an ArrayRef, then a
1079             B query will be performed, the first element in the array will be the
1080             range_min, the second element the range_max. other elements will be ignored.
1081              
1082             Based on the example in C, here is how to query it:
1083              
1084             # exact match
1085             my $matching_keys = $client->query_index( 'bucket', 'field2_int', 42 );
1086              
1087             # range match
1088             my $matching_keys = $client->query_index( 'bucket', 'field2_int', [ 40, 50] );
1089              
1090             # with pagination
1091             my ($matching_keys, $continuation, $done) = $client->query_index( 'bucket', 'field2_int', 42, { max_results => 100 });
1092              
1093             to fetch the next 100 keys
1094              
1095             my ($matching_keys, $continuation, $done) = $client->query_index( 'bucket', 'field2_int', 42, {
1096             max_results => 100,
1097             continuation => $continuation
1098             });
1099              
1100             to fetch only the first 100 keys you can do this
1101              
1102             my $matching_keys = $client->query_index( 'bucket', 'field2_int', [ 40, 50], { max_results => 100 });
1103              
1104             =head3 query_index_loop
1105              
1106             Instead using a normal loop around query_index to query 2i with pagination, like this:
1107              
1108             do {
1109             ($matching_keys, $continuation) = $client->query_index( 'bucket', 'field2_int', 42, {
1110             max_results => 100,
1111             continuation => $continuation
1112             });
1113             push @keys, @{$matching_keys};
1114             } while(defined $continuation);
1115              
1116             you can simply use query_index_loop helper method
1117              
1118             my $matching_keys = $client->query_index_loop( 'bucket', 'field2_int', [ 40, 50], { max_results => 1024 });
1119              
1120             if you omit the max_results, the default value is 100
1121              
1122             =head3 map_reduce
1123              
1124             This is an alias for map_reduce_raw with content-type 'application/json'
1125              
1126             =head3 map_reduce_raw
1127              
1128             Performa a map/reduce operation. You can use content-type 'application/json' or 'application/x-erlang-binary' Accept callback.
1129              
1130             Example:
1131              
1132             my $map_reduce_json = '{
1133             "inputs":"training",
1134             "query":[{"map":{"language":"javascript",
1135             "source":"function(riakObject) {
1136             var val = riakObject.values[0].data.match(/pizza/g);
1137             return [[riakObject.key, (val ? val.length : 0 )]];
1138             }"}}]}';
1139            
1140             my $response = $client->map_reduce_raw($map_reduce_json, 'application/json');
1141              
1142             will return something like
1143              
1144             [
1145             {'response' => [['foo',1]],'phase' => 0},
1146             {'response' => [['bam',3]],'phase' => 0},
1147             {'response' => [['bar',4]],'phase' => 0},
1148             {'response' => [['baz',0]],'phase' => 0}
1149             ]
1150              
1151             a hashref with response (decoded if json) and phase value. you can also pass a callback
1152              
1153             $client->map_reduce( $map_reduce_json , sub {
1154             my ($response, $phase) = @_;
1155            
1156             # process the response
1157             });
1158              
1159             this callback will be called 4 times, with this response (decoded from json)
1160              
1161             [['foo', 1]]
1162             [['bam', 3]]
1163             [['bar', 4]]
1164             [['baz', 0]]
1165              
1166             using map_reduce method, you can also use a hashref as a map reduce query:
1167              
1168             my $json_hash = {
1169             inputs => "training",
1170             query => [{
1171             map => {
1172             language =>"javascript",
1173             source =>"function(riakObject) {
1174             var val = riakObject.values[0].data.match(/pizza/g);
1175             return [[riakObject.key, (val ? val.length : 0 )]];
1176             }"
1177             }
1178             }]
1179             };
1180            
1181             $client->map_reduce($json_hash, sub { ... });
1182              
1183             map_reduce encode/decode to json format. If you need control with the format (like to use with erlang), you should use map_reduce_raw.
1184              
1185             you can use erlang functions but using the json format (see L).
1186              
1187             {"inputs":"messages","query":[{"map":{"language":"erlang","module":"mr_example","function":"get_keys"}}]}
1188              
1189             More information:
1190              
1191             L
1192              
1193             L
1194              
1195             L
1196              
1197             =head1 SEE ALSO
1198              
1199             L
1200              
1201             L
1202              
1203             L
1204              
1205             L
1206              
1207             =head1 AUTHORS
1208              
1209             =over 4
1210              
1211             =item *
1212              
1213             Tiago Peczenyj
1214              
1215             =item *
1216              
1217             Damien Krotkine
1218              
1219             =back
1220              
1221             =head1 COPYRIGHT AND LICENSE
1222              
1223             This software is copyright (c) 2013 by Weborama.
1224              
1225             This is free software; you can redistribute it and/or modify it under
1226             the same terms as the Perl 5 programming language system itself.
1227              
1228             =cut
1229              
1230              
1231             __END__