| line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
|
1
|
|
|
|
|
|
|
package Cassandra::Client::Connection; |
|
2
|
|
|
|
|
|
|
our $AUTHORITY = 'cpan:TVDW'; |
|
3
|
|
|
|
|
|
|
$Cassandra::Client::Connection::VERSION = '0.13_006'; # TRIAL |
|
4
|
|
|
|
|
|
|
|
|
5
|
1
|
|
|
1
|
|
21
|
$Cassandra::Client::Connection::VERSION = '0.13006';use 5.010; |
|
|
1
|
|
|
|
|
5
|
|
|
6
|
1
|
|
|
1
|
|
8
|
use strict; |
|
|
1
|
|
|
|
|
3
|
|
|
|
1
|
|
|
|
|
31
|
|
|
7
|
1
|
|
|
1
|
|
8
|
use warnings; |
|
|
1
|
|
|
|
|
4
|
|
|
|
1
|
|
|
|
|
40
|
|
|
8
|
1
|
|
|
1
|
|
8
|
use vars qw/$BUFFER/; |
|
|
1
|
|
|
|
|
3
|
|
|
|
1
|
|
|
|
|
61
|
|
|
9
|
|
|
|
|
|
|
|
|
10
|
1
|
|
|
1
|
|
8
|
use Ref::Util qw/is_blessed_ref is_plain_arrayref/; |
|
|
1
|
|
|
|
|
2
|
|
|
|
1
|
|
|
|
|
67
|
|
|
11
|
1
|
|
|
1
|
|
404
|
use IO::Socket::INET; |
|
|
1
|
|
|
|
|
18926
|
|
|
|
1
|
|
|
|
|
6
|
|
|
12
|
1
|
|
|
1
|
|
698
|
use IO::Socket::INET6; |
|
|
1
|
|
|
|
|
3517
|
|
|
|
1
|
|
|
|
|
6
|
|
|
13
|
1
|
|
|
1
|
|
648
|
use Errno qw/EAGAIN/; |
|
|
1
|
|
|
|
|
2
|
|
|
|
1
|
|
|
|
|
88
|
|
|
14
|
1
|
|
|
1
|
|
5
|
use Socket qw/SOL_SOCKET IPPROTO_TCP SO_KEEPALIVE TCP_NODELAY/; |
|
|
1
|
|
|
|
|
2
|
|
|
|
1
|
|
|
|
|
51
|
|
|
15
|
1
|
|
|
1
|
|
5
|
use Scalar::Util qw/weaken/; |
|
|
1
|
|
|
|
|
2
|
|
|
|
1
|
|
|
|
|
37
|
|
|
16
|
1
|
|
|
1
|
|
423
|
use Net::SSLeay qw/ERROR_WANT_READ ERROR_WANT_WRITE ERROR_NONE/; |
|
|
1
|
|
|
|
|
5561
|
|
|
|
1
|
|
|
|
|
343
|
|
|
17
|
|
|
|
|
|
|
|
|
18
|
1
|
|
|
1
|
|
306
|
use Cassandra::Client::Util; |
|
|
1
|
|
|
|
|
2
|
|
|
|
1
|
|
|
|
|
60
|
|
|
19
|
1
|
|
|
|
|
379
|
use Cassandra::Client::Protocol qw/ |
|
20
|
|
|
|
|
|
|
:constants |
|
21
|
|
|
|
|
|
|
%consistency_lookup |
|
22
|
|
|
|
|
|
|
%batch_type_lookup |
|
23
|
|
|
|
|
|
|
pack_bytes |
|
24
|
|
|
|
|
|
|
pack_longstring |
|
25
|
|
|
|
|
|
|
pack_queryparameters |
|
26
|
|
|
|
|
|
|
pack_shortbytes |
|
27
|
|
|
|
|
|
|
pack_stringmap |
|
28
|
|
|
|
|
|
|
pack_stringlist |
|
29
|
|
|
|
|
|
|
unpack_errordata |
|
30
|
|
|
|
|
|
|
unpack_inet |
|
31
|
|
|
|
|
|
|
unpack_int |
|
32
|
|
|
|
|
|
|
unpack_metadata |
|
33
|
|
|
|
|
|
|
unpack_shortbytes |
|
34
|
|
|
|
|
|
|
unpack_string |
|
35
|
|
|
|
|
|
|
unpack_stringmultimap |
|
36
|
1
|
|
|
1
|
|
278
|
/; |
|
|
1
|
|
|
|
|
2
|
|
|
37
|
1
|
|
|
1
|
|
7
|
use Cassandra::Client::Error::Base; |
|
|
1
|
|
|
|
|
2
|
|
|
|
1
|
|
|
|
|
18
|
|
|
38
|
1
|
|
|
1
|
|
292
|
use Cassandra::Client::ResultSet; |
|
|
1
|
|
|
|
|
2
|
|
|
|
1
|
|
|
|
|
25
|
|
|
39
|
1
|
|
|
1
|
|
251
|
use Cassandra::Client::TLSHandling; |
|
|
1
|
|
|
|
|
4
|
|
|
|
1
|
|
|
|
|
48
|
|
|
40
|
|
|
|
|
|
|
|
|
41
|
1
|
|
|
1
|
|
7
|
use constant STREAM_ID_LIMIT => 32768; |
|
|
1
|
|
|
|
|
2
|
|
|
|
1
|
|
|
|
|
4535
|
|
|
42
|
|
|
|
|
|
|
|
|
43
|
|
|
|
|
|
|
# Populated at BEGIN{} time |
|
44
|
|
|
|
|
|
|
my @compression_preference; |
|
45
|
|
|
|
|
|
|
my %available_compression; |
|
46
|
|
|
|
|
|
|
|
|
47
|
|
|
|
|
|
|
sub new { |
|
48
|
0
|
|
|
0
|
0
|
|
my ($class, %args)= @_; |
|
49
|
|
|
|
|
|
|
|
|
50
|
|
|
|
|
|
|
my $self= bless { |
|
51
|
|
|
|
|
|
|
client => $args{client}, |
|
52
|
|
|
|
|
|
|
async_io => $args{async_io}, |
|
53
|
|
|
|
|
|
|
pool_id => undef, |
|
54
|
|
|
|
|
|
|
|
|
55
|
|
|
|
|
|
|
options => $args{options}, |
|
56
|
|
|
|
|
|
|
request_timeout => $args{options}{request_timeout}, |
|
57
|
|
|
|
|
|
|
host => $args{host}, |
|
58
|
|
|
|
|
|
|
metadata => $args{metadata}, |
|
59
|
|
|
|
|
|
|
prepare_cache => $args{metadata}->prepare_cache, |
|
60
|
0
|
|
|
|
|
|
last_stream_id => 0, |
|
61
|
|
|
|
|
|
|
pending_streams => {}, |
|
62
|
|
|
|
|
|
|
in_prepare => {}, |
|
63
|
|
|
|
|
|
|
|
|
64
|
|
|
|
|
|
|
decompress_func => undef, |
|
65
|
|
|
|
|
|
|
compress_func => undef, |
|
66
|
|
|
|
|
|
|
connected => 0, |
|
67
|
|
|
|
|
|
|
connecting => undef, |
|
68
|
|
|
|
|
|
|
socket => undef, |
|
69
|
|
|
|
|
|
|
fileno => undef, |
|
70
|
|
|
|
|
|
|
pending_write => undef, |
|
71
|
|
|
|
|
|
|
shutdown => 0, |
|
72
|
|
|
|
|
|
|
read_buffer => \(my $empty= ''), |
|
73
|
|
|
|
|
|
|
|
|
74
|
|
|
|
|
|
|
tls => undef, |
|
75
|
|
|
|
|
|
|
tls_want_write => undef, |
|
76
|
|
|
|
|
|
|
}, $class; |
|
77
|
0
|
|
|
|
|
|
weaken($self->{async_io}); |
|
78
|
0
|
|
|
|
|
|
weaken($self->{client}); |
|
79
|
0
|
|
|
|
|
|
return $self; |
|
80
|
|
|
|
|
|
|
} |
|
81
|
|
|
|
|
|
|
|
|
82
|
|
|
|
|
|
|
sub get_local_status { |
|
83
|
0
|
|
|
0
|
0
|
|
my ($self, $callback)= @_; |
|
84
|
|
|
|
|
|
|
|
|
85
|
|
|
|
|
|
|
series([ |
|
86
|
|
|
|
|
|
|
sub { |
|
87
|
0
|
|
|
0
|
|
|
my ($next)= @_; |
|
88
|
0
|
|
|
|
|
|
$self->execute_prepared($next, \"select key, data_center, host_id, broadcast_address, rack, release_version, tokens, schema_version from system.local"); |
|
89
|
|
|
|
|
|
|
}, |
|
90
|
|
|
|
|
|
|
sub { |
|
91
|
0
|
|
|
0
|
|
|
my ($next, $result)= @_; |
|
92
|
|
|
|
|
|
|
|
|
93
|
0
|
|
|
|
|
|
my %local_status= map { $_->[3] => { |
|
94
|
|
|
|
|
|
|
peer => $_->[3], |
|
95
|
|
|
|
|
|
|
data_center => $_->[1], |
|
96
|
|
|
|
|
|
|
host_id => $_->[2], |
|
97
|
|
|
|
|
|
|
preferred_ip => $_->[3], |
|
98
|
|
|
|
|
|
|
rack => $_->[4], |
|
99
|
|
|
|
|
|
|
release_version => $_->[5], |
|
100
|
|
|
|
|
|
|
tokens => $_->[6], |
|
101
|
|
|
|
|
|
|
schema_version => $_->[7], |
|
102
|
0
|
|
|
|
|
|
} } @{$result->rows}; |
|
|
0
|
|
|
|
|
|
|
|
103
|
|
|
|
|
|
|
|
|
104
|
0
|
|
|
|
|
|
$next->(undef, \%local_status); |
|
105
|
|
|
|
|
|
|
}, |
|
106
|
0
|
|
|
|
|
|
], $callback); |
|
107
|
|
|
|
|
|
|
|
|
108
|
0
|
|
|
|
|
|
return; |
|
109
|
|
|
|
|
|
|
} |
|
110
|
|
|
|
|
|
|
|
|
111
|
|
|
|
|
|
|
sub get_peers_status { |
|
112
|
0
|
|
|
0
|
0
|
|
my ($self, $callback)= @_; |
|
113
|
|
|
|
|
|
|
|
|
114
|
|
|
|
|
|
|
series([ |
|
115
|
|
|
|
|
|
|
sub { |
|
116
|
0
|
|
|
0
|
|
|
my ($next)= @_; |
|
117
|
0
|
|
|
|
|
|
$self->execute_prepared($next, \"select peer, data_center, host_id, preferred_ip, rack, release_version, tokens, schema_version from system.peers"); |
|
118
|
|
|
|
|
|
|
}, |
|
119
|
|
|
|
|
|
|
sub { |
|
120
|
0
|
|
|
0
|
|
|
my ($next, $result)= @_; |
|
121
|
|
|
|
|
|
|
|
|
122
|
0
|
|
|
|
|
|
my %network_status= map { $_->[0] => { |
|
123
|
|
|
|
|
|
|
peer => $_->[0], |
|
124
|
|
|
|
|
|
|
data_center => $_->[1], |
|
125
|
|
|
|
|
|
|
host_id => $_->[2], |
|
126
|
|
|
|
|
|
|
preferred_ip => $_->[3], |
|
127
|
|
|
|
|
|
|
rack => $_->[4], |
|
128
|
|
|
|
|
|
|
release_version => $_->[5], |
|
129
|
|
|
|
|
|
|
tokens => $_->[6], |
|
130
|
|
|
|
|
|
|
schema_version => $_->[7], |
|
131
|
0
|
|
|
|
|
|
} } @{$result->rows}; |
|
|
0
|
|
|
|
|
|
|
|
132
|
|
|
|
|
|
|
|
|
133
|
0
|
|
|
|
|
|
$next->(undef, \%network_status); |
|
134
|
|
|
|
|
|
|
}, |
|
135
|
0
|
|
|
|
|
|
], $callback); |
|
136
|
|
|
|
|
|
|
|
|
137
|
0
|
|
|
|
|
|
return; |
|
138
|
|
|
|
|
|
|
} |
|
139
|
|
|
|
|
|
|
|
|
140
|
|
|
|
|
|
|
sub get_network_status { |
|
141
|
0
|
|
|
0
|
0
|
|
my ($self, $callback)= @_; |
|
142
|
|
|
|
|
|
|
|
|
143
|
|
|
|
|
|
|
parallel([ |
|
144
|
|
|
|
|
|
|
sub { |
|
145
|
0
|
|
|
0
|
|
|
my ($next)= @_; |
|
146
|
0
|
|
|
|
|
|
$self->get_peers_status($next); |
|
147
|
|
|
|
|
|
|
}, |
|
148
|
|
|
|
|
|
|
sub { |
|
149
|
0
|
|
|
0
|
|
|
my ($next)= @_; |
|
150
|
0
|
|
|
|
|
|
$self->get_local_status($next); |
|
151
|
|
|
|
|
|
|
}, |
|
152
|
|
|
|
|
|
|
], sub { |
|
153
|
0
|
|
|
0
|
|
|
my ($error, $peers, $local)= @_; |
|
154
|
0
|
0
|
|
|
|
|
if ($error) { return $callback->($error); } |
|
|
0
|
|
|
|
|
|
|
|
155
|
0
|
|
|
|
|
|
return $callback->(undef, { %$peers, %$local }); |
|
156
|
0
|
|
|
|
|
|
}); |
|
157
|
|
|
|
|
|
|
} |
|
158
|
|
|
|
|
|
|
|
|
159
|
|
|
|
|
|
|
sub register_events { |
|
160
|
0
|
|
|
0
|
0
|
|
my ($self, $callback)= @_; |
|
161
|
|
|
|
|
|
|
|
|
162
|
0
|
|
|
|
|
|
$self->request($callback, OPCODE_REGISTER, pack_stringlist([ |
|
163
|
|
|
|
|
|
|
'TOPOLOGY_CHANGE', |
|
164
|
|
|
|
|
|
|
'STATUS_CHANGE', |
|
165
|
|
|
|
|
|
|
])); |
|
166
|
|
|
|
|
|
|
|
|
167
|
0
|
|
|
|
|
|
return; |
|
168
|
|
|
|
|
|
|
} |
|
169
|
|
|
|
|
|
|
|
|
170
|
|
|
|
|
|
|
|
|
171
|
|
|
|
|
|
|
###### QUERY CODE |
|
172
|
|
|
|
|
|
|
sub execute_prepared { |
|
173
|
0
|
|
|
0
|
0
|
|
my ($self, $callback, $queryref, $parameters, $attr, $exec_info)= @_; |
|
174
|
|
|
|
|
|
|
|
|
175
|
|
|
|
|
|
|
# Note: parameters is retained until the query is complete. It must not be changed; clone if needed. |
|
176
|
|
|
|
|
|
|
# Same for attr. Note that external callers automatically have their arguments cloned. |
|
177
|
|
|
|
|
|
|
|
|
178
|
0
|
0
|
|
|
|
|
my $prepared= $self->{prepare_cache}{$$queryref} or do { |
|
179
|
0
|
|
|
|
|
|
return $self->prepare_and_try_execute_again($callback, $queryref, $parameters, $attr, $exec_info); |
|
180
|
|
|
|
|
|
|
}; |
|
181
|
|
|
|
|
|
|
|
|
182
|
0
|
|
|
|
|
|
my $want_result_metadata= !$prepared->{decoder}; |
|
183
|
0
|
|
|
|
|
|
my $row; |
|
184
|
0
|
0
|
|
|
|
|
if ($parameters) { |
|
185
|
|
|
|
|
|
|
eval { |
|
186
|
0
|
|
|
|
|
|
$row= $prepared->{encoder}->encode($parameters); |
|
187
|
0
|
|
|
|
|
|
1; |
|
188
|
0
|
0
|
|
|
|
|
} or do { |
|
189
|
0
|
|
0
|
|
|
|
my $error= $@ || "??"; |
|
190
|
0
|
|
|
|
|
|
return $callback->("Failed to encode row to native protocol: $error"); |
|
191
|
|
|
|
|
|
|
}; |
|
192
|
|
|
|
|
|
|
} |
|
193
|
|
|
|
|
|
|
|
|
194
|
0
|
|
0
|
|
|
|
my $consistency= $consistency_lookup{$attr->{consistency} || 'one'}; |
|
195
|
0
|
0
|
|
|
|
|
if (!defined $consistency) { |
|
196
|
0
|
|
|
|
|
|
return $callback->("Invalid consistency level specified: $attr->{consistency}"); |
|
197
|
|
|
|
|
|
|
} |
|
198
|
|
|
|
|
|
|
|
|
199
|
0
|
|
0
|
|
|
|
my $page_size= (0+($attr->{page_size} || $self->{options}{max_page_size} || 0)) || undef; |
|
200
|
0
|
|
0
|
|
|
|
my $paging_state= $attr->{page} || undef; |
|
201
|
0
|
|
|
|
|
|
my $execute_body= pack_shortbytes($prepared->{id}).pack_queryparameters($consistency, !$want_result_metadata, $page_size, $paging_state, undef, $row); |
|
202
|
|
|
|
|
|
|
|
|
203
|
|
|
|
|
|
|
my $on_completion= sub { |
|
204
|
|
|
|
|
|
|
# my ($body)= $_[2]; (not copying, because performance. assuming ownership) |
|
205
|
0
|
|
|
0
|
|
|
my ($err, $code)= @_; |
|
206
|
|
|
|
|
|
|
|
|
207
|
0
|
0
|
|
|
|
|
if ($err) { |
|
208
|
0
|
0
|
0
|
|
|
|
if (is_blessed_ref($err) && $err->code == 0x2500) { |
|
209
|
0
|
|
|
|
|
|
return $self->prepare_and_try_execute_again($callback, $queryref, $parameters, $attr, $exec_info); |
|
210
|
|
|
|
|
|
|
} |
|
211
|
0
|
|
|
|
|
|
return $callback->($err); |
|
212
|
|
|
|
|
|
|
} |
|
213
|
|
|
|
|
|
|
|
|
214
|
0
|
0
|
|
|
|
|
if ($code != OPCODE_RESULT) { |
|
215
|
|
|
|
|
|
|
# This shouldn't ever happen... |
|
216
|
0
|
|
|
|
|
|
return $callback->(Cassandra::Client::Error::Base->new( |
|
217
|
|
|
|
|
|
|
message => "Expected a RESULT frame but got something else; considering the query failed", |
|
218
|
|
|
|
|
|
|
request_error => 1, |
|
219
|
|
|
|
|
|
|
)); |
|
220
|
|
|
|
|
|
|
} |
|
221
|
|
|
|
|
|
|
|
|
222
|
0
|
|
|
|
|
|
$self->decode_result($callback, $prepared, $_[2]); |
|
223
|
0
|
|
|
|
|
|
}; |
|
224
|
|
|
|
|
|
|
|
|
225
|
0
|
|
|
|
|
|
$self->request($on_completion, OPCODE_EXECUTE, $execute_body); |
|
226
|
|
|
|
|
|
|
|
|
227
|
0
|
|
|
|
|
|
return; |
|
228
|
|
|
|
|
|
|
} |
|
229
|
|
|
|
|
|
|
|
|
230
|
|
|
|
|
|
|
sub prepare_and_try_execute_again { |
|
231
|
0
|
|
|
0
|
0
|
|
my ($self, $callback, $queryref, $parameters, $attr, $exec_info)= @_; |
|
232
|
|
|
|
|
|
|
|
|
233
|
0
|
0
|
|
|
|
|
if ($exec_info->{_prepared_and_tried_again}++) { |
|
234
|
0
|
|
|
|
|
|
return $callback->("Query failed because it seems to be missing from the server's prepared statement cache"); |
|
235
|
|
|
|
|
|
|
} |
|
236
|
|
|
|
|
|
|
|
|
237
|
|
|
|
|
|
|
series([ |
|
238
|
|
|
|
|
|
|
sub { |
|
239
|
0
|
|
|
0
|
|
|
my ($next)= @_; |
|
240
|
0
|
|
|
|
|
|
$self->prepare($next, $$queryref); |
|
241
|
|
|
|
|
|
|
}, |
|
242
|
|
|
|
|
|
|
], sub { |
|
243
|
0
|
0
|
|
0
|
|
|
return $callback->($_[0]) if $_[0]; |
|
244
|
|
|
|
|
|
|
|
|
245
|
0
|
0
|
|
|
|
|
unless ($self->{prepare_cache}{$$queryref}) { |
|
246
|
|
|
|
|
|
|
# We're recursing, so let's make sure we avoid the infinite loop |
|
247
|
0
|
|
|
|
|
|
return $callback->("Internal error: expected query to be prepared but it was not"); |
|
248
|
|
|
|
|
|
|
} |
|
249
|
|
|
|
|
|
|
|
|
250
|
0
|
|
|
|
|
|
return $self->execute_prepared($callback, $queryref, $parameters, $attr, $exec_info); |
|
251
|
0
|
|
|
|
|
|
}); |
|
252
|
0
|
|
|
|
|
|
return; |
|
253
|
|
|
|
|
|
|
} |
|
254
|
|
|
|
|
|
|
|
|
255
|
|
|
|
|
|
|
sub execute_batch { |
|
256
|
0
|
|
|
0
|
0
|
|
my ($self, $callback, $queries, $attribs, $exec_info)= @_; |
|
257
|
|
|
|
|
|
|
# Like execute_prepared, assumes ownership of $queries and $attribs |
|
258
|
|
|
|
|
|
|
|
|
259
|
0
|
0
|
|
|
|
|
if (!is_plain_arrayref($queries)) { |
|
260
|
0
|
|
|
|
|
|
return $callback->("execute_batch: queries argument must be an array of arrays"); |
|
261
|
|
|
|
|
|
|
} |
|
262
|
|
|
|
|
|
|
|
|
263
|
0
|
|
|
|
|
|
my @prepared; |
|
264
|
0
|
|
|
|
|
|
for my $query (@$queries) { |
|
265
|
0
|
0
|
|
|
|
|
if (!is_plain_arrayref($query)) { |
|
266
|
0
|
|
|
|
|
|
return $callback->("execute_batch: entries in query argument must be arrayrefs"); |
|
267
|
|
|
|
|
|
|
} |
|
268
|
0
|
0
|
|
|
|
|
if (!$query->[0]) { |
|
269
|
0
|
|
|
|
|
|
return $callback->("Empty or no query given, cannot execute as part of a batch"); |
|
270
|
|
|
|
|
|
|
} |
|
271
|
0
|
0
|
0
|
|
|
|
if ($query->[1] && !is_plain_arrayref($query->[1])) { |
|
272
|
0
|
|
|
|
|
|
return $callback->("Query parameters to batch() must be given as an arrayref"); |
|
273
|
|
|
|
|
|
|
} |
|
274
|
|
|
|
|
|
|
|
|
275
|
0
|
0
|
|
|
|
|
if (my $prep= $self->{prepare_cache}{$query->[0]}) { |
|
276
|
0
|
|
|
|
|
|
push @prepared, [ $prep, $query->[1] ]; |
|
277
|
|
|
|
|
|
|
|
|
278
|
|
|
|
|
|
|
} else { |
|
279
|
0
|
|
|
|
|
|
return $self->prepare_and_try_batch_again($callback, $queries, $attribs, $exec_info); |
|
280
|
|
|
|
|
|
|
} |
|
281
|
|
|
|
|
|
|
} |
|
282
|
|
|
|
|
|
|
|
|
283
|
0
|
|
|
|
|
|
my $batch_type= 0; |
|
284
|
0
|
0
|
|
|
|
|
if ($attribs->{batch_type}) { |
|
285
|
0
|
|
|
|
|
|
$batch_type= $batch_type_lookup{$attribs->{batch_type}}; |
|
286
|
0
|
0
|
|
|
|
|
if (!defined $batch_type) { |
|
287
|
0
|
|
|
|
|
|
return $callback->("Unknown batch_type: <$attribs->{batch_type}>"); |
|
288
|
|
|
|
|
|
|
} |
|
289
|
|
|
|
|
|
|
} |
|
290
|
|
|
|
|
|
|
|
|
291
|
0
|
|
0
|
|
|
|
my $consistency= $consistency_lookup{$attribs->{consistency} || 'one'}; |
|
292
|
0
|
0
|
|
|
|
|
if (!defined $consistency) { |
|
293
|
0
|
|
|
|
|
|
return $callback->("Invalid consistency level specified: $attribs->{consistency}"); |
|
294
|
|
|
|
|
|
|
} |
|
295
|
|
|
|
|
|
|
|
|
296
|
0
|
|
|
|
|
|
my $batch_frame= pack('Cn', $batch_type, (0+@prepared)); |
|
297
|
0
|
|
|
|
|
|
for my $prep (@prepared) { |
|
298
|
0
|
|
|
|
|
|
$batch_frame .= pack('C', 1).pack_shortbytes($prep->[0]{id}).$prep->[0]{encoder}->encode($prep->[1]); |
|
299
|
|
|
|
|
|
|
} |
|
300
|
0
|
|
|
|
|
|
$batch_frame .= pack('nC', $consistency, 0); |
|
301
|
|
|
|
|
|
|
|
|
302
|
|
|
|
|
|
|
my $on_completion= sub { |
|
303
|
|
|
|
|
|
|
# my ($body)= $_[2]; (not copying, because performance. assuming ownership) |
|
304
|
0
|
|
|
0
|
|
|
my ($err, $code)= @_; |
|
305
|
|
|
|
|
|
|
|
|
306
|
0
|
0
|
|
|
|
|
if ($err) { |
|
307
|
0
|
0
|
0
|
|
|
|
if (is_blessed_ref($err) && $err->code == 0x2500) { |
|
308
|
0
|
|
|
|
|
|
return $self->prepare_and_try_batch_again($callback, $queries, $attribs, $exec_info); |
|
309
|
|
|
|
|
|
|
} |
|
310
|
0
|
|
|
|
|
|
return $callback->($err); |
|
311
|
|
|
|
|
|
|
} |
|
312
|
|
|
|
|
|
|
|
|
313
|
0
|
0
|
|
|
|
|
if ($code != OPCODE_RESULT) { |
|
314
|
|
|
|
|
|
|
# This shouldn't ever happen... |
|
315
|
0
|
|
|
|
|
|
return $callback->(Cassandra::Client::Error::Base->new( |
|
316
|
|
|
|
|
|
|
message => "Expected a RESULT frame but got something else; considering the batch failed", |
|
317
|
|
|
|
|
|
|
request_error => 1, |
|
318
|
|
|
|
|
|
|
)); |
|
319
|
|
|
|
|
|
|
} |
|
320
|
|
|
|
|
|
|
|
|
321
|
0
|
|
|
|
|
|
$self->decode_result($callback, undef, $_[2]); |
|
322
|
0
|
|
|
|
|
|
}; |
|
323
|
|
|
|
|
|
|
|
|
324
|
0
|
|
|
|
|
|
$self->request($on_completion, OPCODE_BATCH, $batch_frame); |
|
325
|
|
|
|
|
|
|
|
|
326
|
0
|
|
|
|
|
|
return; |
|
327
|
|
|
|
|
|
|
} |
|
328
|
|
|
|
|
|
|
|
|
329
|
|
|
|
|
|
|
sub prepare_and_try_batch_again { |
|
330
|
0
|
|
|
0
|
0
|
|
my ($self, $callback, $queries, $attribs, $exec_info)= @_; |
|
331
|
|
|
|
|
|
|
|
|
332
|
0
|
0
|
|
|
|
|
if ($exec_info->{_prepared_and_tried_again}++) { |
|
333
|
0
|
|
|
|
|
|
return $callback->("Batch failed because one or more queries seem to be missing from the server's prepared statement cache"); |
|
334
|
|
|
|
|
|
|
} |
|
335
|
|
|
|
|
|
|
|
|
336
|
0
|
|
|
|
|
|
my %to_be_prepared; |
|
337
|
0
|
|
|
|
|
|
$to_be_prepared{$_->[0]}= 1 for @$queries; |
|
338
|
|
|
|
|
|
|
|
|
339
|
|
|
|
|
|
|
parallel([ |
|
340
|
0
|
|
|
|
|
|
map { my $query= $_; sub { |
|
341
|
0
|
|
|
0
|
|
|
my ($next)= @_; |
|
342
|
0
|
|
|
|
|
|
$self->prepare($next, $query); |
|
343
|
0
|
|
|
|
|
|
} } keys %to_be_prepared |
|
344
|
|
|
|
|
|
|
], sub { |
|
345
|
0
|
0
|
|
0
|
|
|
return $callback->($_[0]) if $_[0]; |
|
346
|
|
|
|
|
|
|
|
|
347
|
0
|
|
|
|
|
|
return $self->execute_batch($callback, $queries, $attribs, $exec_info); |
|
348
|
0
|
|
|
|
|
|
}); |
|
349
|
0
|
|
|
|
|
|
return; |
|
350
|
|
|
|
|
|
|
} |
|
351
|
|
|
|
|
|
|
|
|
352
|
|
|
|
|
|
|
sub prepare { |
|
353
|
0
|
|
|
0
|
0
|
|
my ($self, $callback, $query)= @_; |
|
354
|
|
|
|
|
|
|
|
|
355
|
0
|
0
|
|
|
|
|
if (exists $self->{in_prepare}{$query}) { |
|
356
|
0
|
|
|
|
|
|
push @{$self->{in_prepare}{$query}}, $callback; |
|
|
0
|
|
|
|
|
|
|
|
357
|
0
|
|
|
|
|
|
return; |
|
358
|
|
|
|
|
|
|
} |
|
359
|
|
|
|
|
|
|
|
|
360
|
0
|
|
|
|
|
|
$self->{in_prepare}{$query}= [ $callback ]; |
|
361
|
|
|
|
|
|
|
|
|
362
|
|
|
|
|
|
|
series([ |
|
363
|
|
|
|
|
|
|
sub { |
|
364
|
0
|
|
|
0
|
|
|
my ($next)= @_; |
|
365
|
0
|
|
|
|
|
|
my $req= pack_longstring($query); |
|
366
|
0
|
|
|
|
|
|
$self->request($next, OPCODE_PREPARE, $req); |
|
367
|
|
|
|
|
|
|
}, |
|
368
|
|
|
|
|
|
|
sub { |
|
369
|
0
|
|
|
0
|
|
|
my ($next, $code, $body)= @_; |
|
370
|
0
|
0
|
|
|
|
|
if ($code != OPCODE_RESULT) { |
|
371
|
0
|
|
|
|
|
|
return $next->("Got unexpected failure while trying to prepare"); |
|
372
|
|
|
|
|
|
|
} |
|
373
|
|
|
|
|
|
|
|
|
374
|
0
|
|
|
|
|
|
my $result_type= unpack_int($body); |
|
375
|
0
|
0
|
|
|
|
|
if ($result_type != RESULT_PREPARED) { |
|
376
|
0
|
|
|
|
|
|
return $next->("Unexpected response from server while preparing"); |
|
377
|
|
|
|
|
|
|
} |
|
378
|
|
|
|
|
|
|
|
|
379
|
0
|
|
|
|
|
|
my $id= unpack_shortbytes($body); |
|
380
|
|
|
|
|
|
|
|
|
381
|
0
|
|
|
|
|
|
my ($encoder, $decoder); |
|
382
|
0
|
0
|
|
|
|
|
eval { |
|
383
|
0
|
|
|
|
|
|
($encoder)= unpack_metadata($body); |
|
384
|
0
|
|
|
|
|
|
1; |
|
385
|
|
|
|
|
|
|
} or return $next->("Unable to unpack query metadata: $@"); |
|
386
|
0
|
0
|
|
|
|
|
eval { |
|
387
|
0
|
|
|
|
|
|
($decoder)= unpack_metadata($body); |
|
388
|
0
|
|
|
|
|
|
1; |
|
389
|
|
|
|
|
|
|
} or return $next->("Unable to unpack query result metadata: $@"); |
|
390
|
|
|
|
|
|
|
|
|
391
|
0
|
|
|
|
|
|
$self->{metadata}->add_prepared($query, $id, $decoder, $encoder); |
|
392
|
0
|
|
|
|
|
|
return $next->(); |
|
393
|
|
|
|
|
|
|
}, |
|
394
|
|
|
|
|
|
|
], sub { |
|
395
|
0
|
|
|
0
|
|
|
my $error= shift; |
|
396
|
0
|
0
|
|
|
|
|
my $in_prepare= delete($self->{in_prepare}{$query}) or die "BUG"; |
|
397
|
0
|
|
|
|
|
|
$_->($error) for @$in_prepare; |
|
398
|
0
|
|
|
|
|
|
}); |
|
399
|
|
|
|
|
|
|
|
|
400
|
0
|
|
|
|
|
|
return; |
|
401
|
|
|
|
|
|
|
} |
|
402
|
|
|
|
|
|
|
|
|
403
|
|
|
|
|
|
|
sub decode_result { |
|
404
|
0
|
|
|
0
|
0
|
|
my ($self, $callback, $prepared)= @_; # $_[3]=$body |
|
405
|
|
|
|
|
|
|
|
|
406
|
0
|
|
|
|
|
|
my $result_type= unpack('l>', substr($_[3], 0, 4, '')); |
|
407
|
0
|
0
|
|
|
|
|
if ($result_type == RESULT_ROWS) { # Rows |
|
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
408
|
0
|
|
|
|
|
|
my ($paging_state, $decoder); |
|
409
|
0
|
0
|
|
|
|
|
eval { ($decoder, $paging_state)= unpack_metadata($_[3]); 1 } or return $callback->("Unable to unpack query metadata: $@"); |
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
410
|
0
|
|
0
|
|
|
|
$decoder= $prepared->{decoder} || $decoder; |
|
411
|
|
|
|
|
|
|
|
|
412
|
0
|
|
|
|
|
|
$callback->(undef, |
|
413
|
|
|
|
|
|
|
Cassandra::Client::ResultSet->new( |
|
414
|
|
|
|
|
|
|
\$_[3], |
|
415
|
|
|
|
|
|
|
$decoder, |
|
416
|
|
|
|
|
|
|
$paging_state, |
|
417
|
|
|
|
|
|
|
) |
|
418
|
|
|
|
|
|
|
); |
|
419
|
|
|
|
|
|
|
|
|
420
|
|
|
|
|
|
|
} elsif ($result_type == RESULT_VOID) { # Void |
|
421
|
0
|
|
|
|
|
|
return $callback->(); |
|
422
|
|
|
|
|
|
|
|
|
423
|
|
|
|
|
|
|
} elsif ($result_type == RESULT_SET_KEYSPACE) { # Set_keyspace |
|
424
|
0
|
|
|
|
|
|
my $new_keyspace= unpack_string($_[3]); |
|
425
|
0
|
|
|
|
|
|
return $callback->(); |
|
426
|
|
|
|
|
|
|
|
|
427
|
|
|
|
|
|
|
} elsif ($result_type == RESULT_SCHEMA_CHANGE) { # Schema change |
|
428
|
|
|
|
|
|
|
return $self->wait_for_schema_agreement(sub { |
|
429
|
|
|
|
|
|
|
# We may be passed an error. Ignore it, our query succeeded |
|
430
|
0
|
|
|
0
|
|
|
$callback->(); |
|
431
|
0
|
|
|
|
|
|
}); |
|
432
|
|
|
|
|
|
|
|
|
433
|
|
|
|
|
|
|
} else { |
|
434
|
0
|
|
|
|
|
|
return $callback->("Query executed successfully but got an unexpected response type"); |
|
435
|
|
|
|
|
|
|
} |
|
436
|
0
|
|
|
|
|
|
return; |
|
437
|
|
|
|
|
|
|
} |
|
438
|
|
|
|
|
|
|
|
|
439
|
|
|
|
|
|
|
sub wait_for_schema_agreement { |
|
440
|
0
|
|
|
0
|
0
|
|
my ($self, $callback)= @_; |
|
441
|
|
|
|
|
|
|
|
|
442
|
0
|
|
|
|
|
|
my $waited= 0; |
|
443
|
0
|
|
|
|
|
|
my $wait_delay= 0.5; |
|
444
|
0
|
|
|
|
|
|
my $max_wait= 5; |
|
445
|
|
|
|
|
|
|
|
|
446
|
0
|
|
|
|
|
|
my $done; |
|
447
|
|
|
|
|
|
|
whilst( |
|
448
|
0
|
|
|
0
|
|
|
sub { !$done }, |
|
449
|
|
|
|
|
|
|
sub { |
|
450
|
0
|
|
|
0
|
|
|
my ($whilst_next)= @_; |
|
451
|
|
|
|
|
|
|
|
|
452
|
|
|
|
|
|
|
series([ |
|
453
|
|
|
|
|
|
|
sub { |
|
454
|
0
|
|
|
|
|
|
my ($next)= @_; |
|
455
|
0
|
|
|
|
|
|
$self->{async_io}->timer($next, $wait_delay); |
|
456
|
|
|
|
|
|
|
}, |
|
457
|
|
|
|
|
|
|
sub { |
|
458
|
0
|
|
|
|
|
|
my ($next)= @_; |
|
459
|
0
|
|
|
|
|
|
$waited += $wait_delay; |
|
460
|
0
|
|
|
|
|
|
$self->get_network_status($next); |
|
461
|
|
|
|
|
|
|
}, |
|
462
|
|
|
|
|
|
|
], sub { |
|
463
|
0
|
|
|
|
|
|
my ($error, $network_status)= @_; |
|
464
|
0
|
0
|
|
|
|
|
return $whilst_next->($error) if $error; |
|
465
|
|
|
|
|
|
|
|
|
466
|
0
|
|
|
|
|
|
my %versions; |
|
467
|
0
|
|
|
|
|
|
$versions{$_->{schema_version}}= 1 for values %$network_status; |
|
468
|
0
|
0
|
|
|
|
|
if (keys %versions > 1) { |
|
469
|
0
|
0
|
|
|
|
|
if ($waited >= $max_wait) { |
|
470
|
0
|
|
|
|
|
|
return $whilst_next->("wait_for_schema_agreement timed out after $waited seconds"); |
|
471
|
|
|
|
|
|
|
} |
|
472
|
|
|
|
|
|
|
} else { |
|
473
|
0
|
|
|
|
|
|
$done= 1; |
|
474
|
|
|
|
|
|
|
} |
|
475
|
0
|
|
|
|
|
|
return $whilst_next->(); |
|
476
|
0
|
|
|
|
|
|
}); |
|
477
|
|
|
|
|
|
|
}, |
|
478
|
0
|
|
|
|
|
|
$callback, |
|
479
|
|
|
|
|
|
|
); |
|
480
|
|
|
|
|
|
|
|
|
481
|
0
|
|
|
|
|
|
return; |
|
482
|
|
|
|
|
|
|
} |
|
483
|
|
|
|
|
|
|
|
|
484
|
|
|
|
|
|
|
|
|
485
|
|
|
|
|
|
|
|
|
486
|
|
|
|
|
|
|
###### PROTOCOL CODE |
|
487
|
|
|
|
|
|
|
sub handshake { |
|
488
|
0
|
|
|
0
|
0
|
|
my ($self, $callback)= @_; |
|
489
|
|
|
|
|
|
|
series([ |
|
490
|
|
|
|
|
|
|
sub { # Send the OPCODE_OPTIONS |
|
491
|
0
|
|
|
0
|
|
|
my ($next)= @_; |
|
492
|
0
|
|
|
|
|
|
$self->request($next, OPCODE_OPTIONS, ''); |
|
493
|
|
|
|
|
|
|
}, |
|
494
|
|
|
|
|
|
|
sub { # The server hopefully just told us what it supports, let's respond with a STARTUP message |
|
495
|
0
|
|
|
0
|
|
|
my ($next, $response_code, $body)= @_; |
|
496
|
0
|
0
|
|
|
|
|
if ($response_code != OPCODE_SUPPORTED) { |
|
497
|
0
|
|
|
|
|
|
return $next->("Server returned an unexpected handshake"); |
|
498
|
|
|
|
|
|
|
} |
|
499
|
|
|
|
|
|
|
|
|
500
|
0
|
|
|
|
|
|
my $map= unpack_stringmultimap($body); |
|
501
|
|
|
|
|
|
|
|
|
502
|
0
|
0
|
0
|
|
|
|
unless ($map->{CQL_VERSION} && $map->{COMPRESSION}) { |
|
503
|
0
|
|
|
|
|
|
return $next->("Server did not return compression and cql version information"); |
|
504
|
|
|
|
|
|
|
} |
|
505
|
|
|
|
|
|
|
|
|
506
|
0
|
|
|
|
|
|
my $selected_cql_version= $self->{options}{cql_version}; |
|
507
|
0
|
0
|
|
|
|
|
if (!$selected_cql_version) { |
|
508
|
0
|
|
|
|
|
|
($selected_cql_version)= reverse sort @{$map->{CQL_VERSION}}; |
|
|
0
|
|
|
|
|
|
|
|
509
|
|
|
|
|
|
|
} |
|
510
|
|
|
|
|
|
|
|
|
511
|
0
|
|
|
|
|
|
my %ss_compression= map { $_, 1 } @{$map->{COMPRESSION}}; |
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
512
|
0
|
|
|
|
|
|
my $selected_compression= $self->{options}{compression}; |
|
513
|
0
|
0
|
|
|
|
|
if (!$selected_compression) { |
|
514
|
0
|
|
|
|
|
|
for (@compression_preference) { |
|
515
|
0
|
0
|
0
|
|
|
|
if ($ss_compression{$_} && $available_compression{$_}) { |
|
516
|
0
|
|
|
|
|
|
$selected_compression= $_; |
|
517
|
0
|
|
|
|
|
|
last; |
|
518
|
|
|
|
|
|
|
} |
|
519
|
|
|
|
|
|
|
} |
|
520
|
|
|
|
|
|
|
} |
|
521
|
0
|
0
|
0
|
|
|
|
$selected_compression= undef if $selected_compression && $selected_compression eq 'none'; |
|
522
|
|
|
|
|
|
|
|
|
523
|
0
|
0
|
|
|
|
|
if ($selected_compression) { |
|
524
|
0
|
0
|
|
|
|
|
if (!$ss_compression{$selected_compression}) { |
|
525
|
0
|
|
|
|
|
|
return $next->("Server did not support requested compression method <$selected_compression>"); |
|
526
|
|
|
|
|
|
|
} |
|
527
|
0
|
0
|
|
|
|
|
if (!$available_compression{$selected_compression}) { |
|
528
|
0
|
|
|
|
|
|
return $next->("Requested compression method <$selected_compression> is supported by the server but not by us"); |
|
529
|
|
|
|
|
|
|
} |
|
530
|
|
|
|
|
|
|
} |
|
531
|
|
|
|
|
|
|
|
|
532
|
0
|
0
|
|
|
|
|
my $request_body= pack_stringmap({ |
|
533
|
|
|
|
|
|
|
CQL_VERSION => $selected_cql_version, |
|
534
|
|
|
|
|
|
|
($selected_compression ? (COMPRESSION => $selected_compression) : ()), |
|
535
|
|
|
|
|
|
|
}); |
|
536
|
|
|
|
|
|
|
|
|
537
|
0
|
|
|
|
|
|
$self->request($next, OPCODE_STARTUP, $request_body); |
|
538
|
|
|
|
|
|
|
|
|
539
|
|
|
|
|
|
|
# This needs to happen after we send the STARTUP message |
|
540
|
0
|
|
|
|
|
|
$self->setup_compression($selected_compression); |
|
541
|
|
|
|
|
|
|
}, |
|
542
|
|
|
|
|
|
|
sub { # By now we should know whether we need to authenticate |
|
543
|
0
|
|
|
0
|
|
|
my ($next, $response_code, $body)= @_; |
|
544
|
0
|
0
|
|
|
|
|
if ($response_code == OPCODE_READY) { |
|
545
|
0
|
|
|
|
|
|
return $next->(undef, $body); # Pass it along |
|
546
|
|
|
|
|
|
|
} |
|
547
|
|
|
|
|
|
|
|
|
548
|
0
|
0
|
|
|
|
|
if ($response_code == OPCODE_AUTHENTICATE) { |
|
549
|
0
|
|
|
|
|
|
return $self->authenticate($next, unpack_string($body)); |
|
550
|
|
|
|
|
|
|
} |
|
551
|
|
|
|
|
|
|
|
|
552
|
0
|
|
|
|
|
|
return $next->("Unexpected response from the server"); |
|
553
|
|
|
|
|
|
|
}, |
|
554
|
|
|
|
|
|
|
sub { |
|
555
|
0
|
|
|
0
|
|
|
my ($next)= @_; |
|
556
|
0
|
0
|
|
|
|
|
if ($self->{options}{keyspace}) { |
|
557
|
0
|
|
|
|
|
|
return $self->execute_prepared($next, \('use "'.$self->{options}{keyspace}.'"')); |
|
558
|
|
|
|
|
|
|
} |
|
559
|
0
|
|
|
|
|
|
return $next->(); |
|
560
|
|
|
|
|
|
|
}, |
|
561
|
|
|
|
|
|
|
sub { |
|
562
|
0
|
|
|
0
|
|
|
my ($next)= @_; |
|
563
|
0
|
0
|
|
|
|
|
if (!$self->{ipaddress}) { |
|
564
|
0
|
|
|
|
|
|
return $self->get_local_status($next); |
|
565
|
|
|
|
|
|
|
} |
|
566
|
0
|
|
|
|
|
|
return $next->(); |
|
567
|
|
|
|
|
|
|
}, |
|
568
|
|
|
|
|
|
|
sub { |
|
569
|
0
|
|
|
0
|
|
|
my ($next, $status)= @_; |
|
570
|
0
|
0
|
|
|
|
|
if ($status) { |
|
571
|
0
|
|
|
|
|
|
my ($local)= values %$status; |
|
572
|
0
|
|
|
|
|
|
$self->{ipaddress}= $local->{peer}; |
|
573
|
0
|
|
|
|
|
|
$self->{datacenter}= $local->{data_center}; |
|
574
|
|
|
|
|
|
|
} |
|
575
|
0
|
0
|
|
|
|
|
if (!$self->{ipaddress}) { |
|
576
|
0
|
|
|
|
|
|
return $next->("Unable to determine node's IP address"); |
|
577
|
|
|
|
|
|
|
} |
|
578
|
0
|
|
|
|
|
|
return $next->(); |
|
579
|
|
|
|
|
|
|
} |
|
580
|
0
|
|
|
|
|
|
], $callback); |
|
581
|
|
|
|
|
|
|
|
|
582
|
0
|
|
|
|
|
|
return; |
|
583
|
|
|
|
|
|
|
} |
|
584
|
|
|
|
|
|
|
|
|
585
|
|
|
|
|
|
|
sub authenticate { |
|
586
|
0
|
|
|
0
|
0
|
|
my ($self, $callback, $authenticator)= @_; |
|
587
|
|
|
|
|
|
|
|
|
588
|
0
|
|
|
|
|
|
my $user= "$self->{options}{username}"; |
|
589
|
0
|
|
|
|
|
|
my $pass= "$self->{options}{password}"; |
|
590
|
0
|
0
|
|
|
|
|
utf8::encode($user) if utf8::is_utf8($user); |
|
591
|
0
|
0
|
|
|
|
|
utf8::encode($pass) if utf8::is_utf8($pass); |
|
592
|
|
|
|
|
|
|
|
|
593
|
0
|
0
|
0
|
|
|
|
if (!$user || !$pass) { |
|
594
|
0
|
|
|
|
|
|
return $callback->("Server expected authentication using <$authenticator> but no credentials were set"); |
|
595
|
|
|
|
|
|
|
} |
|
596
|
|
|
|
|
|
|
|
|
597
|
|
|
|
|
|
|
series([ |
|
598
|
|
|
|
|
|
|
sub { |
|
599
|
0
|
|
|
0
|
|
|
my ($next)= @_; |
|
600
|
0
|
|
|
|
|
|
my $auth_body= pack_bytes("\0$user\0$pass"); |
|
601
|
0
|
|
|
|
|
|
$self->request($next, OPCODE_AUTH_RESPONSE, $auth_body); |
|
602
|
|
|
|
|
|
|
}, |
|
603
|
|
|
|
|
|
|
sub { |
|
604
|
0
|
|
|
0
|
|
|
my ($next, $code, $body)= @_; |
|
605
|
0
|
0
|
|
|
|
|
if ($code == OPCODE_AUTH_SUCCESS) { |
|
606
|
0
|
|
|
|
|
|
$next->(); |
|
607
|
|
|
|
|
|
|
} else { |
|
608
|
0
|
|
|
|
|
|
$next->("Failed to authenticate: unknown error"); |
|
609
|
|
|
|
|
|
|
} |
|
610
|
|
|
|
|
|
|
}, |
|
611
|
0
|
|
|
|
|
|
], $callback); |
|
612
|
|
|
|
|
|
|
|
|
613
|
0
|
|
|
|
|
|
return; |
|
614
|
|
|
|
|
|
|
} |
|
615
|
|
|
|
|
|
|
|
|
616
|
|
|
|
|
|
|
sub handle_event { |
|
617
|
0
|
|
|
0
|
0
|
|
my ($self, $eventdata)= @_; |
|
618
|
0
|
|
|
|
|
|
my $type= unpack_string($eventdata); |
|
619
|
0
|
0
|
|
|
|
|
if ($type eq 'TOPOLOGY_CHANGE') { |
|
|
|
0
|
|
|
|
|
|
|
620
|
0
|
|
|
|
|
|
my ($change, $ipaddress)= (unpack_string($eventdata), unpack_inet($eventdata)); |
|
621
|
0
|
|
|
|
|
|
$self->{client}->_handle_topology_change($change, $ipaddress); |
|
622
|
|
|
|
|
|
|
|
|
623
|
|
|
|
|
|
|
} elsif ($type eq 'STATUS_CHANGE') { |
|
624
|
0
|
|
|
|
|
|
my ($change, $ipaddress)= (unpack_string($eventdata), unpack_inet($eventdata)); |
|
625
|
0
|
|
|
|
|
|
$self->{client}->_handle_status_change($change, $ipaddress); |
|
626
|
|
|
|
|
|
|
|
|
627
|
|
|
|
|
|
|
} else { |
|
628
|
0
|
|
|
|
|
|
warn 'Received unknown event type: '.$type; |
|
629
|
|
|
|
|
|
|
} |
|
630
|
|
|
|
|
|
|
} |
|
631
|
|
|
|
|
|
|
|
|
632
|
|
|
|
|
|
|
sub get_pool_id { |
|
633
|
|
|
|
|
|
|
$_[0]{pool_id} |
|
634
|
0
|
|
|
0
|
0
|
|
} |
|
635
|
|
|
|
|
|
|
|
|
636
|
|
|
|
|
|
|
sub set_pool_id { |
|
637
|
0
|
|
|
0
|
0
|
|
$_[0]{pool_id}= $_[1]; |
|
638
|
|
|
|
|
|
|
} |
|
639
|
|
|
|
|
|
|
|
|
640
|
|
|
|
|
|
|
sub ip_address { |
|
641
|
|
|
|
|
|
|
$_[0]{ipaddress} |
|
642
|
0
|
|
|
0
|
0
|
|
} |
|
643
|
|
|
|
|
|
|
|
|
644
|
|
|
|
|
|
|
|
|
645
|
|
|
|
|
|
|
|
|
646
|
|
|
|
|
|
|
####### IO LOGIC |
|
647
|
|
|
|
|
|
|
sub connect { |
|
648
|
0
|
|
|
0
|
0
|
|
my ($self, $callback)= @_; |
|
649
|
0
|
0
|
|
|
|
|
return $callback->() if $self->{connected}; |
|
650
|
|
|
|
|
|
|
|
|
651
|
0
|
0
|
|
|
|
|
if ($self->{connecting}++) { |
|
652
|
0
|
|
|
|
|
|
warn "BUG: Calling connect twice?"; |
|
653
|
0
|
|
|
|
|
|
return $callback->("Internal bug: called connect twice."); |
|
654
|
0
|
|
|
|
|
|
return; |
|
655
|
|
|
|
|
|
|
} |
|
656
|
|
|
|
|
|
|
|
|
657
|
0
|
0
|
|
|
|
|
if ($self->{options}{tls}) { |
|
658
|
|
|
|
|
|
|
eval { |
|
659
|
0
|
|
|
|
|
|
$self->{tls}= $self->{client}{tls}->new_conn; |
|
660
|
0
|
|
|
|
|
|
1; |
|
661
|
0
|
0
|
|
|
|
|
} or do { |
|
662
|
0
|
|
0
|
|
|
|
my $error= $@ || "unknown TLS error"; |
|
663
|
0
|
|
|
|
|
|
return $callback->($error); |
|
664
|
|
|
|
|
|
|
}; |
|
665
|
|
|
|
|
|
|
} |
|
666
|
|
|
|
|
|
|
|
|
667
|
0
|
|
|
|
|
|
my $socket; { |
|
668
|
0
|
|
|
|
|
|
local $@; |
|
|
0
|
|
|
|
|
|
|
|
669
|
|
|
|
|
|
|
|
|
670
|
0
|
0
|
|
|
|
|
if ($self->{host} =~ /:/) { |
|
671
|
|
|
|
|
|
|
# IPv6 |
|
672
|
|
|
|
|
|
|
$socket= IO::Socket::INET6->new( |
|
673
|
|
|
|
|
|
|
PeerAddr => $self->{host}, |
|
674
|
|
|
|
|
|
|
PeerPort => $self->{options}{port}, |
|
675
|
0
|
|
|
|
|
|
Proto => 'tcp', |
|
676
|
|
|
|
|
|
|
Blocking => 0, |
|
677
|
|
|
|
|
|
|
); |
|
678
|
|
|
|
|
|
|
} else { |
|
679
|
|
|
|
|
|
|
# IPv6 |
|
680
|
|
|
|
|
|
|
$socket= IO::Socket::INET->new( |
|
681
|
|
|
|
|
|
|
PeerAddr => $self->{host}, |
|
682
|
|
|
|
|
|
|
PeerPort => $self->{options}{port}, |
|
683
|
0
|
|
|
|
|
|
Proto => 'tcp', |
|
684
|
|
|
|
|
|
|
Blocking => 0, |
|
685
|
|
|
|
|
|
|
); |
|
686
|
|
|
|
|
|
|
} |
|
687
|
|
|
|
|
|
|
|
|
688
|
0
|
0
|
|
|
|
|
unless ($socket) { |
|
689
|
0
|
|
|
|
|
|
my $error= "Could not connect: $@"; |
|
690
|
0
|
|
|
|
|
|
return $callback->($error); |
|
691
|
|
|
|
|
|
|
} |
|
692
|
|
|
|
|
|
|
|
|
693
|
0
|
|
|
|
|
|
$socket->setsockopt(SOL_SOCKET, SO_KEEPALIVE, 1); |
|
694
|
0
|
|
|
|
|
|
$socket->setsockopt(IPPROTO_TCP, TCP_NODELAY, 1); |
|
695
|
|
|
|
|
|
|
} |
|
696
|
|
|
|
|
|
|
|
|
697
|
0
|
|
|
|
|
|
$self->{socket}= $socket; |
|
698
|
0
|
|
|
|
|
|
$self->{fileno}= $socket->fileno; |
|
699
|
0
|
|
|
|
|
|
$self->{async_io}->register($self->{fileno}, $self); |
|
700
|
0
|
|
|
|
|
|
$self->{async_io}->register_read($self->{fileno}); |
|
701
|
|
|
|
|
|
|
|
|
702
|
|
|
|
|
|
|
# We create a fake buffer, to ensure we wait until we can actually write |
|
703
|
0
|
|
|
|
|
|
$self->{pending_write}= ''; |
|
704
|
0
|
|
|
|
|
|
$self->{async_io}->register_write($self->{fileno}); |
|
705
|
|
|
|
|
|
|
|
|
706
|
0
|
0
|
|
|
|
|
if ($self->{options}{tls}) { |
|
707
|
0
|
|
|
|
|
|
Net::SSLeay::set_fd(${$self->{tls}}, $self->{fileno}); |
|
|
0
|
|
|
|
|
|
|
|
708
|
0
|
|
|
|
|
|
Net::SSLeay::set_connect_state(${$self->{tls}}); |
|
|
0
|
|
|
|
|
|
|
|
709
|
|
|
|
|
|
|
} |
|
710
|
|
|
|
|
|
|
|
|
711
|
|
|
|
|
|
|
$self->handshake(sub { |
|
712
|
0
|
|
|
0
|
|
|
my $error= shift; |
|
713
|
0
|
|
|
|
|
|
$self->{connected}= 1; |
|
714
|
0
|
0
|
|
|
|
|
if ($error) { |
|
715
|
0
|
|
|
|
|
|
$self->shutdown("Failed to connect: $error"); |
|
716
|
|
|
|
|
|
|
} |
|
717
|
0
|
|
|
|
|
|
return $callback->($error); |
|
718
|
0
|
|
|
|
|
|
}); |
|
719
|
|
|
|
|
|
|
|
|
720
|
0
|
|
|
|
|
|
return; |
|
721
|
|
|
|
|
|
|
} |
|
722
|
|
|
|
|
|
|
|
|
723
|
|
|
|
|
|
|
sub request { |
|
724
|
|
|
|
|
|
|
# my $body= $_[3] (let's avoid copying that blob). Yes, this code assumes ownership of the body. |
|
725
|
0
|
|
|
0
|
0
|
|
my ($self, $cb, $opcode)= @_; |
|
726
|
|
|
|
|
|
|
return $cb->(Cassandra::Client::Error::Base->new( |
|
727
|
|
|
|
|
|
|
message => "Connection shutting down", |
|
728
|
|
|
|
|
|
|
request_error => 1, |
|
729
|
0
|
0
|
|
|
|
|
)) if $self->{shutdown}; |
|
730
|
|
|
|
|
|
|
|
|
731
|
0
|
|
|
|
|
|
my $pending= $self->{pending_streams}; |
|
732
|
|
|
|
|
|
|
|
|
733
|
0
|
|
|
|
|
|
my $stream_id= $self->{last_stream_id} + 1; |
|
734
|
0
|
|
|
|
|
|
my $attempts= 0; |
|
735
|
0
|
|
0
|
|
|
|
while (exists($pending->{$stream_id}) || $stream_id >= STREAM_ID_LIMIT) { |
|
736
|
0
|
|
|
|
|
|
$stream_id= (++$stream_id) % STREAM_ID_LIMIT; |
|
737
|
0
|
0
|
|
|
|
|
return $cb->(Cassandra::Client::Error::Base->new( |
|
738
|
|
|
|
|
|
|
message => "Cannot find a stream ID to post query with", |
|
739
|
|
|
|
|
|
|
request_error => 1, |
|
740
|
|
|
|
|
|
|
)) if ++$attempts >= STREAM_ID_LIMIT; |
|
741
|
|
|
|
|
|
|
} |
|
742
|
0
|
|
|
|
|
|
$self->{last_stream_id}= $stream_id; |
|
743
|
0
|
|
|
|
|
|
$pending->{$stream_id}= [$cb, $self->{async_io}->deadline($self->{fileno}, $stream_id, $self->{request_timeout})]; |
|
744
|
|
|
|
|
|
|
|
|
745
|
|
|
|
|
|
|
WRITE: { |
|
746
|
0
|
|
|
|
|
|
my $flags= 0; |
|
|
0
|
|
|
|
|
|
|
|
747
|
|
|
|
|
|
|
|
|
748
|
0
|
0
|
0
|
|
|
|
if (length($_[3]) > 500 && (my $compress_func= $self->{compress_func})) { |
|
749
|
0
|
|
|
|
|
|
$flags |= 1; |
|
750
|
0
|
|
|
|
|
|
$compress_func->($_[3]); |
|
751
|
|
|
|
|
|
|
} |
|
752
|
|
|
|
|
|
|
|
|
753
|
0
|
|
|
|
|
|
my $data= pack('CCsCN/a', 3, $flags, $stream_id, $opcode, $_[3]); |
|
754
|
|
|
|
|
|
|
|
|
755
|
0
|
0
|
|
|
|
|
if (defined $self->{pending_write}) { |
|
756
|
0
|
|
|
|
|
|
$self->{pending_write} .= $data; |
|
757
|
0
|
|
|
|
|
|
last WRITE; |
|
758
|
|
|
|
|
|
|
} |
|
759
|
|
|
|
|
|
|
|
|
760
|
0
|
0
|
|
|
|
|
if ($self->{tls}) { |
|
761
|
0
|
|
|
|
|
|
my $length= length $data; |
|
762
|
0
|
|
|
|
|
|
my $rv= Net::SSLeay::write(${$self->{tls}}, $data); |
|
|
0
|
|
|
|
|
|
|
|
763
|
0
|
0
|
|
|
|
|
if ($rv == $length) { |
|
|
|
0
|
|
|
|
|
|
|
764
|
|
|
|
|
|
|
# All good |
|
765
|
|
|
|
|
|
|
} elsif ($rv > 0) { |
|
766
|
|
|
|
|
|
|
# Partital write |
|
767
|
0
|
|
|
|
|
|
substr($data, 0, $rv, ''); |
|
768
|
0
|
|
|
|
|
|
$self->{pending_write}= $data; |
|
769
|
0
|
|
|
|
|
|
$self->{async_io}->register_write($self->{fileno}); |
|
770
|
|
|
|
|
|
|
} else { |
|
771
|
0
|
|
|
|
|
|
$rv= Net::SSLeay::get_error(${$self->{tls}}, $rv); |
|
|
0
|
|
|
|
|
|
|
|
772
|
0
|
0
|
0
|
|
|
|
if ($rv == ERROR_WANT_WRITE || $rv == ERROR_WANT_READ || $rv == ERROR_NONE) { |
|
|
|
|
0
|
|
|
|
|
|
773
|
|
|
|
|
|
|
# Ok... |
|
774
|
0
|
|
|
|
|
|
$self->{pending_write}= $data; |
|
775
|
0
|
0
|
|
|
|
|
if ($rv == ERROR_WANT_READ) { |
|
776
|
0
|
|
|
|
|
|
$self->{tls_want_write}= 1; |
|
777
|
|
|
|
|
|
|
} else { |
|
778
|
0
|
|
|
|
|
|
$self->{async_io}->register_write($self->{fileno}); |
|
779
|
|
|
|
|
|
|
} |
|
780
|
|
|
|
|
|
|
} else { |
|
781
|
|
|
|
|
|
|
# We failed to send the request. |
|
782
|
0
|
|
|
|
|
|
my $error= Net::SSLeay::ERR_error_string(Net::SSLeay::ERR_get_error()); |
|
783
|
|
|
|
|
|
|
|
|
784
|
|
|
|
|
|
|
# We never actually sent our request, so take it out again |
|
785
|
0
|
|
|
|
|
|
my $my_stream= delete $pending->{$stream_id}; |
|
786
|
|
|
|
|
|
|
|
|
787
|
|
|
|
|
|
|
# Disable our stream's deadline |
|
788
|
0
|
|
|
|
|
|
${$my_stream->[1]}= 1; |
|
|
0
|
|
|
|
|
|
|
|
789
|
|
|
|
|
|
|
|
|
790
|
0
|
|
|
|
|
|
$self->shutdown($error); |
|
791
|
|
|
|
|
|
|
|
|
792
|
|
|
|
|
|
|
# Now fail our stream properly, but include the retry notice |
|
793
|
0
|
|
|
|
|
|
$my_stream->[0]->(Cassandra::Client::Error::Base->new( |
|
794
|
|
|
|
|
|
|
message => "Disconnected: $error", |
|
795
|
|
|
|
|
|
|
do_retry => 1, |
|
796
|
|
|
|
|
|
|
request_error => 1, |
|
797
|
|
|
|
|
|
|
)); |
|
798
|
|
|
|
|
|
|
} |
|
799
|
|
|
|
|
|
|
} |
|
800
|
|
|
|
|
|
|
|
|
801
|
|
|
|
|
|
|
} else { |
|
802
|
0
|
|
|
|
|
|
my $length= length $data; |
|
803
|
0
|
|
|
|
|
|
my $result= syswrite($self->{socket}, $data, $length); |
|
804
|
0
|
0
|
0
|
|
|
|
if ($result && $result == $length) { |
|
|
|
0
|
0
|
|
|
|
|
|
805
|
|
|
|
|
|
|
# All good |
|
806
|
|
|
|
|
|
|
} elsif (defined $result || $! == EAGAIN) { |
|
807
|
0
|
0
|
|
|
|
|
substr($data, 0, $result, '') if $result; |
|
808
|
0
|
|
|
|
|
|
$self->{pending_write}= $data; |
|
809
|
0
|
|
|
|
|
|
$self->{async_io}->register_write($self->{fileno}); |
|
810
|
|
|
|
|
|
|
} else { |
|
811
|
|
|
|
|
|
|
# Oh, we failed to send out the request. That's bad. Let's first find out what happened. |
|
812
|
0
|
|
|
|
|
|
my $error= $!; |
|
813
|
|
|
|
|
|
|
|
|
814
|
|
|
|
|
|
|
# We never actually sent our request, so take it out again |
|
815
|
0
|
|
|
|
|
|
my $my_stream= delete $pending->{$stream_id}; |
|
816
|
|
|
|
|
|
|
|
|
817
|
|
|
|
|
|
|
# Disable our stream's deadline |
|
818
|
0
|
|
|
|
|
|
${$my_stream->[1]}= 1; |
|
|
0
|
|
|
|
|
|
|
|
819
|
|
|
|
|
|
|
|
|
820
|
0
|
|
|
|
|
|
$self->shutdown($error); |
|
821
|
|
|
|
|
|
|
|
|
822
|
|
|
|
|
|
|
# Now fail our stream properly, but include the retry notice |
|
823
|
0
|
|
|
|
|
|
$my_stream->[0]->(Cassandra::Client::Error::Base->new( |
|
824
|
|
|
|
|
|
|
message => "Disconnected: $error", |
|
825
|
|
|
|
|
|
|
do_retry => 1, |
|
826
|
|
|
|
|
|
|
request_error => 1, |
|
827
|
|
|
|
|
|
|
)); |
|
828
|
|
|
|
|
|
|
} |
|
829
|
|
|
|
|
|
|
} |
|
830
|
|
|
|
|
|
|
} |
|
831
|
|
|
|
|
|
|
|
|
832
|
0
|
|
|
|
|
|
return; |
|
833
|
|
|
|
|
|
|
} |
|
834
|
|
|
|
|
|
|
|
|
835
|
|
|
|
|
|
|
sub can_read { |
|
836
|
0
|
|
|
0
|
0
|
|
my ($self)= @_; |
|
837
|
0
|
|
|
|
|
|
my $shutdown_when_done; |
|
838
|
0
|
|
|
|
|
|
local *BUFFER= $self->{read_buffer}; |
|
839
|
0
|
|
|
|
|
|
my $bufsize= length $BUFFER; |
|
840
|
|
|
|
|
|
|
|
|
841
|
|
|
|
|
|
|
READ: |
|
842
|
0
|
|
|
|
|
|
while (!$self->{shutdown}) { |
|
843
|
0
|
|
|
|
|
|
my $should_read_more; |
|
844
|
|
|
|
|
|
|
|
|
845
|
0
|
0
|
|
|
|
|
if ($self->{tls}) { |
|
846
|
0
|
|
|
|
|
|
my ($bytes, $rv)= Net::SSLeay::read(${$self->{tls}}); |
|
|
0
|
|
|
|
|
|
|
|
847
|
0
|
0
|
|
|
|
|
if (length $bytes) { |
|
848
|
0
|
|
|
|
|
|
$BUFFER .= $bytes; |
|
849
|
0
|
|
|
|
|
|
$bufsize += $rv; |
|
850
|
0
|
|
|
|
|
|
$should_read_more= 1; |
|
851
|
|
|
|
|
|
|
} |
|
852
|
|
|
|
|
|
|
|
|
853
|
0
|
0
|
|
|
|
|
if ($rv <= 0) { |
|
854
|
0
|
|
|
|
|
|
$rv= Net::SSLeay::get_error(${$self->{tls}}, $rv); |
|
|
0
|
|
|
|
|
|
|
|
855
|
0
|
0
|
|
|
|
|
if ($rv == ERROR_WANT_WRITE) { |
|
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
856
|
0
|
|
|
|
|
|
$self->{async_io}->register_write($self->{fileno}); |
|
857
|
|
|
|
|
|
|
} elsif ($rv == ERROR_WANT_READ) { |
|
858
|
|
|
|
|
|
|
# Can do! Wait for the next event. |
|
859
|
|
|
|
|
|
|
|
|
860
|
|
|
|
|
|
|
# Resume our write if needed. |
|
861
|
0
|
0
|
|
|
|
|
if (delete $self->{tls_want_write}) { |
|
862
|
|
|
|
|
|
|
# Try our write again! |
|
863
|
0
|
|
|
|
|
|
$self->{async_io}->register_write($self->{fileno}); |
|
864
|
|
|
|
|
|
|
} |
|
865
|
|
|
|
|
|
|
} elsif ($rv == ERROR_NONE) { |
|
866
|
|
|
|
|
|
|
# Huh? |
|
867
|
|
|
|
|
|
|
} else { |
|
868
|
0
|
|
|
|
|
|
my $error= Net::SSLeay::ERR_error_string(Net::SSLeay::ERR_get_error()); |
|
869
|
0
|
|
|
|
|
|
$shutdown_when_done= "TLS error: $error"; |
|
870
|
|
|
|
|
|
|
} |
|
871
|
|
|
|
|
|
|
} |
|
872
|
|
|
|
|
|
|
|
|
873
|
|
|
|
|
|
|
} else { |
|
874
|
0
|
|
|
|
|
|
my $read_cnt= sysread($self->{socket}, $BUFFER, 16384, $bufsize); |
|
875
|
0
|
0
|
|
|
|
|
if ($read_cnt) { |
|
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
876
|
0
|
|
|
|
|
|
$bufsize += $read_cnt; |
|
877
|
0
|
0
|
|
|
|
|
$should_read_more= 1 if $read_cnt >= 16384; |
|
878
|
|
|
|
|
|
|
|
|
879
|
|
|
|
|
|
|
} elsif (!defined $read_cnt) { |
|
880
|
0
|
0
|
|
|
|
|
if ($! != EAGAIN) { |
|
881
|
0
|
|
|
|
|
|
my $error= "$!"; |
|
882
|
0
|
|
|
|
|
|
$shutdown_when_done= $error; |
|
883
|
|
|
|
|
|
|
} |
|
884
|
|
|
|
|
|
|
} elsif ($read_cnt == 0) { # EOF |
|
885
|
0
|
|
|
|
|
|
$shutdown_when_done= "Disconnected from server"; |
|
886
|
|
|
|
|
|
|
} |
|
887
|
|
|
|
|
|
|
} |
|
888
|
|
|
|
|
|
|
|
|
889
|
0
|
0
|
|
|
|
|
READ_NEXT: |
|
890
|
|
|
|
|
|
|
goto READ_MORE if $bufsize < 9; |
|
891
|
0
|
|
|
|
|
|
my ($version, $flags, $stream_id, $opcode, $bodylen)= unpack('CCsCN', substr($BUFFER, 0, 9)); |
|
892
|
0
|
0
|
|
|
|
|
if ($bufsize < $bodylen+9) { |
|
893
|
0
|
|
|
|
|
|
goto READ_MORE; |
|
894
|
|
|
|
|
|
|
} |
|
895
|
|
|
|
|
|
|
|
|
896
|
0
|
|
|
|
|
|
substr($BUFFER, 0, 9, ''); |
|
897
|
0
|
|
|
|
|
|
my $body= substr($BUFFER, 0, $bodylen, ''); |
|
898
|
0
|
|
|
|
|
|
$bufsize -= 9 + $bodylen; |
|
899
|
|
|
|
|
|
|
|
|
900
|
|
|
|
|
|
|
# Decompress if needed |
|
901
|
0
|
0
|
0
|
|
|
|
if (($flags & 1) && $body) { |
|
902
|
0
|
|
|
|
|
|
$self->{decompress_func}->($body); |
|
903
|
|
|
|
|
|
|
} |
|
904
|
|
|
|
|
|
|
|
|
905
|
0
|
0
|
|
|
|
|
if ($stream_id != -1) { |
|
906
|
0
|
|
|
|
|
|
my $stream_cb= delete $self->{pending_streams}{$stream_id}; |
|
907
|
0
|
0
|
|
|
|
|
if (!$stream_cb) { |
|
|
|
0
|
|
|
|
|
|
|
908
|
0
|
|
|
|
|
|
warn 'BUG: received response for unknown stream'; |
|
909
|
|
|
|
|
|
|
|
|
910
|
|
|
|
|
|
|
} elsif ($opcode == OPCODE_ERROR) { |
|
911
|
0
|
|
|
|
|
|
my ($cb, $dl)= @$stream_cb; |
|
912
|
0
|
|
|
|
|
|
$$dl= 1; |
|
913
|
|
|
|
|
|
|
|
|
914
|
0
|
|
|
|
|
|
my $error= unpack_errordata($body); |
|
915
|
0
|
|
|
|
|
|
$cb->($error); |
|
916
|
|
|
|
|
|
|
|
|
917
|
|
|
|
|
|
|
} else { |
|
918
|
0
|
|
|
|
|
|
my ($cb, $dl)= @$stream_cb; |
|
919
|
0
|
|
|
|
|
|
$$dl= 1; |
|
920
|
0
|
|
|
|
|
|
$cb->(undef, $opcode, $body); |
|
921
|
|
|
|
|
|
|
} |
|
922
|
|
|
|
|
|
|
|
|
923
|
|
|
|
|
|
|
} else { |
|
924
|
0
|
|
|
|
|
|
$self->handle_event($body); |
|
925
|
|
|
|
|
|
|
} |
|
926
|
|
|
|
|
|
|
|
|
927
|
0
|
|
|
|
|
|
goto READ_NEXT; |
|
928
|
|
|
|
|
|
|
|
|
929
|
0
|
0
|
|
|
|
|
READ_MORE: |
|
930
|
|
|
|
|
|
|
last READ unless $should_read_more; |
|
931
|
|
|
|
|
|
|
} |
|
932
|
|
|
|
|
|
|
|
|
933
|
0
|
0
|
|
|
|
|
if ($shutdown_when_done) { |
|
934
|
0
|
|
|
|
|
|
$self->shutdown($shutdown_when_done); |
|
935
|
|
|
|
|
|
|
} |
|
936
|
|
|
|
|
|
|
|
|
937
|
0
|
|
|
|
|
|
return; |
|
938
|
|
|
|
|
|
|
} |
|
939
|
|
|
|
|
|
|
|
|
940
|
|
|
|
|
|
|
sub can_write { |
|
941
|
0
|
|
|
0
|
0
|
|
my ($self)= @_; |
|
942
|
|
|
|
|
|
|
|
|
943
|
0
|
0
|
|
|
|
|
if ($self->{tls}) { |
|
944
|
0
|
|
|
|
|
|
my $rv= Net::SSLeay::write(${$self->{tls}}, $self->{pending_write}); |
|
|
0
|
|
|
|
|
|
|
|
945
|
0
|
0
|
|
|
|
|
if ($rv > 0) { |
|
946
|
0
|
|
|
|
|
|
substr($self->{pending_write}, 0, $rv, ''); |
|
947
|
0
|
0
|
|
|
|
|
if (!length $self->{pending_write}) { |
|
948
|
0
|
|
|
|
|
|
$self->{async_io}->unregister_write($self->{fileno}); |
|
949
|
0
|
|
|
|
|
|
delete $self->{pending_write}; |
|
950
|
|
|
|
|
|
|
} |
|
951
|
0
|
|
|
|
|
|
return; |
|
952
|
|
|
|
|
|
|
|
|
953
|
|
|
|
|
|
|
} else { |
|
954
|
0
|
|
|
|
|
|
$rv= Net::SSLeay::get_error(${$self->{tls}}, $rv); |
|
|
0
|
|
|
|
|
|
|
|
955
|
0
|
0
|
|
|
|
|
if ($rv == ERROR_WANT_WRITE) { |
|
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
956
|
|
|
|
|
|
|
# Wait until the next callback. |
|
957
|
0
|
|
|
|
|
|
return; |
|
958
|
|
|
|
|
|
|
} elsif ($rv == ERROR_WANT_READ) { |
|
959
|
|
|
|
|
|
|
# Unschedule ourselves |
|
960
|
0
|
|
|
|
|
|
$self->{async_io}->unregister_write($self->{fileno}); |
|
961
|
0
|
|
|
|
|
|
$self->{tls_want_write}= 1; |
|
962
|
0
|
|
|
|
|
|
return; |
|
963
|
|
|
|
|
|
|
} elsif ($rv == ERROR_NONE) { |
|
964
|
|
|
|
|
|
|
# Huh? |
|
965
|
0
|
|
|
|
|
|
return; |
|
966
|
|
|
|
|
|
|
} else { |
|
967
|
0
|
|
|
|
|
|
my $error= Net::SSLeay::ERR_error_string(Net::SSLeay::ERR_get_error()); |
|
968
|
0
|
|
|
|
|
|
return $self->shutdown("TLS error: $error"); |
|
969
|
|
|
|
|
|
|
} |
|
970
|
|
|
|
|
|
|
} |
|
971
|
|
|
|
|
|
|
|
|
972
|
|
|
|
|
|
|
} else { |
|
973
|
0
|
|
|
|
|
|
my $result= syswrite($self->{socket}, $self->{pending_write}); |
|
974
|
0
|
0
|
|
|
|
|
if (!defined($result)) { |
|
975
|
0
|
0
|
|
|
|
|
if ($! == EAGAIN) { |
|
976
|
0
|
|
|
|
|
|
return; # Huh. Oh well, whatever |
|
977
|
|
|
|
|
|
|
} |
|
978
|
|
|
|
|
|
|
|
|
979
|
0
|
|
|
|
|
|
my $error= "$!"; |
|
980
|
0
|
|
|
|
|
|
return $self->shutdown($error); |
|
981
|
|
|
|
|
|
|
} |
|
982
|
0
|
0
|
|
|
|
|
if ($result == 0) { return; } # No idea whether that happens, but guard anyway. |
|
|
0
|
|
|
|
|
|
|
|
983
|
0
|
|
|
|
|
|
substr($self->{pending_write}, 0, $result, ''); |
|
984
|
|
|
|
|
|
|
|
|
985
|
0
|
0
|
|
|
|
|
if (!length $self->{pending_write}) { |
|
986
|
0
|
|
|
|
|
|
$self->{async_io}->unregister_write($self->{fileno}); |
|
987
|
0
|
|
|
|
|
|
delete $self->{pending_write}; |
|
988
|
|
|
|
|
|
|
} |
|
989
|
|
|
|
|
|
|
} |
|
990
|
|
|
|
|
|
|
|
|
991
|
0
|
|
|
|
|
|
return; |
|
992
|
|
|
|
|
|
|
} |
|
993
|
|
|
|
|
|
|
|
|
994
|
|
|
|
|
|
|
sub can_timeout { |
|
995
|
0
|
|
|
0
|
0
|
|
my ($self, $id)= @_; |
|
996
|
0
|
|
|
|
|
|
my $stream= delete $self->{pending_streams}{$id}; |
|
997
|
0
|
|
|
0
|
|
|
$self->{pending_streams}{$id}= [ sub{}, \(my $zero= 0) ]; # fake it |
|
998
|
0
|
|
|
|
|
|
$stream->[0]->(Cassandra::Client::Error::Base->new( |
|
999
|
|
|
|
|
|
|
message => "Request timed out", |
|
1000
|
|
|
|
|
|
|
is_timeout => 1, |
|
1001
|
|
|
|
|
|
|
request_error => 1, |
|
1002
|
|
|
|
|
|
|
)); |
|
1003
|
0
|
|
|
|
|
|
return; |
|
1004
|
|
|
|
|
|
|
} |
|
1005
|
|
|
|
|
|
|
|
|
1006
|
|
|
|
|
|
|
sub shutdown { |
|
1007
|
0
|
|
|
0
|
0
|
|
my ($self, $shutdown_reason)= @_; |
|
1008
|
|
|
|
|
|
|
|
|
1009
|
0
|
0
|
|
|
|
|
return if $self->{shutdown}; |
|
1010
|
0
|
|
|
|
|
|
$self->{shutdown}= 1; |
|
1011
|
|
|
|
|
|
|
|
|
1012
|
0
|
|
|
|
|
|
my $pending= $self->{pending_streams}; |
|
1013
|
0
|
|
|
|
|
|
$self->{pending_streams}= {}; |
|
1014
|
|
|
|
|
|
|
|
|
1015
|
|
|
|
|
|
|
# Disable our deadlines |
|
1016
|
0
|
|
|
|
|
|
${$_->[1]}= 1 for values %$pending; |
|
|
0
|
|
|
|
|
|
|
|
1017
|
|
|
|
|
|
|
|
|
1018
|
0
|
|
|
|
|
|
$self->{async_io}->unregister_read($self->{fileno}); |
|
1019
|
0
|
0
|
|
|
|
|
if (defined(delete $self->{pending_write})) { |
|
1020
|
0
|
|
|
|
|
|
$self->{async_io}->unregister_write($self->{fileno}); |
|
1021
|
|
|
|
|
|
|
} |
|
1022
|
0
|
|
|
|
|
|
$self->{async_io}->unregister($self->{fileno}, $self); |
|
1023
|
0
|
|
|
|
|
|
$self->{client}->_disconnected($self->get_pool_id); |
|
1024
|
0
|
|
|
|
|
|
$self->{socket}->close; |
|
1025
|
|
|
|
|
|
|
|
|
1026
|
0
|
|
|
|
|
|
for (values %$pending) { |
|
1027
|
0
|
|
|
|
|
|
$_->[0]->(Cassandra::Client::Error::Base->new( |
|
1028
|
|
|
|
|
|
|
message => "Disconnected: $shutdown_reason", |
|
1029
|
|
|
|
|
|
|
request_error => 1, |
|
1030
|
|
|
|
|
|
|
)); |
|
1031
|
|
|
|
|
|
|
} |
|
1032
|
|
|
|
|
|
|
|
|
1033
|
0
|
|
|
|
|
|
return; |
|
1034
|
|
|
|
|
|
|
} |
|
1035
|
|
|
|
|
|
|
|
|
1036
|
|
|
|
|
|
|
|
|
1037
|
|
|
|
|
|
|
|
|
1038
|
|
|
|
|
|
|
###### COMPRESSION |
|
1039
|
|
|
|
|
|
|
BEGIN { |
|
1040
|
1
|
|
|
1
|
|
5
|
@compression_preference= qw/lz4 snappy/; |
|
1041
|
|
|
|
|
|
|
|
|
1042
|
1
|
|
|
1
|
|
58
|
%available_compression= ( |
|
|
1
|
|
|
1
|
|
315
|
|
|
|
1
|
|
|
|
|
447
|
|
|
|
1
|
|
|
|
|
18
|
|
|
|
1
|
|
|
|
|
229
|
|
|
|
1
|
|
|
|
|
500
|
|
|
|
1
|
|
|
|
|
9
|
|
|
1043
|
|
|
|
|
|
|
snappy => scalar eval "use Compress::Snappy (); 1;", |
|
1044
|
|
|
|
|
|
|
lz4 => scalar eval "use Compress::LZ4 (); 1;", |
|
1045
|
|
|
|
|
|
|
); |
|
1046
|
|
|
|
|
|
|
} |
|
1047
|
|
|
|
|
|
|
|
|
1048
|
|
|
|
|
|
|
sub setup_compression { |
|
1049
|
0
|
|
|
0
|
0
|
|
my ($self, $type)= @_; |
|
1050
|
|
|
|
|
|
|
|
|
1051
|
0
|
0
|
|
|
|
|
return unless $type; |
|
1052
|
0
|
0
|
|
|
|
|
if ($type eq 'snappy') { |
|
|
|
0
|
|
|
|
|
|
|
1053
|
0
|
|
|
|
|
|
$self->{compress_func}= \&compress_snappy; |
|
1054
|
0
|
|
|
|
|
|
$self->{decompress_func}= \&decompress_snappy; |
|
1055
|
|
|
|
|
|
|
} elsif ($type eq 'lz4') { |
|
1056
|
0
|
|
|
|
|
|
$self->{compress_func}= \&compress_lz4; |
|
1057
|
0
|
|
|
|
|
|
$self->{decompress_func}= \&decompress_lz4; |
|
1058
|
|
|
|
|
|
|
} else { |
|
1059
|
0
|
|
|
|
|
|
warn 'Internal error: failed to set compression'; |
|
1060
|
|
|
|
|
|
|
} |
|
1061
|
|
|
|
|
|
|
|
|
1062
|
0
|
|
|
|
|
|
return; |
|
1063
|
|
|
|
|
|
|
} |
|
1064
|
|
|
|
|
|
|
|
|
1065
|
|
|
|
|
|
|
sub compress_snappy { |
|
1066
|
0
|
|
|
0
|
0
|
|
$_[0]= Compress::Snappy::compress(\$_[0]); |
|
1067
|
0
|
|
|
|
|
|
return; |
|
1068
|
|
|
|
|
|
|
} |
|
1069
|
|
|
|
|
|
|
|
|
1070
|
|
|
|
|
|
|
sub decompress_snappy { |
|
1071
|
0
|
0
|
|
0
|
0
|
|
if ($_[0] ne "\0") { |
|
1072
|
0
|
|
|
|
|
|
$_[0]= Compress::Snappy::decompress(\$_[0]); |
|
1073
|
|
|
|
|
|
|
} else { |
|
1074
|
0
|
|
|
|
|
|
$_[0]= ''; |
|
1075
|
|
|
|
|
|
|
} |
|
1076
|
0
|
|
|
|
|
|
return; |
|
1077
|
|
|
|
|
|
|
} |
|
1078
|
|
|
|
|
|
|
|
|
1079
|
|
|
|
|
|
|
sub compress_lz4 { |
|
1080
|
0
|
|
|
0
|
0
|
|
$_[0]= pack('N', length($_[0])) . Compress::LZ4::lz4_compress(\$_[0]); |
|
1081
|
0
|
|
|
|
|
|
return; |
|
1082
|
|
|
|
|
|
|
} |
|
1083
|
|
|
|
|
|
|
|
|
1084
|
|
|
|
|
|
|
sub decompress_lz4 { |
|
1085
|
0
|
|
|
0
|
0
|
|
my $len= unpack('N', substr $_[0], 0, 4, ''); |
|
1086
|
0
|
0
|
|
|
|
|
if ($len) { |
|
1087
|
0
|
|
|
|
|
|
$_[0]= Compress::LZ4::lz4_decompress(\$_[0], $len); |
|
1088
|
|
|
|
|
|
|
} else { |
|
1089
|
0
|
|
|
|
|
|
$_[0]= ''; |
|
1090
|
|
|
|
|
|
|
} |
|
1091
|
0
|
|
|
|
|
|
return; |
|
1092
|
|
|
|
|
|
|
} |
|
1093
|
|
|
|
|
|
|
|
|
1094
|
|
|
|
|
|
|
1; |
|
1095
|
|
|
|
|
|
|
|
|
1096
|
|
|
|
|
|
|
__END__ |