File Coverage

blib/lib/Thrift/API/HiveClient2.pm
Criterion Covered Total %
statement 35 180 19.4
branch 0 66 0.0
condition 0 18 0.0
subroutine 12 31 38.7
pod 6 8 75.0
total 53 303 17.4


line stmt bran cond sub pod time code
1             package Thrift::API::HiveClient2;
2             $Thrift::API::HiveClient2::VERSION = '0.023';
3             {
4             $Thrift::API::HiveClient2::DIST = 'Thrift-API-HiveClient2';
5             }
6              
7             # ABSTRACT: Perl to HiveServer2 Thrift API wrapper
8              
9 2     2   65673 use 5.010;
  2         15  
10 2     2   8 use strict;
  2         4  
  2         35  
11 2     2   9 use warnings;
  2         4  
  2         69  
12 2     2   925 use Moo;
  2         19025  
  2         10  
13 2     2   2470 use Carp;
  2         4  
  2         99  
14 2     2   11 use Scalar::Util qw( reftype blessed );
  2         4  
  2         88  
15 2     2   1020 use List::MoreUtils 'zip';
  2         20838  
  2         11  
16              
17 2     2   2577 use Thrift;
  2         4085  
  2         50  
18 2     2   793 use Thrift::Socket;
  2         44514  
  2         68  
19 2     2   739 use Thrift::BufferedTransport;
  2         1141  
  2         46  
20              
21             # Protocol loading is done dynamically later.
22              
23 2     2   1302 use Thrift::API::HiveClient2::TCLIService;
  2         9  
  2         4410  
24              
25             # See https://msdn.microsoft.com/en-us/library/ms711683(v=vs.85).aspx
26             my @odbc_coldesc_fields = qw(
27             TABLE_CAT
28             TABLE_SCHEM
29             TABLE_NAME
30             COLUMN_NAME
31             DATA_TYPE
32             TYPE_NAME
33             COLUMN_SIZE
34             BUFFER_LENGTH
35             DECIMAL_DIGITS
36             NUM_PREC_RADIX
37             NULLABLE
38             REMARKS
39             COLUMN_DEF
40             SQL_DATA_TYPE
41             SQL_DATETIME_SUB
42             CHAR_OCTET_LENGTH
43             ORDINAL_POSITION
44             IS_NULLABLE
45             );
46              
47             my @tabledesc_fields = qw(
48             TABLE_CAT
49             TABLE_SCHEM
50             TABLE_NAME
51             TABLE_TYPE
52             REMARKS
53             );
54              
55             # Don't use XS for now, fails initializing properly with BufferedTransport. See
56             # Thrift::XS documentation.
57             has use_xs => (
58             is => 'rwp',
59             default => sub {0},
60             lazy => 1,
61             );
62              
63             has host => (
64             is => 'ro',
65             default => sub {'localhost'},
66             );
67             has port => (
68             is => 'ro',
69             default => sub {10_000},
70             );
71             has sasl => (
72             is => 'ro',
73             default => 0,
74             );
75              
76             # Kerberos principal
77             # Usually in the format 'hive/{hostname}@REALM.COM';
78             has principal => (
79             is => 'rw',
80             );
81              
82             # 1 hour default recv socket timeout. Increase for longer-running queries
83             # called "timeout" for simplicity's sake, as this is how a user will experience
84             # it: a time after which the Thrift stack will throw an exception if not
85             # getting an answer from the server
86              
87             has timeout => (
88             is => 'rw',
89             default => sub {3_600},
90             );
91              
92             # These exist to make testing with various other Thrift Implementation classes
93             # easier, eventually.
94              
95             has _socket => ( is => 'rwp', lazy => 1 );
96             has _transport => ( is => 'rwp', lazy => 1 );
97             has _protocol => ( is => 'rwp', lazy => 1 );
98             has _client => ( is => 'rwp', lazy => 1 );
99             has _sasl => ( is => 'rwp', lazy => 1 );
100              
101             # setters implied by the 'rwp' mode on the attrs above.
102              
103 0     0     sub _set_socket { $_[0]->{_socket} = $_[1] }
104 0     0     sub _set_transport { $_[0]->{_transport} = $_[1] }
105 0     0     sub _set_protocol { $_[0]->{_protocol} = $_[1] }
106 0     0     sub _set_client { $_[0]->{_client} = $_[1] }
107              
108             sub _set_sasl {
109 0     0     my ( $self, $sasl ) = @_;
110 0 0         return if !$sasl;
111              
112             # This normally selects XS first (hopefully)
113 0           require Authen::SASL;
114 0           Authen::SASL->import;
115              
116 0           require Thrift::SASL::Transport;
117 0           Thrift::SASL::Transport->import;
118              
119 0 0         if ( $sasl == 1 ) {
    0          
120 0           return $self->{_sasl} = Authen::SASL->new( mechanism => 'GSSAPI' );
121             }
122             elsif ( reftype $sasl eq "HASH" ) {
123 0           return $self->{_sasl} = Authen::SASL->new(%$sasl); #, debug => 8 );
124             }
125 0           die "Incorrect parameter passed to _set_sasl";
126             }
127              
128             # after constructon is complete, initialize any attributes that
129             # weren't set in the constructor.
130             sub BUILD {
131 0     0 0   my $self = shift;
132              
133 0 0         $self->_set_socket( Thrift::Socket->new( $self->host, $self->port ) )
134             unless $self->_socket;
135 0           $self->_socket->setRecvTimeout( $self->timeout * 1000 );
136              
137 0 0 0       $self->_set_sasl( $self->sasl ) if ( $self->sasl && !$self->_sasl );
138              
139 0 0         if ( !$self->_transport ) {
140 0           my $transport = Thrift::BufferedTransport->new( $self->_socket );
141 0 0         if ( $self->_sasl ) {
142 0           my $debug = 0;
143 0           $self->_set_transport( Thrift::SASL::Transport->new( $transport, $self->_sasl, $debug, $self->principal ) );
144             }
145             else {
146 0           $self->_set_transport($transport);
147             }
148             }
149              
150 0 0         $self->_set_protocol( $self->_init_protocol( $self->_transport ) )
151             unless $self->_protocol;
152              
153 0 0         $self->_set_client( Thrift::API::HiveClient2::TCLIServiceClient->new( $self->_protocol ) )
154             unless $self->_client;
155             }
156              
157             sub _init_protocol {
158 0     0     my $self = shift;
159 0           my $err;
160             my $protocol = eval {
161 0 0         $self->use_xs
162             && require Thrift::XS::BinaryProtocol;
163 0           Thrift::XS::BinaryProtocol->new( $self->_transport );
164 0 0         } or do { $err = $@; 0 };
  0            
  0            
165             $protocol
166 0   0       ||= do { require Thrift::BinaryProtocol; Thrift::BinaryProtocol->new( $self->_transport ) };
  0            
  0            
167 0 0         $self->_set_use_xs(0) if ref($protocol) !~ /XS/;
168              
169             # TODO Add warning when XS was asked but failed to load
170 0           return $protocol;
171             }
172              
173             sub connect {
174 0     0 1   my ($self) = @_;
175 0           $self->_transport->open;
176             }
177              
178             has _session => (
179             is => 'rwp',
180             isa => sub {
181             my($val) = @_;
182             if ( !blessed( $val )
183             || !$val->isa('Thrift::API::HiveClient2::TOpenSessionResp')
184             ) {
185             die sprintf "Session `%s` isn't a Thrift::API::HiveClient2::TOpenSessionResp",
186             $val // '[undefined]'
187             ;
188             }
189             },
190             lazy => 1,
191             builder => '_build_session',
192             );
193              
194             has username => (
195             is => 'rwp',
196             lazy => 1,
197             default => sub { $ENV{USER} },
198             );
199              
200             has password => (
201             is => 'rwp',
202             lazy => 1,
203             default => sub {''},
204             );
205              
206             sub _build_session {
207 0     0     my $self = shift;
208 0 0         $self->_transport->open if !$self->_transport->isOpen;
209 0           return $self->_client->OpenSession(
210             Thrift::API::HiveClient2::TOpenSessionReq->new(
211             { username => $self->username,
212             password => $self->password,
213             }
214             )
215             );
216             }
217              
218             has _session_handle => (
219             is => 'rwp',
220             isa => sub {
221             my($val) = @_;
222             if ( !blessed( $val )
223             || !$val->isa('Thrift::API::HiveClient2::TSessionHandle')
224             ) {
225             die sprintf "Session handle `%s` isn't a Thrift::API::HiveClient2::TSessionHandle",
226             $val // '[undefined]'
227             ;
228             }
229             },
230             lazy => 1,
231             builder => '_build_session_handle',
232             );
233              
234             sub _build_session_handle {
235 0     0     my $self = shift;
236 0           return $self->_session->{sessionHandle};
237             }
238              
239             has _operation => (
240             is => "rwp",
241             isa => sub {
242             my($val) = @_;
243             if ( defined $val
244             && (
245             !blessed( $val )
246             || ( !$val->isa('Thrift::API::HiveClient2::TExecuteStatementResp')
247             && !$val->isa('Thrift::API::HiveClient2::TGetColumnsResp')
248             && !$val->isa('Thrift::API::HiveClient2::TGetTablesResp')
249             )
250             )
251             ) {
252             die "Operation `%s` isn't a Thrift::API::HiveClient2::T*Resp",
253             $val // '[undefined]'
254             ;
255             }
256             },
257             lazy => 1,
258             );
259              
260             has _operation_handle => (
261             is => 'rwp',
262             isa => sub {
263             my($val) = @_;
264             if (
265             defined $val
266             && ( !blessed( $val )
267             || !$val->isa('Thrift::API::HiveClient2::TOperationHandle')
268             )
269             ) {
270             die sprintf "Operation handle isn't a Thrift::API::HiveClient2::TOperationHandle",
271             $val // '[undefined]'
272             ;
273             }
274             },
275             lazy => 1,
276             );
277              
278             sub _cleanup_previous_operation {
279 0     0     my $self = shift;
280              
281             # We seeem to have some memory leaks in the Hive server, let's try freeing the
282             # operation handle explicitely
283 0 0         if ( $self->_operation_handle ) {
284 0           $self->_client->CloseOperation(
285             Thrift::API::HiveClient2::TCloseOperationReq->new(
286             { operationHandle => $self->_operation_handle, }
287             )
288             );
289 0           $self->_set__operation(undef);
290 0           $self->_set__operation_handle(undef);
291             }
292             }
293              
294             sub execute {
295 0     0 1   my $self = shift;
296 0           my ($query) = @_; # make this a bit more flexible
297              
298 0           $self->_cleanup_previous_operation;
299              
300 0           my $rh = $self->_client->ExecuteStatement(
301             Thrift::API::HiveClient2::TExecuteStatementReq->new(
302             { sessionHandle => $self->_session_handle, statement => $query, confOverlay => {} }
303             )
304             );
305 0 0         if ( $rh->{status}{errorCode} ) {
306 0           die __PACKAGE__ . "::execute: $rh->{status}{errorMessage}; HQL was: \"$query\"";
307             }
308 0           $self->_set__operation($rh);
309 0           $self->_set__operation_handle( $rh->{operationHandle} );
310 0           return $rh;
311             }
312              
313             {
314             # cache the column names we need to extract from the bloated data structure
315             # (keyed on query)
316             my ( $column_keys, $column_names );
317              
318             sub fetch_hashref {
319 0     0 1   my $self = shift;
320 0           my ( $rh, $rows_at_a_time ) = @_;
321 0           return $self->fetch( $rh, $rows_at_a_time, 1 );
322             }
323              
324             sub fetch {
325 0     0 1   my $self = shift;
326 0           my ( $rh, $rows_at_a_time, $use_hashref ) = @_;
327              
328             # if $rh looks like a number, use it instead of $rows_at_a_time
329             # it means we're using the new form for this call, which takes only the
330             # number of wanted rows, or even nothing (and relies on the defaults,
331             # and a cached copy of the query $rh)
332 0 0 0       $rows_at_a_time = $rh if ( $rh && !$rows_at_a_time && $rh =~ /^[1-9][0-9]*$/ );
      0        
333 0   0       $rows_at_a_time ||= 10_000;
334              
335 0           my $result = [];
336 0           my $has_more_rows;
337              
338             # NOTE we don't use the provided $rh any more, maybe we should leave
339             # that possibility open for parallel queries, but that would need a lot
340             # more testing. Patches welcome.
341 0           my $cached_rh = $self->_operation_handle;
342 0           $rh = $self->_client->FetchResults(
343             Thrift::API::HiveClient2::TFetchResultsReq->new(
344             { operationHandle => $cached_rh,
345             maxRows => $rows_at_a_time,
346             }
347             )
348             );
349 0 0         if ( ref $rh eq 'Thrift::API::HiveClient2::TFetchResultsResp' ) {
350              
351             # NOTE that currently (july 2013) the hasMoreRows method is broken,
352             # see the explanation in the POD
353 0           $has_more_rows = $rh->hasMoreRows();
354              
355 0 0         for my $row ( @{ $rh->{results}{rows} || [] } ) {
  0            
356              
357             # Find which fields to extract from each row, only on the first iteration
358 0 0         if ( !@{ $column_keys->{$cached_rh} || [] } ) {
  0 0          
359              
360             # metadata for the query
361 0 0         if ($use_hashref) {
362 0           my $rh_meta = $self->_client->GetResultSetMetadata(
363             Thrift::API::HiveClient2::TGetResultSetMetadataReq->new(
364             { operationHandle => $cached_rh }
365             )
366             );
367 0           $column_names = [ map { $_->{columnName} }
368 0 0         @{ $rh_meta->{schema}{columns} || [] } ];
  0            
369             }
370              
371             # TODO redo all this using the TGetResultSetMetadataResp object we retrieved
372             # above
373 0 0         for my $column ( @{ $row->{colVals} || [] } ) {
  0            
374              
375 0           my $first_col = {%$column};
376              
377             # Only 1 element of each TColumnValue is populated
378             # (although 7 keys are present, 1 for each possible data
379             # type) with a T*Value, and the rest is undef. Find out
380             # which is defined, and put the key (i.e. the data type) in
381             # cache, to reuse it to fetch the next rows faster.
382             # NOTE this data structure smells of Java and friends from
383             # miles away. Dynamically typed languages don't really need
384             # the bloat.
385 0           push @{ $column_keys->{$cached_rh} },
386 0           grep { ref $first_col->{$_} } keys %$first_col;
  0            
387             }
388             }
389              
390             # TODO find something faster? (see comment above)
391              
392 0           my $idx = 0;
393             my $retval = [
394 0           map { $_->value }
395 0           grep { defined $_ }
396 0           map { $row->{colVals}[ $idx++ ]{$_} } @{ $column_keys->{$cached_rh} }
  0            
  0            
397             ];
398 0 0         if ($use_hashref) {
399 0           push @$result, { zip @$column_names, @$retval };
400             }
401             else {
402 0           push @$result, $retval;
403             }
404             }
405             }
406 0 0         return wantarray ? ( $result, $has_more_rows ) : ( @$result ? $result : undef );
    0          
407             }
408             }
409              
410             sub get_columns {
411 0     0 1   my $self = shift;
412 0           my ( $table, $schema ) = @_;
413              
414             # note that not specifying a table name would return all columns for all
415             # tables we probably don't want that, but feel free to change this
416             # behaviour. Same goes for the schema name: we probably want a default
417             # value for the schema, which is what we use here.
418 0 0         die "Unspecified table name" if !$table;
419 0   0       $schema //= "default";
420              
421 0           $self->_cleanup_previous_operation;
422              
423 0           my $rh = $self->_client->GetColumns(
424             Thrift::API::HiveClient2::TGetColumnsReq->new(
425             { sessionHandle => $self->_session_handle,
426             catalogName => undef,
427             schemaName => $schema,
428             tableName => $table,
429             columnName => undef,
430             confOverlay => {}
431             }
432             )
433             );
434 0 0         if ( $rh->{status}{errorCode} ) {
435 0           die __PACKAGE__ . "::execute: $rh->{status}{errorMessage}";
436             }
437 0           $self->_set__operation($rh);
438 0           $self->_set__operation_handle( $rh->{operationHandle} );
439 0           my $columns;
440 0           while ( my $res = $self->fetch($rh) ) {
441 0           for my $line (@$res) {
442 0           my $idx = 0;
443 0           push @$columns, { map { $_ => $line->[ $idx++ ] } @odbc_coldesc_fields };
  0            
444             }
445             }
446 0           return $columns;
447             }
448              
449             sub get_tables {
450 0     0 1   my $self = shift;
451 0           my ( $schema, $table_pattern ) = @_;
452              
453             # note that not specifying a table name would return all columns for all
454             # tables we probably don't want that, but feel free to change this
455             # behaviour. Same goes for the schema name: we probably want a default
456             # value for the schema, which is what we use here.
457 0   0       $schema //= "default";
458              
459 0           $self->_cleanup_previous_operation;
460              
461 0           my $rh = $self->_client->GetTables(
462             Thrift::API::HiveClient2::TGetTablesReq->new(
463             { sessionHandle => $self->_session_handle,
464             catalogName => undef,
465             schemaName => $schema,
466             tableName => $table_pattern,
467             confOverlay => {},
468             }
469             )
470             );
471 0 0         if ( $rh->{status}{errorCode} ) {
472 0           die __PACKAGE__ . "::execute: $rh->{status}{errorMessage}";
473             }
474 0           $self->_set__operation($rh);
475 0           $self->_set__operation_handle( $rh->{operationHandle} );
476 0           my $tables;
477 0           while ( my $res = $self->fetch($rh) ) {
478 0           for my $line (@$res) {
479 0           my $idx = 0;
480 0           push @$tables, { map { $_ => $line->[ $idx++ ] } @tabledesc_fields };
  0            
481             }
482             }
483 0           return $tables;
484             }
485              
486             sub DEMOLISH {
487 0     0 0   my $self = shift;
488              
489 0           $self->_cleanup_previous_operation;
490              
491 0 0         if ( $self->_session_handle ) {
492 0           $self->_client->CloseSession(
493             Thrift::API::HiveClient2::TCloseSessionReq->new(
494             { sessionHandle => $self->_session_handle, }
495             )
496             );
497             }
498            
499 0 0         if ( $self->_transport ) {
500 0           $self->_transport->close;
501             }
502             }
503              
504             # when the user calls a method on an object of this class, see if that method
505             # exists on the TCLIService object. If so, create a sub that calls that method
506             # on the client object. If not, die horribly.
507             sub AUTOLOAD {
508 0     0     my ($self) = @_;
509 0           ( my $meth = our $AUTOLOAD ) =~ s/.*:://;
510 0 0         return if $meth eq 'DESTROY';
511 0           print STDERR "$meth\n";
512 2     2   16 no strict 'refs';
  2         5  
  2         280  
513 0 0         if ( $self->_client->can($meth) ) {
514 0     0     *$AUTOLOAD = sub { shift->_client->$meth(@_) };
  0            
515 0           goto &$AUTOLOAD;
516             }
517 0           croak "No such method exists: $AUTOLOAD";
518             }
519              
520             1;
521              
522             __END__