File Coverage

blib/lib/Thrift/API/HiveClient2.pm
Criterion Covered Total %
statement 35 180 19.4
branch 0 66 0.0
condition 0 18 0.0
subroutine 12 31 38.7
pod 6 8 75.0
total 53 303 17.4


line stmt bran cond sub pod time code
1             package Thrift::API::HiveClient2;
2             $Thrift::API::HiveClient2::VERSION = '0.024';
3             {
4             $Thrift::API::HiveClient2::DIST = 'Thrift-API-HiveClient2';
5             }
6              
7             # ABSTRACT: Perl to HiveServer2 Thrift API wrapper
8              
9 2     2   68270 use 5.010;
  2         18  
10 2     2   11 use strict;
  2         3  
  2         41  
11 2     2   10 use warnings;
  2         3  
  2         70  
12 2     2   1051 use Moo;
  2         22300  
  2         10  
13 2     2   2923 use Carp;
  2         5  
  2         115  
14 2     2   13 use Scalar::Util qw( reftype blessed );
  2         4  
  2         96  
15 2     2   1110 use List::MoreUtils 'zip';
  2         24526  
  2         13  
16              
17 2     2   2992 use Thrift;
  2         4470  
  2         61  
18 2     2   889 use Thrift::Socket;
  2         52603  
  2         83  
19 2     2   859 use Thrift::BufferedTransport;
  2         1613  
  2         59  
20              
21             # Protocol loading is done dynamically later.
22              
23 2     2   1527 use Thrift::API::HiveClient2::TCLIService;
  2         9  
  2         5127  
24              
25             # See https://msdn.microsoft.com/en-us/library/ms711683(v=vs.85).aspx
26             my @odbc_coldesc_fields = qw(
27             TABLE_CAT
28             TABLE_SCHEM
29             TABLE_NAME
30             COLUMN_NAME
31             DATA_TYPE
32             TYPE_NAME
33             COLUMN_SIZE
34             BUFFER_LENGTH
35             DECIMAL_DIGITS
36             NUM_PREC_RADIX
37             NULLABLE
38             REMARKS
39             COLUMN_DEF
40             SQL_DATA_TYPE
41             SQL_DATETIME_SUB
42             CHAR_OCTET_LENGTH
43             ORDINAL_POSITION
44             IS_NULLABLE
45             );
46              
47             my @tabledesc_fields = qw(
48             TABLE_CAT
49             TABLE_SCHEM
50             TABLE_NAME
51             TABLE_TYPE
52             REMARKS
53             );
54              
55             # Don't use XS for now, fails initializing properly with BufferedTransport. See
56             # Thrift::XS documentation.
57             has use_xs => (
58             is => 'rwp',
59             default => sub {0},
60             lazy => 1,
61             );
62              
63             has host => (
64             is => 'ro',
65             default => sub {'localhost'},
66             );
67             has port => (
68             is => 'ro',
69             default => sub {10_000},
70             );
71             has sasl => (
72             is => 'ro',
73             default => 0,
74             );
75              
76             # Kerberos principal
77             # Usually in the format 'hive/{hostname}@REALM.COM';
78             has principal => (
79             is => 'rw',
80             );
81              
82             # 1 hour default recv socket timeout. Increase for longer-running queries
83             # called "timeout" for simplicity's sake, as this is how a user will experience
84             # it: a time after which the Thrift stack will throw an exception if not
85             # getting an answer from the server
86              
87             has timeout => (
88             is => 'rw',
89             default => sub {3_600},
90             );
91              
92             # These exist to make testing with various other Thrift Implementation classes
93             # easier, eventually.
94              
95             has _socket => ( is => 'rwp', lazy => 1 );
96             has _transport => ( is => 'rwp', lazy => 1 );
97             has _protocol => ( is => 'rwp', lazy => 1 );
98             has _client => ( is => 'rwp', lazy => 1 );
99             has _sasl => ( is => 'rwp', lazy => 1 );
100              
101             # setters implied by the 'rwp' mode on the attrs above.
102              
103 0     0     sub _set_socket { $_[0]->{_socket} = $_[1] }
104 0     0     sub _set_transport { $_[0]->{_transport} = $_[1] }
105 0     0     sub _set_protocol { $_[0]->{_protocol} = $_[1] }
106 0     0     sub _set_client { $_[0]->{_client} = $_[1] }
107              
108             sub _set_sasl {
109 0     0     my ( $self, $sasl ) = @_;
110 0 0         return if !$sasl;
111              
112             # This normally selects XS first (hopefully)
113 0           require Authen::SASL;
114 0           Authen::SASL->import;
115              
116 0           require Thrift::SASL::Transport;
117 0           Thrift::SASL::Transport->import;
118              
119 0 0         if ( $sasl == 1 ) {
    0          
120 0           return $self->{_sasl} = Authen::SASL->new( mechanism => 'GSSAPI' );
121             }
122             elsif ( reftype $sasl eq "HASH" ) {
123 0           return $self->{_sasl} = Authen::SASL->new(%$sasl); #, debug => 8 );
124             }
125 0           die "Incorrect parameter passed to _set_sasl";
126             }
127              
128             # after constructon is complete, initialize any attributes that
129             # weren't set in the constructor.
130             sub BUILD {
131 0     0 0   my $self = shift;
132              
133 0 0         $self->_set_socket( Thrift::Socket->new( $self->host, $self->port ) )
134             unless $self->_socket;
135 0           $self->_socket->setRecvTimeout( $self->timeout * 1000 );
136              
137 0 0 0       $self->_set_sasl( $self->sasl ) if ( $self->sasl && !$self->_sasl );
138              
139 0 0         if ( !$self->_transport ) {
140 0           my $transport = Thrift::BufferedTransport->new( $self->_socket );
141 0 0         if ( $self->_sasl ) {
142 0           my $debug = 0;
143 0           $self->_set_transport( Thrift::SASL::Transport->new( $transport, $self->_sasl, $debug, $self->principal ) );
144             }
145             else {
146 0           $self->_set_transport($transport);
147             }
148             }
149              
150 0 0         $self->_set_protocol( $self->_init_protocol( $self->_transport ) )
151             unless $self->_protocol;
152              
153 0 0         $self->_set_client( Thrift::API::HiveClient2::TCLIServiceClient->new( $self->_protocol ) )
154             unless $self->_client;
155             }
156              
157             sub _init_protocol {
158 0     0     my $self = shift;
159 0           my $err;
160             my $protocol = eval {
161 0 0         $self->use_xs
162             && require Thrift::XS::BinaryProtocol;
163 0           Thrift::XS::BinaryProtocol->new( $self->_transport );
164 0 0         } or do { $err = $@; 0 };
  0            
  0            
165             $protocol
166 0   0       ||= do { require Thrift::BinaryProtocol; Thrift::BinaryProtocol->new( $self->_transport ) };
  0            
  0            
167 0 0         $self->_set_use_xs(0) if ref($protocol) !~ /XS/;
168              
169             # TODO Add warning when XS was asked but failed to load
170 0           return $protocol;
171             }
172              
173             sub connect {
174 0     0 1   my ($self) = @_;
175 0           $self->_transport->open;
176             }
177              
178             has _session => (
179             is => 'rwp',
180             isa => sub {
181             my($val) = @_;
182             if ( !blessed( $val )
183             || !$val->isa('Thrift::API::HiveClient2::TOpenSessionResp')
184             ) {
185             die sprintf "Session `%s` isn't a Thrift::API::HiveClient2::TOpenSessionResp",
186             $val // '[undefined]'
187             ;
188             }
189             },
190             lazy => 1,
191             builder => '_build_session',
192             );
193              
194             has username => (
195             is => 'rwp',
196             lazy => 1,
197             default => sub { $ENV{USER} },
198             );
199              
200             has password => (
201             is => 'rwp',
202             lazy => 1,
203             default => sub {''},
204             );
205              
206             sub _build_session {
207 0     0     my $self = shift;
208 0 0         $self->_transport->open if !$self->_transport->isOpen;
209 0           return $self->_client->OpenSession(
210             Thrift::API::HiveClient2::TOpenSessionReq->new(
211             { username => $self->username,
212             password => $self->password,
213             }
214             )
215             );
216             }
217              
218             has _session_handle => (
219             is => 'rwp',
220             isa => sub {
221             my($val) = @_;
222             if ( !blessed( $val )
223             || !$val->isa('Thrift::API::HiveClient2::TSessionHandle')
224             ) {
225             die sprintf "Session handle `%s` isn't a Thrift::API::HiveClient2::TSessionHandle",
226             $val // '[undefined]'
227             ;
228             }
229             },
230             lazy => 1,
231             builder => '_build_session_handle',
232             predicate => '_has_session_handle',
233             );
234              
235             sub _build_session_handle {
236 0     0     my $self = shift;
237 0           return $self->_session->{sessionHandle};
238             }
239              
240             has _operation => (
241             is => "rwp",
242             isa => sub {
243             my($val) = @_;
244             if ( defined $val
245             && (
246             !blessed( $val )
247             || ( !$val->isa('Thrift::API::HiveClient2::TExecuteStatementResp')
248             && !$val->isa('Thrift::API::HiveClient2::TGetColumnsResp')
249             && !$val->isa('Thrift::API::HiveClient2::TGetTablesResp')
250             )
251             )
252             ) {
253             die "Operation `%s` isn't a Thrift::API::HiveClient2::T*Resp",
254             $val // '[undefined]'
255             ;
256             }
257             },
258             lazy => 1,
259             );
260              
261             has _operation_handle => (
262             is => 'rwp',
263             isa => sub {
264             my($val) = @_;
265             if (
266             defined $val
267             && ( !blessed( $val )
268             || !$val->isa('Thrift::API::HiveClient2::TOperationHandle')
269             )
270             ) {
271             die sprintf "Operation handle isn't a Thrift::API::HiveClient2::TOperationHandle",
272             $val // '[undefined]'
273             ;
274             }
275             },
276             lazy => 1,
277             );
278              
279             sub _cleanup_previous_operation {
280 0     0     my $self = shift;
281              
282             # We seeem to have some memory leaks in the Hive server, let's try freeing the
283             # operation handle explicitely
284 0 0         if ( $self->_operation_handle ) {
285 0           $self->_client->CloseOperation(
286             Thrift::API::HiveClient2::TCloseOperationReq->new(
287             { operationHandle => $self->_operation_handle, }
288             )
289             );
290 0           $self->_set__operation(undef);
291 0           $self->_set__operation_handle(undef);
292             }
293             }
294              
295             sub execute {
296 0     0 1   my $self = shift;
297 0           my ($query) = @_; # make this a bit more flexible
298              
299 0           $self->_cleanup_previous_operation;
300              
301 0           my $rh = $self->_client->ExecuteStatement(
302             Thrift::API::HiveClient2::TExecuteStatementReq->new(
303             { sessionHandle => $self->_session_handle, statement => $query, confOverlay => {} }
304             )
305             );
306 0 0         if ( $rh->{status}{errorCode} ) {
307 0           die __PACKAGE__ . "::execute: $rh->{status}{errorMessage}; HQL was: \"$query\"";
308             }
309 0           $self->_set__operation($rh);
310 0           $self->_set__operation_handle( $rh->{operationHandle} );
311 0           return $rh;
312             }
313              
314             {
315             # cache the column names we need to extract from the bloated data structure
316             # (keyed on query)
317             my ( $column_keys, $column_names );
318              
319             sub fetch_hashref {
320 0     0 1   my $self = shift;
321 0           my ( $rh, $rows_at_a_time ) = @_;
322 0           return $self->fetch( $rh, $rows_at_a_time, 1 );
323             }
324              
325             sub fetch {
326 0     0 1   my $self = shift;
327 0           my ( $rh, $rows_at_a_time, $use_hashref ) = @_;
328              
329             # if $rh looks like a number, use it instead of $rows_at_a_time
330             # it means we're using the new form for this call, which takes only the
331             # number of wanted rows, or even nothing (and relies on the defaults,
332             # and a cached copy of the query $rh)
333 0 0 0       $rows_at_a_time = $rh if ( $rh && !$rows_at_a_time && $rh =~ /^[1-9][0-9]*$/ );
      0        
334 0   0       $rows_at_a_time ||= 10_000;
335              
336 0           my $result = [];
337 0           my $has_more_rows;
338              
339             # NOTE we don't use the provided $rh any more, maybe we should leave
340             # that possibility open for parallel queries, but that would need a lot
341             # more testing. Patches welcome.
342 0           my $cached_rh = $self->_operation_handle;
343 0           $rh = $self->_client->FetchResults(
344             Thrift::API::HiveClient2::TFetchResultsReq->new(
345             { operationHandle => $cached_rh,
346             maxRows => $rows_at_a_time,
347             }
348             )
349             );
350 0 0         if ( ref $rh eq 'Thrift::API::HiveClient2::TFetchResultsResp' ) {
351              
352             # NOTE that currently (july 2013) the hasMoreRows method is broken,
353             # see the explanation in the POD
354 0           $has_more_rows = $rh->hasMoreRows();
355              
356 0 0         for my $row ( @{ $rh->{results}{rows} || [] } ) {
  0            
357              
358             # Find which fields to extract from each row, only on the first iteration
359 0 0         if ( !@{ $column_keys->{$cached_rh} || [] } ) {
  0 0          
360              
361             # metadata for the query
362 0 0         if ($use_hashref) {
363 0           my $rh_meta = $self->_client->GetResultSetMetadata(
364             Thrift::API::HiveClient2::TGetResultSetMetadataReq->new(
365             { operationHandle => $cached_rh }
366             )
367             );
368 0           $column_names = [ map { $_->{columnName} }
369 0 0         @{ $rh_meta->{schema}{columns} || [] } ];
  0            
370             }
371              
372             # TODO redo all this using the TGetResultSetMetadataResp object we retrieved
373             # above
374 0 0         for my $column ( @{ $row->{colVals} || [] } ) {
  0            
375              
376 0           my $first_col = {%$column};
377              
378             # Only 1 element of each TColumnValue is populated
379             # (although 7 keys are present, 1 for each possible data
380             # type) with a T*Value, and the rest is undef. Find out
381             # which is defined, and put the key (i.e. the data type) in
382             # cache, to reuse it to fetch the next rows faster.
383             # NOTE this data structure smells of Java and friends from
384             # miles away. Dynamically typed languages don't really need
385             # the bloat.
386 0           push @{ $column_keys->{$cached_rh} },
387 0           grep { ref $first_col->{$_} } keys %$first_col;
  0            
388             }
389             }
390              
391             # TODO find something faster? (see comment above)
392              
393 0           my $idx = 0;
394             my $retval = [
395 0           map { $_->value }
396 0           grep { defined $_ }
397 0           map { $row->{colVals}[ $idx++ ]{$_} } @{ $column_keys->{$cached_rh} }
  0            
  0            
398             ];
399 0 0         if ($use_hashref) {
400 0           push @$result, { zip @$column_names, @$retval };
401             }
402             else {
403 0           push @$result, $retval;
404             }
405             }
406             }
407 0 0         return wantarray ? ( $result, $has_more_rows ) : ( @$result ? $result : undef );
    0          
408             }
409             }
410              
411             sub get_columns {
412 0     0 1   my $self = shift;
413 0           my ( $table, $schema ) = @_;
414              
415             # note that not specifying a table name would return all columns for all
416             # tables we probably don't want that, but feel free to change this
417             # behaviour. Same goes for the schema name: we probably want a default
418             # value for the schema, which is what we use here.
419 0 0         die "Unspecified table name" if !$table;
420 0   0       $schema //= "default";
421              
422 0           $self->_cleanup_previous_operation;
423              
424 0           my $rh = $self->_client->GetColumns(
425             Thrift::API::HiveClient2::TGetColumnsReq->new(
426             { sessionHandle => $self->_session_handle,
427             catalogName => undef,
428             schemaName => $schema,
429             tableName => $table,
430             columnName => undef,
431             confOverlay => {}
432             }
433             )
434             );
435 0 0         if ( $rh->{status}{errorCode} ) {
436 0           die __PACKAGE__ . "::execute: $rh->{status}{errorMessage}";
437             }
438 0           $self->_set__operation($rh);
439 0           $self->_set__operation_handle( $rh->{operationHandle} );
440 0           my $columns;
441 0           while ( my $res = $self->fetch($rh) ) {
442 0           for my $line (@$res) {
443 0           my $idx = 0;
444 0           push @$columns, { map { $_ => $line->[ $idx++ ] } @odbc_coldesc_fields };
  0            
445             }
446             }
447 0           return $columns;
448             }
449              
450             sub get_tables {
451 0     0 1   my $self = shift;
452 0           my ( $schema, $table_pattern ) = @_;
453              
454             # note that not specifying a table name would return all columns for all
455             # tables we probably don't want that, but feel free to change this
456             # behaviour. Same goes for the schema name: we probably want a default
457             # value for the schema, which is what we use here.
458 0   0       $schema //= "default";
459              
460 0           $self->_cleanup_previous_operation;
461              
462 0           my $rh = $self->_client->GetTables(
463             Thrift::API::HiveClient2::TGetTablesReq->new(
464             { sessionHandle => $self->_session_handle,
465             catalogName => undef,
466             schemaName => $schema,
467             tableName => $table_pattern,
468             confOverlay => {},
469             }
470             )
471             );
472 0 0         if ( $rh->{status}{errorCode} ) {
473 0           die __PACKAGE__ . "::execute: $rh->{status}{errorMessage}";
474             }
475 0           $self->_set__operation($rh);
476 0           $self->_set__operation_handle( $rh->{operationHandle} );
477 0           my $tables;
478 0           while ( my $res = $self->fetch($rh) ) {
479 0           for my $line (@$res) {
480 0           my $idx = 0;
481 0           push @$tables, { map { $_ => $line->[ $idx++ ] } @tabledesc_fields };
  0            
482             }
483             }
484 0           return $tables;
485             }
486              
487             sub DEMOLISH {
488 0     0 0   my $self = shift;
489              
490 0           $self->_cleanup_previous_operation;
491              
492 0 0         if ( $self->_has_session_handle ) {
493 0           $self->_client->CloseSession(
494             Thrift::API::HiveClient2::TCloseSessionReq->new(
495             { sessionHandle => $self->_session_handle, }
496             )
497             );
498             }
499            
500 0 0         if ( $self->_transport ) {
501 0           $self->_transport->close;
502             }
503             }
504              
505             # when the user calls a method on an object of this class, see if that method
506             # exists on the TCLIService object. If so, create a sub that calls that method
507             # on the client object. If not, die horribly.
508             sub AUTOLOAD {
509 0     0     my ($self) = @_;
510 0           ( my $meth = our $AUTOLOAD ) =~ s/.*:://;
511 0 0         return if $meth eq 'DESTROY';
512 0           print STDERR "$meth\n";
513 2     2   19 no strict 'refs';
  2         4  
  2         325  
514 0 0         if ( $self->_client->can($meth) ) {
515 0     0     *$AUTOLOAD = sub { shift->_client->$meth(@_) };
  0            
516 0           goto &$AUTOLOAD;
517             }
518 0           croak "No such method exists: $AUTOLOAD";
519             }
520              
521             1;
522              
523             __END__