File Coverage

blib/lib/Cassandra/Client.pm
Criterion Covered Total %
statement 71 308 23.0
branch 0 88 0.0
condition 0 36 0.0
subroutine 24 65 36.9
pod n/a
total 95 497 19.1


line stmt bran cond sub pod time code
1             package Cassandra::Client;
2             our $AUTHORITY = 'cpan:TVDW';
3             $Cassandra::Client::VERSION = '0.13_006'; # TRIAL
4              
5             $Cassandra::Client::VERSION = '0.13006';# ABSTRACT: Perl library for accessing Cassandra using its binary network protocol
6              
7 1     1   61446 use 5.010;
  1         5  
8 1     1   9 use strict;
  1         3  
  1         33  
9 1     1   13 use warnings;
  1         3  
  1         55  
10              
11 1     1   430 use Cassandra::Client::AsyncAnyEvent;
  1         3  
  1         39  
12 1     1   446 use Cassandra::Client::AsyncEV;
  1         4  
  1         39  
13 1     1   396 use Cassandra::Client::Config;
  1         4  
  1         42  
14 1     1   498 use Cassandra::Client::Connection;
  1         3  
  1         30  
15 1     1   305 use Cassandra::Client::Metadata;
  1         5  
  1         45  
16 1     1   278 use Cassandra::Client::Policy::Queue::Default;
  1         2  
  1         24  
17 1     1   302 use Cassandra::Client::Policy::Retry::Default;
  1         3  
  1         25  
18 1     1   6 use Cassandra::Client::Policy::Retry;
  1         3  
  1         30  
19 1     1   307 use Cassandra::Client::Policy::Throttle::Default;
  1         3  
  1         24  
20 1     1   265 use Cassandra::Client::Policy::LoadBalancing::Default;
  1         3  
  1         41  
21 1     1   318 use Cassandra::Client::Pool;
  1         3  
  1         25  
22 1     1   6 use Cassandra::Client::TLSHandling;
  1         29  
  1         38  
23 1     1   6 use Cassandra::Client::Util qw/series whilst/;
  1         3  
  1         53  
24              
25 1     1   304 use Clone 0.36 qw/clone/;
  1         425  
  1         47  
26 1     1   6 use List::Util qw/shuffle/;
  1         2  
  1         46  
27 1     1   336 use Promises 0.93 qw/deferred/;
  1         11653  
  1         6  
28 1     1   207 use Time::HiRes ();
  1         3  
  1         21  
29 1     1   5 use Ref::Util 0.008 qw/is_ref/;
  1         16  
  1         48  
30 1     1   6 use Devel::GlobalDestruction 0.11;
  1         16  
  1         7  
31 1     1   57 use XSLoader;
  1         2  
  1         2660  
32              
33             our $XS_VERSION = ($Cassandra::Client::VERSION || '');
34             $XS_VERSION =~ s/\A(\d+)\.(\d+)(\d{3})\z/$1.$2_$3/;
35             XSLoader::load(__PACKAGE__, $XS_VERSION);
36              
37             sub new {
38 0     0     my ($class, %args)= @_;
39              
40 0           my $self= bless {
41             connected => 0,
42             connect_callbacks => undef,
43             shutdown => 0,
44              
45             active_queries => 0,
46             }, $class;
47              
48 0           my $options= Cassandra::Client::Config->new(
49             \%args
50             );
51              
52 0   0       $self->{throttler}= $options->{throttler} || Cassandra::Client::Policy::Throttle::Default->new();
53 0   0       $self->{retry_policy}= $options->{retry_policy} || Cassandra::Client::Policy::Retry::Default->new();
54 0   0       $self->{command_queue}= $options->{command_queue} || Cassandra::Client::Policy::Queue::Default->new();
55 0   0       $self->{load_balancing_policy}= $options->{load_balancing_policy} || Cassandra::Client::Policy::LoadBalancing::Default->new();
56              
57 0 0         my $async_class= $options->{anyevent} ? "Cassandra::Client::AsyncAnyEvent" : "Cassandra::Client::AsyncEV";
58 0           my $async_io= $async_class->new(
59             options => $options,
60             );
61 0           my $metadata= Cassandra::Client::Metadata->new(
62             options => $options,
63             );
64             my $pool= Cassandra::Client::Pool->new(
65             client => $self,
66             options => $options,
67             metadata => $metadata,
68             async_io => $async_io,
69             load_balancing_policy => $self->{load_balancing_policy},
70 0           );
71 0 0         my $tls= $options->{tls} ? Cassandra::Client::TLSHandling->new() : undef;
72              
73 0           $self->{options}= $options;
74 0           $self->{async_io}= $async_io;
75 0           $self->{metadata}= $metadata;
76 0           $self->{pool}= $pool;
77 0           $self->{tls}= $tls;
78              
79 0           return $self;
80             }
81              
82             sub _connect {
83 0     0     my ($self, $callback)= @_;
84 0 0         return _cb($callback) if $self->{connected};
85 0 0         return _cb($callback, 'Cannot connect: shutdown() has been called') if $self->{shutdown};
86              
87             # This is ONLY useful if the user doesn't throw away the C::C object on connect errors.
88 0 0 0       if (!$self->{connecting} && $self->{throttler}->should_fail()) {
89 0           return _cb($callback, "Client-induced connection failure by throttling mechanism");
90             }
91              
92 0   0       push @{$self->{connect_callbacks}||=[]}, $callback;
  0            
93 0 0         if ($self->{connecting}++) {
94 0           return;
95             }
96              
97 0           my @contact_points= shuffle @{$self->{options}{contact_points}};
  0            
98 0           my $last_error= "No hosts to connect to";
99              
100 0           my $next_connect;
101             $next_connect= sub {
102 0     0     my $contact_point= shift @contact_points;
103 0 0         if (!$contact_point) {
104 0           delete $self->{connecting};
105 0           undef $next_connect;
106 0           _cb($_, "Unable to connect to any Cassandra server. Last error: $last_error") for @{delete $self->{connect_callbacks}};
  0            
107 0           return;
108             }
109              
110             my $connection= Cassandra::Client::Connection->new(
111             client => $self,
112             options => $self->{options},
113             host => $contact_point,
114             async_io => $self->{async_io},
115             metadata => $self->{metadata},
116 0           );
117              
118             series([
119             sub {
120 0           my ($next)= @_;
121 0           $connection->connect($next);
122             },
123             sub {
124 0           my ($next)= @_;
125 0           $self->{pool}->init($next, $connection);
126             },
127             ], sub {
128 0           my $error= shift;
129 0           $self->{throttler}->count($error);
130 0 0         if ($error) {
131 0           $last_error= "On $contact_point: $error";
132 0           return $next_connect->();
133             }
134              
135 0           undef $next_connect;
136 0           $self->{connected}= 1;
137 0           delete $self->{connecting};
138 0           _cb($_) for @{delete $self->{connect_callbacks}};
  0            
139 0           });
140 0           };
141 0           $next_connect->();
142              
143 0           return;
144             }
145              
146             sub shutdown {
147 0     0     my ($self)= @_;
148              
149 0 0         return if $self->{shutdown};
150 0           $self->{shutdown}= 1;
151 0           $self->{connected}= 0;
152              
153 0           $self->{pool}->shutdown;
154              
155 0           return;
156             }
157              
158             sub is_active {
159 0     0     my ($self)= @_;
160 0 0         return 0 unless $self->{connected};
161 0           return 1;
162             }
163              
164             sub _disconnected {
165 0     0     my ($self, $connid)= @_;
166 0           $self->{pool}->remove($connid);
167 0           return;
168             }
169              
170             sub _handle_topology_change {
171 0     0     my ($self, $change, $ipaddress)= @_;
172 0 0         if ($change eq 'NEW_NODE') {
    0          
173 0           $self->{pool}->event_added_node($ipaddress);
174             } elsif ($change eq 'REMOVED_NODE') {
175 0           $self->{pool}->event_removed_node($ipaddress);
176             } else {
177 0           warn "Received unknown topology change: $change for $ipaddress";
178             }
179             }
180              
181             sub _handle_status_change {
182 0     0     my ($self, $change, $ipaddress)= @_;
183             # XXX Ignored, for now
184 0           $self->{pool}->connect_if_needed;
185             }
186              
187              
188              
189             # Query functions
190             sub _prepare {
191 0     0     my ($self, $callback, $query)= @_;
192              
193             # Fast path: we're already done
194 0 0         if ($self->{metadata}->is_prepared(\$query)) {
195 0           return _cb($callback);
196             }
197              
198 0           $self->_command("prepare", [ $callback, $query ]);
199 0           return;
200             }
201              
202             sub _execute {
203 0     0     my ($self, $callback, $query, $params, $attribs)= @_;
204              
205 0           my $attribs_clone= clone($attribs);
206 0   0       $attribs_clone->{consistency} ||= $self->{options}{default_consistency};
207              
208 0           $self->_command("execute_prepared", $callback, [ \$query, clone($params), $attribs_clone ]);
209 0           return;
210             }
211              
212             sub _batch {
213 0     0     my ($self, $callback, $queries, $attribs)= @_;
214              
215 0           my $attribs_clone= clone($attribs);
216 0   0       $attribs_clone->{consistency} ||= $self->{options}{default_consistency};
217              
218 0           $self->_command("execute_batch", $callback, [ clone($queries), $attribs_clone ]);
219 0           return;
220             }
221              
222             sub _wait_for_schema_agreement {
223 0     0     my ($self, $callback)= @_;
224 0           $self->_command("wait_for_schema_agreement", $callback, []);
225 0           return;
226             }
227              
228              
229             # Command queue
230             sub _command {
231 0     0     my ($self, $command, $callback, $args)= @_;
232              
233 0           my $command_info= {
234             start_time => Time::HiRes::time(),
235             };
236              
237 0 0         goto OVERFLOW if $self->{active_queries} >= $self->{options}{max_concurrent_queries};
238              
239 0 0         goto SLOWPATH if !$self->{connected};
240              
241 0           my $connection= $self->{pool}->get_one;
242 0 0         goto SLOWPATH if !$connection;
243              
244 0 0         goto FAILFAST if $self->{throttler}->should_fail();
245              
246 0           $self->{active_queries}++;
247             $connection->$command(sub {
248 0     0     my ($error, $result)= @_;
249 0           $self->{throttler}->count($error);
250              
251 0           $self->{active_queries}--;
252 0 0         $self->_command_dequeue if $self->{command_queue}{has_any};
253              
254 0 0         return $self->_command_failed($command, $callback, $args, $command_info, $error) if $error;
255 0           return _cb($callback, $error, $result);
256 0           }, @$args);
257              
258 0           return;
259              
260 0           SLOWPATH:
261             return $self->_command_slowpath($command, $callback, $args, $command_info);
262              
263 0           FAILFAST:
264             return _cb($callback, "Client-induced failure by throttling mechanism");
265              
266 0           OVERFLOW:
267             return $self->_command_enqueue($command, $callback, $args, $command_info);
268             }
269              
270             sub _command_slowpath {
271 0     0     my ($self, $command, $callback, $args, $command_info)= @_;
272              
273 0           $self->{active_queries}++;
274              
275             series([
276             sub {
277 0     0     my ($next)= @_;
278 0           $self->_connect($next);
279             }, sub {
280 0     0     my ($next)= @_;
281 0           $self->{pool}->get_one_cb($next);
282             }, sub {
283 0     0     my ($next, $connection)= @_;
284 0 0         if ($self->{throttler}->should_fail()) {
285 0           return $next->("Client-induced failure by throttling mechanism");
286             }
287 0           $connection->$command($next, @$args);
288             }
289             ], sub {
290 0     0     my ($error, $result)= @_;
291 0           $self->{throttler}->count($error);
292              
293 0           $self->{active_queries}--;
294 0 0         $self->_command_dequeue if $self->{command_queue}{has_any};
295              
296 0 0         return $self->_command_failed($command, $callback, $args, $command_info, $error) if $error;
297 0           return _cb($callback, $error, $result);
298 0           });
299 0           return;
300             }
301              
302             sub _command_retry {
303 0     0     my ($self, $command, $callback, $args, $command_info)= @_;
304              
305 0           $command_info->{retries}++;
306              
307 0           my $delay= 0.1 * (2 ** $command_info->{retries});
308             $self->{async_io}->timer(sub {
309 0 0   0     if ($self->{active_queries} >= $self->{options}{max_concurrent_queries}) {
310 0           $self->_command_enqueue($command, $callback, $args, $command_info);
311             } else {
312 0           $self->_command_slowpath($command, $callback, $args, $command_info);
313             }
314 0           }, $delay);
315             }
316              
317             sub _command_failed {
318 0     0     my ($self, $command, $callback, $args, $command_info, $error)= @_;
319              
320 0 0         return $callback->($error) unless is_ref($error);
321              
322 0           my $retry_decision;
323 0 0         if ($error->do_retry) {
    0          
    0          
    0          
    0          
324 0           $retry_decision= Cassandra::Client::Policy::Retry::retry;
325             } elsif ($error->is_request_error) {
326 0   0       $retry_decision= $self->{retry_policy}->on_request_error(undef, undef, $error, ($command_info->{retries}||0));
327             } elsif ($error->isa('Cassandra::Client::Error::WriteTimeoutException')) {
328 0   0       $retry_decision= $self->{retry_policy}->on_write_timeout(undef, $error->cl, $error->write_type, $error->blockfor, $error->received, ($command_info->{retries}||0));
329             } elsif ($error->isa('Cassandra::Client::Error::ReadTimeoutException')) {
330 0   0       $retry_decision= $self->{retry_policy}->on_read_timeout(undef, $error->cl, $error->blockfor, $error->received, $error->data_retrieved, ($command_info->{retries}||0));
331             } elsif ($error->isa('Cassandra::Client::Error::UnavailableException')) {
332 0   0       $retry_decision= $self->{retry_policy}->on_unavailable(undef, $error->cl, $error->required, $error->alive, ($command_info->{retries}||0));
333             } else {
334 0           $retry_decision= Cassandra::Client::Policy::Retry::rethrow;
335             }
336              
337 0 0 0       if ($retry_decision && $retry_decision eq 'retry') {
338 0           return $self->_command_retry($command, $callback, $args, $command_info);
339             }
340              
341 0           return $callback->($error);
342             }
343              
344             sub _command_enqueue {
345 0     0     my ($self, $command, $callback, $args, $command_info)= @_;
346 0 0         if (my $error= $self->{command_queue}->enqueue([$command, $callback, $args, $command_info])) {
347 0           return $callback->("Cannot $command: $error");
348             }
349 0           return;
350             }
351              
352             sub _command_dequeue {
353 0     0     my ($self)= @_;
354 0 0         my $item= $self->{command_queue}->dequeue or return;
355 0           $self->_command_slowpath(@$item);
356 0           return;
357             }
358              
359             # Utility functions that wrap query functions
360             sub _each_page {
361 0     0     my ($self, $callback, $query, $params, $attribs, $page_callback)= @_;
362              
363 0 0         my $params_copy= $params ? clone($params) : undef;
364 0 0         my $attribs_copy= $attribs ? clone($attribs) : undef;
365              
366 0           my $done= 0;
367             whilst(
368 0     0     sub { !$done },
369             sub {
370 0     0     my $next= shift;
371              
372             $self->_execute(sub {
373             # Completion handler, with page data (or an error)
374 0           my ($error, $result)= @_;
375 0 0         return $next->($error) if $error;
376              
377 0           my $next_page_id= $result->next_page;
378 0           _cb($page_callback, $result); # Note that page_callback doesn't get an error argument, that's intentional
379              
380 0 0         if ($next_page_id) {
381 0           $attribs_copy->{page}= $next_page_id;
382             } else {
383 0           $done= 1;
384             }
385 0           return $next->();
386 0           }, $query, $params_copy, $attribs_copy);
387             },
388             sub {
389 0     0     my $error= shift;
390 0           return _cb($callback, $error);
391             }
392 0           );
393              
394 0           return;
395             }
396              
397             sub DESTROY {
398 0     0     local $@;
399 0 0         return if in_global_destruction;
400              
401 0           my $self= shift;
402 0 0         if ($self->{connected}) {
403 0           $self->shutdown;
404             }
405             }
406              
407              
408             # Utility functions for callers
409             sub _get_stacktrace {
410             # This gets called a lot. Let's keep it fast.
411              
412 0     0     my $trace= '';
413 0           my ($c, $file, $line)= caller(1);
414 0           $trace .= " $c ($file:$line)\n";
415 0 0         ($c, $file, $line)= caller(2) or goto DONE;
416 0           $trace .= " $c ($file:$line)\n";
417 0 0         ($c, $file, $line)= caller(3) or goto DONE;
418 0           $trace .= " $c ($file:$line)\n";
419              
420 0           DONE:
421             return $trace;
422             }
423              
424             sub _cb {
425 0     0     my $cb= shift;
426             eval {
427 0           &$cb; 1
  0            
428 0 0         } or do {
429 0   0       my $error= $@ || "unknown error";
430 0           warn "Ignoring unhandled exception in callback: $error";
431             };
432              
433 0           return;
434             }
435              
436             sub _mksync { # Translates an asynchronous call into something that looks like Perl
437 0     0     my ($sub)= @_;
438             return sub {
439 0     0     my $self= shift;
440 0           $sub->($self, $self->{async_io}->wait(my $w), @_);
441 0           my ($err, @output)= $w->();
442 0 0         if ($err) { die $err; }
  0            
443 0           return @output;
444 0           };
445             }
446              
447             sub _mkcall { # Basically _mksync, but returns the error instead of dying
448 0     0     my ($sub)= @_;
449             return sub {
450 0     0     my $self= shift;
451 0           $sub->($self, $self->{async_io}->wait(my $w), @_);
452 0           return $w->();
453 0           };
454             }
455              
456             sub _mkpromise {
457 0     0     my ($sub)= @_;
458             return sub {
459 0     0     my $self= shift;
460 0           my $trace= &_get_stacktrace;
461 0           my $deferred= deferred;
462              
463             $sub->($self, sub {
464 0           my ($error, @output)= @_;
465 0 0         if ($error) {
466 0           $deferred->reject("$error\n\nTrace:\n$trace");
467             } else {
468 0           $deferred->resolve(@output);
469             }
470 0           }, @_);
471              
472 0           return $deferred->promise;
473 0           };
474             }
475              
476             sub _mkfuture {
477 0     0     my ($sub)= @_;
478             return sub {
479 0     0     my $self= shift;
480 0           my $trace= &_get_stacktrace;
481 0           $sub->($self, $self->{async_io}->wait(my $w), @_);
482             return sub {
483 0           my ($error, @output)= $w->();
484 0 0         if ($error) { die "$error\n\nTrace:\n$trace"; }
  0            
485 0           return @output;
486 0           };
487             }
488 0           }
489              
490             sub _mkfuture_call {
491 0     0     my ($sub)= @_;
492             return sub {
493 0     0     my $self= shift;
494 0           $sub->($self, $self->{async_io}->wait(my $w), @_);
495 0           return $w;
496             }
497 0           }
498              
499             PUBLIC_METHODS: {
500 1     1   37 no strict 'refs';
  1         3  
  1         188  
501             for (qw/
502             batch
503             connect
504             execute
505             each_page
506             prepare
507             wait_for_schema_agreement
508             /) {
509             *{$_}= _mksync (\&{"_$_"});
510             *{"call_$_"}= _mkcall (\&{"_$_"});
511             *{"async_$_"}= _mkpromise (\&{"_$_"});
512             *{"future_$_"}= _mkfuture (\&{"_$_"});
513             *{"future_call_$_"}= _mkfuture_call (\&{"_$_"});
514             }
515             }
516              
517             1;
518              
519             __END__