File Coverage

blib/lib/Cassandra/Client/Connection.pm
Criterion Covered Total %
statement 58 563 10.3
branch 0 234 0.0
condition 0 57 0.0
subroutine 20 78 25.6
pod 0 29 0.0
total 78 961 8.1


line stmt bran cond sub pod time code
1             package Cassandra::Client::Connection;
2             our $AUTHORITY = 'cpan:TVDW';
3             $Cassandra::Client::Connection::VERSION = '0.13_007'; # TRIAL
4              
5 1     1   14 $Cassandra::Client::Connection::VERSION = '0.13007';use 5.010;
  1         3  
6 1     1   8 use strict;
  1         3  
  1         28  
7 1     1   8 use warnings;
  1         2  
  1         35  
8 1     1   6 use vars qw/$BUFFER/;
  1         3  
  1         50  
9              
10 1     1   7 use Ref::Util qw/is_blessed_ref is_plain_arrayref/;
  1         8  
  1         59  
11 1     1   350 use IO::Socket::INET;
  1         16412  
  1         6  
12 1     1   926 use IO::Socket::INET6;
  1         5925  
  1         19  
13 1     1   1382 use Errno qw/EAGAIN/;
  1         5  
  1         172  
14 1     1   10 use Socket qw/SOL_SOCKET IPPROTO_TCP SO_KEEPALIVE TCP_NODELAY/;
  1         4  
  1         100  
15 1     1   12 use Scalar::Util qw/weaken/;
  1         3  
  1         73  
16 1     1   710 use Net::SSLeay qw/ERROR_WANT_READ ERROR_WANT_WRITE ERROR_NONE/;
  1         8892  
  1         408  
17              
18 1     1   328 use Cassandra::Client::Util;
  1         2  
  1         62  
19 1         385 use Cassandra::Client::Protocol qw/
20             :constants
21             %consistency_lookup
22             %batch_type_lookup
23             pack_bytes
24             pack_longstring
25             pack_queryparameters
26             pack_shortbytes
27             pack_stringmap
28             pack_stringlist
29             unpack_errordata
30             unpack_inet
31             unpack_int
32             unpack_metadata
33             unpack_shortbytes
34             unpack_string
35             unpack_stringmultimap
36 1     1   288 /;
  1         2  
37 1     1   7 use Cassandra::Client::Error::Base;
  1         2  
  1         17  
38 1     1   280 use Cassandra::Client::ResultSet;
  1         2  
  1         24  
39 1     1   252 use Cassandra::Client::TLSHandling;
  1         3  
  1         30  
40              
41 1     1   5 use constant STREAM_ID_LIMIT => 32768;
  1         2  
  1         4549  
42              
43             # Populated at BEGIN{} time
44             my @compression_preference;
45             my %available_compression;
46              
47             sub new {
48 0     0 0   my ($class, %args)= @_;
49              
50             my $self= bless {
51             client => $args{client},
52             async_io => $args{async_io},
53             pool_id => undef,
54              
55             options => $args{options},
56             request_timeout => $args{options}{request_timeout},
57             host => $args{host},
58             metadata => $args{metadata},
59             prepare_cache => $args{metadata}->prepare_cache,
60 0           last_stream_id => 0,
61             pending_streams => {},
62             in_prepare => {},
63              
64             decompress_func => undef,
65             compress_func => undef,
66             connected => 0,
67             connecting => undef,
68             socket => undef,
69             fileno => undef,
70             pending_write => undef,
71             shutdown => 0,
72             read_buffer => \(my $empty= ''),
73              
74             tls => undef,
75             tls_want_write => undef,
76             }, $class;
77 0           weaken($self->{async_io});
78 0           weaken($self->{client});
79 0           return $self;
80             }
81              
82             sub get_local_status {
83 0     0 0   my ($self, $callback)= @_;
84              
85             series([
86             sub {
87 0     0     my ($next)= @_;
88 0           $self->execute_prepared($next, \"select key, data_center, host_id, broadcast_address, rack, release_version, tokens, schema_version from system.local");
89             },
90             sub {
91 0     0     my ($next, $result)= @_;
92              
93 0           my %local_status= map { $_->[3] => {
94             peer => $_->[3],
95             data_center => $_->[1],
96             host_id => $_->[2],
97             preferred_ip => $_->[3],
98             rack => $_->[4],
99             release_version => $_->[5],
100             tokens => $_->[6],
101             schema_version => $_->[7],
102 0           } } @{$result->rows};
  0            
103              
104 0           $next->(undef, \%local_status);
105             },
106 0           ], $callback);
107              
108 0           return;
109             }
110              
111             sub get_peers_status {
112 0     0 0   my ($self, $callback)= @_;
113              
114             series([
115             sub {
116 0     0     my ($next)= @_;
117 0           $self->execute_prepared($next, \"select peer, data_center, host_id, preferred_ip, rack, release_version, tokens, schema_version from system.peers");
118             },
119             sub {
120 0     0     my ($next, $result)= @_;
121              
122 0           my %network_status= map { $_->[0] => {
123             peer => $_->[0],
124             data_center => $_->[1],
125             host_id => $_->[2],
126             preferred_ip => $_->[3],
127             rack => $_->[4],
128             release_version => $_->[5],
129             tokens => $_->[6],
130             schema_version => $_->[7],
131 0           } } @{$result->rows};
  0            
132              
133 0           $next->(undef, \%network_status);
134             },
135 0           ], $callback);
136              
137 0           return;
138             }
139              
140             sub get_network_status {
141 0     0 0   my ($self, $callback)= @_;
142              
143             parallel([
144             sub {
145 0     0     my ($next)= @_;
146 0           $self->get_peers_status($next);
147             },
148             sub {
149 0     0     my ($next)= @_;
150 0           $self->get_local_status($next);
151             },
152             ], sub {
153 0     0     my ($error, $peers, $local)= @_;
154 0 0         if ($error) { return $callback->($error); }
  0            
155 0           return $callback->(undef, { %$peers, %$local });
156 0           });
157             }
158              
159             sub register_events {
160 0     0 0   my ($self, $callback)= @_;
161              
162 0           $self->request($callback, OPCODE_REGISTER, pack_stringlist([
163             'TOPOLOGY_CHANGE',
164             'STATUS_CHANGE',
165             ]));
166              
167 0           return;
168             }
169              
170              
171             ###### QUERY CODE
172             sub execute_prepared {
173 0     0 0   my ($self, $callback, $queryref, $parameters, $attr, $exec_info)= @_;
174              
175             # Note: parameters is retained until the query is complete. It must not be changed; clone if needed.
176             # Same for attr. Note that external callers automatically have their arguments cloned.
177              
178 0 0         my $prepared= $self->{prepare_cache}{$$queryref} or do {
179 0           return $self->prepare_and_try_execute_again($callback, $queryref, $parameters, $attr, $exec_info);
180             };
181              
182 0           my $want_result_metadata= !$prepared->{decoder};
183 0           my $row;
184 0 0         if ($parameters) {
185             eval {
186 0           $row= $prepared->{encoder}->encode($parameters);
187 0           1;
188 0 0         } or do {
189 0   0       my $error= $@ || "??";
190 0           return $callback->("Failed to encode row to native protocol: $error");
191             };
192             }
193              
194 0   0       my $consistency= $consistency_lookup{$attr->{consistency} || 'one'};
195 0 0         if (!defined $consistency) {
196 0           return $callback->("Invalid consistency level specified: $attr->{consistency}");
197             }
198              
199 0   0       my $page_size= (0+($attr->{page_size} || $self->{options}{max_page_size} || 0)) || undef;
200 0   0       my $paging_state= $attr->{page} || undef;
201 0           my $execute_body= pack_shortbytes($prepared->{id}).pack_queryparameters($consistency, !$want_result_metadata, $page_size, $paging_state, undef, $row);
202              
203             my $on_completion= sub {
204             # my ($body)= $_[2]; (not copying, because performance. assuming ownership)
205 0     0     my ($err, $code)= @_;
206              
207 0 0         if ($err) {
208 0 0 0       if (is_blessed_ref($err) && $err->code == 0x2500) {
209 0           return $self->prepare_and_try_execute_again($callback, $queryref, $parameters, $attr, $exec_info);
210             }
211 0           return $callback->($err);
212             }
213              
214 0 0         if ($code != OPCODE_RESULT) {
215             # This shouldn't ever happen...
216 0           return $callback->(Cassandra::Client::Error::Base->new(
217             message => "Expected a RESULT frame but got something else; considering the query failed",
218             request_error => 1,
219             ));
220             }
221              
222 0           $self->decode_result($callback, $prepared, $_[2]);
223 0           };
224              
225 0           $self->request($on_completion, OPCODE_EXECUTE, $execute_body);
226              
227 0           return;
228             }
229              
230             sub prepare_and_try_execute_again {
231 0     0 0   my ($self, $callback, $queryref, $parameters, $attr, $exec_info)= @_;
232              
233 0 0         if ($exec_info->{_prepared_and_tried_again}++) {
234 0           return $callback->("Query failed because it seems to be missing from the server's prepared statement cache");
235             }
236              
237             series([
238             sub {
239 0     0     my ($next)= @_;
240 0           $self->prepare($next, $$queryref);
241             },
242             ], sub {
243 0 0   0     return $callback->($_[0]) if $_[0];
244              
245 0 0         unless ($self->{prepare_cache}{$$queryref}) {
246             # We're recursing, so let's make sure we avoid the infinite loop
247 0           return $callback->("Internal error: expected query to be prepared but it was not");
248             }
249              
250 0           return $self->execute_prepared($callback, $queryref, $parameters, $attr, $exec_info);
251 0           });
252 0           return;
253             }
254              
255             sub execute_batch {
256 0     0 0   my ($self, $callback, $queries, $attribs, $exec_info)= @_;
257             # Like execute_prepared, assumes ownership of $queries and $attribs
258              
259 0 0         if (!is_plain_arrayref($queries)) {
260 0           return $callback->("execute_batch: queries argument must be an array of arrays");
261             }
262              
263 0           my @prepared;
264 0           for my $query (@$queries) {
265 0 0         if (!is_plain_arrayref($query)) {
266 0           return $callback->("execute_batch: entries in query argument must be arrayrefs");
267             }
268 0 0         if (!$query->[0]) {
269 0           return $callback->("Empty or no query given, cannot execute as part of a batch");
270             }
271 0 0 0       if ($query->[1] && !is_plain_arrayref($query->[1])) {
272 0           return $callback->("Query parameters to batch() must be given as an arrayref");
273             }
274              
275 0 0         if (my $prep= $self->{prepare_cache}{$query->[0]}) {
276 0           push @prepared, [ $prep, $query->[1] ];
277              
278             } else {
279 0           return $self->prepare_and_try_batch_again($callback, $queries, $attribs, $exec_info);
280             }
281             }
282              
283 0           my $batch_type= 0;
284 0 0         if ($attribs->{batch_type}) {
285 0           $batch_type= $batch_type_lookup{$attribs->{batch_type}};
286 0 0         if (!defined $batch_type) {
287 0           return $callback->("Unknown batch_type: <$attribs->{batch_type}>");
288             }
289             }
290              
291 0   0       my $consistency= $consistency_lookup{$attribs->{consistency} || 'one'};
292 0 0         if (!defined $consistency) {
293 0           return $callback->("Invalid consistency level specified: $attribs->{consistency}");
294             }
295              
296 0           my $batch_frame= pack('Cn', $batch_type, (0+@prepared));
297 0           for my $prep (@prepared) {
298 0           $batch_frame .= pack('C', 1).pack_shortbytes($prep->[0]{id}).$prep->[0]{encoder}->encode($prep->[1]);
299             }
300 0           $batch_frame .= pack('nC', $consistency, 0);
301              
302             my $on_completion= sub {
303             # my ($body)= $_[2]; (not copying, because performance. assuming ownership)
304 0     0     my ($err, $code)= @_;
305              
306 0 0         if ($err) {
307 0 0 0       if (is_blessed_ref($err) && $err->code == 0x2500) {
308 0           return $self->prepare_and_try_batch_again($callback, $queries, $attribs, $exec_info);
309             }
310 0           return $callback->($err);
311             }
312              
313 0 0         if ($code != OPCODE_RESULT) {
314             # This shouldn't ever happen...
315 0           return $callback->(Cassandra::Client::Error::Base->new(
316             message => "Expected a RESULT frame but got something else; considering the batch failed",
317             request_error => 1,
318             ));
319             }
320              
321 0           $self->decode_result($callback, undef, $_[2]);
322 0           };
323              
324 0           $self->request($on_completion, OPCODE_BATCH, $batch_frame);
325              
326 0           return;
327             }
328              
329             sub prepare_and_try_batch_again {
330 0     0 0   my ($self, $callback, $queries, $attribs, $exec_info)= @_;
331              
332 0 0         if ($exec_info->{_prepared_and_tried_again}++) {
333 0           return $callback->("Batch failed because one or more queries seem to be missing from the server's prepared statement cache");
334             }
335              
336 0           my %to_be_prepared;
337 0           $to_be_prepared{$_->[0]}= 1 for @$queries;
338              
339             parallel([
340 0           map { my $query= $_; sub {
341 0     0     my ($next)= @_;
342 0           $self->prepare($next, $query);
343 0           } } keys %to_be_prepared
344             ], sub {
345 0 0   0     return $callback->($_[0]) if $_[0];
346              
347 0           return $self->execute_batch($callback, $queries, $attribs, $exec_info);
348 0           });
349 0           return;
350             }
351              
352             sub prepare {
353 0     0 0   my ($self, $callback, $query)= @_;
354              
355 0 0         if (exists $self->{in_prepare}{$query}) {
356 0           push @{$self->{in_prepare}{$query}}, $callback;
  0            
357 0           return;
358             }
359              
360 0           $self->{in_prepare}{$query}= [ $callback ];
361              
362             series([
363             sub {
364 0     0     my ($next)= @_;
365 0           my $req= pack_longstring($query);
366 0           $self->request($next, OPCODE_PREPARE, $req);
367             },
368             sub {
369 0     0     my ($next, $code, $body)= @_;
370 0 0         if ($code != OPCODE_RESULT) {
371 0           return $next->("Got unexpected failure while trying to prepare");
372             }
373              
374 0           my $result_type= unpack_int($body);
375 0 0         if ($result_type != RESULT_PREPARED) {
376 0           return $next->("Unexpected response from server while preparing");
377             }
378              
379 0           my $id= unpack_shortbytes($body);
380              
381 0           my ($encoder, $decoder);
382 0 0         eval {
383 0           ($encoder)= unpack_metadata($body);
384 0           1;
385             } or return $next->("Unable to unpack query metadata: $@");
386 0 0         eval {
387 0           ($decoder)= unpack_metadata($body);
388 0           1;
389             } or return $next->("Unable to unpack query result metadata: $@");
390              
391 0           $self->{metadata}->add_prepared($query, $id, $decoder, $encoder);
392 0           return $next->();
393             },
394             ], sub {
395 0     0     my $error= shift;
396 0 0         my $in_prepare= delete($self->{in_prepare}{$query}) or die "BUG";
397 0           $_->($error) for @$in_prepare;
398 0           });
399              
400 0           return;
401             }
402              
403             sub decode_result {
404 0     0 0   my ($self, $callback, $prepared)= @_; # $_[3]=$body
405              
406 0           my $result_type= unpack('l>', substr($_[3], 0, 4, ''));
407 0 0         if ($result_type == RESULT_ROWS) { # Rows
    0          
    0          
    0          
408 0           my ($paging_state, $decoder);
409 0 0         eval { ($decoder, $paging_state)= unpack_metadata($_[3]); 1 } or return $callback->("Unable to unpack query metadata: $@");
  0            
  0            
410 0   0       $decoder= $prepared->{decoder} || $decoder;
411              
412 0           $callback->(undef,
413             Cassandra::Client::ResultSet->new(
414             \$_[3],
415             $decoder,
416             $paging_state,
417             )
418             );
419              
420             } elsif ($result_type == RESULT_VOID) { # Void
421 0           return $callback->();
422              
423             } elsif ($result_type == RESULT_SET_KEYSPACE) { # Set_keyspace
424 0           my $new_keyspace= unpack_string($_[3]);
425 0           return $callback->();
426              
427             } elsif ($result_type == RESULT_SCHEMA_CHANGE) { # Schema change
428             return $self->wait_for_schema_agreement(sub {
429             # We may be passed an error. Ignore it, our query succeeded
430 0     0     $callback->();
431 0           });
432              
433             } else {
434 0           return $callback->("Query executed successfully but got an unexpected response type");
435             }
436 0           return;
437             }
438              
439             sub wait_for_schema_agreement {
440 0     0 0   my ($self, $callback)= @_;
441              
442 0           my $waited= 0;
443 0           my $wait_delay= 0.5;
444 0           my $max_wait= 5;
445              
446 0           my $done;
447             whilst(
448 0     0     sub { !$done },
449             sub {
450 0     0     my ($whilst_next)= @_;
451              
452             series([
453             sub {
454 0           my ($next)= @_;
455 0           $self->{async_io}->timer($next, $wait_delay);
456             },
457             sub {
458 0           my ($next)= @_;
459 0           $waited += $wait_delay;
460 0           $self->get_network_status($next);
461             },
462             ], sub {
463 0           my ($error, $network_status)= @_;
464 0 0         return $whilst_next->($error) if $error;
465              
466 0           my %versions;
467 0           $versions{$_->{schema_version}}= 1 for values %$network_status;
468 0 0         if (keys %versions > 1) {
469 0 0         if ($waited >= $max_wait) {
470 0           return $whilst_next->("wait_for_schema_agreement timed out after $waited seconds");
471             }
472             } else {
473 0           $done= 1;
474             }
475 0           return $whilst_next->();
476 0           });
477             },
478 0           $callback,
479             );
480              
481 0           return;
482             }
483              
484              
485              
486             ###### PROTOCOL CODE
487             sub handshake {
488 0     0 0   my ($self, $callback)= @_;
489             series([
490             sub { # Send the OPCODE_OPTIONS
491 0     0     my ($next)= @_;
492 0           $self->request($next, OPCODE_OPTIONS, '');
493             },
494             sub { # The server hopefully just told us what it supports, let's respond with a STARTUP message
495 0     0     my ($next, $response_code, $body)= @_;
496 0 0         if ($response_code != OPCODE_SUPPORTED) {
497 0           return $next->("Server returned an unexpected handshake");
498             }
499              
500 0           my $map= unpack_stringmultimap($body);
501              
502 0 0 0       unless ($map->{CQL_VERSION} && $map->{COMPRESSION}) {
503 0           return $next->("Server did not return compression and cql version information");
504             }
505              
506 0           my $selected_cql_version= $self->{options}{cql_version};
507 0 0         if (!$selected_cql_version) {
508 0           ($selected_cql_version)= reverse sort @{$map->{CQL_VERSION}};
  0            
509             }
510              
511 0           my %ss_compression= map { $_, 1 } @{$map->{COMPRESSION}};
  0            
  0            
512 0           my $selected_compression= $self->{options}{compression};
513 0 0         if (!$selected_compression) {
514 0           for (@compression_preference) {
515 0 0 0       if ($ss_compression{$_} && $available_compression{$_}) {
516 0           $selected_compression= $_;
517 0           last;
518             }
519             }
520             }
521 0 0 0       $selected_compression= undef if $selected_compression && $selected_compression eq 'none';
522              
523 0 0         if ($selected_compression) {
524 0 0         if (!$ss_compression{$selected_compression}) {
525 0           return $next->("Server did not support requested compression method <$selected_compression>");
526             }
527 0 0         if (!$available_compression{$selected_compression}) {
528 0           return $next->("Requested compression method <$selected_compression> is supported by the server but not by us");
529             }
530             }
531              
532 0 0         my $request_body= pack_stringmap({
533             CQL_VERSION => $selected_cql_version,
534             ($selected_compression ? (COMPRESSION => $selected_compression) : ()),
535             });
536              
537 0           $self->request($next, OPCODE_STARTUP, $request_body);
538              
539             # This needs to happen after we send the STARTUP message
540 0           $self->setup_compression($selected_compression);
541             },
542             sub { # By now we should know whether we need to authenticate
543 0     0     my ($next, $response_code, $body)= @_;
544 0 0         if ($response_code == OPCODE_READY) {
545 0           return $next->(undef, $body); # Pass it along
546             }
547              
548 0 0         if ($response_code == OPCODE_AUTHENTICATE) {
549 0           return $self->authenticate($next, unpack_string($body));
550             }
551              
552 0           return $next->("Unexpected response from the server");
553             },
554             sub {
555 0     0     my ($next)= @_;
556 0 0         if ($self->{options}{keyspace}) {
557 0           return $self->execute_prepared($next, \('use "'.$self->{options}{keyspace}.'"'));
558             }
559 0           return $next->();
560             },
561             sub {
562 0     0     my ($next)= @_;
563 0 0         if (!$self->{ipaddress}) {
564 0           return $self->get_local_status($next);
565             }
566 0           return $next->();
567             },
568             sub {
569 0     0     my ($next, $status)= @_;
570 0 0         if ($status) {
571 0           my ($local)= values %$status;
572 0           $self->{ipaddress}= $local->{peer};
573 0           $self->{datacenter}= $local->{data_center};
574             }
575 0 0         if (!$self->{ipaddress}) {
576 0           return $next->("Unable to determine node's IP address");
577             }
578 0           return $next->();
579             }
580 0           ], $callback);
581              
582 0           return;
583             }
584              
585             sub authenticate {
586 0     0 0   my ($self, $callback, $authenticator)= @_;
587              
588 0           my $user= "$self->{options}{username}";
589 0           my $pass= "$self->{options}{password}";
590 0 0         utf8::encode($user) if utf8::is_utf8($user);
591 0 0         utf8::encode($pass) if utf8::is_utf8($pass);
592              
593 0 0 0       if (!$user || !$pass) {
594 0           return $callback->("Server expected authentication using <$authenticator> but no credentials were set");
595             }
596              
597             series([
598             sub {
599 0     0     my ($next)= @_;
600 0           my $auth_body= pack_bytes("\0$user\0$pass");
601 0           $self->request($next, OPCODE_AUTH_RESPONSE, $auth_body);
602             },
603             sub {
604 0     0     my ($next, $code, $body)= @_;
605 0 0         if ($code == OPCODE_AUTH_SUCCESS) {
606 0           $next->();
607             } else {
608 0           $next->("Failed to authenticate: unknown error");
609             }
610             },
611 0           ], $callback);
612              
613 0           return;
614             }
615              
616             sub handle_event {
617 0     0 0   my ($self, $eventdata)= @_;
618 0           my $type= unpack_string($eventdata);
619 0 0         if ($type eq 'TOPOLOGY_CHANGE') {
    0          
620 0           my ($change, $ipaddress)= (unpack_string($eventdata), unpack_inet($eventdata));
621 0           $self->{client}->_handle_topology_change($change, $ipaddress);
622              
623             } elsif ($type eq 'STATUS_CHANGE') {
624 0           my ($change, $ipaddress)= (unpack_string($eventdata), unpack_inet($eventdata));
625 0           $self->{client}->_handle_status_change($change, $ipaddress);
626              
627             } else {
628 0           warn 'Received unknown event type: '.$type;
629             }
630             }
631              
632             sub get_pool_id {
633             $_[0]{pool_id}
634 0     0 0   }
635              
636             sub set_pool_id {
637 0     0 0   $_[0]{pool_id}= $_[1];
638             }
639              
640             sub ip_address {
641             $_[0]{ipaddress}
642 0     0 0   }
643              
644              
645              
646             ####### IO LOGIC
647             sub connect {
648 0     0 0   my ($self, $callback)= @_;
649 0 0         return $callback->() if $self->{connected};
650              
651 0 0         if ($self->{connecting}++) {
652 0           warn "BUG: Calling connect twice?";
653 0           return $callback->("Internal bug: called connect twice.");
654 0           return;
655             }
656              
657 0 0         if ($self->{options}{tls}) {
658             eval {
659 0           $self->{tls}= $self->{client}{tls}->new_conn;
660 0           1;
661 0 0         } or do {
662 0   0       my $error= $@ || "unknown TLS error";
663 0           return $callback->($error);
664             };
665             }
666              
667 0           my $socket; {
668 0           local $@;
  0            
669              
670 0 0         if ($self->{host} =~ /:/) {
671             # IPv6
672             $socket= IO::Socket::INET6->new(
673             PeerAddr => $self->{host},
674             PeerPort => $self->{options}{port},
675 0           Proto => 'tcp',
676             Blocking => 0,
677             );
678             } else {
679             # IPv6
680             $socket= IO::Socket::INET->new(
681             PeerAddr => $self->{host},
682             PeerPort => $self->{options}{port},
683 0           Proto => 'tcp',
684             Blocking => 0,
685             );
686             }
687              
688 0 0         unless ($socket) {
689 0           my $error= "Could not connect: $@";
690 0           return $callback->($error);
691             }
692              
693 0           $socket->setsockopt(SOL_SOCKET, SO_KEEPALIVE, 1);
694 0           $socket->setsockopt(IPPROTO_TCP, TCP_NODELAY, 1);
695             }
696              
697 0           $self->{socket}= $socket;
698 0           $self->{fileno}= $socket->fileno;
699 0           $self->{async_io}->register($self->{fileno}, $self);
700 0           $self->{async_io}->register_read($self->{fileno});
701              
702             # We create a fake buffer, to ensure we wait until we can actually write
703 0           $self->{pending_write}= '';
704 0           $self->{async_io}->register_write($self->{fileno});
705              
706 0 0         if ($self->{options}{tls}) {
707 0           Net::SSLeay::set_fd(${$self->{tls}}, $self->{fileno});
  0            
708 0           Net::SSLeay::set_connect_state(${$self->{tls}});
  0            
709             }
710              
711             $self->handshake(sub {
712 0     0     my $error= shift;
713 0           $self->{connected}= 1;
714 0 0         if ($error) {
715 0           $self->shutdown("Failed to connect: $error");
716             }
717 0           return $callback->($error);
718 0           });
719              
720 0           return;
721             }
722              
723             sub request {
724             # my $body= $_[3] (let's avoid copying that blob). Yes, this code assumes ownership of the body.
725 0     0 0   my ($self, $cb, $opcode)= @_;
726             return $cb->(Cassandra::Client::Error::Base->new(
727             message => "Connection shutting down",
728             request_error => 1,
729 0 0         )) if $self->{shutdown};
730              
731 0           my $pending= $self->{pending_streams};
732              
733 0           my $stream_id= $self->{last_stream_id} + 1;
734 0           my $attempts= 0;
735 0   0       while (exists($pending->{$stream_id}) || $stream_id >= STREAM_ID_LIMIT) {
736 0           $stream_id= (++$stream_id) % STREAM_ID_LIMIT;
737 0 0         return $cb->(Cassandra::Client::Error::Base->new(
738             message => "Cannot find a stream ID to post query with",
739             request_error => 1,
740             )) if ++$attempts >= STREAM_ID_LIMIT;
741             }
742 0           $self->{last_stream_id}= $stream_id;
743 0           $pending->{$stream_id}= [$cb, $self->{async_io}->deadline($self->{fileno}, $stream_id, $self->{request_timeout})];
744              
745             WRITE: {
746 0           my $flags= 0;
  0            
747              
748 0 0 0       if (length($_[3]) > 500 && (my $compress_func= $self->{compress_func})) {
749 0           $flags |= 1;
750 0           $compress_func->($_[3]);
751             }
752              
753 0           my $data= pack('CCsCN/a', 3, $flags, $stream_id, $opcode, $_[3]);
754              
755 0 0         if (defined $self->{pending_write}) {
756 0           $self->{pending_write} .= $data;
757 0           last WRITE;
758             }
759              
760 0 0         if ($self->{tls}) {
761 0           my $length= length $data;
762 0           my $rv= Net::SSLeay::write(${$self->{tls}}, $data);
  0            
763 0 0         if ($rv == $length) {
    0          
764             # All good
765             } elsif ($rv > 0) {
766             # Partital write
767 0           substr($data, 0, $rv, '');
768 0           $self->{pending_write}= $data;
769 0           $self->{async_io}->register_write($self->{fileno});
770             } else {
771 0           $rv= Net::SSLeay::get_error(${$self->{tls}}, $rv);
  0            
772 0 0 0       if ($rv == ERROR_WANT_WRITE || $rv == ERROR_WANT_READ || $rv == ERROR_NONE) {
      0        
773             # Ok...
774 0           $self->{pending_write}= $data;
775 0 0         if ($rv == ERROR_WANT_READ) {
776 0           $self->{tls_want_write}= 1;
777             } else {
778 0           $self->{async_io}->register_write($self->{fileno});
779             }
780             } else {
781             # We failed to send the request.
782 0           my $error= Net::SSLeay::ERR_error_string(Net::SSLeay::ERR_get_error());
783              
784             # We never actually sent our request, so take it out again
785 0           my $my_stream= delete $pending->{$stream_id};
786              
787             # Disable our stream's deadline
788 0           ${$my_stream->[1]}= 1;
  0            
789              
790 0           $self->shutdown($error);
791              
792             # Now fail our stream properly, but include the retry notice
793 0           $my_stream->[0]->(Cassandra::Client::Error::Base->new(
794             message => "Disconnected: $error",
795             do_retry => 1,
796             request_error => 1,
797             ));
798             }
799             }
800              
801             } else {
802 0           my $length= length $data;
803 0           my $result= syswrite($self->{socket}, $data, $length);
804 0 0 0       if ($result && $result == $length) {
    0 0        
805             # All good
806             } elsif (defined $result || $! == EAGAIN) {
807 0 0         substr($data, 0, $result, '') if $result;
808 0           $self->{pending_write}= $data;
809 0           $self->{async_io}->register_write($self->{fileno});
810             } else {
811             # Oh, we failed to send out the request. That's bad. Let's first find out what happened.
812 0           my $error= $!;
813              
814             # We never actually sent our request, so take it out again
815 0           my $my_stream= delete $pending->{$stream_id};
816              
817             # Disable our stream's deadline
818 0           ${$my_stream->[1]}= 1;
  0            
819              
820 0           $self->shutdown($error);
821              
822             # Now fail our stream properly, but include the retry notice
823 0           $my_stream->[0]->(Cassandra::Client::Error::Base->new(
824             message => "Disconnected: $error",
825             do_retry => 1,
826             request_error => 1,
827             ));
828             }
829             }
830             }
831              
832 0           return;
833             }
834              
835             sub can_read {
836 0     0 0   my ($self)= @_;
837 0           my $shutdown_when_done;
838 0           local *BUFFER= $self->{read_buffer};
839 0           my $bufsize= length $BUFFER;
840              
841             READ:
842 0           while (!$self->{shutdown}) {
843 0           my $should_read_more;
844              
845 0 0         if ($self->{tls}) {
846 0           my ($bytes, $rv)= Net::SSLeay::read(${$self->{tls}});
  0            
847 0 0         if (length $bytes) {
848 0           $BUFFER .= $bytes;
849 0           $bufsize += $rv;
850 0           $should_read_more= 1;
851             }
852              
853 0 0         if ($rv <= 0) {
854 0           $rv= Net::SSLeay::get_error(${$self->{tls}}, $rv);
  0            
855 0 0         if ($rv == ERROR_WANT_WRITE) {
    0          
    0          
856 0           $self->{async_io}->register_write($self->{fileno});
857             } elsif ($rv == ERROR_WANT_READ) {
858             # Can do! Wait for the next event.
859              
860             # Resume our write if needed.
861 0 0         if (delete $self->{tls_want_write}) {
862             # Try our write again!
863 0           $self->{async_io}->register_write($self->{fileno});
864             }
865             } elsif ($rv == ERROR_NONE) {
866             # Huh?
867             } else {
868 0           my $error= Net::SSLeay::ERR_error_string(Net::SSLeay::ERR_get_error());
869 0           $shutdown_when_done= "TLS error: $error";
870             }
871             }
872              
873             } else {
874 0           my $read_cnt= sysread($self->{socket}, $BUFFER, 16384, $bufsize);
875 0 0         if ($read_cnt) {
    0          
    0          
876 0           $bufsize += $read_cnt;
877 0 0         $should_read_more= 1 if $read_cnt >= 16384;
878              
879             } elsif (!defined $read_cnt) {
880 0 0         if ($! != EAGAIN) {
881 0           my $error= "$!";
882 0           $shutdown_when_done= $error;
883             }
884             } elsif ($read_cnt == 0) { # EOF
885 0           $shutdown_when_done= "Disconnected from server";
886             }
887             }
888              
889 0 0         READ_NEXT:
890             goto READ_MORE if $bufsize < 9;
891 0           my ($version, $flags, $stream_id, $opcode, $bodylen)= unpack('CCsCN', substr($BUFFER, 0, 9));
892 0 0         if ($bufsize < $bodylen+9) {
893 0           goto READ_MORE;
894             }
895              
896 0           substr($BUFFER, 0, 9, '');
897 0           my $body= substr($BUFFER, 0, $bodylen, '');
898 0           $bufsize -= 9 + $bodylen;
899              
900             # Decompress if needed
901 0 0 0       if (($flags & 1) && $body) {
902 0           $self->{decompress_func}->($body);
903             }
904              
905 0 0         if ($stream_id != -1) {
906 0           my $stream_cb= delete $self->{pending_streams}{$stream_id};
907 0 0         if (!$stream_cb) {
    0          
908 0           warn 'BUG: received response for unknown stream';
909              
910             } elsif ($opcode == OPCODE_ERROR) {
911 0           my ($cb, $dl)= @$stream_cb;
912 0           $$dl= 1;
913              
914 0           my $error= unpack_errordata($body);
915 0           $cb->($error);
916              
917             } else {
918 0           my ($cb, $dl)= @$stream_cb;
919 0           $$dl= 1;
920 0           $cb->(undef, $opcode, $body);
921             }
922              
923             } else {
924 0           $self->handle_event($body);
925             }
926              
927 0           goto READ_NEXT;
928              
929 0 0         READ_MORE:
930             last READ unless $should_read_more;
931             }
932              
933 0 0         if ($shutdown_when_done) {
934 0           $self->shutdown($shutdown_when_done);
935             }
936              
937 0           return;
938             }
939              
940             sub can_write {
941 0     0 0   my ($self)= @_;
942              
943 0 0         if ($self->{tls}) {
944 0           my $rv= Net::SSLeay::write(${$self->{tls}}, $self->{pending_write});
  0            
945 0 0         if ($rv > 0) {
946 0           substr($self->{pending_write}, 0, $rv, '');
947 0 0         if (!length $self->{pending_write}) {
948 0           $self->{async_io}->unregister_write($self->{fileno});
949 0           delete $self->{pending_write};
950             }
951 0           return;
952              
953             } else {
954 0           $rv= Net::SSLeay::get_error(${$self->{tls}}, $rv);
  0            
955 0 0         if ($rv == ERROR_WANT_WRITE) {
    0          
    0          
956             # Wait until the next callback.
957 0           return;
958             } elsif ($rv == ERROR_WANT_READ) {
959             # Unschedule ourselves
960 0           $self->{async_io}->unregister_write($self->{fileno});
961 0           $self->{tls_want_write}= 1;
962 0           return;
963             } elsif ($rv == ERROR_NONE) {
964             # Huh?
965 0           return;
966             } else {
967 0           my $error= Net::SSLeay::ERR_error_string(Net::SSLeay::ERR_get_error());
968 0           return $self->shutdown("TLS error: $error");
969             }
970             }
971              
972             } else {
973 0           my $result= syswrite($self->{socket}, $self->{pending_write});
974 0 0         if (!defined($result)) {
975 0 0         if ($! == EAGAIN) {
976 0           return; # Huh. Oh well, whatever
977             }
978              
979 0           my $error= "$!";
980 0           return $self->shutdown($error);
981             }
982 0 0         if ($result == 0) { return; } # No idea whether that happens, but guard anyway.
  0            
983 0           substr($self->{pending_write}, 0, $result, '');
984              
985 0 0         if (!length $self->{pending_write}) {
986 0           $self->{async_io}->unregister_write($self->{fileno});
987 0           delete $self->{pending_write};
988             }
989             }
990              
991 0           return;
992             }
993              
994             sub can_timeout {
995 0     0 0   my ($self, $id)= @_;
996 0           my $stream= delete $self->{pending_streams}{$id};
997 0     0     $self->{pending_streams}{$id}= [ sub{}, \(my $zero= 0) ]; # fake it
998 0           $stream->[0]->(Cassandra::Client::Error::Base->new(
999             message => "Request timed out",
1000             is_timeout => 1,
1001             request_error => 1,
1002             ));
1003 0           return;
1004             }
1005              
1006             sub shutdown {
1007 0     0 0   my ($self, $shutdown_reason)= @_;
1008              
1009 0 0         return if $self->{shutdown};
1010 0           $self->{shutdown}= 1;
1011              
1012 0           my $pending= $self->{pending_streams};
1013 0           $self->{pending_streams}= {};
1014              
1015             # Disable our deadlines
1016 0           ${$_->[1]}= 1 for values %$pending;
  0            
1017              
1018 0           $self->{async_io}->unregister_read($self->{fileno});
1019 0 0         if (defined(delete $self->{pending_write})) {
1020 0           $self->{async_io}->unregister_write($self->{fileno});
1021             }
1022 0           $self->{async_io}->unregister($self->{fileno}, $self);
1023 0           $self->{client}->_disconnected($self->get_pool_id);
1024 0           $self->{socket}->close;
1025              
1026 0           for (values %$pending) {
1027 0           $_->[0]->(Cassandra::Client::Error::Base->new(
1028             message => "Disconnected: $shutdown_reason",
1029             request_error => 1,
1030             ));
1031             }
1032              
1033 0           return;
1034             }
1035              
1036              
1037              
1038             ###### COMPRESSION
1039             BEGIN {
1040 1     1   6 @compression_preference= qw/lz4 snappy/;
1041              
1042 1     1   60 %available_compression= (
  1     1   294  
  1         430  
  1         9  
  1         235  
  1         427  
  1         9  
1043             snappy => scalar eval "use Compress::Snappy (); 1;",
1044             lz4 => scalar eval "use Compress::LZ4 (); 1;",
1045             );
1046             }
1047              
1048             sub setup_compression {
1049 0     0 0   my ($self, $type)= @_;
1050              
1051 0 0         return unless $type;
1052 0 0         if ($type eq 'snappy') {
    0          
1053 0           $self->{compress_func}= \&compress_snappy;
1054 0           $self->{decompress_func}= \&decompress_snappy;
1055             } elsif ($type eq 'lz4') {
1056 0           $self->{compress_func}= \&compress_lz4;
1057 0           $self->{decompress_func}= \&decompress_lz4;
1058             } else {
1059 0           warn 'Internal error: failed to set compression';
1060             }
1061              
1062 0           return;
1063             }
1064              
1065             sub compress_snappy {
1066 0     0 0   $_[0]= Compress::Snappy::compress(\$_[0]);
1067 0           return;
1068             }
1069              
1070             sub decompress_snappy {
1071 0 0   0 0   if ($_[0] ne "\0") {
1072 0           $_[0]= Compress::Snappy::decompress(\$_[0]);
1073             } else {
1074 0           $_[0]= '';
1075             }
1076 0           return;
1077             }
1078              
1079             sub compress_lz4 {
1080 0     0 0   $_[0]= pack('N', length($_[0])) . Compress::LZ4::lz4_compress(\$_[0]);
1081 0           return;
1082             }
1083              
1084             sub decompress_lz4 {
1085 0     0 0   my $len= unpack('N', substr $_[0], 0, 4, '');
1086 0 0         if ($len) {
1087 0           $_[0]= Compress::LZ4::lz4_decompress(\$_[0], $len);
1088             } else {
1089 0           $_[0]= '';
1090             }
1091 0           return;
1092             }
1093              
1094             1;
1095              
1096             __END__