File Coverage

blib/lib/Cassandra/Client/Pool.pm
Criterion Covered Total %
statement 17 151 11.2
branch 0 48 0.0
condition 0 16 0.0
subroutine 6 25 24.0
pod 0 14 0.0
total 23 254 9.0


line stmt bran cond sub pod time code
1             package Cassandra::Client::Pool;
2             our $AUTHORITY = 'cpan:TVDW';
3             $Cassandra::Client::Pool::VERSION = '0.13_007'; # TRIAL
4              
5 1     1   14 $Cassandra::Client::Pool::VERSION = '0.13007';use 5.010;
  1         3  
6 1     1   5 use strict;
  1         2  
  1         16  
7 1     1   4 use warnings;
  1         2  
  1         24  
8              
9 1     1   5 use Scalar::Util 'weaken';
  1         2  
  1         43  
10 1     1   5 use Cassandra::Client::Util;
  1         2  
  1         42  
11 1     1   273 use Cassandra::Client::NetworkStatus;
  1         35  
  1         1070  
12              
13             sub new {
14 0     0 0   my ($class, %args)= @_;
15             my $self= bless {
16             client => $args{client},
17             options => $args{options},
18             metadata => $args{metadata},
19             max_connections => $args{options}{max_connections},
20             async_io => $args{async_io},
21             policy => $args{load_balancing_policy},
22              
23 0           shutdown => 0,
24             pool => {},
25             count => 0,
26             list => [],
27              
28             last_id => 0,
29             id2ip => {},
30              
31             i => 0,
32              
33             connecting => {},
34             wait_connect => [],
35             }, $class;
36 0           weaken($self->{client});
37 0           $self->{network_status}= Cassandra::Client::NetworkStatus->new(pool => $self, async_io => $args{async_io});
38 0           return $self;
39             }
40              
41             sub init {
42 0     0 0   my ($self, $callback, $first_connection)= @_;
43              
44             # This code can be called twice.
45              
46             # If we didn't have a datacenter pinned before, now we do
47 0   0       $self->{policy}{datacenter} ||= $first_connection->{datacenter};
48              
49 0           $self->add($first_connection);
50 0           $self->{policy}->set_connecting($first_connection->ip_address);
51 0           $self->{policy}->set_connected($first_connection->ip_address);
52              
53             # Master selection, warmup, etc
54             series([
55             sub {
56 0     0     my ($next)= @_;
57 0           $self->{network_status}->init($next);
58             },
59             sub {
60 0     0     my ($next)= @_;
61              
62 0 0         if ($self->{config}{warmup}) {
63 0           $self->connect_if_needed($next);
64             } else {
65 0           $self->connect_if_needed();
66 0           return $next->();
67             }
68             },
69 0           ], $callback);
70             }
71              
72             sub get_one {
73 0     0 0   my ($self)= @_;
74 0 0         return undef unless $self->{count};
75              
76             # Round-robin: pick the next one
77 0           return $self->{list}[$self->{i}= (($self->{i}+1) % $self->{count})];
78             }
79              
80             sub get_one_cb {
81 0     0 0   my ($self, $callback)= @_;
82              
83 0 0         return $callback->(undef, $self->get_one) if $self->{count};
84              
85 0 0         if (!%{$self->{connecting}}) {
  0            
86 0           $self->connect_if_needed;
87             }
88 0 0         if (!%{$self->{connecting}}) {
  0            
89 0           return $callback->("Disconnected: all servers unreachable");
90             }
91              
92 0   0       push @{$self->{wait_connect} ||= []}, {
  0            
93             callback => $callback,
94             attempts => 0,
95             };
96             }
97              
98             sub remove {
99 0     0 0   my ($self, $id)= @_;
100 0 0         if (!$id) {
101             # Probably never got added. Ignore.
102 0           return;
103             }
104              
105 0           my $ipaddress= delete $self->{id2ip}{$id};
106 0 0         if (!$ipaddress) {
107 0           warn 'BUG: Tried to remove an unregistered connection. Probably a bad idea.';
108 0           return;
109             }
110              
111 0           my $connection= delete $self->{pool}{$ipaddress};
112 0 0         if (!$connection) {
113 0           warn 'BUG: Found a registered but unknown connection. This should not happen.';
114 0           return;
115             }
116              
117 0           $self->rebuild;
118              
119 0           $self->{policy}->set_disconnected($ipaddress);
120 0           $self->{network_status}->disconnected($connection->get_pool_id);
121 0           $self->connect_if_needed;
122              
123 0           return;
124             }
125              
126             sub add {
127 0     0 0   my ($self, $connection)= @_;
128              
129 0           my $ipaddress= $connection->ip_address;
130              
131 0 0         if ($self->{pool}{$ipaddress}) {
132 0           warn 'BUG: Duplicate connection for '.$ipaddress.'!';
133             }
134              
135 0           my $id= (++($self->{last_id}));
136 0           $connection->set_pool_id($id);
137 0           $self->{pool}{$ipaddress}= $connection;
138 0           $self->{id2ip}{$id}= $ipaddress;
139              
140 0           $self->rebuild;
141              
142 0           my $waiters= delete $self->{wait_connect};
143 0           $_->{callback}->(undef, $connection) for @$waiters;
144              
145 0     0     $self->{network_status}->select_master(sub{});
146              
147 0           return;
148             }
149              
150             sub rebuild {
151 0     0 0   my ($self)= @_;
152              
153 0           $self->{list}= [ values %{$self->{pool}} ];
  0            
154 0           $self->{count}= 0+ @{$self->{list}};
  0            
155              
156 0           return;
157             }
158              
159             sub shutdown {
160 0     0 0   my ($self)= @_;
161              
162 0           $self->{network_status}->shutdown;
163 0           $self->{shutdown}= 1;
164              
165 0           my @pool= @{$self->{list}};
  0            
166 0           $_->shutdown("Shutting down") for @pool;
167              
168 0           my @connecting= values %{$self->{connecting}};
  0            
169 0           $_->shutdown("Shutting down") for @connecting;
170              
171 0           return;
172             }
173              
174             sub connect_if_needed {
175 0     0 0   my ($self, $callback)= @_;
176              
177 0           my $max_connect= $self->{max_connections} - $self->{count};
178 0 0         return if $max_connect <= 0;
179              
180 0           $max_connect -= keys %{$self->{connecting}};
  0            
181 0 0         return if $max_connect <= 0;
182              
183 0 0         return if $self->{shutdown};
184              
185 0 0         if ($self->{_in_connect}) {
186 0           return;
187             }
188 0           local $self->{_in_connect}= 1;
189              
190 0           my $done= 0;
191 0           my $expect= $max_connect;
192 0           for (1..$max_connect) {
193             $expect-- unless $self->spawn_new_connection(sub {
194 0     0     $done++;
195              
196 0 0         if ($done == $expect) {
197 0 0         $callback->() if $callback;
198 0           undef $callback;
199             }
200 0 0         });
201             }
202 0 0 0       if ($callback && !$expect) {
203 0           $callback->();
204             }
205             }
206              
207             sub spawn_new_connection {
208 0     0 0   my ($self, $callback)= @_;
209              
210 0           my $host= $self->{policy}->get_next_candidate;
211 0 0         return unless $host;
212              
213             my $connection= Cassandra::Client::Connection->new(
214             client => $self->{client},
215             options => $self->{options},
216             host => $host,
217             async_io => $self->{async_io},
218             metadata => $self->{metadata},
219 0           );
220              
221 0           $self->{connecting}{$host}= $connection;
222 0           $self->{policy}->set_connecting($host);
223              
224             $connection->connect(sub {
225 0     0     my ($error)= @_;
226              
227 0           delete $self->{connecting}{$host};
228 0 0         if ($error) {
229 0           $self->{policy}->set_disconnected($host);
230              
231 0 0         if (my $waiters= delete $self->{wait_connect}) {
232 0 0 0       if ($self->{count} && @$waiters) {
233 0           warn 'We have callbacks waiting for a connection while we\'re connected';
234             }
235              
236 0           my $max_conn= $self->{max_connections};
237 0           my $known_node_count= $self->{policy}->known_node_count;
238 0 0         my $max_attempts = ($max_conn < $known_node_count ? $max_conn : $known_node_count) + 1;
239              
240 0           for my $waiter (@$waiters) {
241 0 0 0       if ((++$waiter->{attempts}) >= $max_attempts || !%{$self->{connecting}}) {
  0            
242 0           $waiter->{callback}->("Failed to connect to server: $error");
243             } else {
244 0   0       push @{$self->{wait_connect} ||= []}, $waiter;
  0            
245             }
246             }
247             }
248              
249 0           $self->connect_if_needed;
250             } else {
251 0           $self->{policy}->set_connected($host);
252              
253 0           $self->add($connection);
254             }
255              
256 0           $callback->($error);
257 0           });
258              
259 0           return 1;
260             }
261              
262             # Events coming from the network
263             sub event_added_node {
264 0     0 0   my ($self, $ipaddress)= @_;
265 0           $self->{network_status}->event_added_node($ipaddress);
266             }
267              
268             sub event_removed_node {
269 0     0 0   my ($self, $ipaddress)= @_;
270 0           $self->{network_status}->event_removed_node($ipaddress);
271              
272 0 0         if (my $conn= $self->{pool}{$ipaddress}) {
273 0           $conn->shutdown("Removed from pool");
274             }
275             }
276              
277             # Events coming from network_status
278             sub on_new_node {
279 0     0 0   my ($self, $node)= @_;
280 0           $self->{policy}->on_new_node($node);
281             }
282              
283             sub on_removed_node {
284 0     0 0   my ($self, $node)= @_;
285 0           $self->{policy}->on_removed_node($node);
286             }
287              
288             1;
289              
290             __END__