File Coverage

blib/lib/Net/Async/CassandraCQL.pm
Criterion Covered Total %
statement 337 383 87.9
branch 78 138 56.5
condition 18 38 47.3
subroutine 60 70 85.7
pod 13 13 100.0
total 506 642 78.8


line stmt bran cond sub pod time code
1             # You may distribute under the terms of either the GNU General Public License
2             # or the Artistic License (the same terms as Perl itself)
3             #
4             # (C) Paul Evans, 2013-2014 -- leonerd@leonerd.org.uk
5              
6             package Net::Async::CassandraCQL;
7              
8 8     8   332410 use strict;
  8         16  
  8         438  
9 8     8   47 use warnings;
  8         11  
  8         290  
10 8     8   225 use 5.010;
  8         25  
  8         465  
11              
12             our $VERSION = '0.12';
13              
14 8     8   52 use base qw( IO::Async::Notifier );
  8         12  
  8         5051  
15              
16 8     8   111705 use Carp;
  8         23  
  8         695  
17              
18 8     8   5054 use Devel::Refcount qw/ refcount /;
  8         18145  
  8         629  
19 8     8   9246 use Future::Utils qw( fmap_void try_repeat_until_success );
  8         18609  
  8         781  
20 8     8   73 use List::Util qw( shuffle );
  8         16  
  8         918  
21 8     8   53 use Scalar::Util qw( weaken );
  8         15  
  8         454  
22 8     8   2475 use Socket qw( inet_ntop getnameinfo AF_INET AF_INET6 NI_NUMERICHOST NIx_NOSERV );
  8         25974  
  8         1177  
23              
24 8     8   2513 use Protocol::CassandraCQL qw( CONSISTENCY_ONE );
  8         14819  
  8         1314  
25              
26 8     8   2838 use Net::Async::CassandraCQL::Connection;
  8         22  
  8         359  
27              
28 8     8   50 use constant DEFAULT_CQL_PORT => 9042;
  8         19  
  8         730  
29              
30             # Time after which down nodes will be retried
31 8     8   43 use constant NODE_RETRY_TIME => 60;
  8         18  
  8         389  
32              
33             # How long after a query is last used to keep it alive on a pacemaker
34 8     8   40 use constant QUERY_TTL => 60;
  8         14  
  8         578  
35             # How often the pacemaker checks the query isn't the only thing keeping the whole stack alive
36 8     8   44 use constant QUERY_PACEMAKER_INTERVAL => 0.5;
  8         19  
  8         40986  
37              
38             =head1 NAME
39              
40             C - use Cassandra databases with L using CQL
41              
42             =head1 SYNOPSIS
43              
44             use IO::Async::Loop;
45             use Net::Async::CassandraCQL;
46             use Protocol::CassandraCQL qw( CONSISTENCY_QUORUM );
47              
48             my $loop = IO::Async::Loop->new;
49              
50             my $cass = Net::Async::CassandraCQL->new(
51             host => "localhost",
52             keyspace => "my_keyspace",
53             default_consistency => CONSISTENCY_QUORUM,
54             );
55             $loop->add( $cass );
56              
57              
58             $cass->connect->get;
59              
60              
61             my @f;
62             foreach my $number ( 1 .. 100 ) {
63             push @f, $cass->query( "INSERT INTO numbers (v) VALUES ($number)" );
64             }
65             Future->needs_all( @f )->get;
66              
67              
68             my $get_stmt = $cass->prepare( "SELECT v FROM numbers" )->get;
69              
70             my ( undef, $result ) = $get_stmt->execute( [] )->get;
71              
72             foreach my $row ( $result->rows_hash ) {
73             say "We have a number " . $row->{v};
74             }
75              
76             =head1 DESCRIPTION
77              
78             This module allows use of the C interface of a Cassandra database. It
79             fully supports asynchronous operation via L, allowing both direct
80             queries and prepared statements to be managed concurrently, if required.
81             Alternatively, as the interface is entirely based on L objects, it can
82             be operated synchronously in a blocking fashion by simply awaiting each
83             individual operation by calling the C method.
84              
85             It is based on L, which more completely documents the
86             behaviours and limits of its ability to communicate with Cassandra.
87              
88             =cut
89              
90             =head1 EVENTS
91              
92             =head2 on_node_up $nodeid
93              
94             =head2 on_node_down $nodeid
95              
96             The node's status has changed. C<$nodeid> is the node's IP address as a text
97             string.
98              
99             =head2 on_node_new $nodeid
100              
101             =head2 on_node_removed $nodeid
102              
103             A new node has been added to the cluster, or an existing node has been
104             decommissioned and removed.
105              
106             These four events are obtained from event watches on the actual node
107             connections and filtered to remove duplicates. The use of multiple primaries
108             should improve the reliability of notifications, though if multiple nodes fail
109             at or around the same time this may go unreported, as no node will ever report
110             its own failure.
111              
112             =cut
113              
114             =head1 PARAMETERS
115              
116             The following named parameters may be passed to C or C:
117              
118             =over 8
119              
120             =item host => STRING
121              
122             =item hosts => ARRAY of STRING
123              
124             The hostnames of Cassandra node to connect to initially. If more than one host
125             is provided in an array, they will be attempted sequentially until one
126             succeeds during the intial connect phase.
127              
128             If a host contains :$service, then the service parameter will be overidden for this host.
129              
130             =item service => STRING
131              
132             Optional. The service name or port number to connect to.
133              
134             =item username => STRING
135              
136             =item password => STRING
137              
138             Optional. Authentication details to use for C.
139              
140             =item keyspace => STRING
141              
142             Optional. If set, a C query will be issued as part of the
143             connect method.
144              
145             =item default_consistency => INT
146              
147             Optional. Default consistency level to use if none is provided to C or
148             C.
149              
150             =item primaries => INT
151              
152             Optional. The number of primary node connections to maintain. Defaults to 1 if
153             not specified.
154              
155             =item prefer_dc => STRING
156              
157             Optional. If set, prefer to pick primary nodes from the given data center,
158             only falling back on others if there are not enough available.
159              
160             =item cql_version => INT
161              
162             Optional. Version of the CQL wire protocol to negotiate during connection.
163             Defaults to 1.
164              
165             =back
166              
167             =cut
168              
169             sub _init
170             {
171 7     7   4984 my $self = shift;
172 7         23 my ( $params ) = @_;
173              
174 7   100     72 $params->{primaries} //= 1;
175              
176             # precache these weasels only once
177             $self->{on_topology_change_cb} = $self->_replace_weakself( sub {
178 2     2   1265 shift->_on_topology_change( @_ );
179 7         103 });
180             $self->{on_status_change_cb} = $self->_replace_weakself( sub {
181 3     3   21635 shift->_on_status_change( @_ );
182 7         228 });
183              
184 7         214 $self->{queries_by_cql} = {}; # {$cql} => {query => $query, pacemaker => $expire_timer_f, ttl => Int]
185             # Can be in two states:
186             # $query is_weak; timer undef => normal user use
187             # $query non-weak; timer exists => due to expire soon
188              
189 7         23 $self->{cql_version} = 1;
190              
191 7         66 $self->SUPER::_init( $params );
192             }
193              
194             sub configure
195             {
196 11     11 1 3570 my $self = shift;
197 11         48 my %params = @_;
198              
199 11 100       60 if( defined $params{host} ) {
200 5   50     88 $params{hosts} ||= [ delete $params{host} ];
201             }
202              
203 11         51 foreach (qw( hosts service username password keyspace default_consistency
204             prefer_dc cql_version
205             on_node_up on_node_down on_node_new on_node_removed )) {
206 132 100       366 $self->{$_} = delete $params{$_} if exists $params{$_};
207             }
208              
209 11 100       63 if( exists $params{primaries} ) {
210 7         28 $self->{primaries} = delete $params{primaries};
211              
212             # TODO: connect more / drain old ones
213             }
214              
215 11         112 $self->SUPER::configure( %params );
216             }
217              
218             =head1 METHODS
219              
220             =cut
221              
222             # function
223             sub _inet_to_string
224             {
225 12     12   25 my ( $addr ) = @_;
226              
227 12         23 my $addrlen = length $addr;
228 12 0       37 my $family = $addrlen == 4 ? AF_INET :
    50          
229             $addrlen == 16 ? AF_INET6 :
230             die "Expected ADDRLEN 4 or 16";
231 12         123 return inet_ntop( $family, $addr );
232             }
233              
234             # function
235             sub _nodeid_to_string
236             {
237 5     5   10 my ( $node ) = @_;
238              
239 5         689 return ( getnameinfo( $node, NI_NUMERICHOST, NIx_NOSERV ) )[1];
240             }
241              
242             =head2 $str = $cass->quote( $str )
243              
244             Quotes a string argument suitable for inclusion in an immediate CQL query
245             string.
246              
247             In general, it is better to use a prepared query and pass the value as an
248             execute parameter though.
249              
250             =cut
251              
252             sub quote
253             {
254 3     3 1 548 my $self = shift;
255 3         9 my ( $str ) = @_;
256              
257             # CQL's 'quoting' handles any character except quote marks, which have to
258             # be doubled
259 3         13 $str =~ s/'/''/g;
260 3         24 return qq('$str');
261             }
262              
263             =head2 $str = $cass->quote_identifier( $str )
264              
265             Quotes an identifier name suitable for inclusion in a CQL query string.
266              
267             =cut
268              
269             sub quote_identifier
270             {
271 4     4 1 11 my $self = shift;
272 4         8 my ( $str ) = @_;
273              
274 4 100       30 return $str if $str =~ m/^[a-z_][a-z0-9_]+$/;
275              
276             # CQL's "quoting" handles any character except quote marks, which have to
277             # be doubled
278 3         9 $str =~ s/"/""/g;
279 3         21 return qq("$str");
280             }
281              
282             =head2 $cass->connect( %args ) ==> ()
283              
284             Connects to the Cassandra node and starts up the connection. The returned
285             Future will yield nothing on success.
286              
287             Takes the following named arguments:
288              
289             =over 8
290              
291             =item host => STRING
292              
293             =item hosts => ARRAY of STRING
294              
295             =item service => STRING
296              
297             =back
298              
299             A set of host names are required, either as a named argument or as a
300             configured value on the object. If the service name is missing, the default
301             CQL port will be used instead. If a host contains a :$service, that overrides
302             both the service passed in, and the service on this object.
303              
304             =cut
305              
306             # ->_connect_node( $host, $service ) ==> $conn
307             # mocked during unit testing
308             sub _connect_node
309             {
310 0     0   0 my $self = shift;
311 0         0 my ( $host, $service ) = @_;
312              
313 0 0       0 if ($host =~ s/:(\d+)$//){
314 0         0 $service = $1;
315             }
316              
317 0   0     0 $service //= $self->{service} // DEFAULT_CQL_PORT;
      0        
318              
319             my $conn = Net::Async::CassandraCQL::Connection->new(
320             on_closed => sub {
321 0     0   0 my $node = shift;
322 0         0 $self->remove_child( $node );
323 0         0 $self->_closed_node( $node->nodeid );
324             # Node close will fail the queries, which must happen after we've chosen a new node
325 0         0 $node->on_closed;
326             },
327 0         0 map { $_ => $self->{$_} } qw( username password cql_version ),
  0         0  
328             );
329 0         0 $self->add_child( $conn );
330              
331             $conn->connect(
332             host => $host,
333             service => $service,
334             )->on_fail( sub {
335             # Some kinds of failure have already removed it
336 0 0   0   0 $self->remove_child( $conn ) if $conn->parent;
337 0         0 });
338             }
339              
340             # invoked during unit testing
341             sub _closed_node
342             {
343 2     2   1987 my $self = shift;
344 2         5 my ( $nodeid ) = @_;
345              
346 2         17 my $now = time();
347              
348 2 50       11 $self->{nodes} or return;
349 2 50       12 my $node = $self->{nodes}{$nodeid} or return;
350              
351 2         5 undef $node->{conn};
352 2         5 undef $node->{ready_f};
353 2         7 $node->{down_time} = $now;
354              
355 2 50       10 if( exists $self->{primary_ids}{$nodeid} ) {
356 2         9 $self->debug_printf( "PRIMARY DOWN %s", $nodeid );
357 2         9 delete $self->{primary_ids}{$nodeid};
358              
359 2         8 $self->_pick_new_primary( $now );
360             }
361              
362 2 50       207 if( exists $self->{event_ids}{$nodeid} ) {
363 2         6 delete $self->{event_ids}{$nodeid};
364              
365 2         7 $self->_pick_new_eventwatch;
366             }
367             }
368              
369             sub _list_nodeids
370             {
371 7     7   14 my $self = shift;
372              
373 7         15 my $nodes = $self->{nodes};
374              
375 7         568 my @nodeids = shuffle keys %$nodes;
376 7 100       42 if( defined( my $dc = $self->{prefer_dc} ) ) {
377             # Put preferred ones first
378 12         34 @nodeids = ( ( grep { $nodes->{$_}{data_center} eq $dc } @nodeids ),
  12         31  
379 2         19 ( grep { $nodes->{$_}{data_center} ne $dc } @nodeids ) );
380             }
381              
382 7         31 return @nodeids;
383             }
384              
385             sub connect
386             {
387 6     6 1 186 my $self = shift;
388 6         19 my %args = @_;
389              
390 6         37 my $conn;
391              
392 0 50       0 my @hosts = $args{hosts} ? @{ $args{hosts} } :
  6         68  
393             defined $args{host} ? ( $args{host} ) :
394 6 50       47 @{ $self->{hosts} || [] };
    50          
395 6 50       28 @hosts or croak "Require initial hostnames to ->connect to";
396              
397             ( try_repeat_until_success {
398 7     7   1613 $self->_connect_node( $_[0], $args{service} )
399             } foreach => \@hosts )->then( sub {
400 6     6   3093 ( $conn ) = @_;
401 6         37 $self->_list_nodes( $conn );
402             })->then( sub {
403 5     5   1036 my @nodes = @_;
404              
405 5         25 $self->{nodes} = \my %nodes;
406 5         17 foreach my $node ( @nodes ) {
407 16         81 my $n = $nodes{$node->{host}} = {
408             data_center => $node->{data_center},
409             rack => $node->{rack},
410             };
411              
412 16 100       68 if( $node->{host} eq $conn->nodeid ) {
413 5         25 $n->{conn} = $conn;
414             }
415             }
416              
417             # Initial primary on the seed
418 5         30 $self->{primary_ids} = {
419             $conn->nodeid => 1,
420             };
421 5         25 my $primary0 = $nodes{$conn->nodeid};
422 5         15 my $have_primaries = 1;
423              
424 5         9 my @conn_f;
425              
426 5         20 $self->debug_printf( "PRIMARY PICKED %s", $conn->nodeid );
427 5         60 push @conn_f, $primary0->{ready_f} = $self->_ready_node( $conn->nodeid );
428              
429 5         119 my @nodeids = $self->_list_nodeids;
430              
431 5   66     59 while( @nodeids and $have_primaries < $self->{primaries} ) {
432 4         29 my $primary = shift @nodeids;
433 4 50       37 next if $primary eq $conn->nodeid;
434              
435 4         38 push @conn_f, $self->_connect_new_primary( $primary );
436 4         486 $have_primaries++;
437             }
438              
439 5         26 $self->_pick_new_eventwatch;
440 5 100       799 $self->_pick_new_eventwatch if $have_primaries > 1;
441              
442 5 100       251 return $conn_f[0] if @conn_f == 1;
443 2         16 return Future->needs_all( @conn_f );
444 6         74 });
445             }
446              
447             sub _pick_new_primary
448             {
449 2     2   5 my $self = shift;
450 2         4 my ( $now ) = @_;
451              
452 2         4 my $nodes = $self->{nodes};
453              
454 2         2 my $new_primary;
455              
456             # Expire old down statuses and try to find a non-down node that is not yet
457             # primary
458 2         13 foreach my $nodeid ( $self->_list_nodeids ) {
459 8         13 my $node = $nodes->{$nodeid};
460              
461 8 50 66     31 delete $node->{down_time} if defined $node->{down_time} and $now - $node->{down_time} > NODE_RETRY_TIME;
462              
463 8 100       20 next if $self->{primary_ids}{$nodeid};
464              
465 6 100 66     29 $new_primary ||= $nodeid if !$node->{down_time};
466             }
467              
468 2 50       9 if( !defined $new_primary ) {
469 0         0 die "ARGH! TODO: can't find a new node to be primary\n";
470             }
471              
472 2         8 $self->_connect_new_primary( $new_primary );
473             }
474              
475             sub _connect_new_primary
476             {
477 7     7   14 my $self = shift;
478 7         18 my ( $new_primary ) = @_;
479              
480 7         26 $self->debug_printf( "PRIMARY PICKED %s", $new_primary );
481 7         31 $self->{primary_ids}{$new_primary} = 1;
482              
483 7         16 my $node = $self->{nodes}{$new_primary};
484              
485             my $f = $node->{ready_f} = $self->_connect_node( $new_primary )->then( sub {
486 7     7   4112 my ( $conn ) = @_;
487 7         25 $node->{conn} = $conn;
488              
489 7         28 $self->_ready_node( $new_primary )
490             })->on_fail( sub {
491 0     0   0 print STDERR "ARGH! NEW PRIMARY FAILED: @_\n";
492             })->on_done( sub {
493 7     7   558 $self->debug_printf( "PRIMARY UP %s", $new_primary );
494 7         31 });
495             }
496              
497             sub _ready_node
498             {
499 12     12   26 my $self = shift;
500 12         21 my ( $nodeid ) = @_;
501              
502 12 50       69 my $node = $self->{nodes}{$nodeid} or die "Don't have a node id $nodeid";
503 12 50       234 my $conn = $node->{conn} or die "Expected node to have a {conn} but it doesn't";
504              
505 12         33 my $keyspace = $self->{keyspace};
506              
507 12 50       131 my $keyspace_f =
508             $keyspace ? $conn->query( "USE " . $self->quote_identifier( $keyspace ), CONSISTENCY_ONE )
509             : Future->new->done;
510              
511             $keyspace_f->then( sub {
512 12     12   1013 my $conn_f = Future->new->done( $conn );
513 12 50       448 return $conn_f unless my $queries_by_cql = $self->{queries_by_cql};
514 12 100       64 return $conn_f unless keys %$queries_by_cql;
515              
516             ( fmap_void {
517 1 50       46 my $query = shift->{query} or return Future->new->done;
518 1         4 $conn->prepare( $query->cql, $self );
519             } foreach => [ values %$queries_by_cql ] )
520 1         7 ->then( sub { $conn_f } );
  1         1182  
521 12         686 });
522             }
523              
524             sub _pick_new_eventwatch
525             {
526 9     9   21 my $self = shift;
527              
528             # Only the primaries which haven't been watched already
529 9         14 my @primaries = grep { !$self->{event_ids}{ $_ } } keys %{ $self->{primary_ids} };
  19         68  
  9         47  
530              
531             # If there aren't any futues left to try, don't create an infinite loop
532             # Don't expect this to happen, but just in case
533 9 50       68 return Future->fail('No primary node available to eventwatch') unless @primaries;
534              
535 9         41 my $nodeid = $primaries[int rand @primaries];
536              
537 9         21 $self->{event_ids}{$nodeid} = 1;
538              
539 9 50       38 my $node = $self->{nodes}{$nodeid} or die "Expected node: '$nodeid' to exist";
540              
541             return $node->{ready_f} = $node->{ready_f}->then( sub {
542 9     9   461 my $conn = shift;
543 9         56 $conn->configure(
544             on_topology_change => $self->{on_topology_change_cb},
545             on_status_change => $self->{on_status_change_cb},
546             );
547             $conn->register( [qw( TOPOLOGY_CHANGE STATUS_CHANGE )] )
548             ->then_done($conn)
549             ->else( sub {
550 0         0 delete $self->{event_ids}{$nodeid};
551             # Return the new attempt to eventwatch a differnt primary
552 0         0 return $self->_pick_new_eventwatch;
553 9         677 });
554 9         76 });
555             }
556              
557             sub _on_topology_change
558             {
559 2     2   5 my $self = shift;
560 2         5 my ( $type, $addr ) = @_;
561 2         7 my $nodeid = _nodeid_to_string( $addr );
562              
563 2         6 my $nodes = $self->{nodes};
564              
565             # These updates can happen twice if there's two event connections but
566             # that's OK. Use the state to ensure printing only once
567              
568 2 100       14 if( $type eq "NEW_NODE" ) {
    50          
569 1 50       6 return if exists $nodes->{$nodeid};
570              
571 1         4 $nodes->{$nodeid} = {};
572              
573             my $f = $self->query_rows( "SELECT peer, data_center, rack FROM system.peers WHERE peer = " . $self->quote( $nodeid ), CONSISTENCY_ONE )
574             ->on_done( sub {
575 1     1   110 my ( $result ) = @_;
576 1         7 my $node = $result->row_hash( 0 );
577 1         53 $node->{host} = _inet_to_string( delete $node->{peer} );
578              
579 1         5 %{$nodes->{$nodeid}} = %$node;
  1         4  
580              
581 1         6 $self->debug_printf( "NEW_NODE {%s}", $nodeid );
582 1         7 $self->maybe_invoke_event( on_node_new => $nodeid );
583 1         7 });
584              
585             # Intentional cycle
586 1     1   106 $f->on_ready( sub { undef $f } );
  1         35  
587             }
588             elsif( $type eq "REMOVED_NODE" ) {
589 1 50       6 return if !exists $nodes->{$nodeid};
590              
591 1         4 delete $nodes->{$nodeid};
592 1         4 $self->debug_printf( "REMOVED_NODE {%s}", $nodeid );
593 1         6 $self->maybe_invoke_event( on_node_removed => $nodeid );
594             }
595             }
596              
597             sub _on_status_change
598             {
599 3     3   7 my $self = shift;
600 3         6 my ( $status, $addr ) = @_;
601 3         11 my $nodeid = _nodeid_to_string( $addr );
602              
603 3         15 my $nodes = $self->{nodes};
604 3 50       23 my $node = $nodes->{$nodeid} or return;
605              
606             # These updates can happen twice if there's two event connections but
607             # that's OK. Use the state to ensure printing only once
608              
609 3 100       22 if( $status eq "DOWN" ) {
    50          
610 1 50       5 return if exists $node->{down_time};
611              
612 1         8 $self->debug_printf( "STATUS DOWN on {%s}", $nodeid );
613 1         14 $self->maybe_invoke_event( on_node_down => $nodeid );
614              
615 1         62 $node->{down_time} = time();
616             }
617             elsif( $status eq "UP" ) {
618 2 50       11 return if !exists $node->{down_time};
619              
620 2         13 $self->debug_printf( "STATUS UP on {%s}", $nodeid );
621 2         20 $self->maybe_invoke_event( on_node_up => $nodeid );
622              
623 2         45 delete $node->{down_time};
624              
625 2 100       15 return unless defined( my $dc = $self->{prefer_dc} );
626 1 50       5 return unless $node->{data_center} eq $dc;
627 1 50       4 return if $node->{conn};
628              
629             # A node in a preferred data center is now up, and we don't already have
630             # a connection to it
631              
632 1         3 my $old_nodeid;
633 1   66     3 $nodes->{$_}{data_center} ne $dc and $old_nodeid = $_, last for keys %{ $self->{primary_ids} };
  1         20  
634              
635 1 50       5 return unless defined $old_nodeid;
636              
637             # We do have a connection to a non-preferred node, so lets switch it
638              
639 1         5 $self->_connect_new_primary( $nodeid );
640              
641             # Don't pick it for new nodes
642 1         15 $self->debug_printf( "PRIMARY SWITCH %s -> %s", $old_nodeid, $nodeid );
643 1         4 delete $self->{primary_ids}{$old_nodeid};
644              
645             # Close it when it's empty
646 1         13 $nodes->{$old_nodeid}{conn}->close_when_idle;
647             }
648             }
649              
650             =head2 $cass->close_when_idle ==> $cass
651              
652             Stops accepting new queries and prepares all the existing connections to be
653             closed once every outstanding query has been responded to. Returns a future
654             that will eventually yield the CassandraCQL object, when all the connections
655             are closed.
656              
657             After calling this method it will be an error to invoke C, C,
658             C or the various other methods derived from them.
659              
660             =cut
661              
662             sub close_when_idle
663             {
664 1     1 1 13440 my $self = shift;
665              
666 1         4 my $nodes = $self->{nodes};
667              
668             # remove 'nodes' to avoid reconnect logic
669 1         2 undef $self->{nodes};
670 1         3 undef $self->{primary_ids};
671              
672 1         2 my @f;
673 1         3 foreach my $node ( values %$nodes ) {
674 2 100       8 next unless my $conn = $node->{conn};
675 1         8 push @f, $conn->close_when_idle;
676             }
677              
678             return Future->wait_all( @f )->then( sub {
679 1     1   152 return Future->new->done( $self );
680 1         6 });
681             }
682              
683             =head2 $cass->close_now
684              
685             Immediately closes all node connections and shuts down the object. Any
686             outstanding or queued queries will immediately fail. Consider this as a "last
687             resort" failure shutdown, as compared to the graceful draining behaviour of
688             C.
689              
690             =cut
691              
692             sub close_now
693             {
694 0     0 1 0 my $self = shift;
695              
696 0         0 my $nodes = $self->{nodes};
697              
698             # remove 'nodes' to avoid reconnect logic
699 0         0 undef $self->{nodes};
700 0         0 undef $self->{primary_ids};
701              
702 0         0 foreach my $node ( values %$nodes ) {
703 0 0       0 next unless my $conn = $node->{conn};
704 0         0 $conn->close_now;
705             }
706             }
707              
708             sub _get_a_node
709             {
710 19     19   30 my $self = shift;
711              
712 19 50       60 my $nodes = $self->{nodes} or die "No available nodes";
713              
714             # TODO: Other sorting strategies;
715             # e.g. fewest outstanding queries, least accumulated time recently
716 19         26 my @nodeids;
717             {
718 19   100     24 my $next = $self->{next_primary} // 0;
  19         78  
719 19 50       23 @nodeids = keys %{ $self->{primary_ids} } or die "ARGH: $self -> _get_a_node called with no defined primaries";
  19         108  
720              
721             # Rotate to the next in sequence
722 19         95 @nodeids = ( @nodeids[$next..$#nodeids], @nodeids[0..$next-1] );
723 19         52 ( $next += 1 ) %= @nodeids;
724              
725 19         23 my $next_ready = $next;
726             # Skip non-ready ones
727 19         101 while( not $nodes->{$nodeids[0]}->{ready_f}->is_ready ) {
728 2         15 push @nodeids, shift @nodeids;
729 2         6 ( $next_ready += 1 ) %= @nodeids;
730 2 50       11 last if $next_ready == $next; # none were ready - just use the next
731             }
732 19         127 $self->{next_primary} = $next_ready;
733             }
734              
735 19 50       66 if( my $node = $nodes->{ $nodeids[0] } ) {
736 19         194 return $node->{ready_f};
737             }
738              
739 0         0 die "ARGH: don't have a primary node";
740             }
741              
742             =head2 $cass->query( $cql, $consistency, %other_args ) ==> ( $type, $result )
743              
744             Performs a CQL query. On success, the values returned from the Future will
745             depend on the type of query.
746              
747             For C queries, the type is C and C<$result> is a string giving
748             the name of the new keyspace.
749              
750             For C, C and C queries, the type is C and
751             C<$result> is a 3-element ARRAY reference containing the type of change, the
752             keyspace and the table name.
753              
754             For C
755             L containing the returned row data.
756              
757             For other queries, such as C, C and C, the future
758             returns nothing.
759              
760             C<%other_args> may be any of the following, when using C 2 or
761             above:
762              
763             =over 8
764              
765             =item skip_metadata => BOOL
766              
767             Requests the server does not include result metadata in the response. It will
768             be up to the caller to provide this, via C on the returned
769             Result object, before it can be used.
770              
771             =item page_size => INT
772              
773             Requests that the server returns at most the given number of rows. If any
774             further remain, the result object will include the C field. This
775             can be passed in another C call to obtain the next set of data.
776              
777             =item paging_state => INT
778              
779             Requests that the server continues a paged request from this position, given
780             in a previous response.
781              
782             =item serial_consistency => INT
783              
784             Sets the consistency level for serial operations in the query. Must be one of
785             C or C.
786              
787             =back
788              
789             =cut
790              
791             sub _debug_wrap_result
792             {
793 19     19   671 my ( $op, $self, $f ) = @_;
794              
795             $f->on_ready( sub {
796 0     0   0 my $f = shift;
797 0 0       0 if( $f->failure ) {
    0          
798 0         0 $self->debug_printf( "$op => FAIL %s", scalar $f->failure );
799             }
800             elsif( my ( $type, $result ) = $f->get ) {
801 0 0       0 if( $type eq "rows" ) {
    0          
802 0         0 $result = sprintf "%d x %d columns", $result->rows, $result->columns;
803             }
804             elsif( $type eq "schema_change" ) {
805 0         0 $result = sprintf "%s %s", $result->[0], join ".", @{$result}[1..$#$result];
  0         0  
806             }
807 0         0 $self->debug_printf( "$op => %s %s", uc $type, $result );
808             }
809             else {
810 0         0 $self->debug_printf( "$op => VOID" );
811             }
812 19 50       56 }) if $IO::Async::Notifier::DEBUG;
813              
814 19         187 return $f;
815             }
816              
817             sub query
818             {
819 15     15 1 1683 my $self = shift;
820 15         40 my ( $cql, $consistency, %other_args ) = @_;
821              
822 15   33     43 $consistency //= $self->{default_consistency};
823 15 50       41 defined $consistency or croak "'query' needs a consistency level";
824              
825             _debug_wrap_result QUERY => $self, $self->_get_a_node->then( sub {
826 15     15   1001 my $node = shift;
827 15         63 $self->debug_printf( "QUERY on {%s}: %s", $node->nodeid, $cql );
828 15         125 $node->query( $cql, $consistency, %other_args );
829 15         52 });
830             }
831              
832             =head2 $cass->query_rows( $cql, $consistency, %other_args ) ==> $result
833              
834             A shortcut wrapper for C which expects a C result and returns it
835             directly. Any other result is treated as an error. The returned Future returns
836             a C directly
837              
838             =cut
839              
840             sub query_rows
841             {
842 1     1 1 3 my $self = shift;
843              
844             $self->query( @_ )->then( sub {
845 1     1   1603 my ( $type, $result ) = @_;
846 1 50       6 defined $type or return Future->new->fail( "Expected type from query" );
847 1 50       6 $type eq "rows" or return Future->new->fail( "Expected 'rows' result" );
848 1         4 Future->new->done( $result );
849 1         8 });
850             }
851              
852             =head2 $cass->prepare( $cql ) ==> $query
853              
854             Prepares a CQL query for later execution. On success, the returned Future
855             yields an instance of a prepared query object (see below).
856              
857             Query objects stored internally cached by the CQL string; subsequent calls to
858             C with the same exact CQL string will yield the same object
859             immediately, saving a roundtrip.
860              
861             =cut
862              
863             sub prepare
864             {
865 5     5 1 1621 my $self = shift;
866 5         13 my ( $cql ) = @_;
867              
868 5 50       18 my $nodes = $self->{nodes} or die "No available nodes";
869              
870 5         12 my $queries_by_cql = $self->{queries_by_cql};
871              
872 5 100       20 if( my $q = $queries_by_cql->{$cql} ) {
873 2         4 my $query = $q->{query};
874 2 100       7 if( $q->{pacemaker} ) {
875 1         10 $q->{pacemaker}->cancel;
876 1         50 undef $q->{pacemaker};
877 1         4 weaken( $q->{query} );
878             }
879 2         8 return Future->new->done( $query );
880             }
881              
882 3         18 $self->debug_printf( "PREPARE %s", $cql );
883              
884 3         8 my @prepare_f = map {
885 3         11 my $node = $nodes->{$_}{conn};
886 3         15 $node->prepare( $cql, $self )
887 3         10 } keys %{ $self->{primary_ids} };
888              
889             Future->needs_all( @prepare_f )->then( sub {
890 3     3   447 my ( $query ) = @_;
891             # Ignore the other objects; they'll all have the same ID anyway
892              
893 3         14 $self->debug_printf( "PREPARE => [%s]", unpack "H*", $query->id );
894              
895 3         18 my $q = $queries_by_cql->{$cql} = { query => $query };
896 3         11 weaken( $q->{query} );
897              
898 3         8 Future->new->done( $query );
899 3         194 });
900             }
901              
902             # Check if a query cache is the only thing keeping a query alive
903             sub _check_query_pacemaker {
904 1     1   2 my ($self, $cql) = @_;
905              
906 1         3 weaken $self;
907              
908 1         2 my $q = $self->{queries_by_cql}{$cql};
909              
910 1         2 my $query = $q->{query};
911              
912 1         2 $q->{ttl} -= QUERY_PACEMAKER_INTERVAL;
913              
914             # 1) Without a loop, there's nothing we can do anyway
915             # 2) Otherwise we've kept this query alive long enough
916             # 3) Otherwise Cassandra and loop is being artificialy kept alive by us.
917             # ( 2 references because Loop has a reference to cassandra in notifiers, and this query has one )
918 1 50 33     12 if ($q->{ttl} <= 0 || refcount($self) == 2 || !$self->loop){
      33        
919              
920             # Remove the {cassandra} element from the query so it doesn't
921             # re-register itself for expiry when it is DESTROYed again
922 1         2 undef $query->{cassandra};
923              
924 1         5 delete $self->{queries_by_cql}{$cql};
925             }
926             else {
927             $q->{pacemaker} = $self->loop->delay_future( after => QUERY_PACEMAKER_INTERVAL )->on_done(
928 0     0   0 sub { $self->_check_query_pacemaker( $cql ) }
929 0         0 );
930             }
931             }
932              
933             sub _expire_query
934             {
935 3     3   5 my $self = shift;
936 3         3 my ( $cql ) = @_;
937              
938 3         8 my $queries_by_cql = $self->{queries_by_cql};
939 3 50       9 my $q = $queries_by_cql->{$cql} or return;
940              
941             # Unweaken the query itself, so it'll survive destruction
942 3         5 my $query = $q->{query};
943 3         7 undef $q->{query};
944 3         5 $q->{query} = $query;
945 3         5 $q->{ttl} = QUERY_TTL;
946              
947 3 50       10 if ($self->loop){
948             $q->{pacemaker} //= $self->loop->delay_future( after => QUERY_PACEMAKER_INTERVAL )
949 3   33 1   23 ->on_done( sub { $self->_check_query_pacemaker( $cql ) } );
  1         113  
950             }
951             }
952              
953             =head2 $cass->execute( $query, $data, $consistency, %other_args ) ==> ( $type, $result )
954              
955             Executes a previously-prepared statement, given the binding data. On success,
956             the returned Future will yield results of the same form as the C
957             method. C<$data> should contain a list of encoded byte-string values.
958              
959             Normally this method is not directly required - instead, use the C
960             method on the query object itself, as this will encode the parameters
961             correctly.
962              
963             C<%other_args> may be as for the C method.
964              
965             =cut
966              
967             sub execute
968             {
969 4     4 1 497 my $self = shift;
970 4         12 my ( $query, $data, $consistency, %other_args ) = @_;
971              
972 4   33     10 $consistency //= $self->{default_consistency};
973 4 50       9 defined $consistency or croak "'execute' needs a consistency level";
974              
975             _debug_wrap_result EXECUTE => $self, $self->_get_a_node->then( sub {
976 4     4   253 my $node = shift;
977 4         18 $self->debug_printf( "EXECUTE on {%s}: %s [%s]", $node->nodeid, $query->cql, unpack "H*", $query->id );
978 4         18 $node->execute( $query->id, $data, $consistency, %other_args );
979 4         12 });
980             }
981              
982             =head1 CONVENIENT WRAPPERS
983              
984             The following wrapper methods all wrap the basic C operation.
985              
986             =cut
987              
988             =head2 $cass->schema_keyspaces ==> $result
989              
990             A shortcut to a C
991             result object listing all the keyspaces.
992              
993             Exact details of the returned columns will depend on the Cassandra version,
994             but the result should at least be keyed by the first column, called
995             C.
996              
997             my $keyspaces = $result->rowmap_hash( "keyspace_name" )
998              
999             =cut
1000              
1001             sub schema_keyspaces
1002             {
1003 0     0 1 0 my $self = shift;
1004              
1005 0         0 $self->query_rows(
1006             "SELECT * FROM system.schema_keyspaces",
1007             CONSISTENCY_ONE
1008             );
1009             }
1010              
1011             =head2 $cass->schema_columnfamilies( $keyspace ) ==> $result
1012              
1013             A shortcut to a C
1014             returns a result object listing all the columnfamilies of the given keyspace.
1015              
1016             Exact details of the returned columns will depend on the Cassandra version,
1017             but the result should at least be keyed by the first column, called
1018             C.
1019              
1020             my $columnfamilies = $result->rowmap_hash( "columnfamily_name" )
1021              
1022             =cut
1023              
1024             sub schema_columnfamilies
1025             {
1026 0     0 1 0 my $self = shift;
1027 0         0 my ( $keyspace ) = @_;
1028              
1029 0         0 $self->query_rows(
1030             "SELECT * FROM system.schema_columnfamilies WHERE keyspace_name = " . $self->quote( $keyspace ),
1031             CONSISTENCY_ONE
1032             );
1033             }
1034              
1035             =head2 $cass->schema_columns( $keyspace, $columnfamily ) ==> $result
1036              
1037             A shortcut to a C
1038             result object listing all the columns of the given columnfamily.
1039              
1040             Exact details of the returned columns will depend on the Cassandra version,
1041             but the result should at least be keyed by the first column, called
1042             C.
1043              
1044             my $columns = $result->rowmap_hash( "column_name" )
1045              
1046             =cut
1047              
1048             sub schema_columns
1049             {
1050 0     0 1 0 my $self = shift;
1051 0         0 my ( $keyspace, $columnfamily ) = @_;
1052              
1053 0         0 $self->query_rows(
1054             "SELECT * FROM system.schema_columns WHERE keyspace_name = " . $self->quote( $keyspace ) . " AND columnfamily_name = " . $self->quote( $columnfamily ),
1055             CONSISTENCY_ONE,
1056             );
1057             }
1058              
1059             sub _list_nodes
1060             {
1061 6     6   14 my $self = shift;
1062 6         12 my ( $conn ) = @_;
1063              
1064             # The system.peers table doesn't include the node we actually connect to.
1065             # So we'll have to look up its own information from system.local and add
1066             # the socket address manually.
1067             Future->needs_all(
1068             $conn->query( "SELECT data_center, rack FROM system.local", CONSISTENCY_ONE )
1069             ->then( sub {
1070 5     5   6729 my ( $type, $result ) = @_;
1071 5 50       32 $type eq "rows" or Future->new->fail( "Expected 'rows' result" );
1072 5         28 my $local = $result->row_hash( 0 );
1073 5         258 $local->{host} = $conn->nodeid;
1074 5         29 Future->new->done( $local );
1075             }),
1076             $conn->query( "SELECT peer, data_center, rack FROM system.peers", CONSISTENCY_ONE )
1077             ->then( sub {
1078 5     5   3614 my ( $type, $result ) = @_;
1079 5 50       27 $type eq "rows" or Future->new->fail( "Expected 'rows' result" );
1080 5         40 my @nodes = $result->rows_hash;
1081 5         520 foreach my $node ( @nodes ) {
1082 11         40 $node->{host} = _inet_to_string( delete $node->{peer} );
1083             }
1084 5         66 Future->new->done( @nodes );
1085 6         38 }),
1086             )
1087             }
1088              
1089             =head1 TODO
1090              
1091             =over 8
1092              
1093             =item *
1094              
1095             Allow other load-balancing strategies than roundrobin.
1096              
1097             =item *
1098              
1099             Adjust connected primary nodes when changing C parameter.
1100              
1101             =item *
1102              
1103             Allow backup nodes, for faster connection failover.
1104              
1105             =item *
1106              
1107             Support LZ4 compression when using CQL version 2.
1108              
1109             This is blocked on RT #92825
1110              
1111             =back
1112              
1113             =cut
1114              
1115             =head1 SPONSORS
1116              
1117             This code was paid for by
1118              
1119             =over 2
1120              
1121             =item *
1122              
1123             Perceptyx L
1124              
1125             =item *
1126              
1127             Shadowcat Systems L
1128              
1129             =back
1130              
1131             =head1 AUTHOR
1132              
1133             Paul Evans
1134              
1135             =cut
1136              
1137             0x55AA;