File Coverage

blib/lib/Protocol/CassandraCQL/Client.pm
Criterion Covered Total %
statement 70 90 77.7
branch 15 36 41.6
condition 2 7 28.5
subroutine 15 17 88.2
pod 4 5 80.0
total 106 155 68.3


line stmt bran cond sub pod time code
1             # You may distribute under the terms of either the GNU General Public License
2             # or the Artistic License (the same terms as Perl itself)
3             #
4             # (C) Paul Evans, 2013-2014 -- leonerd@leonerd.org.uk
5              
6             package Protocol::CassandraCQL::Client;
7              
8 2     2   30850 use strict;
  2         4  
  2         69  
9 2     2   10 use warnings;
  2         4  
  2         86  
10              
11             our $VERSION = '0.11';
12              
13 2     2   13 use base qw( IO::Socket::IP );
  2         4  
  2         8072  
14              
15 2     2   76723 use Carp;
  2         4  
  2         219  
16              
17 2         614 use Protocol::CassandraCQL qw(
18             :opcodes :results
19             send_frame recv_frame FLAG_COMPRESS
20 2     2   543 );
  2         4  
21 2     2   1295 use Protocol::CassandraCQL::Frame;
  2         6  
  2         83  
22 2         256 use Protocol::CassandraCQL::Frames qw(
23             build_startup_frame
24             build_credentials_frame
25             build_query_frame
26              
27             parse_error_frame
28             parse_authenticate_frame
29             parse_result_frame
30 2     2   1059 );
  2         9  
31 2     2   17 use Protocol::CassandraCQL::Result;
  2         4  
  2         62  
32              
33 2     2   1897 use Compress::Snappy qw( compress decompress );
  2         1332  
  2         121  
34              
35 2     2   11 use constant DEFAULT_CQL_PORT => 9042;
  2         4  
  2         118  
36              
37 2     2   10 use constant MAX_SUPPORTED_VERSION => 2;
  2         6  
  2         1777  
38              
39             =head1 NAME
40              
41             C - a minimal Cassandra CQL client
42              
43             =head1 SYNOPSIS
44              
45             use Protocol::CassandraCQL::Client;
46             use Protocol::CassandraCQL qw( CONSISTENCY_QUORUM );
47              
48             my $cass = Protocol::CassandraCQL::Client->new(
49             PeerHost => "localhost",
50             Keyspace => "my-keyspace",
51             );
52              
53             my ( undef, $result ) = $cass->query( "SELECT v FROM numbers" );
54              
55             foreach my $row ( $result->rows_hash ) {
56             say "We have a number $row->{v}";
57             }
58              
59             =head1 DESCRIPTION
60              
61             This subclass of L implements a client that can execute
62             queries on a Cassandra CQL database. It is not intended as a complete client,
63             is simply provides enough functionallity to test that the protocol handling is
64             working, and is used to implement the bundled F utility.
65              
66             For a more complete client, see instead L.
67              
68             =cut
69              
70             =head1 CONSTRUCTOR
71              
72             =cut
73              
74             =head2 $cass = Protocol::CassandraCQL::Client->new( %args )
75              
76             Takes the following arguments in addition to those accepted by
77             L:
78              
79             =over 8
80              
81             =item Username => STRING
82              
83             =item Password => STRING
84              
85             Authentication credentials if required by the server.
86              
87             =item Keyspace => STRING
88              
89             If defined, selects the keyspace to C after connection.
90              
91             =item CQLVersion => INT
92              
93             If defined, sets the CQL protocol version that will be negotiated. If omitted
94             will default to 1.
95              
96             =back
97              
98             =cut
99              
100             sub new
101             {
102 1     1 1 1968 my $class = shift;
103 1 50       74 my %args = @_ == 1 ? ( PeerHost => $_[0] ) : @_;
104              
105 1   50     21 $args{PeerService} ||= DEFAULT_CQL_PORT;
106              
107 1 50       74 my $self = $class->SUPER::new( %args ) or return;
108              
109 1   50     1840 ${*$self}{Cassandra_version} = $args{CQLVersion} // 1; # default 1
  1         4  
110 1 50       16 $self->_version <= MAX_SUPPORTED_VERSION or
111             croak "CQLVersion too high - maximum supported is " . MAX_SUPPORTED_VERSION;
112              
113 1         33 $self->startup( %args );
114 1 50       13 $self->use_keyspace( $args{Keyspace} ) if defined $args{Keyspace};
115              
116 1         27 return $self;
117             }
118              
119             sub _version
120             {
121 8     8   16 my $self = shift;
122 8         12 return ${*$self}{Cassandra_version};
  8         98  
123             }
124              
125             =head1 METHODS
126              
127             =cut
128              
129             =head2 ( $result_op, $result_frame ) = $cass->send_message( $opcode, $frame )
130              
131             Sends a message with the given opcode and L for
132             the message body. Waits for a response to be received, and returns it.
133              
134             If the response opcode is C then the error message string is
135             thrown directly as an exception; this method will only return in non-error
136             cases.
137              
138             =cut
139              
140             sub send_message
141             {
142 2     2 1 5 my $self = shift;
143 2         11 my ( $opcode, $frame ) = @_;
144              
145             {
146 2         4 my $flags = 0;
  2         5  
147 2         10 my $body = $frame->bytes;
148              
149 2         67 my $body_compressed = compress( $body );
150 2 50       10 if( length $body_compressed < length $body ) {
151 0         0 $body = $body_compressed;
152 0         0 $flags |= FLAG_COMPRESS;
153             }
154              
155 2         7 send_frame( $self, $self->_version, $flags, 0, $opcode, $body );
156             }
157              
158 2 50       132 my ( $version, $flags, $streamid, $result_op, $body ) = recv_frame( $self ) or croak "Unable to ->recv: $!";
159              
160 2 50       10 $version & 0x80 or croak "Expected response frame to have RESPONSE bit set";
161 2         5 $version &= 0x7f;
162              
163 2 50       12 $version <= $self->_version or
164             croak sprintf "Received message version too high to parse (%d)", $version;
165              
166 2 50       10 if( $flags & FLAG_COMPRESS ) {
167 0         0 $body = decompress( $body );
168 0         0 $flags &= ~FLAG_COMPRESS;
169             }
170 2 50       27 $flags == 0 or
171             croak sprintf "Unexpected flags 0x%02x", $flags;
172              
173 2 50       7 $streamid == 0 or
174             croak "Unexpected stream ID $streamid";
175              
176 2         18 my $response = Protocol::CassandraCQL::Frame->new( $body );
177              
178 2 50       15 if( $result_op == OPCODE_ERROR ) {
179 0         0 my ( undef, $message ) = parse_error_frame( $version, $response );
180 0         0 croak "OPCODE_ERROR: $message";
181             }
182              
183             # Version check after OPCODE_ERROR in case of "insupported version" error
184 2 50       8 $version == $self->_version or
185             croak sprintf "Unexpected message version %#02x", $version;
186              
187 2         10 return ( $result_op, $response );
188             }
189              
190             sub startup
191             {
192 1     1 0 2 my $self = shift;
193 1         4 my %args = @_;
194              
195 1         2 my ( $op, $response ) = $self->send_message( OPCODE_STARTUP,
196             build_startup_frame( $self->_version, options => {
197             CQL_VERSION => "3.0.5",
198             COMPRESSION => "Snappy",
199             } ),
200             );
201              
202 1 50       115 if( $op == OPCODE_AUTHENTICATE ) {
203 0         0 my ( $authenticator ) = parse_authenticate_frame( $self->_version, $response );
204 0 0       0 if( $authenticator eq "org.apache.cassandra.auth.PasswordAuthenticator" ) {
205 0 0 0     0 defined $args{Username} and defined $args{Password} or
206             croak "Cannot authenticate without a username/password";
207              
208 0         0 ( $op, $response ) = $self->send_message( OPCODE_CREDENTIALS,
209             build_credentials_frame( $self->_version, credentials => {
210             username => $args{Username},
211             password => $args{Password},
212             } )
213             );
214             }
215             else {
216 0         0 croak "Unrecognised authenticator $authenticator";
217             }
218             }
219              
220 1 50       10 $op == OPCODE_READY or croak "Expected OPCODE_READY";
221             }
222              
223             =head2 ( $type, $result ) = $cass->query( $cql, $consistency )
224              
225             Performs a CQL query and returns the result, as decoded by
226             L.
227              
228             For C queries, the type is C and C<$result> is a
229             string giving the name of the new keyspace.
230              
231             For C, C and C queries, the type is
232             C and C<$result> is a 3-element ARRAY reference
233             containing the type of change, the keyspace and the table name.
234              
235             For C
236             instance of L containing the returned row
237             data.
238              
239             For other queries, such as C, C and C, the method
240             returns C and C<$result> is C.
241              
242             =cut
243              
244             sub query
245             {
246 0     0 1   my $self = shift;
247 0           my ( $cql, $consistency ) = @_;
248              
249 0           my ( $op, $response ) = $self->send_message( OPCODE_QUERY,
250             build_query_frame( $self->_version, cql => $cql, consistency => $consistency )
251             );
252              
253 0 0         $op == OPCODE_RESULT or croak "Expected OPCODE_RESULT";
254 0           return parse_result_frame( $self->_version, $response );
255             }
256              
257             =head2 ( $type, $result ) = $cass->use_keyspace( $keyspace )
258              
259             A convenient shortcut to the C query which escapes the keyspace
260             name.
261              
262             =cut
263              
264             sub use_keyspace
265             {
266 0     0 1   my $self = shift;
267 0           my ( $keyspace ) = @_;
268              
269             # CQL's "quoting" handles any character except quote marks, which have to
270             # be doubled
271 0           $keyspace =~ s/"/""/g;
272              
273 0           $self->query( qq(USE "$keyspace"), 0 );
274             }
275              
276             =head1 TODO
277              
278             =over 8
279              
280             =item *
281              
282             Consider how the server's maximum supported CQL version can be detected on
283             startup. This is made hard by the fact that the server closes the connection
284             if the version is too high, so we'll have to reconnect it.
285              
286             =back
287              
288             =cut
289              
290             =head1 SPONSORS
291              
292             This code was paid for by
293              
294             =over 2
295              
296             =item *
297              
298             Perceptyx L
299              
300             =item *
301              
302             Shadowcat Systems L
303              
304             =back
305              
306             =head1 AUTHOR
307              
308             Paul Evans
309              
310             =cut
311              
312             0x55AA;