File Coverage

blib/lib/Protocol/CassandraCQL.pm
Criterion Covered Total %
statement 27 29 93.1
branch 5 8 62.5
condition 1 2 50.0
subroutine 8 9 88.8
pod 6 6 100.0
total 47 54 87.0


line stmt bran cond sub pod time code
1             # You may distribute under the terms of either the GNU General Public License
2             # or the Artistic License (the same terms as Perl itself)
3             #
4             # (C) Paul Evans, 2013-2014 -- leonerd@leonerd.org.uk
5              
6             package Protocol::CassandraCQL;
7              
8 7     7   41784 use strict;
  7         13  
  7         529  
9 7     7   44 use warnings;
  7         11  
  7         387  
10              
11             our $VERSION = '0.12';
12              
13 7     7   55 use Exporter 'import';
  7         13  
  7         7081  
14             our @EXPORT_OK = qw(
15             parse_frame recv_frame
16             build_frame send_frame
17             lookup_consistency
18             );
19              
20             =head1 NAME
21              
22             C - wire protocol support functions for Cassandra CQL
23              
24             =head1 DESCRIPTION
25              
26             This module provides the basic constants and other support functions required
27             to communicate with a Cassandra database using C. It is not in itself a
28             CQL client; it simply provides the necessary support functions to allow one to
29             be written. It supports the additions added by C version 2.
30              
31             For a complete client, see instead L.
32              
33             =cut
34              
35             =head1 CONSTANTS
36              
37             The following families of constants are defined, along with export tags:
38              
39             =head2 FLAG_* (:flags)
40              
41             Bitmask of flags used in message frames.
42              
43             =head2 OPCODE_* (:opcodes)
44              
45             Opcodes used in message frames.
46              
47             =head2 QUERY_* (:queryflags)
48              
49             Flag constants used in C frames.
50              
51             =head2 BATCH_* (:batches)
52              
53             Type constants used in C frames.
54              
55             =head2 RESULT_* (:results)
56              
57             Result codes used in C frames.
58              
59             =head2 ROWS_* (:rowflags)
60              
61             Flag constants used in C frames.
62              
63             =head2 TYPE_* (:types)
64              
65             Type codes used in C and C column metadata.
66              
67             =head2 CONSISTENCY_* (:consistencies)
68              
69             Consistency levels used in C and C frames.
70              
71             =cut
72              
73             # See also
74             # https://github.com/apache/cassandra/blob/cassandra-1.2/doc/native_protocol.spec
75              
76             my %CONSTANTS = (
77             FLAG_COMPRESS => 0x01,
78             FLAG_TRACE => 0x02,
79              
80             OPCODE_ERROR => 0x00,
81             OPCODE_STARTUP => 0x01,
82             OPCODE_READY => 0x02,
83             OPCODE_AUTHENTICATE => 0x03,
84             OPCODE_CREDENTIALS => 0x04,
85             OPCODE_OPTIONS => 0x05,
86             OPCODE_SUPPORTED => 0x06,
87             OPCODE_QUERY => 0x07,
88             OPCODE_RESULT => 0x08,
89             OPCODE_PREPARE => 0x09,
90             OPCODE_EXECUTE => 0x0A,
91             OPCODE_REGISTER => 0x0B,
92             OPCODE_EVENT => 0x0C,
93             OPCODE_BATCH => 0x0D,
94             OPCODE_AUTH_CHALLENGE => 0x0E,
95             OPCODE_AUTH_RESPONSE => 0x0F,
96             OPCODE_AUTH_SUCCESS => 0x10,
97              
98             QUERY_VALUES => 0x0001,
99             QUERY_SKIP_METADATA => 0x0002,
100             QUERY_PAGE_SIZE => 0x0004,
101             QUERY_WITH_PAGING_STATE => 0x0008,
102             QUERY_WITH_SERIAL_CONSISTENCY => 0x0010,
103              
104             BATCH_LOGGED => 0,
105             BATCH_UNLOGGED => 1,
106             BATCH_COUNTER => 2,
107              
108             RESULT_VOID => 0x0001,
109             RESULT_ROWS => 0x0002,
110             RESULT_SET_KEYSPACE => 0x0003,
111             RESULT_PREPARED => 0x0004,
112             RESULT_SCHEMA_CHANGE => 0x0005,
113              
114             ROWS_HAS_GLOBALTABLESPEC => 0x0001,
115             ROWS_HAS_MORE_PAGES => 0x0002,
116             ROWS_NO_METADATA => 0x0004,
117              
118             TYPE_CUSTOM => 0x0000,
119             TYPE_ASCII => 0x0001,
120             TYPE_BIGINT => 0x0002,
121             TYPE_BLOB => 0x0003,
122             TYPE_BOOLEAN => 0x0004,
123             TYPE_COUNTER => 0x0005,
124             TYPE_DECIMAL => 0x0006,
125             TYPE_DOUBLE => 0x0007,
126             TYPE_FLOAT => 0x0008,
127             TYPE_INT => 0x0009,
128             TYPE_TEXT => 0x000A,
129             TYPE_TIMESTAMP => 0x000B,
130             TYPE_UUID => 0x000C,
131             TYPE_VARCHAR => 0x000D,
132             TYPE_VARINT => 0x000E,
133             TYPE_TIMEUUID => 0x000F,
134             TYPE_INET => 0x0010,
135             TYPE_LIST => 0x0020,
136             TYPE_MAP => 0x0021,
137             TYPE_SET => 0x0022,
138              
139             CONSISTENCY_ANY => 0x0000,
140             CONSISTENCY_ONE => 0x0001,
141             CONSISTENCY_TWO => 0x0002,
142             CONSISTENCY_THREE => 0x0003,
143             CONSISTENCY_QUORUM => 0x0004,
144             CONSISTENCY_ALL => 0x0005,
145             CONSISTENCY_LOCAL_QUORUM => 0x0006,
146             CONSISTENCY_EACH_QUORUM => 0x0007,
147             CONSISTENCY_SERIAL => 0x0008,
148             CONSISTENCY_LOCAL_SERIAL => 0x0009,
149             CONSISTENCY_LOCAL_ONE => 0x000A,
150             );
151              
152             require constant;
153             constant->import( $_, $CONSTANTS{$_} ) for keys %CONSTANTS;
154             push @EXPORT_OK, keys %CONSTANTS;
155              
156             our %EXPORT_TAGS = (
157             'flags' => [ grep { m/^FLAG_/ } keys %CONSTANTS ],
158             'opcodes' => [ grep { m/^OPCODE_/ } keys %CONSTANTS ],
159             'queryflags' => [ grep { m/^QUERY_/ } keys %CONSTANTS ],
160             'batches' => [ grep { m/^BATCH_/ } keys %CONSTANTS ],
161             'results' => [ grep { m/^RESULT_/ } keys %CONSTANTS ],
162             'rowflags' => [ grep { m/^ROWS_/ } keys %CONSTANTS ],
163             'types' => [ grep { m/^TYPE_/ } keys %CONSTANTS ],
164             'consistencies' => [ grep { m/^CONSISTENCY_/ } keys %CONSTANTS ],
165             );
166              
167             =head1 FUNCTIONS
168              
169             =cut
170              
171             =head2 ( $version, $flags, $streamid, $opcode, $body ) = parse_frame( $bytes )
172              
173             Attempts to parse a complete message frame from the given byte string. If it
174             succeeds, it returns the header fields and the body as an opaque byte string.
175             If it fails, it returns an empty list.
176              
177             If successful, it will remove the bytes of the message from the C<$bytes>
178             scalar, which must therefore be mutable.
179              
180             =cut
181              
182             sub parse_frame
183             {
184 1 50   1 1 778 return unless length $_[0] >= 8; # header length
185              
186 1         5 my $bodylen = unpack( "x4 N", $_[0] );
187 1 50       11 return unless length $_[0] >= 8 + $bodylen;
188              
189             # Now committed to extracting a frame
190 1         5 my ( $version, $flags, $streamid, $opcode ) = unpack( "C C C C x4", substr $_[0], 0, 8, "" );
191 1         3 my $body = substr $_[0], 0, $bodylen, "";
192              
193 1         3 return ( $version, $flags, $streamid, $opcode, $body );
194             }
195              
196             =head2 ( $version, $flags, $streamid, $opcode, $body ) = recv_frame( $fh )
197              
198             Attempts to read a complete frame from the given filehandle, blocking until it
199             is available. If an IO error happens, returns an empty list. The results are
200             undefined if this method is called on a non-blocking filehandle.
201              
202             =cut
203              
204             sub recv_frame
205             {
206 3     3 1 385 my ( $fh ) = @_;
207              
208 3 50       17 $fh->read( my $header, 8 ) or return;
209 3         416 my ( $version, $flags, $streamid, $opcode, $bodylen ) = unpack( "C C C C N", $header );
210              
211 3         7 my $body = "";
212 3 100 50     12 $fh->read( $body, $bodylen ) or return if $bodylen;
213              
214 3         20 return ( $version, $flags, $streamid, $opcode, $body );
215             }
216              
217             =head2 $bytes = build_frame( $version, $flags, $streamid, $opcode, $body )
218              
219             Returns a byte string containing a complete message with the given fields as
220             the header and body.
221              
222             =cut
223              
224             sub build_frame
225             {
226 4     4 1 11 my ( $version, $flags, $streamid, $opcode, $body ) = @_;
227              
228 4         48 return pack "C C C C N a*", $version, $flags, $streamid, $opcode, length $body, $body;
229             }
230              
231             =head2 send_frame( $fh, $version, $flags, $streamid, $opcode, $body )
232              
233             Sends a complete frame to the given filehandle.
234              
235             =cut
236              
237             sub send_frame
238             {
239 3     3 1 422 my $fh = shift;
240 3         40 $fh->print( build_frame( @_ ) );
241             }
242              
243             =head2 $consistency = lookup_consistency( $name )
244              
245             Returns the C value for the given name (without the initial
246             C prefix).
247              
248             =cut
249              
250             my %consvals = map { substr($_, 12) => __PACKAGE__->$_ } grep { m/^CONSISTENCY_/ } keys %CONSTANTS;
251              
252             sub lookup_consistency
253             {
254 0     0 1 0 my ( $name ) = @_;
255 0         0 return $consvals{$name};
256             }
257              
258             =head2 $name = typename( $type )
259              
260             Returns the name of the given C value, without the initial C
261             prefix.
262              
263             =cut
264              
265             my %typevals = map { substr($_, 5) => __PACKAGE__->$_ } grep { m/^TYPE_/ } keys %CONSTANTS;
266             my %typenames = reverse %typevals;
267              
268             sub typename
269             {
270 26     26 1 30 my ( $type ) = @_;
271 26         94 return $typenames{$type};
272             }
273              
274             =head1 TODO
275              
276             =over 8
277              
278             =item *
279              
280             Reimplement L in XS code for better
281             performance.
282              
283             =back
284              
285             =cut
286              
287             =head1 SPONSORS
288              
289             This code was paid for by
290              
291             =over 2
292              
293             =item *
294              
295             Perceptyx L
296              
297             =item *
298              
299             Shadowcat Systems L
300              
301             =back
302              
303             =head1 AUTHOR
304              
305             Paul Evans
306              
307             =cut
308              
309             0x55AA;