File Coverage

blib/lib/Rethinkdb/IO.pm
Criterion Covered Total %
statement 27 228 11.8
branch 0 76 0.0
condition 0 16 0.0
subroutine 9 24 37.5
pod 6 6 100.0
total 42 350 12.0


line stmt bran cond sub pod time code
1             package Rethinkdb::IO;
2 15     15   51 use Rethinkdb::Base -base;
  15         15  
  15         77  
3              
4 15     15   59 no warnings 'recursion';
  15         20  
  15         450  
5              
6 15     15   45 use Carp 'croak';
  15         16  
  15         655  
7 15     15   7168 use IO::Socket::INET;
  15         181557  
  15         80  
8 15     15   17347 use JSON::PP;
  15         181850  
  15         1155  
9              
10 15     15   6107 use Rethinkdb::Protocol;
  15         52  
  15         148  
11 15     15   7279 use Rethinkdb::Response;
  15         27  
  15         96  
12              
13             has host => 'localhost';
14             has port => 28_015;
15             has default_db => 'test';
16             has auth_key => q{};
17             has timeout => 20;
18             has [ '_rdb', '_handle', '_callbacks', '_responder' ];
19             has '_protocol' => sub { Rethinkdb::Protocol->new; };
20              
21             sub connect {
22 0     0 1   my $self = shift;
23              
24 0 0         $self->{_handle} = IO::Socket::INET->new(
25             PeerHost => $self->host,
26             PeerPort => $self->port,
27             Reuse => 1,
28             Timeout => $self->timeout,
29             )
30             or croak 'ERROR: Could not connect to ' . $self->host . q{:} . $self->port;
31              
32 0           $self->_handle->send( pack 'L<',
33             $self->_protocol->versionDummy->version->v0_3 );
34 0           $self->_handle->send(
35             ( pack 'L<', length $self->auth_key ) . $self->auth_key );
36              
37 0           $self->_handle->send( pack 'L<',
38             $self->_protocol->versionDummy->protocol->json );
39              
40 0           my $response;
41 0           my $char = q{};
42 0           do {
43 0           $self->_handle->recv( $char, 1 );
44 0           $response .= $char;
45             } while ( $char ne "\0" );
46              
47             # trim string
48 0           $response =~ s/^\s//;
49 0           $response =~ s/\s$//;
50              
51 0 0         if ( $response =~ /^ERROR/ ) {
52 0           croak $response;
53             }
54              
55 0           $self->_callbacks( {} );
56              
57 0           return $self;
58             }
59              
60             sub close {
61 0     0 1   my $self = shift;
62 0 0         my $args = ref $_[0] ? $_[0] : {@_};
63              
64 0 0         if ( $self->_handle ) {
65 0 0 0       if ( !defined $args->{noreply_wait} || !$args->{noreply_wait} ) {
66 0           $self->noreply_wait;
67             }
68              
69 0           $self->_handle->close;
70 0           $self->_handle(undef);
71             }
72              
73 0           $self->_callbacks( {} );
74              
75 0           return $self;
76             }
77              
78             sub reconnect {
79 0     0 1   my $self = shift;
80 0 0         my $args = ref $_[0] ? $_[0] : {@_};
81              
82 0           return $self->close($args)->connect;
83             }
84              
85             # put the handle into main package
86             sub repl {
87 0     0 1   my $self = shift;
88 0           my $package = caller;
89              
90 0           $package::_rdb_io = $self;
91 0           return $self;
92             }
93              
94             sub use {
95 0     0 1   my $self = shift;
96 0           my $db = shift;
97              
98 0           $self->default_db($db);
99 0           return $self;
100             }
101              
102             sub noreply_wait {
103 0     0 1   my $self = shift;
104              
105 0           return $self->_send(
106             {
107             type => $self->_protocol->query->queryType->noreply_wait,
108             token => Rethinkdb::Util::_token(),
109             }
110             );
111             }
112              
113             sub _start {
114 0     0     my $self = shift;
115 0           my ( $query, $args, $callback ) = @_;
116              
117 0           my $q = {
118             type => $self->_protocol->query->queryType->start,
119             token => Rethinkdb::Util::_token(),
120             query => $query->_build
121             };
122              
123 0 0         if ( ref $callback eq 'CODE' ) {
124 0           $self->_callbacks->{ $q->{token} } = $callback;
125             }
126              
127             # add our database
128 0 0         if(!$args->{db}) {
129 0           $args->{db} = $self->default_db;
130             }
131              
132 0           return $self->_send( $q, $args );
133             }
134              
135             sub _encode {
136 0     0     my $self = shift;
137 0           my $data = shift;
138 0   0       my $args = shift || {};
139              
140             # only QUERY->START needs these:
141 0 0         if ( $data->{type} == 1 ) {
142 0           $data = $self->_encode_recurse($data);
143 0           push @{$data}, _simple_encode_hash($args);
  0            
144             }
145             else {
146 0           $data = [ $data->{type} ];
147             }
148              
149 0           return encode_json $data;
150             }
151              
152             # temporarily: clean up global optional arguments
153             sub _simple_encode_hash {
154 0     0     my $data = shift;
155 0           my $json = {};
156              
157 0           foreach ( keys %{$data} ) {
  0            
158 0           $json->{$_} = _simple_encode( $data->{$_} );
159             }
160              
161 0 0         if ( $json->{db} ) {
162             $json->{db} = Rethinkdb::IO->_encode_recurse(Rethinkdb::Query::Database->new(
163             name => $json->{db},
164             args => $json->{db},
165 0           )->_build);
166             }
167              
168 0           return $json;
169             }
170              
171             sub _simple_encode {
172 0     0     my $data = shift;
173              
174 0 0         if ( ref $data eq 'Rethinkdb::_True' ) {
    0          
175 0           return JSON::PP::true;
176             }
177             elsif ( ref $data eq 'Rethinkdb::_False' ) {
178 0           return JSON::PP::false;
179             }
180              
181 0           return $data;
182             }
183              
184             sub _encode_recurse {
185 0     0     my $self = shift;
186 0           my $data = shift;
187 0           my $json = [];
188              
189 0 0         if ( $data->{datum} ) {
190 0           my $val = q{};
191 0 0 0       if ( defined $data->{datum}->{r_bool} ) {
    0          
192 0 0         if ( $data->{datum}->{r_bool} ) {
193 0           return JSON::PP::true;
194             }
195             else {
196 0           return JSON::PP::false;
197             }
198             }
199             elsif ( defined $data->{datum}->{type}
200             && $data->{datum}->{type} == $self->_protocol->datum->datumType->r_null )
201             {
202 0           return JSON::PP::null;
203             }
204             else {
205 0           foreach ( keys %{ $data->{datum} } ) {
  0            
206 0 0         if ( $_ ne 'type' ) {
207 0           return $data->{datum}->{$_};
208             }
209             }
210             }
211             }
212              
213 0 0         if ( $data->{type} ) {
214 0           push @{$json}, $data->{type};
  0            
215             }
216              
217 0 0         if ( $data->{query} ) {
218 0           push @{$json}, $self->_encode_recurse( $data->{query} );
  0            
219             }
220              
221 0 0         if ( $data->{args} ) {
222 0           my $args = [];
223 0           foreach ( @{ $data->{args} } ) {
  0            
224 0           push @{$args}, $self->_encode_recurse($_);
  0            
225             }
226              
227 0           push @{$json}, $args;
  0            
228             }
229              
230 0 0 0       if ( $data->{optargs} && ref $data->{optargs} eq 'HASH' ) {
    0          
231 0           push @{$json}, $self->_encode_recurse( $data->{optargs} );
  0            
232             }
233             elsif ( $data->{optargs} ) {
234 0           my $args = {};
235 0           foreach ( @{ $data->{optargs} } ) {
  0            
236 0           $args->{ $_->{key} } = $self->_encode_recurse( $_->{val} );
237             }
238              
239 0 0         if ( $data->{type} == $self->_protocol->term->termType->make_obj ) {
240 0           return $args;
241             }
242              
243 0           push @{$json}, $args;
  0            
244             }
245              
246 0           return $json;
247             }
248              
249             sub _decode {
250 0     0     my $self = shift;
251 0           my $data = shift;
252 0           my $decode = decode_json $data;
253              
254 0           $decode->{r} = $self->_clean( $decode->{r} );
255 0           return $decode;
256             }
257              
258             # converts JSON::PP::Boolean in an array to our Booleans
259             sub _clean {
260 0     0     my $self = shift;
261 0           my $data = shift;
262 0           my $clean = [];
263              
264 0 0         if ( ref $data eq 'ARRAY' ) {
    0          
265 0           foreach ( @{$data} ) {
  0            
266 0           push @{$clean}, $self->_real_cleaner($_);
  0            
267             }
268              
269 0           return $clean;
270             }
271             elsif ( ref $data eq 'HASH' ) {
272 0           foreach ( keys %{$data} ) {
  0            
273 0           $data->{$_} = $self->_real_cleaner( $data->{$_} );
274             }
275              
276 0           return $data;
277             }
278              
279 0           return $data;
280             }
281              
282             sub _real_cleaner {
283 0     0     my $self = shift;
284 0           my $data = shift;
285 0           my $retval;
286              
287 0 0         if ( ref $data eq 'JSON::PP::Boolean' ) {
    0          
    0          
288 0 0         if ($data) {
289 0           $retval = $self->_rdb->true;
290             }
291             else {
292 0           $retval = $self->_rdb->false;
293             }
294             }
295             elsif ( ref $data eq 'ARRAY' ) {
296 0           $retval = $self->_clean($data);
297             }
298             elsif ( ref $data eq 'HASH' ) {
299 0           $retval = $self->_clean($data);
300             }
301             else {
302 0           $retval = $data;
303             }
304              
305 0           return $retval;
306             }
307              
308             sub _send {
309 0     0     my $self = shift;
310 0           my $query = shift;
311 0   0       my $args = shift || {};
312              
313 0 0         if ( $ENV{RDB_DEBUG} ) {
314 15     15   89 use feature ':5.10';
  15         21  
  15         689  
315 15     15   9688 use Data::Dumper;
  15         67907  
  15         8470  
316 0           $Data::Dumper::Indent = 1;
317 0           say {*STDERR} 'SENDING:';
  0            
318 0           say {*STDERR} Dumper $query;
  0            
319             }
320              
321 0           my $token;
322             my $length;
323              
324 0           my $serial = $self->_encode( $query, $args );
325 0           my $header = pack 'QL<', $query->{token}, length $serial;
326              
327 0 0         if ( $ENV{RDB_DEBUG} ) {
328 0           say 'SENDING:';
329 0           say {*STDERR} Dumper $serial;
  0            
330             }
331              
332             # send message
333 0           $self->_handle->send( $header . $serial );
334              
335             # noreply should just return
336 0 0         if ( $args->{noreply} ) {
337 0           return;
338             }
339              
340             # receive message
341 0           my $data = q{};
342              
343 0           $self->_handle->recv( $token, 8 );
344 0           $token = unpack 'Q<', $token;
345              
346 0           $self->_handle->recv( $length, 4 );
347 0           $length = unpack 'L<', $length;
348              
349 0           my $_data;
350 0           do {
351 0           $self->_handle->recv( $_data, 4096 );
352 0           $data = $data . $_data;
353             } until ( length($data) eq $length );
354              
355             # decode RQL data
356 0           my $res_data = $self->_decode($data);
357 0           $res_data->{token} = $token;
358              
359             # handle partial and feed responses
360 0 0 0       if ( $res_data->{t} == 3 or $res_data->{t} == 5 ) {
361 0 0         if ( $self->_callbacks->{$token} ) {
362 0           my $res = Rethinkdb::Response->_init( $res_data, $args );
363              
364 0 0         if ( $ENV{RDB_DEBUG} ) {
365 0           say {*STDERR} 'RECEIVED:';
  0            
366 0           say {*STDERR} Dumper $res;
  0            
367             }
368              
369             # send what we have
370 0           $self->_callbacks->{$token}->($res);
371              
372             # fetch more
373 0           return $self->_send(
374             {
375             type => $self->_protocol->query->queryType->continue,
376             token => $token
377             }
378             );
379             }
380             else {
381 0 0         if ( $ENV{RDB_DEBUG} ) {
382 0           say {*STDERR} 'RECEIVED:';
  0            
383 0           say {*STDERR} Dumper $res_data;
  0            
384             }
385              
386             # fetch the rest of the data if stream/partial/feed
387 0           my $more = $self->_send(
388             {
389             type => $self->_protocol->query->queryType->continue,
390             token => $token
391             }
392             );
393              
394 0           push @{ $res_data->{r} }, @{ $more->response };
  0            
  0            
395 0           $res_data->{t} = $more->type;
396             }
397             }
398              
399             # put data in response
400 0           my $res = Rethinkdb::Response->_init( $res_data, $args );
401              
402 0 0         if ( $ENV{RDB_DEBUG} ) {
403 0           say {*STDERR} 'RECEIVED:';
  0            
404 0           say {*STDERR} Dumper $res_data;
  0            
405 0           say {*STDERR} Dumper $res;
  0            
406             }
407              
408             # if there is callback return data to that
409 0 0         if ( $self->_callbacks->{$token} ) {
410 0           my $cb = $self->_callbacks->{$token};
411 0           delete $self->_callbacks->{$token};
412 0           return $cb->($res);
413             }
414              
415 0           return $res;
416             }
417              
418             1;
419              
420             =encoding utf8
421              
422             =head1 NAME
423              
424             Rethinkdb::IO - RethinkDB IO
425              
426             =head1 SYNOPSIS
427              
428             package MyApp;
429             use Rethinkdb::IO;
430              
431             my $io = Rethinkdb::IO->new->connect;
432             $io->use('marvel');
433             $io->close;
434              
435             =head1 DESCRIPTION
436              
437             This module handles communicating with the RethinkDB Database.
438              
439             =head1 ATTRIBUTES
440              
441             L implements the following attributes.
442              
443             =head2 host
444              
445             my $io = Rethinkdb::IO->new->connect;
446             my $host = $io->host;
447             $io->host('r.example.com');
448              
449             The C attribute returns or sets the current host name that
450             L is currently set to use.
451              
452             =head2 port
453              
454             my $io = Rethinkdb::IO->new->connect;
455             my $port = $io->port;
456             $io->port(1212);
457              
458             The C attribute returns or sets the current port number that
459             L is currently set to use.
460              
461             =head2 default_db
462              
463             my $io = Rethinkdb::IO->new->connect;
464             my $port = $io->default_db;
465             $io->default_db('marvel');
466              
467             The C attribute returns or sets the current database name that
468             L is currently set to use.
469              
470             =head2 auth_key
471              
472             my $io = Rethinkdb::IO->new->connect;
473             my $port = $io->auth_key;
474             $io->auth_key('setec astronomy');
475              
476             The C attribute returns or sets the current authentication key that
477             L is currently set to use.
478              
479             =head2 timeout
480              
481             my $io = Rethinkdb::IO->new->connect;
482             my $timeout = $io->timeout;
483             $io->timeout(60);
484              
485             The C attribute returns or sets the timeout length that
486             L is currently set to use.
487              
488             =head1 METHODS
489              
490             L inherits all methods from L and implements
491             the following methods.
492              
493             =head2 connect
494              
495             my $io = Rethinkdb::IO->new;
496             $io->host('rdb.example.com');
497             $io->connect->repl;
498              
499             The C method initiates the connection to the RethinkDB database.
500              
501             =head2 close
502              
503             my $io = Rethinkdb::IO->new;
504             $io->host('rdb.example.com');
505             $io->connect;
506             $io->close;
507              
508             The C method closes the current connection to the RethinkDB database.
509              
510             =head2 reconnect
511              
512             my $io = Rethinkdb::IO->new;
513             $io->host('rdb.example.com');
514             $io->connect;
515             $io->reconnect;
516              
517             The C method closes and reopens a connection to the RethinkDB
518             database.
519              
520             =head2 repl
521              
522             my $io = Rethinkdb::IO->new;
523             $io->host('rdb.example.com');
524             $io->connect->repl;
525              
526             The C method caches the current connection in to the main program so that
527             it is available to for all L queries without specifically specifying
528             one.
529              
530             =head2 use
531              
532             my $io = Rethinkdb::IO->new;
533             $io->use('marven');
534             $io->connect;
535              
536             The C method sets the default database name to use for all queries that
537             use this connection.
538              
539             =head2 noreply_wait
540              
541             my $io = Rethinkdb::IO->new;
542             $io->noreply_wait;
543              
544             The C method will tell the database to wait until all "no reply"
545             have executed before responding.
546              
547             =head1 SEE ALSO
548              
549             L, L
550              
551             =cut