File Coverage

blib/lib/Net/Async/CassandraCQL/Connection.pm
Criterion Covered Total %
statement 201 221 90.9
branch 57 80 71.2
condition 11 26 42.3
subroutine 42 47 89.3
pod 13 15 86.6
total 324 389 83.2


line stmt bran cond sub pod time code
1             # You may distribute under the terms of either the GNU General Public License
2             # or the Artistic License (the same terms as Perl itself)
3             #
4             # (C) Paul Evans, 2013-2014 -- leonerd@leonerd.org.uk
5              
6             package Net::Async::CassandraCQL::Connection;
7              
8 13     13   820141 use strict;
  13         32  
  13         820  
9 13     13   72 use warnings;
  13         29  
  13         351  
10 13     13   332 use 5.010;
  13         48  
  13         778  
11              
12             our $VERSION = '0.11';
13              
14 13     13   69 use base qw( IO::Async::Stream );
  13         24  
  13         9464  
15             IO::Async::Stream->VERSION( '0.59' );
16              
17 13     13   660265 use Carp;
  13         30  
  13         1052  
18              
19 13     13   69 use Future 0.13;
  13         316  
  13         442  
20              
21 13     13   64 use constant HAVE_SNAPPY => eval { require Compress::Snappy };
  13         20  
  13         24  
  13         10891  
22 13     13   9679 use constant HAVE_LZ4 => eval { require Compress::LZ4 };
  13         27  
  13         29  
  13         9980  
23              
24 13         4562 use Protocol::CassandraCQL qw(
25             :opcodes :results :consistencies FLAG_COMPRESS
26             build_frame parse_frame
27 13     13   14995 );
  13         28649  
28 13     13   11032 use Protocol::CassandraCQL::Frame;
  13         30115  
  13         487  
29 13     13   12426 use Protocol::CassandraCQL::Frames 0.10 qw( :all );
  13         249635  
  13         2888  
30              
31 13     13   9608 use Net::Async::CassandraCQL::Query;
  13         40  
  13         39696  
32              
33             # Ensure that IO::Async definitely uses this for Iv6 connections as we need
34             # the ->peerhost method
35             require IO::Socket::IP;
36              
37             =head1 NAME
38              
39             C - connect to a single Cassandra database node
40              
41             =head1 DESCRIPTION
42              
43             TODO
44              
45             =cut
46              
47             =head1 EVENTS
48              
49             =head2 on_event $name, @args
50              
51             A registered event occurred. C<@args> will depend on the event name. Each
52             is also available as its own event, with the name in lowercase. If the event
53             is not one of the types recognised below, C<@args> will contain the actual
54             L object.
55              
56             =head2 on_topology_change $type, $node
57              
58             The cluster topology has changed. C<$node> is a packed socket address.
59              
60             =head2 on_status_change $status, $node
61              
62             The node's status has changed. C<$node> is a packed socket address.
63              
64             =head2 on_schema_change $type, $keyspace, $table
65              
66             A keyspace or table schema has changed.
67              
68             =cut
69              
70             =head1 PARAMETERS
71              
72             The following named parameters may be passed to C or C:
73              
74             =over 8
75              
76             =item username => STRING
77              
78             =item password => STRING
79              
80             Optional. Authentication details to use for C.
81              
82             =item cql_version => INT
83              
84             Optional. Version of the CQL wire protocol to negotiate during connection.
85             Defaults to 1.
86              
87             =back
88              
89             =cut
90              
91             sub _init
92             {
93 19     19   30031 my $self = shift;
94 19         155 $self->SUPER::_init( @_ );
95              
96 19         458 $self->{streams} = []; # map [1 .. 127] to Future
97 19         59 $self->{pending} = []; # queue of [$opcode, $frame, $f]
98 19         58 $self->{cql_version} = 1;
99             }
100              
101             sub configure
102             {
103 33     33 1 5011 my $self = shift;
104 33         97 my %params = @_;
105              
106 33         95 foreach (qw( username password cql_version
107             on_event on_topology_change on_status_change on_schema_change )) {
108 231 100       562 $self->{$_} = delete $params{$_} if exists $params{$_};
109             }
110              
111 33         203 $self->SUPER::configure( %params );
112             }
113              
114             =head1 METHODS
115              
116             =cut
117              
118             =head2 $id = $conn->nodeid
119              
120             Returns the connection's node ID (the string form of its IP address), which is
121             used as its ID in the C table.
122              
123             =cut
124              
125             sub nodeid
126             {
127 81     81 1 508 my $self = shift;
128 81         557 $self->{nodeid};
129             }
130              
131             sub _version
132             {
133 4109     4109   5095 my $self = shift;
134 4109         12666 return $self->{cql_version};
135             }
136              
137             # function
138             sub _decode_result
139             {
140 1010     1010   1867 my ( $version, $response ) = @_;
141              
142 1010         2614 my ( $type, $result ) = parse_result_frame( $version, $response );
143              
144 1010 100       13635 if( $type == RESULT_VOID ) {
145 1005         2665 return Future->new->done();
146             }
147             else {
148 5 100       21 if ( $type == RESULT_ROWS ) { $type = "rows" }
  3 100       7  
    50          
149 1         3 elsif( $type == RESULT_SET_KEYSPACE ) { $type = "keyspace" }
150 1         2 elsif( $type == RESULT_SCHEMA_CHANGE ) { $type = "schema_change" }
151 5         27 return Future->new->done( $type => $result );
152             }
153             }
154              
155             =head2 $conn->connect( %args ) ==> $conn
156              
157             Connects to the Cassandra node an send the C message. The
158             returned Future will yield the connection itself on success.
159              
160             Takes the following named arguments:
161              
162             =over 8
163              
164             =item host => STRING
165              
166             =item service => STRING
167              
168             =back
169              
170             =cut
171              
172             sub connect
173             {
174 0     0 1 0 my $self = shift;
175 0         0 my %args = @_;
176              
177 0   0     0 $args{socktype} ||= "stream";
178              
179             return ( $self->{connect_f} ||=
180 0     0   0 $self->SUPER::connect( %args )->on_fail( sub { undef $self->{connect_f} } ) )
181             ->then( sub {
182 0     0   0 $self->{nodeid} = $self->read_handle->peerhost;
183 0         0 $self->startup
184 0   0 0   0 })->then( sub { Future->new->done( $self ) });
  0         0  
185             }
186              
187             sub _has_pending
188             {
189 4     4   9 my $self = shift;
190 4   100     5 defined and return 1 for @{ $self->{streams} };
  4         26  
191 3         23 return 0;
192             }
193              
194             sub on_read
195             {
196 1029     1029 1 188162 my $self = shift;
197 1029         1477 my ( $buffref, $eof ) = @_;
198              
199 1029 50       2685 my ( $version, $flags, $streamid, $opcode, $body ) = parse_frame( $$buffref ) or return 0;
200              
201 1029 50       14136 $version & 0x80 or
202             $self->fail_all_and_close( "Expected response to have RESPONSE bit set" ), return;
203 1029         1204 $version &= 0x7f;
204              
205             # Test version <= for now in case of "unsupported protocol version" error messages, and
206             # test it exactly later
207 1029 50       1969 $version <= $self->_version or
208             $self->fail_all_and_close( sprintf "Unexpected message version %#02x\n", $version ), return;
209              
210 1029 100 66     2556 if( $flags & FLAG_COMPRESS and my $decompress = $self->{decompress} ) {
211 2         4 $flags &= ~FLAG_COMPRESS;
212 2         8 $body = $decompress->( $body );
213             }
214              
215 1029 50       1880 $flags == 0 or
216             $self->fail_all_and_close( sprintf "Unexpected message flags %#02x\n", $flags ), return;
217              
218 1029         3488 my $frame = Protocol::CassandraCQL::Frame->new( $body );
219              
220 1029 100 33     11800 if( my $f = $self->{streams}[$streamid] ) {
    50 33        
    50          
221 1028         1805 undef $self->{streams}[$streamid];
222              
223 1028 100       1796 if( $opcode == OPCODE_ERROR ) {
224 1         8 my ( $err, $message ) = parse_error_frame( $version, $frame );
225 1         88 $f->fail( "OPCODE_ERROR: $message\n", $err, $frame );
226             }
227             else {
228 1027 50       1930 $version == $self->_version or
229             $self->fail_all_and_close( sprintf "Unexpected message version %#02x\n", $version ), return;
230              
231 1027         2991 $f->done( $opcode, $frame, $version );
232             }
233              
234 1028 100 66     120480 if( my $next = shift @{ $self->{pending} } ) {
  1028 100       3572  
235 873         1607 my ( $opcode, $frame, $f ) = @$next;
236 873         1865 $self->_send( $opcode, $streamid, $frame, $f );
237             }
238             elsif( my $close_f = $self->{cassandra_close_future} and !$self->_has_pending ) {
239 1         4 $close_f->done( $self );
240             }
241             }
242             elsif( $streamid == 0 and $opcode == OPCODE_ERROR ) {
243 0         0 my ( $err, $message ) = parse_error_frame( $version, $frame );
244 0         0 $self->fail_all_and_close( "OPCODE_ERROR: $message\n", $err, $frame );
245             }
246             elsif( $streamid == 0xff and $opcode == OPCODE_EVENT ) {
247 1         5 $self->_event( $frame );
248             }
249             else {
250 0         0 print STDERR "Received a message opcode=$opcode for unknown stream $streamid\n";
251             }
252              
253 1029         6052 return 1;
254             }
255              
256             sub _event
257             {
258 1     1   1 my $self = shift;
259 1         2 my ( $frame ) = @_;
260              
261 1         3 my ( $name, @args ) = parse_event_frame( $self->_version, $frame );
262              
263 1 50       314 $self->maybe_invoke_event( "on_".lc($name), @args )
264             or $self->maybe_invoke_event( on_event => $name, @args );
265             }
266              
267             sub on_closed
268             {
269 2     2 1 156 my $self = shift;
270 2         16 $self->fail_all( "Connection closed" );
271             }
272              
273             sub fail_all
274             {
275 2     2 0 3 my $self = shift;
276 2         5 my ( $failure ) = @_;
277              
278 2         4 foreach ( @{ $self->{streams} } ) {
  2         6  
279 0 0       0 $_->fail( $failure ) if $_;
280             }
281 2         5 @{ $self->{streams} } = ();
  2         5  
282              
283 2         4 foreach ( @{ $self->{pending} } ) {
  2         7  
284 0         0 $_->[2]->fail( $failure );
285             }
286 2         5 @{ $self->{pending} } = ();
  2         10  
287             }
288              
289             sub fail_all_and_close
290             {
291 0     0 0 0 my $self = shift;
292 0         0 my ( $failure ) = @_;
293              
294 0         0 $self->fail_all( $failure );
295              
296 0         0 $self->close;
297              
298 0         0 return Future->new->fail( $failure );
299             }
300              
301             =head2 $conn->send_message( $opcode, $frame ) ==> ( $reply_opcode, $reply_frame, $reply_version )
302              
303             Sends a message with the given opcode and L for
304             the message body. The returned Future will yield the response opcode, frame
305             and version number (with the RESPONSE bit masked off).
306              
307             This is a low-level method; applications should instead use one of the wrapper
308             methods below.
309              
310             =cut
311              
312             sub send_message
313             {
314 1029     1029 1 52909 my $self = shift;
315 1029         1444 my ( $opcode, $frame ) = @_;
316              
317 1029 50       2663 croak "Cannot ->send_message when in close-pending state" if $self->{cassandra_close_future};
318              
319 1029         2748 my $f = $self->loop->new_future;
320              
321 1028   50     31380 my $streams = $self->{streams} ||= [];
322 1028         1204 my $id;
323 1028         2399 foreach ( 1 .. $#$streams ) {
324 118894 100 50     220533 $id = $_ and last if !defined $streams->[$_];
325             }
326              
327 1028 100       2804 if( !defined $id ) {
328 1007 100       2518 if( $#$streams == 127 ) {
329 873         1035 push @{ $self->{pending} }, [ $opcode, $frame, $f ];
  873         3095  
330 873         7676 return $f;
331             }
332 134         163 $id = @$streams;
333 134 100       234 $id = 1 if !$id; # can't use 0
334             }
335              
336 155         349 $self->_send( $opcode, $id, $frame, $f );
337              
338 155         1344 return $f;
339             }
340              
341             sub _send
342             {
343 1028     1028   1247 my $self = shift;
344 1028         1472 my ( $opcode, $id, $frame, $f ) = @_;
345              
346 1028         1201 my $flags = 0;
347 1028         2878 my $body = $frame->bytes;
348              
349 1028 100       6002 if( my $compress = $self->{compress} ) {
350 12         181 my $body_compressed = $compress->( $body );
351 12 100       47 if( length $body_compressed < length $body ) {
352 2         4 $flags |= FLAG_COMPRESS;
353 2         6 $body = $body_compressed;
354             }
355             }
356              
357 1028         1978 $self->write( build_frame( $self->_version, $flags, $id, $opcode, $body ) );
358              
359 1028         48689 $self->{streams}[$id] = $f;
360             }
361              
362             =head2 $conn->startup ==> ()
363              
364             Sends the initial connection setup message. On success, the returned Future
365             yields nothing.
366              
367             Normally this is not required as the C method performs it implicitly.
368              
369             =cut
370              
371             sub startup
372             {
373 4     4 1 1332 my $self = shift;
374              
375             # CQLv1 doesn't support LZ4
376 4         7 my $compression;
377 4 100       15 if( HAVE_LZ4 and $self->_version > 1 ) {
378             # Cassandra prepends 32bit BE integer of original size
379             $compression = [ lz4 =>
380 1     1   3 sub { my ( $data ) = @_; pack "N a*", length $data, Compress::LZ4::lz4_compress( $data ) },
  1         367  
381 1     1   5 sub { my ( $bodylen, $lz4data ) = unpack "N a*", $_[0]; Compress::LZ4::lz4_decompress( $lz4data, $bodylen ) },
  1         5  
382 1         8 ];
383             }
384             elsif( HAVE_SNAPPY ) {
385 3         16 $compression = [ snappy =>
386             \&Compress::Snappy::compress,
387             \&Compress::Snappy::decompress,
388             ];
389             }
390             # else undef
391              
392             my $f = $self->send_message( OPCODE_STARTUP, build_startup_frame( $self->_version,
393             options => {
394             CQL_VERSION => "3.0.5",
395             ( $compression ? ( COMPRESSION => $compression->[0] ) : () ),
396             } )
397             )->then( sub {
398 4     4   241 my ( $op, $response, $version ) = @_;
399              
400 4 100       16 if( $op == OPCODE_READY ) {
    50          
401 3         20 return Future->new->done;
402             }
403             elsif( $op == OPCODE_AUTHENTICATE ) {
404 1         9 return $self->_authenticate( parse_authenticate_frame( $version, $response ) );
405             }
406             else {
407 0         0 return $self->fail_all_and_close( "Expected OPCODE_READY or OPCODE_AUTHENTICATE" );
408             }
409 4 50       14 });
410              
411 4         371 $self->{compress} = $compression->[1];
412 4         10 $self->{decompress} = $compression->[2];
413              
414 4         16 return $f;
415             }
416              
417             sub _authenticate
418             {
419 1     1   99 my $self = shift;
420 1         2 my ( $authenticator ) = @_;
421              
422 1 50       13 if( $authenticator eq "org.apache.cassandra.auth.PasswordAuthenticator" ) {
423 1         3 foreach (qw( username password )) {
424 2 50       9 defined $self->{$_} or croak "Cannot authenticate by password without $_";
425             }
426              
427             $self->send_message( OPCODE_CREDENTIALS, build_credentials_frame( $self->_version,
428             credentials => {
429             username => $self->{username},
430             password => $self->{password},
431             } )
432             )->then( sub {
433 1     1   61 my ( $op, $response, $version ) = @_;
434 1 50       5 $op == OPCODE_READY or return $self->fail_all_and_close( "Expected OPCODE_READY" );
435              
436 1         5 return Future->new->done;
437 1         5 });
438             }
439             else {
440 0         0 return $self->fail_all_and_close( "Unrecognised authenticator $authenticator" );
441             }
442             }
443              
444             =head2 $conn->options ==> $options
445              
446             Requests the list of supported options from the server node. On success, the
447             returned Future yields a HASH reference mapping option names to ARRAY
448             references containing valid values.
449              
450             =cut
451              
452             sub options
453             {
454 1     1 1 617 my $self = shift;
455              
456             $self->send_message( OPCODE_OPTIONS,
457             Protocol::CassandraCQL::Frame->new
458             )->then( sub {
459 1     1   43 my ( $op, $response, $version ) = @_;
460 1 50       5 $op == OPCODE_SUPPORTED or return Future->new->fail( "Expected OPCODE_SUPPORTED" );
461              
462 1         8 my ( $opts ) = parse_supported_frame( $version, $response );
463 1         129 return Future->new->done( $opts );
464 1         5 });
465             }
466              
467             =head2 $conn->query( $cql, $consistency, %other_args ) ==> ( $type, $result )
468              
469             Performs a CQL query. On success, the values returned from the Future will
470             depend on the type of query.
471              
472             For C queries, the type is C and C<$result> is a string giving
473             the name of the new keyspace.
474              
475             For C, C and C queries, the type is C and
476             C<$result> is a 3-element ARRAY reference containing the type of change, the
477             keyspace and the table name.
478              
479             For C
480             L containing the returned row data.
481              
482             For other queries, such as C, C and C, the future
483             returns nothing.
484              
485             Any other arguments will be passed on to the underlying C
486             function of L.
487              
488             =cut
489              
490             sub query
491             {
492 1005     1005 1 74627 my $self = shift;
493 1005         1852 my ( $cql, $consistency, %other_args ) = @_;
494              
495             $self->send_message( OPCODE_QUERY, build_query_frame( $self->_version,
496             cql => $cql,
497             consistency => $consistency,
498             %other_args,
499             )
500             )->then( sub {
501 1005     1005   41806 my ( $op, $response, $version ) = @_;
502 1005 50       2023 $op == OPCODE_RESULT or return Future->new->fail( "Expected OPCODE_RESULT" );
503 1005         1876 return _decode_result( $version, $response );
504 1005         2101 });
505             }
506              
507             =head2 $conn->prepare( $cql ) ==> $query
508              
509             Prepares a CQL query for later execution. On success, the returned Future
510             yields an instance of L.
511              
512             =cut
513              
514             sub prepare
515             {
516 3     3 1 1007 my $self = shift;
517 3         7 my ( $cql, $cassandra ) = @_;
518              
519             $self->send_message( OPCODE_PREPARE, build_prepare_frame( $self->_version,
520             cql => $cql,
521             )
522             )->then( sub {
523 3     3   171 my ( $op, $response, $version ) = @_;
524 3 50       10 $op == OPCODE_RESULT or return Future->new->fail( "Expected OPCODE_RESULT" );
525              
526 3         18 my ( $type, $result ) = parse_result_frame( $version, $response );
527 3 50       1048 $type == RESULT_PREPARED or return Future->new->fail( "Expected RESULT_PREPARED" );
528              
529 3         8 my ( $id, $params_meta, $result_meta ) = @$result;
530              
531 3         31 my $query = Net::Async::CassandraCQL::Query->new(
532             cassandra => $cassandra,
533             cql => $cql,
534             id => $id,
535             params_meta => $params_meta,
536             result_meta => $result_meta, # v2+ only
537             );
538 3         16 return Future->new->done( $query );
539 3         13 });
540             }
541              
542             =head2 $conn->execute( $id, $data, $consistency, %other_args ) ==> ( $type, $result )
543              
544             Executes a previously-prepared statement, given its ID and the binding data.
545             On success, the returned Future will yield results of the same form as the
546             C method. C<$data> should contain a list of encoded byte-string values.
547             Any other arguments will be passed on to the underlying C
548             function of L.
549              
550             Normally this method is not directly required - instead, use the C
551             method on the query object itself, as this will encode the parameters
552             correctly.
553              
554             =cut
555              
556             sub execute
557             {
558 5     5 1 8 my $self = shift;
559 5         18 my ( $id, $data, $consistency, %other_args ) = @_;
560              
561             $self->send_message( OPCODE_EXECUTE, build_execute_frame( $self->_version,
562             id => $id,
563             values => $data,
564             consistency => $consistency,
565             %other_args,
566             )
567             )->then( sub {
568 5     5   251 my ( $op, $response, $version ) = @_;
569 5 50       20 $op == OPCODE_RESULT or return Future->new->fail( "Expected OPCODE_RESULT" );
570 5         16 return _decode_result( $version, $response );
571 5         17 });
572             }
573              
574             =head2 $conn->register( $events ) ==> ()
575              
576             Registers the connection's interest in receiving events of the types given in
577             the ARRAY reference. Event names may be C, C
578             or C. On success, the returned Future yields nothing.
579              
580             =cut
581              
582             sub register
583             {
584 2     2 1 446 my $self = shift;
585 2         4 my ( $events ) = @_;
586              
587             $self->send_message( OPCODE_REGISTER, build_register_frame( $self->_version,
588             events => $events,
589             )
590             )->then( sub {
591 1     1   64 my ( $op, $response, $version ) = @_;
592 1 50       3 $op == OPCODE_READY or Future->new->fail( "Expected OPCODE_READY" );
593              
594 1         7 return Future->new->done;
595 2         13 });
596             }
597              
598             =head2 $conn->close_when_idle ==> $conn
599              
600             If the connection is idle (has no outstanding queries), then it is closed
601             immediately. If not, it is put into close-pending mode, where it will accept
602             no more queries, and will close when the last pending one is complete.
603              
604             Returns a future which will eventually yield the (closed) connection when it
605             becomes closed.
606              
607             =cut
608              
609             sub close_when_idle
610             {
611 3     3 1 650 my $self = shift;
612              
613 3   33     24 $self->{cassandra_close_future} ||= do {
614 3         12 my $f = $self->loop->new_future;
615 3     3   2690 $f->on_done( sub { $_[0]->close } );
  3         155  
616              
617 3 100       40 $f->done( $self ) if !$self->_has_pending;
618              
619 3         304 $f
620             };
621             }
622              
623             =head1 SPONSORS
624              
625             This code was paid for by
626              
627             =over 2
628              
629             =item *
630              
631             Perceptyx L
632              
633             =item *
634              
635             Shadowcat Systems L
636              
637             =back
638              
639             =head1 AUTHOR
640              
641             Paul Evans
642              
643             =cut
644              
645             0x55AA;