File Coverage

blib/lib/Cassandra/Client.pm
Criterion Covered Total %
statement 71 305 23.2
branch 0 86 0.0
condition 0 33 0.0
subroutine 24 65 36.9
pod n/a
total 95 489 19.4


line stmt bran cond sub pod time code
1             package Cassandra::Client;
2             our $AUTHORITY = 'cpan:TVDW';
3             $Cassandra::Client::VERSION = '0.13_004'; # TRIAL
4              
5             $Cassandra::Client::VERSION = '0.13004';# ABSTRACT: Perl library for accessing Cassandra using its binary network protocol
6              
7 1     1   49975 use 5.010;
  1         3  
8 1     1   4 use strict;
  1         2  
  1         18  
9 1     1   4 use warnings;
  1         2  
  1         31  
10              
11 1     1   308 use Cassandra::Client::AsyncAnyEvent;
  1         2  
  1         27  
12 1     1   333 use Cassandra::Client::AsyncEV;
  1         2  
  1         23  
13 1     1   258 use Cassandra::Client::Config;
  1         2  
  1         27  
14 1     1   342 use Cassandra::Client::Connection;
  1         2  
  1         34  
15 1     1   270 use Cassandra::Client::Metadata;
  1         4  
  1         41  
16 1     1   299 use Cassandra::Client::Policy::Queue::Default;
  1         3  
  1         25  
17 1     1   319 use Cassandra::Client::Policy::Retry::Default;
  1         2  
  1         24  
18 1     1   7 use Cassandra::Client::Policy::Retry;
  1         2  
  1         30  
19 1     1   285 use Cassandra::Client::Policy::Throttle::Default;
  1         4  
  1         30  
20 1     1   262 use Cassandra::Client::Policy::LoadBalancing::Default;
  1         3  
  1         24  
21 1     1   263 use Cassandra::Client::Pool;
  1         3  
  1         26  
22 1     1   23 use Cassandra::Client::TLSHandling;
  1         3  
  1         22  
23 1     1   5 use Cassandra::Client::Util qw/series whilst/;
  1         2  
  1         50  
24              
25 1     1   280 use Clone 0.36 qw/clone/;
  1         552  
  1         47  
26 1     1   6 use List::Util qw/shuffle/;
  1         2  
  1         45  
27 1     1   337 use Promises 0.93 qw/deferred/;
  1         11058  
  1         7  
28 1     1   197 use Time::HiRes ();
  1         2  
  1         21  
29 1     1   5 use Ref::Util 0.008 qw/is_ref/;
  1         16  
  1         46  
30 1     1   6 use Devel::GlobalDestruction 0.11;
  1         17  
  1         7  
31 1     1   63 use XSLoader;
  1         2  
  1         2675  
32              
33             our $XS_VERSION = ($Cassandra::Client::VERSION || '');
34             $XS_VERSION =~ s/\A(\d+)\.(\d+)(\d{3})\z/$1.$2_$3/;
35             XSLoader::load(__PACKAGE__, $XS_VERSION);
36              
37             sub new {
38 0     0     my ($class, %args)= @_;
39              
40 0           my $self= bless {
41             connected => 0,
42             connect_callbacks => undef,
43             shutdown => 0,
44              
45             active_queries => 0,
46             }, $class;
47              
48 0           my $options= Cassandra::Client::Config->new(
49             \%args
50             );
51              
52 0   0       $self->{throttler}= $options->{throttler} || Cassandra::Client::Policy::Throttle::Default->new();
53 0   0       $self->{retry_policy}= $options->{retry_policy} || Cassandra::Client::Policy::Retry::Default->new();
54 0   0       $self->{command_queue}= $options->{command_queue} || Cassandra::Client::Policy::Queue::Default->new();
55 0   0       $self->{load_balancing_policy}= $options->{load_balancing_policy} || Cassandra::Client::Policy::LoadBalancing::Default->new();
56              
57 0 0         my $async_class= $options->{anyevent} ? "Cassandra::Client::AsyncAnyEvent" : "Cassandra::Client::AsyncEV";
58 0           my $async_io= $async_class->new(
59             options => $options,
60             );
61 0           my $metadata= Cassandra::Client::Metadata->new(
62             options => $options,
63             );
64             my $pool= Cassandra::Client::Pool->new(
65             client => $self,
66             options => $options,
67             metadata => $metadata,
68             async_io => $async_io,
69             load_balancing_policy => $self->{load_balancing_policy},
70 0           );
71 0 0         my $tls= $options->{tls} ? Cassandra::Client::TLSHandling->new() : undef;
72              
73 0           $self->{options}= $options;
74 0           $self->{async_io}= $async_io;
75 0           $self->{metadata}= $metadata;
76 0           $self->{pool}= $pool;
77 0           $self->{tls}= $tls;
78              
79 0           return $self;
80             }
81              
82             sub _connect {
83 0     0     my ($self, $callback)= @_;
84 0 0         return _cb($callback) if $self->{connected};
85 0 0         return _cb($callback, 'Cannot connect: shutdown() has been called') if $self->{shutdown};
86              
87 0   0       push @{$self->{connect_callbacks}||=[]}, $callback;
  0            
88 0 0         if ($self->{connecting}++) {
89 0           return;
90             }
91              
92 0           my @contact_points= shuffle @{$self->{options}{contact_points}};
  0            
93 0           my $last_error= "No hosts to connect to";
94              
95 0           my $next_connect;
96             $next_connect= sub {
97 0     0     my $contact_point= shift @contact_points;
98 0 0         if (!$contact_point) {
99 0           delete $self->{connecting};
100 0           undef $next_connect;
101 0           _cb($_, "Unable to connect to any Cassandra server. Last error: $last_error") for @{delete $self->{connect_callbacks}};
  0            
102 0           return;
103             }
104              
105             my $connection= Cassandra::Client::Connection->new(
106             client => $self,
107             options => $self->{options},
108             host => $contact_point,
109             async_io => $self->{async_io},
110             metadata => $self->{metadata},
111 0           );
112              
113             series([
114             sub {
115 0           my ($next)= @_;
116 0           $connection->connect($next);
117             },
118             sub {
119 0           my ($next)= @_;
120 0           $self->{pool}->init($next, $connection);
121             },
122             ], sub {
123 0           my $error= shift;
124 0 0         if ($error) {
125 0           $last_error= "On $contact_point: $error";
126 0           return $next_connect->();
127             }
128              
129 0           undef $next_connect;
130 0           $self->{connected}= 1;
131 0           delete $self->{connecting};
132 0           _cb($_) for @{delete $self->{connect_callbacks}};
  0            
133 0           });
134 0           };
135 0           $next_connect->();
136              
137 0           return;
138             }
139              
140             sub shutdown {
141 0     0     my ($self)= @_;
142              
143 0 0         return if $self->{shutdown};
144 0           $self->{shutdown}= 1;
145 0           $self->{connected}= 0;
146              
147 0           $self->{pool}->shutdown;
148              
149 0           return;
150             }
151              
152             sub is_active {
153 0     0     my ($self)= @_;
154 0 0         return 0 unless $self->{connected};
155 0           return 1;
156             }
157              
158             sub _disconnected {
159 0     0     my ($self, $connid)= @_;
160 0           $self->{pool}->remove($connid);
161 0           return;
162             }
163              
164             sub _handle_topology_change {
165 0     0     my ($self, $change, $ipaddress)= @_;
166 0 0         if ($change eq 'NEW_NODE') {
    0          
167 0           $self->{pool}->event_added_node($ipaddress);
168             } elsif ($change eq 'REMOVED_NODE') {
169 0           $self->{pool}->event_removed_node($ipaddress);
170             } else {
171 0           warn "Received unknown topology change: $change for $ipaddress";
172             }
173             }
174              
175             sub _handle_status_change {
176 0     0     my ($self, $change, $ipaddress)= @_;
177             # XXX Ignored, for now
178 0           $self->{pool}->connect_if_needed;
179             }
180              
181              
182              
183             # Query functions
184             sub _prepare {
185 0     0     my ($self, $callback, $query)= @_;
186              
187             # Fast path: we're already done
188 0 0         if ($self->{metadata}->is_prepared(\$query)) {
189 0           return _cb($callback);
190             }
191              
192 0           $self->_command("prepare", [ $callback, $query ]);
193 0           return;
194             }
195              
196             sub _execute {
197 0     0     my ($self, $callback, $query, $params, $attribs)= @_;
198              
199 0           my $attribs_clone= clone($attribs);
200 0   0       $attribs_clone->{consistency} ||= $self->{options}{default_consistency};
201              
202 0           $self->_command("execute_prepared", $callback, [ \$query, clone($params), $attribs_clone ]);
203 0           return;
204             }
205              
206             sub _batch {
207 0     0     my ($self, $callback, $queries, $attribs)= @_;
208              
209 0           my $attribs_clone= clone($attribs);
210 0   0       $attribs_clone->{consistency} ||= $self->{options}{default_consistency};
211              
212 0           $self->_command("execute_batch", $callback, [ clone($queries), $attribs_clone ]);
213 0           return;
214             }
215              
216             sub _wait_for_schema_agreement {
217 0     0     my ($self, $callback)= @_;
218 0           $self->_command("wait_for_schema_agreement", $callback, []);
219 0           return;
220             }
221              
222              
223             # Command queue
224             sub _command {
225 0     0     my ($self, $command, $callback, $args)= @_;
226              
227 0           my $command_info= {
228             start_time => Time::HiRes::time(),
229             };
230              
231             # Handle overloads
232 0 0         goto FAILFAST if $self->{throttler}->should_fail();
233              
234 0 0         goto OVERFLOW if $self->{active_queries} >= $self->{options}{max_concurrent_queries};
235              
236 0 0         goto SLOWPATH if !$self->{connected};
237              
238 0           my $connection= $self->{pool}->get_one;
239 0 0         goto SLOWPATH if !$connection;
240              
241 0           $self->{active_queries}++;
242             $connection->$command(sub {
243 0     0     my ($error, $result)= @_;
244 0           $self->{throttler}->count($error);
245              
246 0           $self->{active_queries}--;
247 0 0         $self->_command_dequeue if $self->{command_queue}{has_any};
248              
249 0 0         return $self->_command_failed($command, $callback, $args, $command_info, $error) if $error;
250 0           return _cb($callback, $error, $result);
251 0           }, @$args);
252              
253 0           return;
254              
255 0           SLOWPATH:
256             return $self->_command_slowpath($command, $callback, $args, $command_info);
257              
258 0           FAILFAST:
259             return _cb($callback, "Client-induced failure by throttling mechanism");
260              
261 0           OVERFLOW:
262             return $self->_command_enqueue($command, $callback, $args, $command_info);
263             }
264              
265             sub _command_slowpath {
266 0     0     my ($self, $command, $callback, $args, $command_info)= @_;
267              
268 0           $self->{active_queries}++;
269              
270             series([
271             sub {
272 0     0     my ($next)= @_;
273 0           $self->_connect($next);
274             }, sub {
275 0     0     my ($next)= @_;
276 0           $self->{pool}->get_one_cb($next);
277             }, sub {
278 0     0     my ($next, $connection)= @_;
279             # Yes, if we immediately take the slow path, which we would if we're not connected, we're going to throttle twice
280             # For now, I'm okay with that, but let's mark it with a TODO anyway.
281             # XXX
282 0 0         if ($self->{throttler}->should_fail()) {
283 0           return $next->("Client-induced failure by throttling mechanism");
284             }
285 0           $connection->$command($next, @$args);
286             }
287             ], sub {
288 0     0     my ($error, $result)= @_;
289 0           $self->{throttler}->count($error);
290              
291 0           $self->{active_queries}--;
292 0 0         $self->_command_dequeue if $self->{command_queue}{has_any};
293              
294 0 0         return $self->_command_failed($command, $callback, $args, $command_info, $error) if $error;
295 0           return _cb($callback, $error, $result);
296 0           });
297 0           return;
298             }
299              
300             sub _command_retry {
301 0     0     my ($self, $command, $callback, $args, $command_info)= @_;
302              
303 0           $command_info->{retries}++;
304              
305 0           my $delay= 0.1 * (2 ** $command_info->{retries});
306             $self->{async_io}->timer(sub {
307 0 0   0     if ($self->{active_queries} >= $self->{options}{max_concurrent_queries}) {
308 0           $self->_command_enqueue($command, $callback, $args, $command_info);
309             } else {
310 0           $self->_command_slowpath($command, $callback, $args, $command_info);
311             }
312 0           }, $delay);
313             }
314              
315             sub _command_failed {
316 0     0     my ($self, $command, $callback, $args, $command_info, $error)= @_;
317              
318 0 0         return $callback->($error) unless is_ref($error);
319              
320 0           my $retry_decision;
321 0 0         if ($error->do_retry) {
    0          
    0          
    0          
    0          
322 0           $retry_decision= Cassandra::Client::Policy::Retry::retry;
323             } elsif ($error->is_request_error) {
324 0   0       $retry_decision= $self->{retry_policy}->on_request_error(undef, undef, $error, ($command_info->{retries}||0));
325             } elsif ($error->isa('Cassandra::Client::Error::WriteTimeoutException')) {
326 0   0       $retry_decision= $self->{retry_policy}->on_write_timeout(undef, $error->cl, $error->write_type, $error->blockfor, $error->received, ($command_info->{retries}||0));
327             } elsif ($error->isa('Cassandra::Client::Error::ReadTimeoutException')) {
328 0   0       $retry_decision= $self->{retry_policy}->on_read_timeout(undef, $error->cl, $error->blockfor, $error->received, $error->data_retrieved, ($command_info->{retries}||0));
329             } elsif ($error->isa('Cassandra::Client::Error::UnavailableException')) {
330 0   0       $retry_decision= $self->{retry_policy}->on_unavailable(undef, $error->cl, $error->required, $error->alive, ($command_info->{retries}||0));
331             } else {
332 0           $retry_decision= Cassandra::Client::Policy::Retry::rethrow;
333             }
334              
335 0 0 0       if ($retry_decision && $retry_decision eq 'retry') {
336 0           return $self->_command_retry($command, $callback, $args, $command_info);
337             }
338              
339 0           return $callback->($error);
340             }
341              
342             sub _command_enqueue {
343 0     0     my ($self, $command, $callback, $args, $command_info)= @_;
344 0 0         if (my $error= $self->{command_queue}->enqueue([$command, $callback, $args, $command_info])) {
345 0           return $callback->("Cannot $command: $error");
346             }
347 0           return;
348             }
349              
350             sub _command_dequeue {
351 0     0     my ($self)= @_;
352 0 0         my $item= $self->{command_queue}->dequeue or return;
353 0           $self->_command_slowpath(@$item);
354 0           return;
355             }
356              
357             # Utility functions that wrap query functions
358             sub _each_page {
359 0     0     my ($self, $callback, $query, $params, $attribs, $page_callback)= @_;
360              
361 0 0         my $params_copy= $params ? clone($params) : undef;
362 0 0         my $attribs_copy= $attribs ? clone($attribs) : undef;
363              
364 0           my $done= 0;
365             whilst(
366 0     0     sub { !$done },
367             sub {
368 0     0     my $next= shift;
369              
370             $self->_execute(sub {
371             # Completion handler, with page data (or an error)
372 0           my ($error, $result)= @_;
373 0 0         return $next->($error) if $error;
374              
375 0           my $next_page_id= $result->next_page;
376 0           _cb($page_callback, $result); # Note that page_callback doesn't get an error argument, that's intentional
377              
378 0 0         if ($next_page_id) {
379 0           $attribs_copy->{page}= $next_page_id;
380             } else {
381 0           $done= 1;
382             }
383 0           return $next->();
384 0           }, $query, $params_copy, $attribs_copy);
385             },
386             sub {
387 0     0     my $error= shift;
388 0           return _cb($callback, $error);
389             }
390 0           );
391              
392 0           return;
393             }
394              
395             sub DESTROY {
396 0     0     local $@;
397 0 0         return if in_global_destruction;
398              
399 0           my $self= shift;
400 0 0         if ($self->{connected}) {
401 0           $self->shutdown;
402             }
403             }
404              
405              
406             # Utility functions for callers
407             sub _get_stacktrace {
408             # This gets called a lot. Let's keep it fast.
409              
410 0     0     my $trace= '';
411 0           my ($c, $file, $line)= caller(1);
412 0           $trace .= " $c ($file:$line)\n";
413 0 0         ($c, $file, $line)= caller(2) or goto DONE;
414 0           $trace .= " $c ($file:$line)\n";
415 0 0         ($c, $file, $line)= caller(3) or goto DONE;
416 0           $trace .= " $c ($file:$line)\n";
417              
418 0           DONE:
419             return $trace;
420             }
421              
422             sub _cb {
423 0     0     my $cb= shift;
424             eval {
425 0           &$cb; 1
  0            
426 0 0         } or do {
427 0   0       my $error= $@ || "unknown error";
428 0           warn "Ignoring unhandled exception in callback: $error";
429             };
430              
431 0           return;
432             }
433              
434             sub _mksync { # Translates an asynchronous call into something that looks like Perl
435 0     0     my ($sub)= @_;
436             return sub {
437 0     0     my $self= shift;
438 0           $sub->($self, $self->{async_io}->wait(my $w), @_);
439 0           my ($err, @output)= $w->();
440 0 0         if ($err) { die $err; }
  0            
441 0           return @output;
442 0           };
443             }
444              
445             sub _mkcall { # Basically _mksync, but returns the error instead of dying
446 0     0     my ($sub)= @_;
447             return sub {
448 0     0     my $self= shift;
449 0           $sub->($self, $self->{async_io}->wait(my $w), @_);
450 0           return $w->();
451 0           };
452             }
453              
454             sub _mkpromise {
455 0     0     my ($sub)= @_;
456             return sub {
457 0     0     my $self= shift;
458 0           my $trace= &_get_stacktrace;
459 0           my $deferred= deferred;
460              
461             $sub->($self, sub {
462 0           my ($error, @output)= @_;
463 0 0         if ($error) {
464 0           $deferred->reject("$error\n\nTrace:\n$trace");
465             } else {
466 0           $deferred->resolve(@output);
467             }
468 0           }, @_);
469              
470 0           return $deferred->promise;
471 0           };
472             }
473              
474             sub _mkfuture {
475 0     0     my ($sub)= @_;
476             return sub {
477 0     0     my $self= shift;
478 0           my $trace= &_get_stacktrace;
479 0           $sub->($self, $self->{async_io}->wait(my $w), @_);
480             return sub {
481 0           my ($error, @output)= $w->();
482 0 0         if ($error) { die "$error\n\nTrace:\n$trace"; }
  0            
483 0           return @output;
484 0           };
485             }
486 0           }
487              
488             sub _mkfuture_call {
489 0     0     my ($sub)= @_;
490             return sub {
491 0     0     my $self= shift;
492 0           $sub->($self, $self->{async_io}->wait(my $w), @_);
493 0           return $w;
494             }
495 0           }
496              
497             PUBLIC_METHODS: {
498 1     1   12 no strict 'refs';
  1         2  
  1         162  
499             for (qw/
500             batch
501             connect
502             execute
503             each_page
504             prepare
505             wait_for_schema_agreement
506             /) {
507             *{$_}= _mksync (\&{"_$_"});
508             *{"call_$_"}= _mkcall (\&{"_$_"});
509             *{"async_$_"}= _mkpromise (\&{"_$_"});
510             *{"future_$_"}= _mkfuture (\&{"_$_"});
511             *{"future_call_$_"}= _mkfuture_call (\&{"_$_"});
512             }
513             }
514              
515             1;
516              
517             __END__