File Coverage

blib/lib/Net/Async/CassandraCQL/Connection.pm
Criterion Covered Total %
statement 207 227 91.1
branch 61 84 72.6
condition 14 29 48.2
subroutine 43 48 89.5
pod 13 15 86.6
total 338 403 83.8


line stmt bran cond sub pod time code
1             # You may distribute under the terms of either the GNU General Public License
2             # or the Artistic License (the same terms as Perl itself)
3             #
4             # (C) Paul Evans, 2013-2014 -- leonerd@leonerd.org.uk
5              
6             package Net::Async::CassandraCQL::Connection;
7              
8 13     13   592594 use strict;
  13         26  
  13         536  
9 13     13   64 use warnings;
  13         21  
  13         350  
10 13     13   358 use 5.010;
  13         39  
  13         840  
11              
12             our $VERSION = '0.12';
13              
14 13     13   115 use base qw( IO::Async::Stream );
  13         19  
  13         6191  
15             IO::Async::Stream->VERSION( '0.59' );
16              
17 13     13   690187 use Carp;
  13         23  
  13         1221  
18              
19 13     13   75 use Future 0.13;
  13         375  
  13         516  
20              
21 13     13   64 use constant HAVE_SNAPPY => eval { require Compress::Snappy };
  13         19  
  13         21  
  13         8096  
22 13     13   17416 use constant HAVE_LZ4 => eval { require Compress::LZ4 };
  13         28  
  13         23  
  13         8146  
23              
24             # Max streams 127, because of streamid packed in the native protocol as C.
25             # Version 3 of the protocol changes that, and this can be raised
26 13     13   8049 use constant MAX_STREAMS => 127;
  13         104  
  13         708  
27              
28 13         4556 use Protocol::CassandraCQL qw(
29             :opcodes :results :consistencies FLAG_COMPRESS
30             build_frame parse_frame
31 13     13   4385 );
  13         23832  
32 13     13   8233 use Protocol::CassandraCQL::Frame;
  13         24399  
  13         589  
33 13     13   8749 use Protocol::CassandraCQL::Frames 0.10 qw( :all );
  13         209621  
  13         3178  
34              
35 13     13   28158 use Net::Async::CassandraCQL::Query;
  13         36  
  13         53482  
36              
37             # Ensure that IO::Async definitely uses this for Iv6 connections as we need
38             # the ->peerhost method
39             require IO::Socket::IP;
40              
41             =head1 NAME
42              
43             C - connect to a single Cassandra database node
44              
45             =head1 DESCRIPTION
46              
47             TODO
48              
49             =cut
50              
51             =head1 EVENTS
52              
53             =head2 on_event $name, @args
54              
55             A registered event occurred. C<@args> will depend on the event name. Each
56             is also available as its own event, with the name in lowercase. If the event
57             is not one of the types recognised below, C<@args> will contain the actual
58             L object.
59              
60             =head2 on_topology_change $type, $node
61              
62             The cluster topology has changed. C<$node> is a packed socket address.
63              
64             =head2 on_status_change $status, $node
65              
66             The node's status has changed. C<$node> is a packed socket address.
67              
68             =head2 on_schema_change $type, $keyspace, $table
69              
70             A keyspace or table schema has changed.
71              
72             =cut
73              
74             =head1 PARAMETERS
75              
76             The following named parameters may be passed to C or C:
77              
78             =over 8
79              
80             =item username => STRING
81              
82             =item password => STRING
83              
84             Optional. Authentication details to use for C.
85              
86             =item cql_version => INT
87              
88             Optional. Version of the CQL wire protocol to negotiate during connection.
89             Defaults to 1.
90              
91             =back
92              
93             =cut
94              
95             sub _init
96             {
97 19     19   22659 my $self = shift;
98 19         156 $self->SUPER::_init( @_ );
99              
100 19         471 $self->{streams} = []; # map [1 .. 127] to Future
101 19         53 $self->{pending} = []; # queue of [$opcode, $frame, $f]
102 19         53 $self->{cql_version} = 1;
103             }
104              
105             sub configure
106             {
107 34     34 1 3783 my $self = shift;
108 34         103 my %params = @_;
109              
110 34         97 foreach (qw( username password cql_version
111             on_event on_topology_change on_status_change on_schema_change )) {
112 238 100       547 $self->{$_} = delete $params{$_} if exists $params{$_};
113             }
114              
115 34         296 $self->SUPER::configure( %params );
116             }
117              
118             =head1 METHODS
119              
120             =cut
121              
122             =head2 $id = $conn->nodeid
123              
124             Returns the connection's node ID (the string form of its IP address), which is
125             used as its ID in the C table.
126              
127             =cut
128              
129             sub nodeid
130             {
131 79     79 1 580 my $self = shift;
132 79         419 $self->{nodeid};
133             }
134              
135             sub _version
136             {
137 4108     4108   3217 my $self = shift;
138 4108         8934 return $self->{cql_version};
139             }
140              
141             # function
142             sub _decode_result
143             {
144 1010     1010   993 my ( $version, $response ) = @_;
145              
146 1010         1990 my ( $type, $result ) = parse_result_frame( $version, $response );
147              
148 1010 100       10112 if( $type == RESULT_VOID ) {
149 1005         1937 return Future->new->done();
150             }
151             else {
152 5 100       17 if ( $type == RESULT_ROWS ) { $type = "rows" }
  3 100       5  
    50          
153 1         2 elsif( $type == RESULT_SET_KEYSPACE ) { $type = "keyspace" }
154 1         1 elsif( $type == RESULT_SCHEMA_CHANGE ) { $type = "schema_change" }
155 5         17 return Future->new->done( $type => $result );
156             }
157             }
158              
159             =head2 $conn->connect( %args ) ==> $conn
160              
161             Connects to the Cassandra node an send the C message. The
162             returned Future will yield the connection itself on success.
163              
164             Takes the following named arguments:
165              
166             =over 8
167              
168             =item host => STRING
169              
170             =item service => STRING
171              
172             =back
173              
174             =cut
175              
176             sub connect
177             {
178 0     0 1 0 my $self = shift;
179 0         0 my %args = @_;
180              
181 0   0     0 $args{socktype} ||= "stream";
182              
183             return ( $self->{connect_f} ||=
184 0     0   0 $self->SUPER::connect( %args )->on_fail( sub { undef $self->{connect_f} } ) )
185             ->then( sub {
186 0     0   0 $self->{nodeid} = $self->read_handle->peerhost;
187 0         0 $self->startup
188 0   0 0   0 })->then( sub { Future->new->done( $self ) });
  0         0  
189             }
190              
191             sub _has_pending
192             {
193 4     4   6 my $self = shift;
194 4   100     6 defined and return 1 for @{ $self->{streams} };
  4         23  
195 3         24 return 0;
196             }
197              
198             sub on_read
199             {
200 1029     1029 1 154668 my $self = shift;
201 1029         967 my ( $buffref, $eof ) = @_;
202              
203 1029 50       1992 my ( $version, $flags, $streamid, $opcode, $body ) = parse_frame( $$buffref ) or return 0;
204              
205 1029 50       10767 $version & 0x80 or
206             $self->fail_all_and_close( "Expected response to have RESPONSE bit set" ), return;
207 1029         860 $version &= 0x7f;
208              
209             # Test version <= for now in case of "unsupported protocol version" error messages, and
210             # test it exactly later
211 1029 50       1379 $version <= $self->_version or
212             $self->fail_all_and_close( sprintf "Unexpected message version %#02x\n", $version ), return;
213              
214 1029 100 66     1929 if( $flags & FLAG_COMPRESS and my $decompress = $self->{decompress} ) {
215 2         2 $flags &= ~FLAG_COMPRESS;
216 2         8 $body = $decompress->( $body );
217             }
218              
219 1029 50       1439 $flags == 0 or
220             $self->fail_all_and_close( sprintf "Unexpected message flags %#02x\n", $flags ), return;
221              
222 1029         2367 my $frame = Protocol::CassandraCQL::Frame->new( $body );
223              
224 1029 100 33     6438 if( my $f = $self->{streams}[$streamid] ) {
    50 33        
    50          
225              
226 1028 100       1480 if ($opcode == OPCODE_AUTHENTICATE){
227             # Authenticates *do* get to run their futures ahead of anything in Pending,
228 1         2 undef $self->{streams}[$streamid];
229             }
230              
231 1028 100       1147 if( $opcode == OPCODE_ERROR ) {
232 1         5 my ( $err, $message ) = parse_error_frame( $version, $frame );
233 1         86 $f->fail( "OPCODE_ERROR: $message\n", $err, $frame );
234             }
235             else {
236 1027 50       1426 $version == $self->_version or
237             $self->fail_all_and_close( sprintf "Unexpected message version %#02x\n", $version ), return;
238              
239 1027         2020 $f->done( $opcode, $frame, $version );
240             }
241              
242             # Undefined after running $f->done, so that $f doesn't get to jump the queue ahead of pending requests
243             # NB: Moving this above $f->done would mean protecting pending against assuming this streamid is still free
244 1028 100       88577 unless ($opcode == OPCODE_AUTHENTICATE){
245 1027         1406 undef $self->{streams}[$streamid];
246             }
247              
248             # streamid may have been re-populated by a new send_message to AUTHENTICATE
249 1028 100 100     1981 if( !$self->{streams}[$streamid] and my $next = shift @{ $self->{pending} } ) {
  1027 100 66     3220  
250 873         1137 my ( $opcode, $frame, $f ) = @$next;
251 873         1344 $self->_send( $opcode, $streamid, $frame, $f );
252             }
253             elsif( my $close_f = $self->{cassandra_close_future} and !$self->_has_pending ) {
254 1         3 $close_f->done( $self );
255             }
256             }
257             elsif( $streamid == 0 and $opcode == OPCODE_ERROR ) {
258 0         0 my ( $err, $message ) = parse_error_frame( $version, $frame );
259 0         0 $self->fail_all_and_close( "OPCODE_ERROR: $message\n", $err, $frame );
260             }
261             elsif( $streamid == 0xff and $opcode == OPCODE_EVENT ) {
262 1         4 $self->_event( $frame );
263             }
264             else {
265 0         0 print STDERR "Received a message opcode=$opcode for unknown stream $streamid\n";
266             }
267              
268 1029         4213 return 1;
269             }
270              
271             sub _event
272             {
273 1     1   3 my $self = shift;
274 1         2 my ( $frame ) = @_;
275              
276 1         3 my ( $name, @args ) = parse_event_frame( $self->_version, $frame );
277              
278 1 50       142 $self->maybe_invoke_event( "on_".lc($name), @args )
279             or $self->maybe_invoke_event( on_event => $name, @args );
280             }
281              
282             sub on_closed
283             {
284 2     2 1 158 my $self = shift;
285 2         14 $self->fail_all( "Connection closed" );
286             }
287              
288             sub fail_all
289             {
290 2     2 0 4 my $self = shift;
291 2         5 my ( $failure ) = @_;
292              
293 2         5 foreach ( @{ $self->{streams} } ) {
  2         9  
294 0 0       0 $_->fail( $failure ) if $_;
295             }
296 2         4 @{ $self->{streams} } = ();
  2         5  
297              
298 2         4 foreach ( @{ $self->{pending} } ) {
  2         5  
299 0         0 $_->[2]->fail( $failure );
300             }
301 2         85 @{ $self->{pending} } = ();
  2         12  
302             }
303              
304             sub fail_all_and_close
305             {
306 0     0 0 0 my $self = shift;
307 0         0 my ( $failure ) = @_;
308              
309 0         0 $self->fail_all( $failure );
310              
311 0         0 $self->close;
312              
313 0         0 return Future->new->fail( $failure );
314             }
315              
316             =head2 $conn->send_message( $opcode, $frame ) ==> ( $reply_opcode, $reply_frame, $reply_version )
317              
318             Sends a message with the given opcode and L for
319             the message body. The returned Future will yield the response opcode, frame
320             and version number (with the RESPONSE bit masked off).
321              
322             This is a low-level method; applications should instead use one of the wrapper
323             methods below.
324              
325             =cut
326              
327             sub send_message
328             {
329 1028     1028 1 36997 my $self = shift;
330 1028         957 my ( $opcode, $frame ) = @_;
331              
332 1028 50       1817 croak "Cannot ->send_message when in close-pending state" if $self->{cassandra_close_future};
333              
334 1028         2020 my $f = $self->loop->new_future;
335              
336 1028   50     21809 my $streams = $self->{streams} ||= [];
337 1028         767 my $id;
338 1028         1722 foreach ( 1 .. $#$streams ) {
339 118894 100 50     151407 $id = $_ and last if !defined $streams->[$_];
340             }
341              
342 1028 100       1717 if( !defined $id ) {
343 1007 100       1518 if( $#$streams == MAX_STREAMS ) {
344 873         619 push @{ $self->{pending} }, [ $opcode, $frame, $f ];
  873         1806  
345 873         5053 return $f;
346             }
347 134         140 $id = @$streams;
348 134 100       184 $id = 1 if !$id; # can't use 0
349             }
350              
351 155         259 $self->_send( $opcode, $id, $frame, $f );
352              
353 155         900 return $f;
354             }
355              
356             sub _send
357             {
358 1028     1028   851 my $self = shift;
359 1028         953 my ( $opcode, $id, $frame, $f ) = @_;
360              
361 1028         898 my $flags = 0;
362 1028         2017 my $body = $frame->bytes;
363              
364 1028 100       3955 if( my $compress = $self->{compress} ) {
365 12         118 my $body_compressed = $compress->( $body );
366 12 100       34 if( length $body_compressed < length $body ) {
367 2         3 $flags |= FLAG_COMPRESS;
368 2         7 $body = $body_compressed;
369             }
370             }
371              
372 1028         1461 $self->write( build_frame( $self->_version, $flags, $id, $opcode, $body ) );
373              
374 1028         41298 $self->{streams}[$id] = $f;
375             }
376              
377             =head2 $conn->startup ==> ()
378              
379             Sends the initial connection setup message. On success, the returned Future
380             yields nothing.
381              
382             Normally this is not required as the C method performs it implicitly.
383              
384             =cut
385              
386             sub startup
387             {
388 4     4 1 1284 my $self = shift;
389              
390             # CQLv1 doesn't support LZ4
391 4         6 my $compression;
392 4 100       10 if( HAVE_LZ4 and $self->_version > 1 ) {
393             # Cassandra prepends 32bit BE integer of original size
394             $compression = [ lz4 =>
395 1     1   2 sub { my ( $data ) = @_; pack "N a*", length $data, Compress::LZ4::lz4_compress( $data ) },
  1         213  
396 1     1   3 sub { my ( $bodylen, $lz4data ) = unpack "N a*", $_[0]; Compress::LZ4::lz4_decompress( $lz4data, $bodylen ) },
  1         5  
397 1         8 ];
398             }
399             elsif( HAVE_SNAPPY ) {
400 3         10 $compression = [ snappy =>
401             \&Compress::Snappy::compress,
402             \&Compress::Snappy::decompress,
403             ];
404             }
405             # else undef
406              
407             my $f = $self->send_message( OPCODE_STARTUP, build_startup_frame( $self->_version,
408             options => {
409             CQL_VERSION => "3.0.5",
410             ( $compression ? ( COMPRESSION => $compression->[0] ) : () ),
411             } )
412             )->then( sub {
413 4     4   208 my ( $op, $response, $version ) = @_;
414              
415 4 100       11 if( $op == OPCODE_READY ) {
    50          
416 3         19 return Future->new->done;
417             }
418             elsif( $op == OPCODE_AUTHENTICATE ) {
419 1         4 return $self->_authenticate( parse_authenticate_frame( $version, $response ) );
420             }
421             else {
422 0         0 return $self->fail_all_and_close( "Expected OPCODE_READY or OPCODE_AUTHENTICATE" );
423             }
424 4 50       10 });
425              
426 4         322 $self->{compress} = $compression->[1];
427 4         11 $self->{decompress} = $compression->[2];
428              
429 4         11 return $f;
430             }
431              
432             sub _authenticate
433             {
434 1     1   71 my $self = shift;
435 1         2 my ( $authenticator ) = @_;
436              
437 1 50       7 if( $authenticator eq "org.apache.cassandra.auth.PasswordAuthenticator" ) {
438 1         2 foreach (qw( username password )) {
439 2 50       6 defined $self->{$_} or croak "Cannot authenticate by password without $_";
440             }
441              
442             $self->send_message( OPCODE_CREDENTIALS, build_credentials_frame( $self->_version,
443             credentials => {
444             username => $self->{username},
445             password => $self->{password},
446             } )
447             )->then( sub {
448 1     1   32 my ( $op, $response, $version ) = @_;
449 1 50       3 $op == OPCODE_READY or return $self->fail_all_and_close( "Expected OPCODE_READY" );
450              
451 1         3 return Future->new->done;
452 1         3 });
453             }
454             else {
455 0         0 return $self->fail_all_and_close( "Unrecognised authenticator $authenticator" );
456             }
457             }
458              
459             =head2 $conn->options ==> $options
460              
461             Requests the list of supported options from the server node. On success, the
462             returned Future yields a HASH reference mapping option names to ARRAY
463             references containing valid values.
464              
465             =cut
466              
467             sub options
468             {
469 1     1 1 429 my $self = shift;
470              
471             $self->send_message( OPCODE_OPTIONS,
472             Protocol::CassandraCQL::Frame->new
473             )->then( sub {
474 1     1   36 my ( $op, $response, $version ) = @_;
475 1 50       4 $op == OPCODE_SUPPORTED or return Future->new->fail( "Expected OPCODE_SUPPORTED" );
476              
477 1         6 my ( $opts ) = parse_supported_frame( $version, $response );
478 1         87 return Future->new->done( $opts );
479 1         5 });
480             }
481              
482             =head2 $conn->query( $cql, $consistency, %other_args ) ==> ( $type, $result )
483              
484             Performs a CQL query. On success, the values returned from the Future will
485             depend on the type of query.
486              
487             For C queries, the type is C and C<$result> is a string giving
488             the name of the new keyspace.
489              
490             For C, C and C queries, the type is C and
491             C<$result> is a 3-element ARRAY reference containing the type of change, the
492             keyspace and the table name.
493              
494             For C
495             L containing the returned row data.
496              
497             For other queries, such as C, C and C, the future
498             returns nothing.
499              
500             Any other arguments will be passed on to the underlying C
501             function of L.
502              
503             =cut
504              
505             sub query
506             {
507 1005     1005 1 47200 my $self = shift;
508 1005         1277 my ( $cql, $consistency, %other_args ) = @_;
509              
510             $self->send_message( OPCODE_QUERY, build_query_frame( $self->_version,
511             cql => $cql,
512             consistency => $consistency,
513             %other_args,
514             )
515             )->then( sub {
516 1005     1005   32506 my ( $op, $response, $version ) = @_;
517 1005 50       1535 $op == OPCODE_RESULT or return Future->new->fail( "Expected OPCODE_RESULT" );
518 1005         1264 return _decode_result( $version, $response );
519 1005         1419 });
520             }
521              
522             =head2 $conn->prepare( $cql ) ==> $query
523              
524             Prepares a CQL query for later execution. On success, the returned Future
525             yields an instance of L.
526              
527             =cut
528              
529             sub prepare
530             {
531 3     3 1 549 my $self = shift;
532 3         6 my ( $cql, $cassandra ) = @_;
533              
534             $self->send_message( OPCODE_PREPARE, build_prepare_frame( $self->_version,
535             cql => $cql,
536             )
537             )->then( sub {
538 3     3   148 my ( $op, $response, $version ) = @_;
539 3 50       10 $op == OPCODE_RESULT or return Future->new->fail( "Expected OPCODE_RESULT" );
540              
541 3         12 my ( $type, $result ) = parse_result_frame( $version, $response );
542 3 50       733 $type == RESULT_PREPARED or return Future->new->fail( "Expected RESULT_PREPARED" );
543              
544 3         7 my ( $id, $params_meta, $result_meta ) = @$result;
545              
546 3         27 my $query = Net::Async::CassandraCQL::Query->new(
547             cassandra => $cassandra,
548             cql => $cql,
549             id => $id,
550             params_meta => $params_meta,
551             result_meta => $result_meta, # v2+ only
552             );
553 3         20 return Future->new->done( $query );
554 3         11 });
555             }
556              
557             =head2 $conn->execute( $id, $data, $consistency, %other_args ) ==> ( $type, $result )
558              
559             Executes a previously-prepared statement, given its ID and the binding data.
560             On success, the returned Future will yield results of the same form as the
561             C method. C<$data> should contain a list of encoded byte-string values.
562             Any other arguments will be passed on to the underlying C
563             function of L.
564              
565             Normally this method is not directly required - instead, use the C
566             method on the query object itself, as this will encode the parameters
567             correctly.
568              
569             =cut
570              
571             sub execute
572             {
573 5     5 1 6 my $self = shift;
574 5         12 my ( $id, $data, $consistency, %other_args ) = @_;
575              
576             $self->send_message( OPCODE_EXECUTE, build_execute_frame( $self->_version,
577             id => $id,
578             values => $data,
579             consistency => $consistency,
580             %other_args,
581             )
582             )->then( sub {
583 5     5   222 my ( $op, $response, $version ) = @_;
584 5 50       17 $op == OPCODE_RESULT or return Future->new->fail( "Expected OPCODE_RESULT" );
585 5         11 return _decode_result( $version, $response );
586 5         11 });
587             }
588              
589             =head2 $conn->register( $events ) ==> ()
590              
591             Registers the connection's interest in receiving events of the types given in
592             the ARRAY reference. Event names may be C, C
593             or C. On success, the returned Future yields nothing.
594              
595             =cut
596              
597             sub register
598             {
599 1     1 1 383 my $self = shift;
600 1         1 my ( $events ) = @_;
601              
602             $self->send_message( OPCODE_REGISTER, build_register_frame( $self->_version,
603             events => $events,
604             )
605             )->then( sub {
606 1     1   71 my ( $op, $response, $version ) = @_;
607 1 50       6 $op == OPCODE_READY or Future->new->fail( "Expected OPCODE_READY" );
608              
609 1         19 return Future->new->done;
610 1         5 });
611             }
612              
613             =head2 $conn->close_when_idle ==> $conn
614              
615             If the connection is idle (has no outstanding queries), then it is closed
616             immediately. If not, it is put into close-pending mode, where it will accept
617             no more queries, and will close when the last pending one is complete.
618              
619             Returns a future which will eventually yield the (closed) connection when it
620             becomes closed.
621              
622             =cut
623              
624             sub close_when_idle
625             {
626 3     3 1 496 my $self = shift;
627              
628 3   33     19 $self->{cassandra_close_future} ||= do {
629 3         12 my $f = $self->loop->new_future;
630 3     3   2725 $f->on_done( sub { $_[0]->close } );
  3         155  
631              
632 3 100       72 $f->done( $self ) if !$self->_has_pending;
633              
634 3         315 $f
635             };
636             }
637              
638             =head1 SPONSORS
639              
640             This code was paid for by
641              
642             =over 2
643              
644             =item *
645              
646             Perceptyx L
647              
648             =item *
649              
650             Shadowcat Systems L
651              
652             =back
653              
654             =head1 AUTHOR
655              
656             Paul Evans
657              
658             =cut
659              
660             0x55AA;