File Coverage

blib/lib/Riak/Light.pm
Criterion Covered Total %
statement 231 261 88.5
branch 76 96 79.1
condition 27 35 77.1
subroutine 49 55 89.0
pod 21 22 95.4
total 404 469 86.1


line stmt bran cond sub pod time code
1             #
2             # This file is part of Riak-Light
3             #
4             # This software is copyright (c) 2013 by Weborama.
5             #
6             # This is free software; you can redistribute it and/or modify it under
7             # the same terms as the Perl 5 programming language system itself.
8             #
9             ## no critic (RequireUseStrict, RequireUseWarnings)
10             package Riak::Light;
11             {
12             $Riak::Light::VERSION = '0.10';
13             }
14             ## use critic
15              
16 13     13   526523 use 5.010;
  13         52  
  13         514  
17 13     13   8272 use Riak::Light::PBC;
  13         43  
  13         614  
18 13     13   10477 use Riak::Light::Driver;
  13         50  
  13         515  
19 13     13   13418 use MIME::Base64 qw(encode_base64);
  13         11541  
  13         1326  
20 13     13   13101 use Type::Params qw(compile);
  13         220589  
  13         121  
21 13     13   3456 use Types::Standard -types;
  13         27  
  13         80  
22 13     13   53069 use English qw(-no_match_vars );
  13         33  
  13         128  
23 13     13   7890 use Scalar::Util qw(blessed);
  13         32  
  13         888  
24 13     13   14520 use IO::Socket;
  13         367451  
  13         71  
25 13     13   9103 use Socket qw(TCP_NODELAY IPPROTO_TCP);
  13         47  
  13         3348  
26 13     13   10644 use Const::Fast;
  13         14920  
  13         81  
27 13     13   19817 use JSON;
  13         204073  
  13         86  
28 13     13   2109 use Carp;
  13         35  
  13         1504  
29 13     13   83 use Module::Runtime qw(use_module);
  13         30  
  13         168  
30 13     13   725 use Moo;
  13         28  
  13         134  
31              
32             # ABSTRACT: Fast and lightweight Perl client for Riak
33              
34             has pid => ( is => 'lazy', isa => Int, clearer => 1 );
35             has port => ( is => 'ro', isa => Int, required => 1 );
36             has host => ( is => 'ro', isa => Str, required => 1 );
37             has r => ( is => 'ro', isa => Int, default => sub {2} );
38             has w => ( is => 'ro', isa => Int, default => sub {2} );
39             has dw => ( is => 'ro', isa => Int, default => sub {2} );
40             has autodie => ( is => 'ro', isa => Bool, default => sub {1}, trigger => 1 );
41             has timeout => ( is => 'ro', isa => Num, default => sub {0.5} );
42             has tcp_nodelay => ( is => 'ro', isa => Bool, default => sub {1} );
43             has in_timeout => ( is => 'lazy', trigger => 1 );
44             has out_timeout => ( is => 'lazy', trigger => 1 );
45             has client_id => ( is => 'lazy', isa => Str );
46              
47             sub _build_pid {
48 50     50   9338 $$;
49             }
50              
51             sub _build_client_id {
52 0     0   0 "perl_riak_light" . encode_base64( int( rand(10737411824) ), '' );
53             }
54              
55             sub _trigger_autodie {
56 39     39   149758 my ( $self, $value ) = @_;
57 39 100       913 carp "autodie will be disable in the next version" unless $value;
58             }
59              
60             sub _trigger_in_timeout {
61 1     1   55 carp
62             "this feature will be disabled in the next version, you should use just timeout instead";
63             }
64              
65             sub _trigger_out_timeout {
66 6     6   1244 carp
67             "this feature will be disabled in the next version, you should use just timeout instead";
68             }
69              
70             sub _build_in_timeout {
71 10     10   3853 $_[0]->timeout;
72             }
73              
74             sub _build_out_timeout {
75 5     5   703 $_[0]->timeout;
76             }
77              
78             has timeout_provider => (
79             is => 'ro',
80             isa => Maybe [Str],
81             default => sub {'Riak::Light::Timeout::Select'}
82             );
83              
84             has driver => ( is => 'lazy', clearer => 1 );
85              
86             sub _build_driver {
87 17     17   5963 Riak::Light::Driver->new( socket => $_[0]->_build_socket() );
88             }
89              
90             sub _build_socket {
91 17     17   262 my ($self) = @_;
92              
93 17         175 my $host = $self->host;
94 17         279 my $port = $self->port;
95              
96 17         462 my $socket = IO::Socket::INET->new(
97             PeerHost => $host,
98             PeerPort => $port,
99             Timeout => $self->timeout,
100             );
101              
102 17 100       41201 croak "Error ($!), can't connect to $host:$port"
103             unless defined $socket;
104              
105 16 100       275 if ( $self->tcp_nodelay ) {
106 15 50       282 $socket->setsockopt( IPPROTO_TCP, TCP_NODELAY, 1 )
107             or croak "Cannot set tcp nodelay $! ($^E)";
108             }
109              
110 16 100       579 return $socket unless defined $self->timeout_provider;
111              
112 13     13   39047 use Module::Load qw(load);
  13         14852  
  13         88  
113 10         111 load $self->timeout_provider;
114              
115             # TODO: add a easy way to inject this proxy
116 10         2532 $self->timeout_provider->new(
117             socket => $socket,
118             in_timeout => $self->in_timeout,
119             out_timeout => $self->out_timeout,
120             );
121             }
122              
123             sub BUILD {
124 56     56 0 3793 $_[0]->driver;
125             }
126              
127             const my $PING => 'ping';
128             const my $GET => 'get';
129             const my $PUT => 'put';
130             const my $DEL => 'del';
131             const my $GET_KEYS => 'get_keys';
132             const my $QUERY_INDEX => 'query_index';
133             const my $MAP_REDUCE => 'map_reduce';
134             const my $SET_CLIENT_ID => 'set_client_id';
135             const my $GET_CLIENT_ID => 'get_client_id';
136              
137             const my $ERROR_RESPONSE_CODE => 0;
138             const my $GET_RESPONSE_CODE => 10;
139             const my $GET_KEYS_RESPONSE_CODE => 18;
140             const my $MAP_REDUCE_RESPONSE_CODE => 24;
141             const my $QUERY_INDEX_RESPONSE_CODE => 26;
142             const my $GET_CLIENT_ID_RESPONSE_CODE => 4;
143              
144             const my $CODES => {
145             $PING => { request_code => 1, response_code => 2 },
146             $GET => { request_code => 9, response_code => 10 },
147             $PUT => { request_code => 11, response_code => 12 },
148             $DEL => { request_code => 13, response_code => 14 },
149             $GET_KEYS => { request_code => 17, response_code => 18 },
150             $MAP_REDUCE => { request_code => 23, response_code => 24 },
151             $QUERY_INDEX => { request_code => 25, response_code => 26 },
152             $GET_CLIENT_ID => { request_code => 3, response_code => 4 },
153             $SET_CLIENT_ID => { request_code => 5, response_code => 6 },
154             };
155              
156             const my $DEFAULT_MAX_RESULTS => 100;
157              
158             sub ping {
159 30     30 1 23436 $_[0]->_parse_response(
160             operation => $PING,
161             body => q(),
162             );
163             }
164              
165             sub is_alive {
166 2     2 1 956 eval { $_[0]->ping };
  2         6  
167             }
168              
169             sub get_keys {
170 3     3 1 158 state $check = compile( Any, Str, Optional [CodeRef] );
171 3         2417 my ( $self, $bucket, $callback ) = $check->(@_);
172              
173 3         124 my $body = RpbListKeysReq->encode( { bucket => $bucket } );
174 3         121 $self->_parse_response(
175             key => "*",
176             bucket => $bucket,
177             operation => $GET_KEYS,
178             body => $body,
179             callback => $callback,
180             );
181             }
182              
183             sub get_raw {
184 3     3 1 8 state $check = compile( Any, Str, Str, Optional [Bool] );
185 3         1789 my ( $self, $bucket, $key, $return_all ) = $check->(@_);
186 3         88 my $response = $self->_fetch( $bucket, $key, 0 );
187              
188 3         94 my $result;
189 3 50       9 if ( defined $response ) {
190 3 100       9 $result = ($return_all) ? $response : $response->{value};
191             }
192 3         32 $result;
193             }
194              
195             sub get_full_raw {
196 1     1 1 33 state $check = compile( Any, Str, Str );
197 1         1149 my ( $self, $bucket, $key ) = $check->(@_);
198              
199 1         18 $self->get_raw( $bucket, $key, 1 );
200             }
201              
202             sub get {
203 6     6 1 203 state $check = compile( Any, Str, Str, Optional [Bool] );
204 6         2033 my ( $self, $bucket, $key, $return_all ) = $check->(@_);
205 6         133 my $response = $self->_fetch( $bucket, $key, 1 );
206 4         107 my $result;
207 4 100       9 if ( defined $response ) {
208 3 100       10 $result = ($return_all) ? $response : $response->{value};
209             }
210 4         27 $result;
211             }
212              
213             sub get_full {
214 1     1 1 32 state $check = compile( Any, Str, Str );
215 1         1084 my ( $self, $bucket, $key ) = $check->(@_);
216              
217 1         13 $self->get( $bucket, $key, 1 );
218             }
219              
220             sub get_all_indexes {
221 2     2 1 58 state $check = compile( Any, Str, Str );
222 2         1152 my ( $self, $bucket, $key ) = $check->(@_);
223 2         24 my $response = $self->_fetch( $bucket, $key, 0, 1 );
224              
225 3         38 return ( !defined $response )
226             ? []
227 2   100     17 : [ map { +{ value => $_->value, key => $_->key } }
228 2 50       76 @{ $response->{indexes} // [] } ];
229             }
230              
231             sub get_index_value {
232 0     0 1 0 state $check = compile( Any, Str, Str, Str );
233 0         0 my ( $self, $bucket, $key, $index_name ) = $check->(@_);
234              
235 0         0 $self->get_all_index_values( $bucket, $key )->{$index_name};
236             }
237              
238             sub get_all_index_values {
239 0     0 1 0 state $check = compile( Any, Str, Str );
240 0         0 my ( $self, $bucket, $key ) = $check->(@_);
241              
242 0         0 my %values;
243 0         0 foreach my $index ( @{ $self->get_all_indexes( $bucket, $key ) } ) {
  0         0  
244 0         0 my $key = $index->{key};
245 0   0     0 $values{$key} //= [];
246 0         0 push @{ $values{$key} }, $index->{value};
  0         0  
247             }
248              
249 0         0 \%values;
250             }
251              
252             sub get_vclock {
253 1     1 1 33 state $check = compile( Any, Str, Str );
254 1         1474 my ( $self, $bucket, $key ) = $check->(@_);
255 1         16 my $response = $self->_fetch( $bucket, $key, 0, 1 );
256              
257 1 50       62 defined $response and $response->{vclock};
258             }
259              
260             sub exists {
261 4     4 1 930 state $check = compile( Any, Str, Str );
262 4         3110 my ( $self, $bucket, $key ) = $check->(@_);
263 4         65 defined $self->_fetch( $bucket, $key, 0, 1 );
264             }
265              
266             sub _fetch {
267 16     16   59 my ( $self, $bucket, $key, $decode, $head ) = @_;
268              
269 16         178 my $body = RpbGetReq->encode(
270             { r => $self->r,
271             key => $key,
272             bucket => $bucket,
273             head => $head
274             }
275             );
276              
277 16         1852 $self->_parse_response(
278             key => $key,
279             bucket => $bucket,
280             operation => $GET,
281             body => $body,
282             decode => $decode,
283             );
284             }
285              
286             sub put_raw {
287 2     2 1 8 state $check =
288             compile( Any, Str, Str, Any, Optional [Str],
289             Optional [ HashRef [ Str | ArrayRef [Str] ] ], Optional [Str] );
290 2         3776 my ( $self, $bucket, $key, $value, $content_type, $indexes, $vclock ) =
291             $check->(@_);
292 2   100     223 $content_type ||= 'plain/text';
293 2         8 $self->_store( $bucket, $key, $value, $content_type, $indexes, $vclock );
294             }
295              
296             sub put {
297 3     3 1 140 state $check =
298             compile( Any, Str, Str, Any, Optional [Str],
299             Optional [ HashRef [ Str | ArrayRef [Str] ] ], Optional [Str] );
300 3         32996 my ( $self, $bucket, $key, $value, $content_type, $indexes, $vclock ) =
301             $check->(@_);
302              
303 3 100 100     289 ( $content_type ||= 'application/json' ) eq 'application/json'
304             and $value = encode_json($value);
305              
306 3         16 $self->_store( $bucket, $key, $value, $content_type, $indexes, $vclock );
307             }
308              
309             sub _store {
310 5     5   12 my ( $self, $bucket, $key, $encoded_value, $content_type, $indexes,
311             $vclock ) = @_;
312              
313 5         11 my %extra_parameters = ();
314              
315 5 50       14 $extra_parameters{vclock} = $vclock if $vclock;
316              
317 0         0 my $body = RpbPutReq->encode(
318             { key => $key,
319             bucket => $bucket,
320             content => {
321             value => $encoded_value,
322             content_type => $content_type,
323             ( $indexes
324             ? ( indexes => [
325             map {
326 5 50       77 my $k = $_;
327 0         0 my $v = $indexes->{$_};
328 0         0 ref $v eq 'ARRAY'
329 0 0       0 ? map { { key => $k, value => $_ }; } @$v
330             : { key => $k, value => $v };
331             } keys %$indexes
332             ]
333             )
334             : ()
335             ),
336             },
337             %extra_parameters,
338             }
339             );
340              
341 5         1076 $self->_parse_response(
342             key => $key,
343             bucket => $bucket,
344             operation => $PUT,
345             body => $body,
346             );
347             }
348              
349             sub del {
350 2     2 1 123 state $check = compile( Any, Str, Str );
351 2         1557 my ( $self, $bucket, $key ) = $check->(@_);
352              
353 2         51 my $body = RpbDelReq->encode(
354             { key => $key,
355             bucket => $bucket,
356             rw => $self->dw
357             }
358             );
359              
360 2         222 $self->_parse_response(
361             key => $key,
362             bucket => $bucket,
363             operation => $DEL,
364             body => $body,
365             );
366             }
367              
368             sub query_index_loop {
369 1     1 1 42 state $check =
370             compile( Any, Str, Str, Str | ArrayRef, Optional [HashRef] );
371 1         4737 my ( $self, $bucket, $index, $value_to_match, $extra_parameters ) =
372             $check->(@_);
373              
374 1   50     110 $extra_parameters //= {};
375 1   33     7 $extra_parameters->{max_results} //= $DEFAULT_MAX_RESULTS;
376              
377 1         1 my @keys;
378 1         3 do {
379              
380 2         12 my ( $temp_keys, $continuation, undef ) =
381             $self->query_index( $bucket, $index, $value_to_match,
382             $extra_parameters );
383              
384 2         91 $extra_parameters->{continuation} = $continuation;
385              
386 2         3 push @keys, @{$temp_keys};
  2         10  
387              
388             } while ( defined $extra_parameters->{continuation} );
389              
390 1         5 return \@keys;
391             }
392              
393             sub query_index {
394 6     6 1 200 state $check =
395             compile( Any, Str, Str, Str | ArrayRef, Optional [HashRef] );
396 6         5730 my ( $self, $bucket, $index, $value_to_match, $extra_parameters ) =
397             $check->(@_);
398              
399 6         242 my $query_type = 0; # eq
400 6 50       22 ref $value_to_match
401             and $query_type = 1; # range
402              
403 6 50 66     31 croak "query index in stream mode not supported"
404             if defined $extra_parameters && $extra_parameters->{stream};
405              
406 6   100     87 my $body = RpbIndexReq->encode(
407             { index => $index,
408             bucket => $bucket,
409             qtype => $query_type,
410             $query_type
411             ? ( range_min => $value_to_match->[0],
412             range_max => $value_to_match->[1]
413             )
414             : ( key => $value_to_match ),
415 6 50       24 %{ $extra_parameters // {} },
416             }
417             );
418              
419 6 50 66     1095 $self->_parse_response(
420             $query_type
421             ? ( key => "2i query on index='$index' => "
422             . $value_to_match->[0] . '...'
423             . $value_to_match->[1] )
424             : ( key => "2i query on index='$index' => " . $value_to_match ),
425             bucket => $bucket,
426             operation => $QUERY_INDEX,
427             body => $body,
428             paginate => defined $extra_parameters
429             && exists $extra_parameters->{max_results},
430             );
431             }
432              
433             sub map_reduce {
434 3     3 1 204 state $check = compile( Any, Any, Optional [CodeRef] );
435 3         2332 my ( $self, $request, $callback ) = $check->(@_);
436              
437 3         101 my @args;
438              
439 3 50       14 push @args, ref($request) ? encode_json($request) : $request;
440 3         6 push @args, 'application/json';
441 3 100       9 push @args, $callback if $callback;
442              
443 3         10 $self->map_reduce_raw(@args);
444             }
445              
446             sub map_reduce_raw {
447 3     3 1 8 state $check = compile( Any, Str, Str, Optional [CodeRef] );
448 3         2005 my ( $self, $request, $content_type, $callback ) = $check->(@_);
449              
450 3         132 my $body = RpbMapRedReq->encode(
451             { request => $request,
452             content_type => $content_type,
453             }
454             );
455              
456 3         192 $self->_parse_response(
457             key => 'no-key',
458             bucket => 'no-bucket',
459             operation => $MAP_REDUCE,
460             body => $body,
461             callback => $callback,
462             decode => ( $content_type eq 'application/json' ),
463             );
464             }
465              
466             sub get_client_id {
467 0     0 1 0 my $self = shift;
468              
469 0         0 $self->_parse_response(
470             operation => $GET_CLIENT_ID,
471             body => q(),
472             );
473             }
474              
475             sub set_client_id {
476 0     0 1 0 state $check = compile( Any, Str );
477 0         0 my ( $self, $client_id ) = $check->(@_);
478              
479 0         0 my $body = RpbSetClientIdReq->encode( { client_id => $client_id } );
480              
481 0         0 $self->_parse_response(
482             operation => $SET_CLIENT_ID,
483             body => $body,
484             );
485             }
486              
487             sub _pid_change {
488 65     65   1703 $_[0]->pid != $$;
489             }
490              
491             sub _parse_response {
492 65     65   676 my ( $self, %args ) = @_;
493              
494 65         298 my $operation = $args{operation};
495              
496 65         334 my $request_code = $CODES->{$operation}->{request_code};
497 65         141 my $expected_code = $CODES->{$operation}->{response_code};
498              
499 65         138 my $request_body = $args{body};
500 65         116 my $decode = $args{decode};
501 65         118 my $bucket = $args{bucket};
502 65         114 my $key = $args{key};
503 65         136 my $callback = $args{callback};
504 65         97 my $paginate = $args{paginate};
505              
506 65 100       541 $self->autodie
507             or undef $@; ## no critic (RequireLocalizedPunctuationVars)
508              
509 65 100       251 if ( $self->_pid_change ) {
510 1         25 $self->clear_pid;
511 1         386 $self->clear_driver;
512             }
513              
514             $self->driver->perform_request(
515 65 100       3532 code => $request_code,
516             body => $request_body
517             )
518             or return $self->_process_generic_error(
519             $ERRNO, $operation, $bucket,
520             $key
521             );
522              
523             # my $done = 0;
524             #$expected_code != $GET_KEYS_RESPONSE_CODE;
525              
526 59         5102 my $response;
527             my @results;
528 59         110 while (1) {
529              
530             # get and check response
531 62   100     2006 $response = $self->driver->read_response()
532             // { code => -1, body => undef, error => $ERRNO };
533              
534 62         367 my ( $response_code, $response_body, $response_error ) =
535 62         3957 @{$response}{qw(code body error)};
536              
537             # in case of internal error message
538 62 100       263 defined $response_error
539             and return $self->_process_generic_error(
540             $response_error, $operation, $bucket,
541             $key
542             );
543              
544             # in case of error msg
545 43 100       187 $response_code == $ERROR_RESPONSE_CODE
546             and return $self->_process_riak_error(
547             $response_body, $operation, $bucket,
548             $key
549             );
550              
551             # in case of default message
552 42 100       148 $response_code != $expected_code
553             and return $self->_process_generic_error(
554             "Unexpected Response Code in (got: $response_code, expected: $expected_code)",
555             $operation, $bucket, $key
556             );
557              
558 41 50       143 $response_code == $GET_CLIENT_ID_RESPONSE_CODE
559             and return $self->_process_get_client_id_response($response_body);
560              
561             # we have a 'get' response
562 41 100       390 $response_code == $GET_RESPONSE_CODE
563             and
564             return $self->_process_get_response( $response_body, $bucket, $key,
565             $decode );
566              
567             # we have a 'get_keys' response
568             # TODO: support for 1.4 (which provides 'stream', 'return_terms', and 'stream')
569 27 100       471 if ( $response_code == $GET_KEYS_RESPONSE_CODE ) {
    100          
    100          
570 3         14 my $obj = RpbListKeysResp->decode($response_body);
571 3   100     251 my @keys = @{ $obj->keys // [] };
  3         14  
572 3 50       55 if ($callback) {
573 3         8 $callback->($_) foreach @keys;
574 3 100       15 $obj->done
575             and return;
576             }
577             else {
578 0         0 push @results, @keys;
579 0 0       0 $obj->done
580             and return \@results;
581             }
582 1         11 next;
583             } # in case of a 'query_index' response
584             elsif ( $response_code == $QUERY_INDEX_RESPONSE_CODE ) {
585 5         32 my $obj = RpbIndexResp->decode($response_body);
586              
587 5   100     722 my $keys = $obj->keys // [];
588              
589 5 100 66     104 if ( $paginate and wantarray ) {
590 3         13 return ( $keys, $obj->continuation, $obj->done );
591             }
592             else {
593 2         23 return $keys;
594             }
595             }
596             elsif ( $response_code == $MAP_REDUCE_RESPONSE_CODE ) {
597 4         48 my $obj = RpbMapRedResp->decode($response_body);
598              
599 4         299 my $phase = $obj->phase;
600 4 50 100     51 my $response =
601             ($decode)
602             ? decode_json( $obj->response // '[]' )
603             : $obj->response;
604              
605 4 100       66 if ($callback) {
606 2 100       6 $obj->done
607             and return;
608 1         15 $callback->( $response, $phase );
609             }
610             else {
611 2 100       8 $obj->done
612             and return \@results;
613 1         16 push @results, { phase => $phase, response => $response };
614             }
615 2         20 next;
616             }
617              
618             # in case of no return value, signify success
619 15         230 return 1;
620             }
621              
622             }
623              
624             sub _process_get_client_id_response {
625 0     0   0 my ( $self, $encoded_message ) = @_;
626              
627 0 0       0 $self->_process_generic_error( "Undefined Message", 'get client id', '-',
628             '-' )
629             unless ( defined $encoded_message );
630              
631 0         0 my $decoded_message = RpbGetClientIdResp->decode($encoded_message);
632 0         0 $decoded_message->client_id;
633             }
634              
635             sub _process_get_response {
636 14     14   28 my ( $self, $encoded_message, $bucket, $key, $should_decode ) = @_;
637              
638 14 100       35 $self->_process_generic_error( "Undefined Message", 'get', $bucket, $key )
639             unless ( defined $encoded_message );
640              
641 13         71 my $decoded_message = RpbGetResp->decode($encoded_message);
642              
643 13         2299 my $contents = $decoded_message->content;
644 13 100       171 if ( ref($contents) eq 'ARRAY' ) {
645 11         20 my $content = $contents->[0];
646              
647 11   100     34 my $decode =
648             $should_decode && ( $content->content_type eq 'application/json' );
649             return {
650 11 100       78 value => ($decode)
651             ? decode_json( $content->value )
652             : $content->value,
653             indexes => $content->indexes,
654             vclock => $decoded_message->vclock,
655             };
656             }
657              
658 2         22 undef;
659             }
660              
661             sub _process_riak_error {
662 1     1   3 my ( $self, $encoded_message, $operation, $bucket, $key ) = @_;
663              
664 1         709 my $decoded_message = RpbErrorResp->decode($encoded_message);
665              
666 1         1655 my $errmsg = $decoded_message->errmsg;
667 1         18 my $errcode = $decoded_message->errcode;
668              
669 1         17 $self->_process_generic_error(
670             "Riak Error (code: $errcode) '$errmsg'",
671             $operation, $bucket, $key
672             );
673             }
674              
675             sub _process_generic_error {
676 28     28   170 my ( $self, $error, $operation, $bucket, $key ) = @_;
677              
678 28         80 my $extra = '';
679              
680 28 100       137 if ( $operation eq $PING ) {
    100          
    100          
681 20         69 $extra = '';
682             }
683             elsif ( $operation eq $QUERY_INDEX ) {
684 1         5 $extra = "(bucket: $bucket, $key)";
685             }
686             elsif ( $operation eq $MAP_REDUCE ) {
687 1         4 $extra = ''; # maybe add the sha1 of the request?
688             }
689             else {
690 6         21 $extra = "(bucket: $bucket, key: $key)";
691             }
692              
693 28         121 my $error_message = "Error in '$operation' $extra: $error";
694              
695 28 100       646 croak $error_message if $self->autodie;
696              
697 4         8 $@ = $error_message; ## no critic (RequireLocalizedPunctuationVars)
698              
699 4         38 undef;
700             }
701              
702             1;
703              
704              
705             =pod
706              
707             =encoding UTF-8
708              
709             =head1 NAME
710              
711             Riak::Light - Fast and lightweight Perl client for Riak
712              
713             =head1 VERSION
714              
715             version 0.10
716              
717             =head1 SYNOPSIS
718              
719             use Riak::Light;
720              
721             # create a new instance - using pbc only
722             my $client = Riak::Light->new(
723             host => '127.0.0.1',
724             port => 8087
725             );
726              
727             $client->is_alive() or die "ops, riak is not alive";
728              
729             # store hashref into bucket 'foo', key 'bar'
730             # will serializer as 'application/json'
731             $client->put( foo => bar => { baz => 1024 });
732              
733             # store text into bucket 'foo', key 'bar'
734             $client->put( foo => baz => "sometext", 'text/plain');
735             $client->put_raw( foo => baz => "sometext"); # does not encode !
736              
737             # fetch hashref from bucket 'foo', key 'bar'
738             my $hash = $client->get( foo => 'bar');
739             my $text = $client->get_raw( foo => 'baz'); # does not decode !
740              
741             # delete hashref from bucket 'foo', key 'bar'
742             $client->del(foo => 'bar');
743              
744             # check if exists (like get but using less bytes in the response)
745             $client->exists(foo => 'baz') or warn "ops, foo => bar does not exist";
746              
747             # list keys in stream (callback only)
748             $client->get_keys(foo => sub{
749             my $key = $_[0];
750              
751             # you should use another client inside this callback!
752             $another_client->del(foo => $key);
753             });
754            
755             # perform 2i queries
756             my $keys = $client->query_index( $bucket_name => 'index_test_field_bin', 'plop');
757            
758             # list all 2i indexes and values
759             my $indexes = $client->get_all_indexes( $bucket_name => $key );
760            
761             # perform map / reduce operations
762             my $response = $client->map_reduce('{
763             "inputs":"training",
764             "query":[{"map":{"language":"javascript",
765             "source":"function(riakObject) {
766             var val = riakObject.values[0].data.match(/pizza/g);
767             return [[riakObject.key, (val ? val.length : 0 )]];
768             }"}}]}');
769              
770             =head1 DESCRIPTION
771              
772             Riak::Light is a very light (and fast) Perl client for Riak using PBC
773             interface. Support operations like ping, get, exists, put, del, and secondary
774             indexes (so-called 2i) setting and querying.
775              
776             It is flexible to change the timeout backend for I/O operations and can
777             suppress 'die' in case of error (autodie) using the configuration. There is no
778             auto-reconnect option. It can be very easily wrapped up by modules like
779             L to manage flexible retry/reconnect strategies.
780              
781             =head2 ATTRIBUTES
782              
783             =head3 host
784              
785             Riak ip or hostname. There is no default.
786              
787             =head3 port
788              
789             Port of the PBC interface. There is no default.
790              
791             =head3 r
792              
793             R value setting for this client. Default 2.
794              
795             =head3 w
796              
797             W value setting for this client. Default 2.
798              
799             =head3 dw
800              
801             DW value setting for this client. Default 2.
802              
803             =head3 autodie
804              
805             Boolean, if false each operation will return undef in case of error (stored in $@). Default is true.
806              
807             =head3 timeout
808              
809             Timeout for connection, write and read operations. Default is 0.5 seconds.
810              
811             =head3 in_timeout
812              
813             Timeout for read operations. Default is timeout value.
814              
815             =head3 out_timeout
816              
817             Timeout for write operations. Default is timeout value.
818              
819             =head3 tcp_nodelay
820              
821             Boolean, enable or disable TCP_NODELAY. If True (default), disables Nagle's Algorithm.
822              
823             See more in: L.
824              
825             =head3 timeout_provider
826              
827             Can change the backend for timeout. The default value is IO::Socket::INET and
828             there is only support to connection timeout.
829              
830             B: in case of any timeout error, the socket between this client and the
831             Riak server will be closed. To support I/O timeout you can choose 5 options (or
832             you can set undef to avoid IO Timeout):
833              
834             =over
835              
836             =item * Riak::Light::Timeout::Alarm
837              
838             uses alarm and Time::HiRes to control the I/O timeout. Does not work on Win32.
839             (Not Safe)
840              
841             =item * Riak::Light::Timeout::Time::Out
842              
843             uses Time::Out and Time::HiRes to control the I/O timeout. Does not work on
844             Win32. (Not Safe)
845              
846             =item * Riak::Light::Timeout::Select
847              
848             uses IO::Select to control the I/O timeout
849              
850             =item * Riak::Light::Timeout::SelectOnWrite
851              
852             uses IO::Select to control only Output Operations. Can block in Write
853             Operations. Be Careful.
854              
855             =item * Riak::Light::Timeout::SetSockOpt
856              
857             uses setsockopt to set SO_RCVTIMEO and SO_SNDTIMEO socket properties. Does not
858             Work on NetBSD 6.0.
859              
860             =back
861              
862             =head3 driver
863              
864             This is a Riak::Light::Driver instance, to be able to connect and perform
865             requests to Riak over PBC interface.
866              
867             =head2 METHODS
868              
869             =head3 is_alive
870              
871             $client->is_alive() or warn "ops... something is wrong: $@";
872              
873             Perform a ping operation. Will return false in case of error (will store in $@).
874              
875             =head3 ping
876              
877             try { $client->ping() } catch { "oops... something is wrong: $_" };
878              
879             Perform a ping operation. Will die in case of error.
880              
881             =head3 set_client_id
882              
883             $client->set_client_id('foobar');
884              
885             Set the client id.
886              
887             =head3 get_client_id
888              
889             my $client_id = $client->get_client_id();
890              
891             Get the client id.
892              
893             =head3 get
894              
895             my $value_or_reference = $client->get(bucket => 'key');
896              
897             Perform a fetch operation. Expects bucket and key names. Decode the json into a
898             Perl structure, if the content_type is 'application/json'. If you need the raw
899             data you can use L.
900              
901             There is a third argument: return_all. Default is false. If true, we will return an hashref with 3 entries:
902             value (the data decoded), indexes and vclock.
903              
904             =head3 get_raw
905              
906             my $scalar_value = $client->get_raw(bucket => 'key');
907              
908             Perform a fetch operation. Expects bucket and key names. Return the raw data.
909             If you need decode the json, you should use L instead.
910              
911             There is a third argument: return_all. Default is false. If true, we will return an hashref with 3 entries:
912             value (the data decoded), indexes and vclock.
913              
914             =head3 get_full
915              
916             my $value_or_reference = $client->get_full(bucket => 'key');
917              
918             Perform a fetch operation. Expects bucket and key names. Will return an hashref with 3 entries:
919             value (the data decoded), indexes and vclock. It is the equivalent to call get(bucket, key, 1)
920              
921             =head3 get_full_raw
922              
923             my $scalar_value = $client->get_full_raw(bucket => 'key');
924              
925             Perform a fetch operation. Expects bucket and key names. Will return an hashref with 3 entries:
926             value (the raw data), indexes and vclock. It is the equivalent to call get_raw(bucket, key, 1)
927              
928             =head3 exists
929              
930             $client->exists(bucket => 'key') or warn "key not found";
931              
932             Perform a fetch operation but with head => 0, and the if there is something
933             stored in the bucket/key.
934              
935             =head3 get_all_indexes
936              
937             $client->get_all_indexes(bucket => 'key');
938              
939             Perform a fetch operation but instead return the content, return a hashref with a mapping between index name and an arrayref with all possible values (or empty arrayref if none). For example one possible return is:
940              
941             [
942             { key => 'index_test_field_bin', value => 'plop' },
943             { key => 'index_test_field2_bin', value => 'plop2' },
944             { key => 'index_test_field2_bin', value => 'plop3' },
945             ]
946              
947             IMPORT: this arrayref is unsortered.
948              
949             =head3 get_index_value
950              
951             Perform a fetch operation, will return an arrayref with all values of the index or undef (if does not exists). There is no order for the array.
952              
953             my $value = $client->get_index_value(bucket => key => 'index_test_field_bin');
954              
955             It is similar to do
956              
957             my $value = $client->get_all_index_values(bucket => 'key')->{index_test_field_bin};
958              
959             =head3 get_all_index_values
960              
961             Perform a fetch operation, will return an hashref with all 2i indexes names as keys, and arrayref of all values for values.
962              
963             =head3 get_vclock
964              
965             Perform a fetch operation, will return the value of the vclock
966              
967             my $vclock = $client->get_vclock(bucket => 'key');
968              
969             =head3 put
970              
971             $client->put('bucket', 'key', { some_values => [1,2,3] });
972             $client->put('bucket', 'key', { some_values => [1,2,3] }, 'application/json);
973             $client->put('bucket', 'key', 'text', 'plain/text');
974              
975             # you can set secondary indexes (2i)
976             $client->put( 'bucket', 'key', 'text', 'plain/text',
977             { field1_bin => 'abc', field2_int => 42 }
978             );
979             $client->put( 'bucket', 'key', { some_values => [1,2,3] }, undef,
980             { field1_bin => 'abc', field2_int => 42 }
981             );
982             # remember that a key can have more than one value in a given index. In this
983             # case, use ArrayRef:
984             $client->put( 'bucket', 'key', 'value', undef,
985             { field1_bin => [ 'abc', 'def' ] } );
986              
987             Perform a store operation. Expects bucket and key names, the value, the content
988             type (optional, default is 'application/json'), and the indexes to set for this
989             value (optional, default is none).
990              
991             Will encode the structure in json string if necessary. If you need only store
992             the raw data you can use L instead.
993              
994             B: all the index field names should end by either C<_int> or
995             C<_bin>, depending if the index type is integer or binary.
996              
997             To query secondary indexes, see L.
998              
999             =head3 put_raw
1000              
1001             $client->put_raw('bucket', 'key', encode_json({ some_values => [1,2,3] }), 'application/json');
1002             $client->put_raw('bucket', 'key', 'text');
1003             $client->put_raw('bucket', 'key', 'text', undef, {field_bin => 'foo'});
1004              
1005             Perform a store operation. Expects bucket and key names, the value, the content
1006             type (optional, default is 'plain/text'), and the indexes to set for this value
1007             (optional, default is none).
1008              
1009             Will encode the raw data. If you need encode the structure you can use L
1010             instead.
1011              
1012             B: all the index field names should end by either C<_int> or
1013             C<_bin>, depending if the index type is integer or binary.
1014              
1015             To query secondary indexes, see L.
1016              
1017             =head3 del
1018              
1019             $client->del(bucket => key);
1020              
1021             Perform a delete operation. Expects bucket and key names.
1022              
1023             =head3 get_keys
1024              
1025             $client->get_keys(foo => sub{
1026             my $key = $_[0];
1027              
1028             # you should use another client inside this callback!
1029             $another_client->del(foo => $key);
1030             });
1031              
1032             Perform a list keys operation. Receive a callback and will call it for each
1033             key. You can't use this callback to perform other operations!
1034              
1035             The callback is optional, in which case an ArrayRef of all the keys are
1036             returned. But you should always provide a callback, to avoid your RAM usage to
1037             skyrocket...
1038              
1039             =head3 query_index
1040              
1041             Perform a secondary index query. Expects a bucket name, the index field name,
1042             and the index value you're searching on. Returns and ArrayRef of matching keys.
1043              
1044             The index value you're searching on can be of two types. If it's a scalar, an
1045             B query will be performed. if the value is an ArrayRef, then a
1046             B query will be performed, the first element in the array will be the
1047             range_min, the second element the range_max. other elements will be ignored.
1048              
1049             Based on the example in C, here is how to query it:
1050              
1051             # exact match
1052             my $matching_keys = $client->query_index( 'bucket', 'field2_int', 42 );
1053              
1054             # range match
1055             my $matching_keys = $client->query_index( 'bucket', 'field2_int', [ 40, 50] );
1056              
1057             # with pagination
1058             my ($matching_keys, $continuation, $done) = $client->query_index( 'bucket', 'field2_int', 42, { max_results => 100 });
1059              
1060             to fetch the next 100 keys
1061              
1062             my ($matching_keys, $continuation, $done) = $client->query_index( 'bucket', 'field2_int', 42, {
1063             max_results => 100,
1064             continuation => $continuation
1065             });
1066              
1067             to fetch only the first 100 keys you can do this
1068              
1069             my $matching_keys = $client->query_index( 'bucket', 'field2_int', [ 40, 50], { max_results => 100 });
1070              
1071             =head3 query_index_loop
1072              
1073             Instead using a normal loop around query_index to query 2i with pagination, like this:
1074              
1075             do {
1076             ($matching_keys, $continuation) = $client->query_index( 'bucket', 'field2_int', 42, {
1077             max_results => 100,
1078             continuation => $continuation
1079             });
1080             push @keys, @{$matching_keys};
1081             } while(defined $continuation);
1082              
1083             you can simply use query_index_loop helper method
1084              
1085             my $matching_keys = $client->query_index_loop( 'bucket', 'field2_int', [ 40, 50], { max_results => 1024 });
1086              
1087             if you omit the max_results, the default value is 100
1088              
1089             =head3 map_reduce
1090              
1091             This is an alias for map_reduce_raw with content-type 'application/json'
1092              
1093             =head3 map_reduce_raw
1094              
1095             Performa a map/reduce operation. You can use content-type 'application/json' or 'application/x-erlang-binary' Accept callback.
1096              
1097             Example:
1098              
1099             my $map_reduce_json = '{
1100             "inputs":"training",
1101             "query":[{"map":{"language":"javascript",
1102             "source":"function(riakObject) {
1103             var val = riakObject.values[0].data.match(/pizza/g);
1104             return [[riakObject.key, (val ? val.length : 0 )]];
1105             }"}}]}';
1106            
1107             my $response = $client->map_reduce_raw($map_reduce_json, 'application/json');
1108              
1109             will return something like
1110              
1111             [
1112             {'response' => [['foo',1]],'phase' => 0},
1113             {'response' => [['bam',3]],'phase' => 0},
1114             {'response' => [['bar',4]],'phase' => 0},
1115             {'response' => [['baz',0]],'phase' => 0}
1116             ]
1117              
1118             a hashref with response (decoded if json) and phase value. you can also pass a callback
1119              
1120             $client->map_reduce( $map_reduce_json , sub {
1121             my ($response, $phase) = @_;
1122            
1123             # process the response
1124             });
1125              
1126             this callback will be called 4 times, with this response (decoded from json)
1127              
1128             [['foo', 1]]
1129             [['bam', 3]]
1130             [['bar', 4]]
1131             [['baz', 0]]
1132              
1133             using map_reduce method, you can also use a hashref as a map reduce query:
1134              
1135             my $json_hash = {
1136             inputs => "training",
1137             query => [{
1138             map => {
1139             language =>"javascript",
1140             source =>"function(riakObject) {
1141             var val = riakObject.values[0].data.match(/pizza/g);
1142             return [[riakObject.key, (val ? val.length : 0 )]];
1143             }"
1144             }
1145             }]
1146             };
1147            
1148             $client->map_reduce($json_hash, sub { ... });
1149              
1150             map_reduce encode/decode to json format. If you need control with the format (like to use with erlang), you should use map_reduce_raw.
1151              
1152             you can use erlang functions but using the json format (see L).
1153              
1154             {"inputs":"messages","query":[{"map":{"language":"erlang","module":"mr_example","function":"get_keys"}}]}
1155              
1156             More information:
1157              
1158             L
1159              
1160             L
1161              
1162             L
1163              
1164             =head1 SEE ALSO
1165              
1166             L
1167              
1168             L
1169              
1170             L
1171              
1172             L
1173              
1174             =head1 AUTHORS
1175              
1176             =over 4
1177              
1178             =item *
1179              
1180             Tiago Peczenyj
1181              
1182             =item *
1183              
1184             Damien Krotkine
1185              
1186             =back
1187              
1188             =head1 COPYRIGHT AND LICENSE
1189              
1190             This software is copyright (c) 2013 by Weborama.
1191              
1192             This is free software; you can redistribute it and/or modify it under
1193             the same terms as the Perl 5 programming language system itself.
1194              
1195             =cut
1196              
1197              
1198             __END__