File Coverage

blib/lib/Cassandra/Client/Pool.pm
Criterion Covered Total %
statement 17 148 11.4
branch 0 48 0.0
condition 0 16 0.0
subroutine 6 25 24.0
pod 0 14 0.0
total 23 251 9.1


line stmt bran cond sub pod time code
1             package Cassandra::Client::Pool;
2             our $AUTHORITY = 'cpan:TVDW';
3             $Cassandra::Client::Pool::VERSION = '0.13_004'; # TRIAL
4              
5 1     1   13 $Cassandra::Client::Pool::VERSION = '0.13004';use 5.010;
  1         3  
6 1     1   5 use strict;
  1         5  
  1         17  
7 1     1   4 use warnings;
  1         2  
  1         32  
8              
9 1     1   4 use Scalar::Util 'weaken';
  1         2  
  1         45  
10 1     1   11 use Cassandra::Client::Util;
  1         3  
  1         42  
11 1     1   265 use Cassandra::Client::NetworkStatus;
  1         3  
  1         1024  
12              
13             sub new {
14 0     0 0   my ($class, %args)= @_;
15             my $self= bless {
16             client => $args{client},
17             options => $args{options},
18             metadata => $args{metadata},
19             max_connections => $args{options}{max_connections},
20             async_io => $args{async_io},
21             policy => $args{load_balancing_policy},
22              
23 0           shutdown => 0,
24             pool => {},
25             count => 0,
26             list => [],
27              
28             last_id => 0,
29             id2ip => {},
30              
31             i => 0,
32              
33             connecting => {},
34             wait_connect => [],
35             }, $class;
36 0           weaken($self->{client});
37 0           $self->{network_status}= Cassandra::Client::NetworkStatus->new(pool => $self, async_io => $args{async_io});
38 0           return $self;
39             }
40              
41             sub init {
42 0     0 0   my ($self, $callback, $first_connection)= @_;
43              
44             # This code can be called twice.
45              
46             # If we didn't have a datacenter pinned before, now we do
47 0   0       $self->{policy}{datacenter} ||= $first_connection->{datacenter};
48              
49 0           $self->add($first_connection);
50 0           $self->{policy}->set_connected($first_connection->ip_address);
51              
52             # Master selection, warmup, etc
53             series([
54             sub {
55 0     0     my ($next)= @_;
56 0           $self->{network_status}->init($next);
57             },
58             sub {
59 0     0     my ($next)= @_;
60              
61 0 0         if ($self->{config}{warmup}) {
62 0           $self->connect_if_needed($next);
63             } else {
64 0           $self->connect_if_needed();
65 0           return $next->();
66             }
67             },
68 0           ], $callback);
69             }
70              
71             sub get_one {
72 0     0 0   my ($self)= @_;
73 0 0         return undef unless $self->{count};
74              
75             # Round-robin: pick the next one
76 0           return $self->{list}[$self->{i}= (($self->{i}+1) % $self->{count})];
77             }
78              
79             sub get_one_cb {
80 0     0 0   my ($self, $callback)= @_;
81              
82 0 0         return $callback->(undef, $self->get_one) if $self->{count};
83              
84 0 0         if (!%{$self->{connecting}}) {
  0            
85 0           $self->connect_if_needed;
86             }
87 0 0         if (!%{$self->{connecting}}) {
  0            
88 0           return $callback->("Disconnected: all servers unreachable");
89             }
90              
91 0   0       push @{$self->{wait_connect} ||= []}, {
  0            
92             callback => $callback,
93             attempts => 0,
94             };
95             }
96              
97             sub remove {
98 0     0 0   my ($self, $id)= @_;
99 0 0         if (!$id) {
100             # Probably never got added. Ignore.
101 0           return;
102             }
103              
104 0           my $ipaddress= delete $self->{id2ip}{$id};
105 0 0         if (!$ipaddress) {
106 0           warn 'BUG: Tried to remove an unregistered connection. Probably a bad idea.';
107 0           return;
108             }
109              
110 0           my $connection= delete $self->{pool}{$ipaddress};
111 0 0         if (!$connection) {
112 0           warn 'BUG: Found a registered but unknown connection. This should not happen.';
113 0           return;
114             }
115              
116 0           $self->rebuild;
117              
118 0           $self->{policy}->set_disconnected($ipaddress);
119 0           $self->{network_status}->disconnected($connection->get_pool_id);
120 0           $self->connect_if_needed;
121              
122 0           return;
123             }
124              
125             sub add {
126 0     0 0   my ($self, $connection)= @_;
127              
128 0           my $ipaddress= $connection->ip_address;
129              
130 0 0         if ($self->{pool}{$ipaddress}) {
131 0           warn 'BUG: Duplicate connection for '.$ipaddress.'!';
132             }
133              
134 0           my $id= (++($self->{last_id}));
135 0           $connection->set_pool_id($id);
136 0           $self->{pool}{$ipaddress}= $connection;
137 0           $self->{id2ip}{$id}= $ipaddress;
138              
139 0           $self->rebuild;
140              
141 0           my $waiters= delete $self->{wait_connect};
142 0           $_->{callback}->(undef, $connection) for @$waiters;
143              
144 0     0     $self->{network_status}->select_master(sub{});
145              
146 0           return;
147             }
148              
149             sub rebuild {
150 0     0 0   my ($self)= @_;
151              
152 0           $self->{list}= [ values %{$self->{pool}} ];
  0            
153 0           $self->{count}= 0+ @{$self->{list}};
  0            
154              
155 0           return;
156             }
157              
158             sub shutdown {
159 0     0 0   my ($self)= @_;
160              
161 0           $self->{network_status}->shutdown;
162 0           $self->{shutdown}= 1;
163              
164 0           my @pool= @{$self->{list}};
  0            
165 0           $_->shutdown("Shutting down") for @pool;
166              
167 0           my @connecting= values %{$self->{connecting}};
  0            
168 0           $_->shutdown("Shutting down") for @connecting;
169              
170 0           return;
171             }
172              
173             sub connect_if_needed {
174 0     0 0   my ($self, $callback)= @_;
175              
176 0           my $max_connect= $self->{max_connections} - $self->{count};
177 0 0         return if $max_connect <= 0;
178              
179 0           $max_connect -= keys %{$self->{connecting}};
  0            
180 0 0         return if $max_connect <= 0;
181              
182 0 0         return if $self->{shutdown};
183              
184 0 0         if ($self->{_in_connect}) {
185 0           return;
186             }
187 0           local $self->{_in_connect}= 1;
188              
189 0           my $done= 0;
190 0           my $expect= $max_connect;
191 0           for (1..$max_connect) {
192             $expect-- unless $self->spawn_new_connection(sub {
193 0     0     $done++;
194              
195 0 0         if ($done == $expect) {
196 0 0         $callback->() if $callback;
197             }
198 0 0         });
199             }
200 0 0 0       if ($callback && !$expect) {
201 0           $callback->();
202             }
203             }
204              
205             sub spawn_new_connection {
206 0     0 0   my ($self, $callback)= @_;
207              
208 0           my $host= $self->{policy}->get_next_candidate;
209 0 0         return unless $host;
210              
211             my $connection= Cassandra::Client::Connection->new(
212             client => $self->{client},
213             options => $self->{options},
214             host => $host,
215             async_io => $self->{async_io},
216             metadata => $self->{metadata},
217 0           );
218              
219 0           $self->{connecting}{$host}= $connection;
220 0           $self->{policy}->set_connected($host);
221              
222             $connection->connect(sub {
223 0     0     my ($error)= @_;
224              
225 0           delete $self->{connecting}{$host};
226 0 0         if ($error) {
227 0           $self->{policy}->set_disconnected($host);
228              
229 0 0         if (my $waiters= delete $self->{wait_connect}) {
230 0 0 0       if ($self->{count} && @$waiters) {
231 0           warn 'We have callbacks waiting for a connection while we\'re connected';
232             }
233              
234 0           my $max_conn= $self->{max_connections};
235 0           my $known_node_count= $self->{policy}->known_node_count;
236 0 0         my $max_attempts = ($max_conn < $known_node_count ? $max_conn : $known_node_count) + 1;
237              
238 0           for my $waiter (@$waiters) {
239 0 0 0       if ((++$waiter->{attempts}) >= $max_attempts || !%{$self->{connecting}}) {
  0            
240 0           $waiter->{callback}->("Failed to connect to server: $error");
241             } else {
242 0   0       push @{$self->{wait_connect} ||= []}, $waiter;
  0            
243             }
244             }
245             }
246              
247 0           $self->connect_if_needed;
248             } else {
249 0           $self->add($connection);
250             }
251              
252 0           $callback->($error);
253 0           });
254              
255 0           return 1;
256             }
257              
258             # Events coming from the network
259             sub event_added_node {
260 0     0 0   my ($self, $ipaddress)= @_;
261 0           $self->{network_status}->event_added_node($ipaddress);
262             }
263              
264             sub event_removed_node {
265 0     0 0   my ($self, $ipaddress)= @_;
266 0           $self->{network_status}->event_removed_node($ipaddress);
267              
268 0 0         if (my $conn= $self->{pool}{$ipaddress}) {
269 0           $conn->shutdown("Removed from pool");
270             }
271             }
272              
273             # Events coming from network_status
274             sub on_new_node {
275 0     0 0   my ($self, $node)= @_;
276 0           $self->{policy}->on_new_node($node);
277             }
278              
279             sub on_removed_node {
280 0     0 0   my ($self, $node)= @_;
281 0           $self->{policy}->on_removed_node($node);
282             }
283              
284             1;
285              
286             __END__