File Coverage

blib/lib/Rethinkdb/IO.pm
Criterion Covered Total %
statement 27 231 11.6
branch 0 76 0.0
condition 0 15 0.0
subroutine 9 25 36.0
pod 7 7 100.0
total 43 354 12.1


line stmt bran cond sub pod time code
1             package Rethinkdb::IO;
2 15     15   52 use Rethinkdb::Base -base;
  15         16  
  15         74  
3              
4 15     15   58 no warnings 'recursion';
  15         18  
  15         424  
5              
6 15     15   48 use Carp 'croak';
  15         12  
  15         608  
7 15     15   6654 use IO::Socket::INET;
  15         168418  
  15         70  
8 15     15   16245 use JSON::PP;
  15         172631  
  15         985  
9              
10 15     15   5540 use Rethinkdb::Protocol;
  15         35  
  15         130  
11 15     15   6529 use Rethinkdb::Response;
  15         27  
  15         99  
12              
13             has host => 'localhost';
14             has port => 28_015;
15             has default_db => 'test';
16             has auth_key => q{};
17             has timeout => 20;
18             has [ '_rdb', '_handle', '_callbacks', '_responder' ];
19             has '_protocol' => sub { Rethinkdb::Protocol->new; };
20              
21             sub connect {
22 0     0 1   my $self = shift;
23              
24 0 0         $self->{_handle} = IO::Socket::INET->new(
25             PeerHost => $self->host,
26             PeerPort => $self->port,
27             Reuse => 1,
28             Timeout => $self->timeout,
29             )
30             or croak 'ERROR: Could not connect to ' . $self->host . q{:} . $self->port;
31              
32 0           $self->_handle->send( pack 'L<',
33             $self->_protocol->versionDummy->version->v0_3 );
34 0           $self->_handle->send(
35             ( pack 'L<', length $self->auth_key ) . $self->auth_key );
36              
37 0           $self->_handle->send( pack 'L<',
38             $self->_protocol->versionDummy->protocol->json );
39              
40 0           my $response;
41 0           my $char = q{};
42 0           do {
43 0           $self->_handle->recv( $char, 1 );
44 0           $response .= $char;
45             } while ( $char ne "\0" );
46              
47             # trim string
48 0           $response =~ s/^\s//;
49 0           $response =~ s/\s$//;
50              
51 0 0         if ( $response =~ /^ERROR/ ) {
52 0           croak $response;
53             }
54              
55 0           $self->_callbacks( {} );
56              
57 0           return $self;
58             }
59              
60             sub close {
61 0     0 1   my $self = shift;
62 0 0         my $args = ref $_[0] ? $_[0] : {@_};
63              
64 0 0         if ( $self->_handle ) {
65 0 0 0       if ( !defined $args->{noreply_wait} || !$args->{noreply_wait} ) {
66 0           $self->noreply_wait;
67             }
68              
69 0           $self->_handle->close;
70 0           $self->_handle(undef);
71             }
72              
73 0           $self->_callbacks( {} );
74              
75 0           return $self;
76             }
77              
78             sub reconnect {
79 0     0 1   my $self = shift;
80 0 0         my $args = ref $_[0] ? $_[0] : {@_};
81              
82 0           return $self->close($args)->connect;
83             }
84              
85             # put the handle into main package
86             sub repl {
87 0     0 1   my $self = shift;
88 0           my $package = caller;
89              
90 0           $package::_rdb_io = $self;
91 0           return $self;
92             }
93              
94             sub use {
95 0     0 1   my $self = shift;
96 0           my $db = shift;
97              
98 0           $self->default_db($db);
99 0           return $self;
100             }
101              
102             sub noreply_wait {
103 0     0 1   my $self = shift;
104              
105 0           return $self->_send(
106             {
107             type => $self->_protocol->query->queryType->noreply_wait,
108             token => Rethinkdb::Util::_token(),
109             }
110             );
111             }
112              
113             sub server {
114 0     0 1   my $self = shift;
115              
116 0           return $self->_send(
117             {
118             type => $self->_protocol->query->queryType->server_info,
119             token => Rethinkdb::Util::_token(),
120             }
121             );
122             }
123              
124             sub _start {
125 0     0     my $self = shift;
126 0           my ( $query, $args, $callback ) = @_;
127              
128 0           my $q = {
129             type => $self->_protocol->query->queryType->start,
130             token => Rethinkdb::Util::_token(),
131             query => $query->_build
132             };
133              
134 0 0         if ( ref $callback eq 'CODE' ) {
135 0           $self->_callbacks->{ $q->{token} } = $callback;
136             }
137              
138             # add our database
139 0 0         if ( !$args->{db} ) {
140 0           $args->{db} = $self->default_db;
141             }
142              
143 0           return $self->_send( $q, $args );
144             }
145              
146             sub _encode {
147 0     0     my $self = shift;
148 0           my $data = shift;
149 0   0       my $args = shift || {};
150              
151             # only QUERY->START needs these:
152 0 0         if ( $data->{type} == 1 ) {
153 0           $data = $self->_encode_recurse($data);
154 0           push @{$data}, _simple_encode_hash($args);
  0            
155             }
156             else {
157 0           $data = [ $data->{type} ];
158             }
159              
160 0           return encode_json $data;
161             }
162              
163             # temporarily: clean up global optional arguments
164             sub _simple_encode_hash {
165 0     0     my $data = shift;
166 0           my $json = {};
167              
168 0           foreach ( keys %{$data} ) {
  0            
169 0           $json->{$_} = _simple_encode( $data->{$_} );
170             }
171              
172 0 0         if ( $json->{db} ) {
173             $json->{db} = Rethinkdb::IO->_encode_recurse(
174             Rethinkdb::Query::Database->new(
175             name => $json->{db},
176             args => $json->{db},
177 0           )->_build
178             );
179             }
180              
181 0           return $json;
182             }
183              
184             sub _simple_encode {
185 0     0     my $data = shift;
186              
187 0 0         if ( ref $data eq 'Rethinkdb::_True' ) {
    0          
188 0           return JSON::PP::true;
189             }
190             elsif ( ref $data eq 'Rethinkdb::_False' ) {
191 0           return JSON::PP::false;
192             }
193              
194 0           return $data;
195             }
196              
197             sub _encode_recurse {
198 0     0     my $self = shift;
199 0           my $data = shift;
200 0           my $json = [];
201              
202 0 0         if ( $data->{datum} ) {
203 0           my $val = q{};
204 0 0 0       if ( defined $data->{datum}->{r_bool} ) {
    0          
205 0 0         if ( $data->{datum}->{r_bool} ) {
206 0           return JSON::PP::true;
207             }
208             else {
209 0           return JSON::PP::false;
210             }
211             }
212             elsif ( defined $data->{datum}->{type}
213             && $data->{datum}->{type} == $self->_protocol->datum->datumType->r_null )
214             {
215 0           return JSON::PP::null;
216             }
217             else {
218 0           foreach ( keys %{ $data->{datum} } ) {
  0            
219 0 0         if ( $_ ne 'type' ) {
220 0           return $data->{datum}->{$_};
221             }
222             }
223             }
224             }
225              
226 0 0         if ( $data->{type} ) {
227 0           push @{$json}, $data->{type};
  0            
228             }
229              
230 0 0         if ( $data->{query} ) {
231 0           push @{$json}, $self->_encode_recurse( $data->{query} );
  0            
232             }
233              
234 0 0         if ( $data->{args} ) {
235 0           my $args = [];
236 0           foreach ( @{ $data->{args} } ) {
  0            
237 0           push @{$args}, $self->_encode_recurse($_);
  0            
238             }
239              
240 0           push @{$json}, $args;
  0            
241             }
242              
243 0 0 0       if ( $data->{optargs} && ref $data->{optargs} eq 'HASH' ) {
    0          
244 0           push @{$json}, $self->_encode_recurse( $data->{optargs} );
  0            
245             }
246             elsif ( $data->{optargs} ) {
247 0           my $args = {};
248 0           foreach ( @{ $data->{optargs} } ) {
  0            
249 0           $args->{ $_->{key} } = $self->_encode_recurse( $_->{val} );
250             }
251              
252 0 0         if ( $data->{type} == $self->_protocol->term->termType->make_obj ) {
253 0           return $args;
254             }
255              
256 0           push @{$json}, $args;
  0            
257             }
258              
259 0           return $json;
260             }
261              
262             sub _decode {
263 0     0     my $self = shift;
264 0           my $data = shift;
265 0           my $decode = decode_json $data;
266              
267 0           $decode->{r} = $self->_clean( $decode->{r} );
268 0           return $decode;
269             }
270              
271             # converts JSON::PP::Boolean in an array to our Booleans
272             sub _clean {
273 0     0     my $self = shift;
274 0           my $data = shift;
275 0           my $clean = [];
276              
277 0 0         if ( ref $data eq 'ARRAY' ) {
    0          
278 0           foreach ( @{$data} ) {
  0            
279 0           push @{$clean}, $self->_real_cleaner($_);
  0            
280             }
281              
282 0           return $clean;
283             }
284             elsif ( ref $data eq 'HASH' ) {
285 0           foreach ( keys %{$data} ) {
  0            
286 0           $data->{$_} = $self->_real_cleaner( $data->{$_} );
287             }
288              
289 0           return $data;
290             }
291              
292 0           return $data;
293             }
294              
295             sub _real_cleaner {
296 0     0     my $self = shift;
297 0           my $data = shift;
298 0           my $retval;
299              
300 0 0         if ( ref $data eq 'JSON::PP::Boolean' ) {
    0          
    0          
301 0 0         if ($data) {
302 0           $retval = $self->_rdb->true;
303             }
304             else {
305 0           $retval = $self->_rdb->false;
306             }
307             }
308             elsif ( ref $data eq 'ARRAY' ) {
309 0           $retval = $self->_clean($data);
310             }
311             elsif ( ref $data eq 'HASH' ) {
312 0           $retval = $self->_clean($data);
313             }
314             else {
315 0           $retval = $data;
316             }
317              
318 0           return $retval;
319             }
320              
321             sub _send {
322 0     0     my $self = shift;
323 0           my $query = shift;
324 0   0       my $args = shift || {};
325              
326 0 0         if ( $ENV{RDB_DEBUG} ) {
327 15     15   80 use feature ':5.10';
  15         15  
  15         570  
328 15     15   8538 use Data::Dumper;
  15         64897  
  15         8637  
329 0           $Data::Dumper::Indent = 1;
330 0           say {*STDERR} 'SENDING:';
  0            
331 0           say {*STDERR} Dumper $query;
  0            
332             }
333              
334 0           my $token;
335             my $length;
336              
337 0           my $serial = $self->_encode( $query, $args );
338 0           my $header = pack 'QL<', $query->{token}, length $serial;
339              
340 0 0         if ( $ENV{RDB_DEBUG} ) {
341 0           say 'SENDING:';
342 0           say {*STDERR} Dumper $serial;
  0            
343             }
344              
345             # send message
346 0           $self->_handle->send( $header . $serial );
347              
348             # noreply should just return
349 0 0         if ( $args->{noreply} ) {
350 0           return;
351             }
352              
353             # receive message
354 0           my $data = q{};
355              
356 0           $self->_handle->recv( $token, 8 );
357 0           $token = unpack 'Q<', $token;
358              
359 0           $self->_handle->recv( $length, 4 );
360 0           $length = unpack 'L<', $length;
361              
362             # if we couldn't unpack a length, say it is zero
363 0   0       $length ||= 0;
364              
365 0           my $_data;
366 0           do {
367 0           $self->_handle->recv( $_data, 4096 );
368 0           $data = $data . $_data;
369             } until ( length($data) eq $length );
370              
371             # decode RQL data
372 0           my $res_data = $self->_decode($data);
373 0           $res_data->{token} = $token;
374              
375             # handle partial response
376 0 0         if ( $res_data->{t} == 3 ) {
377 0 0         if ( $self->_callbacks->{$token} ) {
378 0           my $res = Rethinkdb::Response->_init( $res_data, $args );
379              
380 0 0         if ( $ENV{RDB_DEBUG} ) {
381 0           say {*STDERR} 'RECEIVED:';
  0            
382 0           say {*STDERR} Dumper $res;
  0            
383             }
384              
385             # send what we have
386 0           $self->_callbacks->{$token}->($res);
387              
388             # fetch more
389 0           return $self->_send(
390             {
391             type => $self->_protocol->query->queryType->continue,
392             token => $token
393             }
394             );
395             }
396             else {
397 0 0         if ( $ENV{RDB_DEBUG} ) {
398 0           say {*STDERR} 'RECEIVED:';
  0            
399 0           say {*STDERR} Dumper $res_data;
  0            
400             }
401              
402             # fetch the rest of the data if partial
403 0           my $more = $self->_send(
404             {
405             type => $self->_protocol->query->queryType->continue,
406             token => $token
407             }
408             );
409              
410 0           push @{ $res_data->{r} }, @{ $more->response };
  0            
  0            
411 0           $res_data->{t} = $more->type;
412             }
413             }
414              
415             # put data in response
416 0           my $res = Rethinkdb::Response->_init( $res_data, $args );
417              
418 0 0         if ( $ENV{RDB_DEBUG} ) {
419 0           say {*STDERR} 'RECEIVED:';
  0            
420 0           say {*STDERR} Dumper $res_data;
  0            
421 0           say {*STDERR} Dumper $res;
  0            
422             }
423              
424             # if there is callback return data to that
425 0 0         if ( $self->_callbacks->{$token} ) {
426 0           my $cb = $self->_callbacks->{$token};
427 0           delete $self->_callbacks->{$token};
428 0           return $cb->($res);
429             }
430              
431 0           return $res;
432             }
433              
434             1;
435              
436             =encoding utf8
437              
438             =head1 NAME
439              
440             Rethinkdb::IO - RethinkDB IO
441              
442             =head1 SYNOPSIS
443              
444             package MyApp;
445             use Rethinkdb::IO;
446              
447             my $io = Rethinkdb::IO->new->connect;
448             $io->use('marvel');
449             $io->close;
450              
451             =head1 DESCRIPTION
452              
453             This module handles communicating with the RethinkDB Database.
454              
455             =head1 ATTRIBUTES
456              
457             L implements the following attributes.
458              
459             =head2 host
460              
461             my $io = Rethinkdb::IO->new->connect;
462             my $host = $io->host;
463             $io->host('r.example.com');
464              
465             The C attribute returns or sets the current host name that
466             L is currently set to use.
467              
468             =head2 port
469              
470             my $io = Rethinkdb::IO->new->connect;
471             my $port = $io->port;
472             $io->port(1212);
473              
474             The C attribute returns or sets the current port number that
475             L is currently set to use.
476              
477             =head2 default_db
478              
479             my $io = Rethinkdb::IO->new->connect;
480             my $port = $io->default_db;
481             $io->default_db('marvel');
482              
483             The C attribute returns or sets the current database name that
484             L is currently set to use.
485              
486             =head2 auth_key
487              
488             my $io = Rethinkdb::IO->new->connect;
489             my $port = $io->auth_key;
490             $io->auth_key('setec astronomy');
491              
492             The C attribute returns or sets the current authentication key that
493             L is currently set to use.
494              
495             =head2 timeout
496              
497             my $io = Rethinkdb::IO->new->connect;
498             my $timeout = $io->timeout;
499             $io->timeout(60);
500              
501             The C attribute returns or sets the timeout length that
502             L is currently set to use.
503              
504             =head1 METHODS
505              
506             L inherits all methods from L and implements
507             the following methods.
508              
509             =head2 connect
510              
511             my $io = Rethinkdb::IO->new;
512             $io->host('rdb.example.com');
513             $io->connect->repl;
514              
515             The C method initiates the connection to the RethinkDB database.
516              
517             =head2 close
518              
519             my $io = Rethinkdb::IO->new;
520             $io->host('rdb.example.com');
521             $io->connect;
522             $io->close;
523              
524             The C method closes the current connection to the RethinkDB database.
525              
526             =head2 reconnect
527              
528             my $io = Rethinkdb::IO->new;
529             $io->host('rdb.example.com');
530             $io->connect;
531             $io->reconnect;
532              
533             The C method closes and reopens a connection to the RethinkDB
534             database.
535              
536             =head2 repl
537              
538             my $io = Rethinkdb::IO->new;
539             $io->host('rdb.example.com');
540             $io->connect->repl;
541              
542             The C method caches the current connection in to the main program so that
543             it is available to for all L queries without specifically specifying
544             one.
545              
546             =head2 use
547              
548             my $io = Rethinkdb::IO->new;
549             $io->use('marven');
550             $io->connect;
551              
552             The C method sets the default database name to use for all queries that
553             use this connection.
554              
555             =head2 noreply_wait
556              
557             my $io = Rethinkdb::IO->new;
558             $io->noreply_wait;
559              
560             The C method will tell the database to wait until all "no reply"
561             have executed before responding.
562              
563             =head2 server
564              
565             my $conn = r->connect;
566             $conn->server;
567              
568             Return information about the server being used by this connection.
569              
570             The server command returns either two or three fields:
571              
572             =over
573              
574             =item C: the UUID of the server the client is connected to.
575              
576             =item C: a boolean indicating whether the server is a L.
577              
578             =item C: the server name. If proxy is Ctrue>, this field will not be
579             returned.
580              
581             =back
582              
583             =head1 SEE ALSO
584              
585             L, L
586              
587             =cut