File Coverage

blib/lib/Net/Async/CassandraCQL.pm
Criterion Covered Total %
statement 318 359 88.5
branch 76 128 59.3
condition 16 29 55.1
subroutine 56 65 86.1
pod 13 13 100.0
total 479 594 80.6


line stmt bran cond sub pod time code
1             # You may distribute under the terms of either the GNU General Public License
2             # or the Artistic License (the same terms as Perl itself)
3             #
4             # (C) Paul Evans, 2013-2014 -- leonerd@leonerd.org.uk
5              
6             package Net::Async::CassandraCQL;
7              
8 8     8   196942 use strict;
  8         16  
  8         307  
9 8     8   46 use warnings;
  8         17  
  8         231  
10 8     8   208 use 5.010;
  8         27  
  8         442  
11              
12             our $VERSION = '0.11';
13              
14 8     8   46 use base qw( IO::Async::Notifier );
  8         11  
  8         4463  
15              
16 8     8   19122 use Carp;
  8         19  
  8         617  
17              
18 8     8   7850 use Future::Utils qw( fmap_void try_repeat_until_success );
  8         18861  
  8         670  
19 8     8   62 use List::Util qw( shuffle );
  8         18  
  8         814  
20 8     8   45 use Scalar::Util qw( weaken );
  8         13  
  8         415  
21 8     8   3297 use Socket qw( inet_ntop getnameinfo AF_INET AF_INET6 NI_NUMERICHOST NIx_NOSERV );
  8         15911  
  8         1208  
22              
23 8     8   3883 use Protocol::CassandraCQL qw( CONSISTENCY_ONE );
  8         14299  
  8         630  
24              
25 8     8   3304 use Net::Async::CassandraCQL::Connection;
  8         21  
  8         264  
26              
27 8     8   71 use constant DEFAULT_CQL_PORT => 9042;
  8         13  
  8         567  
28              
29             # Time after which down nodes will be retried
30 8     8   37 use constant NODE_RETRY_TIME => 60;
  8         12  
  8         36647  
31              
32             =head1 NAME
33              
34             C - use Cassandra databases with L using CQL
35              
36             =head1 SYNOPSIS
37              
38             use IO::Async::Loop;
39             use Net::Async::CassandraCQL;
40             use Protocol::CassandraCQL qw( CONSISTENCY_QUORUM );
41              
42             my $loop = IO::Async::Loop->new;
43              
44             my $cass = Net::Async::CassandraCQL->new(
45             host => "localhost",
46             keyspace => "my-keyspace",
47             default_consistency => CONSISTENCY_QUORUM,
48             );
49             $loop->add( $cass );
50              
51              
52             $cass->connect->get;
53              
54              
55             my @f;
56             foreach my $number ( 1 .. 100 ) {
57             push @f, $cass->query( "INSERT INTO numbers (v) VALUES $number" );
58             }
59             Future->needs_all( @f )->get;
60              
61              
62             my $get_stmt = $cass->prepare( "SELECT v FROM numbers" )->get;
63              
64             my ( undef, $result ) = $get_stmt->execute( [] )->get;
65              
66             foreach my $row ( $result->rows_hash ) {
67             say "We have a number " . $row->{v};
68             }
69              
70             =head1 DESCRIPTION
71              
72             This module allows use of the C interface of a Cassandra database. It
73             fully supports asynchronous operation via L, allowing both direct
74             queries and prepared statements to be managed concurrently, if required.
75             Alternatively, as the interface is entirely based on L objects, it can
76             be operated synchronously in a blocking fashion by simply awaiting each
77             individual operation by calling the C method.
78              
79             It is based on L, which more completely documents the
80             behaviours and limits of its ability to communicate with Cassandra.
81              
82             =cut
83              
84             =head1 EVENTS
85              
86             =head2 on_node_up $nodeid
87              
88             =head2 on_node_down $nodeid
89              
90             The node's status has changed. C<$nodeid> is the node's IP address as a text
91             string.
92              
93             =head2 on_node_new $nodeid
94              
95             =head2 on_node_removed $nodeid
96              
97             A new node has been added to the cluster, or an existing node has been
98             decommissioned and removed.
99              
100             These four events are obtained from event watches on the actual node
101             connections and filtered to remove duplicates. The use of multiple primaries
102             should improve the reliability of notifications, though if multiple nodes fail
103             at or around the same time this may go unreported, as no node will ever report
104             its own failure.
105              
106             =cut
107              
108             =head1 PARAMETERS
109              
110             The following named parameters may be passed to C or C:
111              
112             =over 8
113              
114             =item host => STRING
115              
116             =item hosts => ARRAY of STRING
117              
118             The hostnames of Cassandra node to connect to initially. If more than one host
119             is provided in an array, they will be attempted sequentially until one
120             succeeds during the intial connect phase.
121              
122             =item service => STRING
123              
124             Optional. The service name or port number to connect to.
125              
126             =item username => STRING
127              
128             =item password => STRING
129              
130             Optional. Authentication details to use for C.
131              
132             =item keyspace => STRING
133              
134             Optional. If set, a C query will be issued as part of the
135             connect method.
136              
137             =item default_consistency => INT
138              
139             Optional. Default consistency level to use if none is provided to C or
140             C.
141              
142             =item primaries => INT
143              
144             Optional. The number of primary node connections to maintain. Defaults to 1 if
145             not specified.
146              
147             =item prefer_dc => STRING
148              
149             Optional. If set, prefer to pick primary nodes from the given data center,
150             only falling back on others if there are not enough available.
151              
152             =item cql_version => INT
153              
154             Optional. Version of the CQL wire protocol to negotiate during connection.
155             Defaults to 1.
156              
157             =back
158              
159             =cut
160              
161             sub _init
162             {
163 7     7   6797 my $self = shift;
164 7         19 my ( $params ) = @_;
165              
166 7   100     55 $params->{primaries} //= 1;
167              
168             # precache these weasels only once
169             $self->{on_topology_change_cb} = $self->_replace_weakself( sub {
170 2     2   1389 shift->_on_topology_change( @_ );
171 7         91 });
172             $self->{on_status_change_cb} = $self->_replace_weakself( sub {
173 3     3   17929 shift->_on_status_change( @_ );
174 7         168 });
175              
176 7         81 $self->{queries_by_cql} = {}; # {$cql} => [$query, $expire_timer_f]
177             # Can be in two states:
178             # $query is_weak; timer undef => normal user use
179             # $query non-weak; timer exists => due to expire soon
180              
181 7         22 $self->{cql_version} = 1;
182              
183 7         65 $self->SUPER::_init( $params );
184             }
185              
186             sub configure
187             {
188 11     11 1 3833 my $self = shift;
189 11         37 my %params = @_;
190              
191 11 100       47 if( defined $params{host} ) {
192 5   50     70 $params{hosts} ||= [ delete $params{host} ];
193             }
194              
195 11         38 foreach (qw( hosts service username password keyspace default_consistency
196             prefer_dc cql_version
197             on_node_up on_node_down on_node_new on_node_removed )) {
198 132 100       294 $self->{$_} = delete $params{$_} if exists $params{$_};
199             }
200              
201 11 100       43 if( exists $params{primaries} ) {
202 7         23 $self->{primaries} = delete $params{primaries};
203              
204             # TODO: connect more / drain old ones
205             }
206              
207 11         85 $self->SUPER::configure( %params );
208             }
209              
210             =head1 METHODS
211              
212             =cut
213              
214             # function
215             sub _inet_to_string
216             {
217 12     12   23 my ( $addr ) = @_;
218              
219 12         20 my $addrlen = length $addr;
220 12 0       36 my $family = $addrlen == 4 ? AF_INET :
    50          
221             $addrlen == 16 ? AF_INET6 :
222             die "Expected ADDRLEN 4 or 16";
223 12         102 return inet_ntop( $family, $addr );
224             }
225              
226             # function
227             sub _nodeid_to_string
228             {
229 5     5   10 my ( $node ) = @_;
230              
231 5         60 return ( getnameinfo( $node, NI_NUMERICHOST, NIx_NOSERV ) )[1];
232             }
233              
234             =head2 $str = $cass->quote( $str )
235              
236             Quotes a string argument suitable for inclusion in an immediate CQL query
237             string.
238              
239             In general, it is better to use a prepared query and pass the value as an
240             execute parameter though.
241              
242             =cut
243              
244             sub quote
245             {
246 3     3 1 398 my $self = shift;
247 3         7 my ( $str ) = @_;
248              
249             # CQL's 'quoting' handles any character except quote marks, which have to
250             # be doubled
251 3         10 $str =~ s/'/''/g;
252 3         22 return qq('$str');
253             }
254              
255             =head2 $str = $cass->quote_identifier( $str )
256              
257             Quotes an identifier name suitable for inclusion in a CQL query string.
258              
259             =cut
260              
261             sub quote_identifier
262             {
263 4     4 1 6 my $self = shift;
264 4         7 my ( $str ) = @_;
265              
266 4 100       18 return $str if $str =~ m/^[a-z_][a-z0-9_]+$/;
267              
268             # CQL's "quoting" handles any character except quote marks, which have to
269             # be doubled
270 3         6 $str =~ s/"/""/g;
271 3         14 return qq("$str");
272             }
273              
274             =head2 $cass->connect( %args ) ==> ()
275              
276             Connects to the Cassandra node and starts up the connection. The returned
277             Future will yield nothing on success.
278              
279             Takes the following named arguments:
280              
281             =over 8
282              
283             =item host => STRING
284              
285             =item hosts => ARRAY of STRING
286              
287             =item service => STRING
288              
289             =back
290              
291             A set of host names are required, either as a named argument or as a
292             configured value on the object. If the service name is missing, the default
293             CQL port will be used instead.
294              
295             =cut
296              
297             # ->_connect_node( $host, $service ) ==> $conn
298             # mocked during unit testing
299             sub _connect_node
300             {
301 0     0   0 my $self = shift;
302 0         0 my ( $host, $service ) = @_;
303              
304 0   0     0 $service //= $self->{service} // DEFAULT_CQL_PORT;
      0        
305              
306             my $conn = Net::Async::CassandraCQL::Connection->new(
307             on_closed => sub {
308 0     0   0 my $node = shift;
309 0         0 $self->remove_child( $node );
310 0         0 $self->_closed_node( $node->nodeid );
311             },
312 0         0 map { $_ => $self->{$_} } qw( username password cql_version ),
  0         0  
313             );
314 0         0 $self->add_child( $conn );
315              
316             $conn->connect(
317             host => $host,
318             service => $service,
319             )->on_fail( sub {
320             # Some kinds of failure have already removed it
321 0 0   0   0 $self->remove_child( $conn ) if $conn->parent;
322 0         0 });
323             }
324              
325             # invoked during unit testing
326             sub _closed_node
327             {
328 2     2   2313 my $self = shift;
329 2         4 my ( $nodeid ) = @_;
330              
331 2         17 my $now = time();
332              
333 2 50       10 $self->{nodes} or return;
334 2 50       10 my $node = $self->{nodes}{$nodeid} or return;
335              
336 2         4 undef $node->{conn};
337 2         5 undef $node->{ready_f};
338 2         7 $node->{down_time} = $now;
339              
340 2 50       8 if( exists $self->{primary_ids}{$nodeid} ) {
341 2         8 $self->debug_printf( "PRIMARY DOWN %s", $nodeid );
342 2         8 delete $self->{primary_ids}{$nodeid};
343              
344 2         8 $self->_pick_new_primary( $now );
345             }
346              
347 2 100       167 if( exists $self->{event_ids}{$nodeid} ) {
348 1         2 delete $self->{event_ids}{$nodeid};
349              
350 1         8 $self->_pick_new_eventwatch;
351             }
352             }
353              
354             sub _list_nodeids
355             {
356 7     7   15 my $self = shift;
357              
358 7         16 my $nodes = $self->{nodes};
359              
360 7         340 my @nodeids = shuffle keys %$nodes;
361 7 100       39 if( defined( my $dc = $self->{prefer_dc} ) ) {
362             # Put preferred ones first
363 12         29 @nodeids = ( ( grep { $nodes->{$_}{data_center} eq $dc } @nodeids ),
  12         26  
364 2         50 ( grep { $nodes->{$_}{data_center} ne $dc } @nodeids ) );
365             }
366              
367 7         187 return @nodeids;
368             }
369              
370             sub connect
371             {
372 6     6 1 133 my $self = shift;
373 6         13 my %args = @_;
374              
375 6         13 my $conn;
376              
377 0 50       0 my @hosts = $args{hosts} ? @{ $args{hosts} } :
  6         33  
378             defined $args{host} ? ( $args{host} ) :
379 6 50       34 @{ $self->{hosts} || [] };
    50          
380 6 50       27 @hosts or croak "Require initial hostnames to ->connect to";
381              
382             ( try_repeat_until_success {
383 7     7   1167 $self->_connect_node( $_[0], $args{service} )
384             } foreach => \@hosts )->then( sub {
385 6     6   2269 ( $conn ) = @_;
386 6         99 $self->_list_nodes( $conn );
387             })->then( sub {
388 5     5   863 my @nodes = @_;
389              
390 5         20 $self->{nodes} = \my %nodes;
391 5         16 foreach my $node ( @nodes ) {
392 16         73 my $n = $nodes{$node->{host}} = {
393             data_center => $node->{data_center},
394             rack => $node->{rack},
395             };
396              
397 16 100       54 if( $node->{host} eq $conn->nodeid ) {
398 5         37 $n->{conn} = $conn;
399             }
400             }
401              
402             # Initial primary on the seed
403 5         30 $self->{primary_ids} = {
404             $conn->nodeid => 1,
405             };
406 5         21 my $primary0 = $nodes{$conn->nodeid};
407 5         11 my $have_primaries = 1;
408              
409 5         10 my @conn_f;
410              
411 5         36 $self->debug_printf( "PRIMARY PICKED %s", $conn->nodeid );
412 5         50 push @conn_f, $primary0->{ready_f} = $self->_ready_node( $conn->nodeid );
413              
414 5         99 my @nodeids = $self->_list_nodeids;
415              
416 5   100     67 while( @nodeids and $have_primaries < $self->{primaries} ) {
417 6         12 my $primary = shift @nodeids;
418 6 100       22 next if $primary eq $conn->nodeid;
419              
420 4         15 push @conn_f, $self->_connect_new_primary( $primary );
421 4         362 $have_primaries++;
422             }
423              
424 5         68 $self->_pick_new_eventwatch;
425 4 100       163 $self->_pick_new_eventwatch if $have_primaries > 1;
426              
427 4 100       80 return $conn_f[0] if @conn_f == 1;
428 2         12 return Future->needs_all( @conn_f );
429 6         66 });
430             }
431              
432             sub _pick_new_primary
433             {
434 2     2   5 my $self = shift;
435 2         3 my ( $now ) = @_;
436              
437 2         3 my $nodes = $self->{nodes};
438              
439 2         4 my $new_primary;
440              
441             # Expire old down statuses and try to find a non-down node that is not yet
442             # primary
443 2         8 foreach my $nodeid ( $self->_list_nodeids ) {
444 8         14 my $node = $nodes->{$nodeid};
445              
446 8 50 66     30 delete $node->{down_time} if defined $node->{down_time} and $now - $node->{down_time} > NODE_RETRY_TIME;
447              
448 8 100       21 next if $self->{primary_ids}{$nodeid};
449              
450 6 100 66     27 $new_primary ||= $nodeid if !$node->{down_time};
451             }
452              
453 2 50       16 if( !defined $new_primary ) {
454 0         0 die "ARGH! TODO: can't find a new node to be primary\n";
455             }
456              
457 2         7 $self->_connect_new_primary( $new_primary );
458             }
459              
460             sub _connect_new_primary
461             {
462 7     7   15 my $self = shift;
463 7         12 my ( $new_primary ) = @_;
464              
465 7         23 $self->debug_printf( "PRIMARY PICKED %s", $new_primary );
466 7         32 $self->{primary_ids}{$new_primary} = 1;
467              
468 7         15 my $node = $self->{nodes}{$new_primary};
469              
470             my $f = $node->{ready_f} = $self->_connect_node( $new_primary )->then( sub {
471 7     7   3789 my ( $conn ) = @_;
472 7         17 $node->{conn} = $conn;
473              
474 7         21 $self->_ready_node( $new_primary )
475             })->on_fail( sub {
476 0     0   0 print STDERR "ARGH! NEW PRIMARY FAILED: @_\n";
477             })->on_done( sub {
478 7     7   395 $self->debug_printf( "PRIMARY UP %s", $new_primary );
479 7         31 });
480             }
481              
482             sub _ready_node
483             {
484 12     12   47 my $self = shift;
485 12         25 my ( $nodeid ) = @_;
486              
487 12 50       58 my $node = $self->{nodes}{$nodeid} or die "Don't have a node id $nodeid";
488 12 50       43 my $conn = $node->{conn} or die "Expected node to have a {conn} but it doesn't";
489              
490 12         22 my $keyspace = $self->{keyspace};
491              
492 12 50       60 my $keyspace_f =
493             $keyspace ? $conn->query( "USE " . $self->quote_identifier( $keyspace ), CONSISTENCY_ONE )
494             : Future->new->done;
495              
496             $keyspace_f->then( sub {
497 12     12   660 my $conn_f = Future->new->done( $conn );
498 12 50       395 return $conn_f unless my $queries_by_cql = $self->{queries_by_cql};
499 12 100       51 return $conn_f unless keys %$queries_by_cql;
500              
501             ( fmap_void {
502 1 50       53 my $query = shift->[0] or return Future->new->done;
503 1         5 $conn->prepare( $query->cql, $self );
504             } foreach => [ values %$queries_by_cql ] )
505 1         9 ->then( sub { $conn_f } );
  1         1632  
506 12         539 });
507             }
508              
509             sub _pick_new_eventwatch
510             {
511 8     8   15 my $self = shift;
512              
513 8         19 my @primaries = keys %{ $self->{primary_ids} };
  8         33  
514              
515             {
516 8         14 my $nodeid = $primaries[rand @primaries];
  8         26  
517 8 50       39 redo if $self->{event_ids}{$nodeid};
518              
519 8         25 $self->{event_ids}{$nodeid} = 1;
520              
521 8         18 my $node = $self->{nodes}{$nodeid};
522             $node->{ready_f}->on_done( sub {
523 8     8   122 my $conn = shift;
524 8         39 $conn->configure(
525             on_topology_change => $self->{on_topology_change_cb},
526             on_status_change => $self->{on_status_change_cb},
527             );
528             $conn->register( [qw( TOPOLOGY_CHANGE STATUS_CHANGE )] )
529             ->on_fail( sub {
530 0         0 delete $self->{event_ids}{$nodeid};
531 0         0 $self->_pick_new_eventwatch
532 8         471 });
533 8         51 });
534             }
535             }
536              
537             sub _on_topology_change
538             {
539 2     2   5 my $self = shift;
540 2         3 my ( $type, $addr ) = @_;
541 2         6 my $nodeid = _nodeid_to_string( $addr );
542              
543 2         5 my $nodes = $self->{nodes};
544              
545             # These updates can happen twice if there's two event connections but
546             # that's OK. Use the state to ensure printing only once
547              
548 2 100       10 if( $type eq "NEW_NODE" ) {
    50          
549 1 50       5 return if exists $nodes->{$nodeid};
550              
551 1         2 $nodes->{$nodeid} = {};
552              
553             my $f = $self->query_rows( "SELECT peer, data_center, rack FROM system.peers WHERE peer = " . $self->quote( $nodeid ), CONSISTENCY_ONE )
554             ->on_done( sub {
555 1     1   92 my ( $result ) = @_;
556 1         4 my $node = $result->row_hash( 0 );
557 1         43 $node->{host} = _inet_to_string( delete $node->{peer} );
558              
559 1         4 %{$nodes->{$nodeid}} = %$node;
  1         3  
560              
561 1         20 $self->debug_printf( "NEW_NODE {%s}", $nodeid );
562 1         6 $self->maybe_invoke_event( on_node_new => $nodeid );
563 1         5 });
564              
565             # Intentional cycle
566 1     1   60 $f->on_ready( sub { undef $f } );
  1         28  
567             }
568             elsif( $type eq "REMOVED_NODE" ) {
569 1 50       5 return if !exists $nodes->{$nodeid};
570              
571 1         4 delete $nodes->{$nodeid};
572 1         4 $self->debug_printf( "REMOVED_NODE {%s}", $nodeid );
573 1         5 $self->maybe_invoke_event( on_node_removed => $nodeid );
574             }
575             }
576              
577             sub _on_status_change
578             {
579 3     3   5 my $self = shift;
580 3         6 my ( $status, $addr ) = @_;
581 3         11 my $nodeid = _nodeid_to_string( $addr );
582              
583 3         8 my $nodes = $self->{nodes};
584 3 50       14 my $node = $nodes->{$nodeid} or return;
585              
586             # These updates can happen twice if there's two event connections but
587             # that's OK. Use the state to ensure printing only once
588              
589 3 100       23 if( $status eq "DOWN" ) {
    50          
590 1 50       4 return if exists $node->{down_time};
591              
592 1         4 $self->debug_printf( "STATUS DOWN on {%s}", $nodeid );
593 1         11 $self->maybe_invoke_event( on_node_down => $nodeid );
594              
595 1         26 $node->{down_time} = time();
596             }
597             elsif( $status eq "UP" ) {
598 2 50       11 return if !exists $node->{down_time};
599              
600 2         12 $self->debug_printf( "STATUS UP on {%s}", $nodeid );
601 2         15 $self->maybe_invoke_event( on_node_up => $nodeid );
602              
603 2         37 delete $node->{down_time};
604              
605 2 100       13 return unless defined( my $dc = $self->{prefer_dc} );
606 1 50       5 return unless $node->{data_center} eq $dc;
607 1 50       5 return if $node->{conn};
608              
609             # A node in a preferred data center is now up, and we don't already have
610             # a connection to it
611              
612 1         1 my $old_nodeid;
613 1   66     2 $nodes->{$_}{data_center} ne $dc and $old_nodeid = $_, last for keys %{ $self->{primary_ids} };
  1         13  
614              
615 1 50       4 return unless defined $old_nodeid;
616              
617             # We do have a connection to a non-preferred node, so lets switch it
618              
619 1         5 $self->_connect_new_primary( $nodeid );
620              
621             # Don't pick it for new nodes
622 1         16 $self->debug_printf( "PRIMARY SWITCH %s -> %s", $old_nodeid, $nodeid );
623 1         4 delete $self->{primary_ids}{$old_nodeid};
624              
625             # Close it when it's empty
626 1         10 $nodes->{$old_nodeid}{conn}->close_when_idle;
627             }
628             }
629              
630             =head2 $cass->close_when_idle ==> $cass
631              
632             Stops accepting new queries and prepares all the existing connections to be
633             closed once every outstanding query has been responded to. Returns a future
634             that will eventually yield the CassandraCQL object, when all the connections
635             are closed.
636              
637             After calling this method it will be an error to invoke C, C,
638             C or the various other methods derived from them.
639              
640             =cut
641              
642             sub close_when_idle
643             {
644 1     1 1 16700 my $self = shift;
645              
646 1         3 my $nodes = $self->{nodes};
647              
648             # remove 'nodes' to avoid reconnect logic
649 1         2 undef $self->{nodes};
650 1         4 undef $self->{primary_ids};
651              
652 1         3 my @f;
653 1         4 foreach my $node ( values %$nodes ) {
654 2 100       10 next unless my $conn = $node->{conn};
655 1         10 push @f, $conn->close_when_idle;
656             }
657              
658             return Future->wait_all( @f )->then( sub {
659 1     1   147 return Future->new->done( $self );
660 1         7 });
661             }
662              
663             =head2 $cass->close_now
664              
665             Immediately closes all node connections and shuts down the object. Any
666             outstanding or queued queries will immediately fail. Consider this as a "last
667             resort" failure shutdown, as compared to the graceful draining behaviour of
668             C.
669              
670             =cut
671              
672             sub close_now
673             {
674 0     0 1 0 my $self = shift;
675              
676 0         0 my $nodes = $self->{nodes};
677              
678             # remove 'nodes' to avoid reconnect logic
679 0         0 undef $self->{nodes};
680 0         0 undef $self->{primary_ids};
681              
682 0         0 foreach my $node ( values %$nodes ) {
683 0 0       0 next unless my $conn = $node->{conn};
684 0         0 $conn->close_now;
685             }
686             }
687              
688             sub _get_a_node
689             {
690 19     19   26 my $self = shift;
691              
692 19 50       68 my $nodes = $self->{nodes} or die "No available nodes";
693              
694             # TODO: Other sorting strategies;
695             # e.g. fewest outstanding queries, least accumulated time recently
696 19         22 my @nodeids;
697             {
698 19   100     23 my $next = $self->{next_primary} // 0;
  19         78  
699 19 50       25 @nodeids = keys %{ $self->{primary_ids} } or die "ARGH: $self -> _get_a_node called with no defined primaries";
  19         89  
700              
701             # Rotate to the next in sequence
702 19         84 @nodeids = ( @nodeids[$next..$#nodeids], @nodeids[0..$next-1] );
703 19         38 ( $next += 1 ) %= @nodeids;
704              
705 19         29 my $next_ready = $next;
706             # Skip non-ready ones
707 19         78 while( not $nodes->{$nodeids[0]}->{ready_f}->is_ready ) {
708 3         15 push @nodeids, shift @nodeids;
709 3         5 ( $next_ready += 1 ) %= @nodeids;
710 3 50       13 last if $next_ready == $next; # none were ready - just use the next
711             }
712 19         117 $self->{next_primary} = $next_ready;
713             }
714              
715 19 50       60 if( my $node = $nodes->{ $nodeids[0] } ) {
716 19         140 return $node->{ready_f};
717             }
718              
719 0         0 die "ARGH: don't have a primary node";
720             }
721              
722             =head2 $cass->query( $cql, $consistency, %other_args ) ==> ( $type, $result )
723              
724             Performs a CQL query. On success, the values returned from the Future will
725             depend on the type of query.
726              
727             For C queries, the type is C and C<$result> is a string giving
728             the name of the new keyspace.
729              
730             For C, C and C queries, the type is C and
731             C<$result> is a 3-element ARRAY reference containing the type of change, the
732             keyspace and the table name.
733              
734             For C
735             L containing the returned row data.
736              
737             For other queries, such as C, C and C, the future
738             returns nothing.
739              
740             C<%other_args> may be any of the following, when using C 2 or
741             above:
742              
743             =over 8
744              
745             =item skip_metadata => BOOL
746              
747             Requests the server does not include result metadata in the response. It will
748             be up to the caller to provide this, via C on the returned
749             Result object, before it can be used.
750              
751             =item page_size => INT
752              
753             Requests that the server returns at most the given number of rows. If any
754             further remain, the result object will include the C field. This
755             can be passed in another C call to obtain the next set of data.
756              
757             =item paging_state => INT
758              
759             Requests that the server continues a paged request from this position, given
760             in a previous response.
761              
762             =item serial_consistency => INT
763              
764             Sets the consistency level for serial operations in the query. Must be one of
765             C or C.
766              
767             =back
768              
769             =cut
770              
771             sub _debug_wrap_result
772             {
773 19     19   590 my ( $op, $self, $f ) = @_;
774              
775             $f->on_ready( sub {
776 0     0   0 my $f = shift;
777 0 0       0 if( $f->failure ) {
    0          
778 0         0 $self->debug_printf( "$op => FAIL %s", scalar $f->failure );
779             }
780             elsif( my ( $type, $result ) = $f->get ) {
781 0 0       0 if( $type eq "rows" ) {
    0          
782 0         0 $result = sprintf "%d x %d columns", $result->rows, $result->columns;
783             }
784             elsif( $type eq "schema_change" ) {
785 0         0 $result = sprintf "%s %s", $result->[0], join ".", @{$result}[1..$#$result];
  0         0  
786             }
787 0         0 $self->debug_printf( "$op => %s %s", uc $type, $result );
788             }
789             else {
790 0         0 $self->debug_printf( "$op => VOID" );
791             }
792 19 50       56 }) if $IO::Async::Notifier::DEBUG;
793              
794 19         129 return $f;
795             }
796              
797             sub query
798             {
799 15     15 1 6883 my $self = shift;
800 15         31 my ( $cql, $consistency, %other_args ) = @_;
801              
802 15   33     37 $consistency //= $self->{default_consistency};
803 15 50       61 defined $consistency or croak "'query' needs a consistency level";
804              
805             _debug_wrap_result QUERY => $self, $self->_get_a_node->then( sub {
806 15     15   722 my $node = shift;
807 15         63 $self->debug_printf( "QUERY on {%s}: %s", $node->nodeid, $cql );
808 15         84 $node->query( $cql, $consistency, %other_args );
809 15         46 });
810             }
811              
812             =head2 $cass->query_rows( $cql, $consistency, %other_args ) ==> $result
813              
814             A shortcut wrapper for C which expects a C result and returns it
815             directly. Any other result is treated as an error. The returned Future returns
816             a C directly
817              
818             =cut
819              
820             sub query_rows
821             {
822 1     1 1 2 my $self = shift;
823              
824             $self->query( @_ )->then( sub {
825 1     1   1655 my ( $type, $result ) = @_;
826 1 50       5 $type eq "rows" or Future->new->fail( "Expected 'rows' result" );
827 1         5 Future->new->done( $result );
828 1         6 });
829             }
830              
831             =head2 $cass->prepare( $cql ) ==> $query
832              
833             Prepares a CQL query for later execution. On success, the returned Future
834             yields an instance of a prepared query object (see below).
835              
836             Query objects stored internally cached by the CQL string; subsequent calls to
837             C with the same exact CQL string will yield the same object
838             immediately, saving a roundtrip.
839              
840             =cut
841              
842             sub prepare
843             {
844 5     5 1 2331 my $self = shift;
845 5         11 my ( $cql ) = @_;
846              
847 5 50       28 my $nodes = $self->{nodes} or die "No available nodes";
848              
849 5         13 my $queries_by_cql = $self->{queries_by_cql};
850              
851 5 100       19 if( my $q = $queries_by_cql->{$cql} ) {
852 2         6 my $query = $q->[0];
853 2 100       7 if( $q->[1] ) {
854 1         11 $q->[1]->cancel;
855 1         62 undef $q->[1];
856 1         5 weaken( $q->[0] );
857             }
858 2         10 return Future->new->done( $query );
859             }
860              
861 3         26 $self->debug_printf( "PREPARE %s", $cql );
862              
863 3         10 my @prepare_f = map {
864 3         13 my $node = $nodes->{$_}{conn};
865 3         19 $node->prepare( $cql, $self )
866 3         12 } keys %{ $self->{primary_ids} };
867              
868             Future->needs_all( @prepare_f )->then( sub {
869 3     3   451 my ( $query ) = @_;
870             # Ignore the other objects; they'll all have the same ID anyway
871              
872 3         17 $self->debug_printf( "PREPARE => [%s]", unpack "H*", $query->id );
873              
874 3         17 my $q = $queries_by_cql->{$cql} = [ $query, undef ];
875 3         11 weaken( $q->[0] );
876              
877 3         11 Future->new->done( $query );
878 3         180 });
879             }
880              
881             sub _expire_query
882             {
883 3     3   6 my $self = shift;
884 3         4 my ( $cql ) = @_;
885              
886 3         8 my $queries_by_cql = $self->{queries_by_cql};
887 3 50       12 my $q = $queries_by_cql->{$cql} or return;
888              
889 3         7 my $query = $q->[0]; undef $q->[0]; $q->[0] = $query; # unweaken
  3         6  
  3         4  
890              
891             $q->[1] = $self->loop->delay_future( after => 60 )
892             ->on_done( sub {
893             # Remove the {cassandra} element from the query so it doesn't
894             # re-register itself for expiry when it is DESTROYed again
895 1     1   161 undef $q->[0]{cassandra};
896 1         3 delete $queries_by_cql->{$cql};
897 3         14 });
898             }
899              
900             =head2 $cass->execute( $query, $data, $consistency, %other_args ) ==> ( $type, $result )
901              
902             Executes a previously-prepared statement, given the binding data. On success,
903             the returned Future will yield results of the same form as the C
904             method. C<$data> should contain a list of encoded byte-string values.
905              
906             Normally this method is not directly required - instead, use the C
907             method on the query object itself, as this will encode the parameters
908             correctly.
909              
910             C<%other_args> may be as for the C method.
911              
912             =cut
913              
914             sub execute
915             {
916 4     4 1 875 my $self = shift;
917 4         16 my ( $query, $data, $consistency, %other_args ) = @_;
918              
919 4   33     16 $consistency //= $self->{default_consistency};
920 4 50       12 defined $consistency or croak "'execute' needs a consistency level";
921              
922             _debug_wrap_result EXECUTE => $self, $self->_get_a_node->then( sub {
923 4     4   288 my $node = shift;
924 4         18 $self->debug_printf( "EXECUTE on {%s}: %s [%s]", $node->nodeid, $query->cql, unpack "H*", $query->id );
925 4         24 $node->execute( $query->id, $data, $consistency, %other_args );
926 4         31 });
927             }
928              
929             =head1 CONVENIENT WRAPPERS
930              
931             The following wrapper methods all wrap the basic C operation.
932              
933             =cut
934              
935             =head2 $cass->schema_keyspaces ==> $result
936              
937             A shortcut to a C
938             result object listing all the keyspaces.
939              
940             Exact details of the returned columns will depend on the Cassandra version,
941             but the result should at least be keyed by the first column, called
942             C.
943              
944             my $keyspaces = $result->rowmap_hash( "keyspace_name" )
945              
946             =cut
947              
948             sub schema_keyspaces
949             {
950 0     0 1 0 my $self = shift;
951              
952 0         0 $self->query_rows(
953             "SELECT * FROM system.schema_keyspaces",
954             CONSISTENCY_ONE
955             );
956             }
957              
958             =head2 $cass->schema_columnfamilies( $keyspace ) ==> $result
959              
960             A shortcut to a C
961             returns a result object listing all the columnfamilies of the given keyspace.
962              
963             Exact details of the returned columns will depend on the Cassandra version,
964             but the result should at least be keyed by the first column, called
965             C.
966              
967             my $columnfamilies = $result->rowmap_hash( "columnfamily_name" )
968              
969             =cut
970              
971             sub schema_columnfamilies
972             {
973 0     0 1 0 my $self = shift;
974 0         0 my ( $keyspace ) = @_;
975              
976 0         0 $self->query_rows(
977             "SELECT * FROM system.schema_columnfamilies WHERE keyspace_name = " . $self->quote( $keyspace ),
978             CONSISTENCY_ONE
979             );
980             }
981              
982             =head2 $cass->schema_columns( $keyspace, $columnfamily ) ==> $result
983              
984             A shortcut to a C
985             result object listing all the columns of the given columnfamily.
986              
987             Exact details of the returned columns will depend on the Cassandra version,
988             but the result should at least be keyed by the first column, called
989             C.
990              
991             my $columns = $result->rowmap_hash( "column_name" )
992              
993             =cut
994              
995             sub schema_columns
996             {
997 0     0 1 0 my $self = shift;
998 0         0 my ( $keyspace, $columnfamily ) = @_;
999              
1000 0         0 $self->query_rows(
1001             "SELECT * FROM system.schema_columns WHERE keyspace_name = " . $self->quote( $keyspace ) . " AND columnfamily_name = " . $self->quote( $columnfamily ),
1002             CONSISTENCY_ONE,
1003             );
1004             }
1005              
1006             sub _list_nodes
1007             {
1008 6     6   14 my $self = shift;
1009 6         12 my ( $conn ) = @_;
1010              
1011             # The system.peers table doesn't include the node we actually connect to.
1012             # So we'll have to look up its own information from system.local and add
1013             # the socket address manually.
1014             Future->needs_all(
1015             $conn->query( "SELECT data_center, rack FROM system.local", CONSISTENCY_ONE )
1016             ->then( sub {
1017 5     5   7225 my ( $type, $result ) = @_;
1018 5 50       28 $type eq "rows" or Future->new->fail( "Expected 'rows' result" );
1019 5         28 my $local = $result->row_hash( 0 );
1020 5         209 $local->{host} = $conn->nodeid;
1021 5         40 Future->new->done( $local );
1022             }),
1023             $conn->query( "SELECT peer, data_center, rack FROM system.peers", CONSISTENCY_ONE )
1024             ->then( sub {
1025 5     5   3272 my ( $type, $result ) = @_;
1026 5 50       29 $type eq "rows" or Future->new->fail( "Expected 'rows' result" );
1027 5         31 my @nodes = $result->rows_hash;
1028 5         438 foreach my $node ( @nodes ) {
1029 11         35 $node->{host} = _inet_to_string( delete $node->{peer} );
1030             }
1031 5         22 Future->new->done( @nodes );
1032 6         40 }),
1033             )
1034             }
1035              
1036             =head1 TODO
1037              
1038             =over 8
1039              
1040             =item *
1041              
1042             Allow other load-balancing strategies than roundrobin.
1043              
1044             =item *
1045              
1046             Adjust connected primary nodes when changing C parameter.
1047              
1048             =item *
1049              
1050             Allow backup nodes, for faster connection failover.
1051              
1052             =item *
1053              
1054             Support LZ4 compression when using CQL version 2.
1055              
1056             This is blocked on RT #92825
1057              
1058             =back
1059              
1060             =cut
1061              
1062             =head1 SPONSORS
1063              
1064             This code was paid for by
1065              
1066             =over 2
1067              
1068             =item *
1069              
1070             Perceptyx L
1071              
1072             =item *
1073              
1074             Shadowcat Systems L
1075              
1076             =back
1077              
1078             =head1 AUTHOR
1079              
1080             Paul Evans
1081              
1082             =cut
1083              
1084             0x55AA;