File Coverage

blib/lib/Thrift/API/HiveClient2.pm
Criterion Covered Total %
statement 33 176 18.7
branch 0 64 0.0
condition 0 18 0.0
subroutine 11 30 36.6
pod 6 8 75.0
total 50 296 16.8


line stmt bran cond sub pod time code
1             package Thrift::API::HiveClient2;
2             $Thrift::API::HiveClient2::VERSION = '0.021';
3             {
4             $Thrift::API::HiveClient2::DIST = 'Thrift-API-HiveClient2';
5             }
6              
7             # ABSTRACT: Perl to HiveServer2 Thrift API wrapper
8              
9 2     2   22288 use strict;
  2         3  
  2         60  
10 2     2   11 use warnings;
  2         3  
  2         64  
11 2     2   2539 use Moo;
  2         44270  
  2         11  
12 2     2   3943 use Carp;
  2         5  
  2         151  
13 2     2   10 use Scalar::Util qw( reftype blessed );
  2         4  
  2         166  
14 2     2   1645 use List::MoreUtils 'zip';
  2         25196  
  2         16  
15              
16 2     2   4812 use Thrift;
  2         4962  
  2         57  
17 2     2   1456 use Thrift::Socket;
  2         64454  
  2         59  
18 2     2   1388 use Thrift::BufferedTransport;
  2         1385  
  2         82  
19              
20             # An unclean workaround to have the multiple "die new TException" statements
21             # display something useful instead of a useless stringified hash reference.
22             # Currently tied to version 0.7 to 0.9; Thrift people don't seem to read cpan
23             # bug reports, will need to poke them directly.
24             # see https://rt.cpan.org/Public/Bug/Display.html?id=86679
25              
26             # update: not indexable by cpan as it injects code in another class which is
27             # not ours. plus the version check doesn't work either because of a missing
28             # package declaration in the Thrift class file, see:
29             # https://issues.apache.org/jira/browse/THRIFT-3551
30             # https://issues.apache.org/jira/browse/THRIFT-3549
31              
32             #package Thrift::TException;
33             #use version;
34             #
35             #my $thrift_version = version->parse($Thrift::VERSION);
36             #if ( $thrift_version >= version->parse('0.7.0') && $thrift_version <= version->parse('0.9.3') )
37             #{
38             # eval <<'OVERLOAD'; #<<<
39             # use overload '""' => sub {
40             # return
41             # ref( $_[0] )
42             # . " error: "
43             # . ( $_[0]->{message} || 'empty message' )
44             # . " (code "
45             # . ( defined $_[0]->{code} ? $_[0]->{code} : 'undefined' ) . ")";
46             # };
47             #OVERLOAD
48             #}
49              
50             #>>>
51              
52             package Thrift::API::HiveClient2;
53              
54             # Protocol loading is done dynamically later.
55              
56 2     2   2045 use Thrift::API::HiveClient2::TCLIService;
  2         7  
  2         5061  
57              
58             # See https://msdn.microsoft.com/en-us/library/ms711683(v=vs.85).aspx
59             my @odbc_coldesc_fields = qw(
60             TABLE_CAT
61             TABLE_SCHEM
62             TABLE_NAME
63             COLUMN_NAME
64             DATA_TYPE
65             TYPE_NAME
66             COLUMN_SIZE
67             BUFFER_LENGTH
68             DECIMAL_DIGITS
69             NUM_PREC_RADIX
70             NULLABLE
71             REMARKS
72             COLUMN_DEF
73             SQL_DATA_TYPE
74             SQL_DATETIME_SUB
75             CHAR_OCTET_LENGTH
76             ORDINAL_POSITION
77             IS_NULLABLE
78             );
79              
80             my @tabledesc_fields = qw(
81             TABLE_CAT
82             TABLE_SCHEM
83             TABLE_NAME
84             TABLE_TYPE
85             REMARKS
86             );
87              
88             # Don't use XS for now, fails initializing properly with BufferedTransport. See
89             # Thrift::XS documentation.
90             has use_xs => (
91             is => 'rwp',
92             default => sub {0},
93             lazy => 1,
94             );
95              
96             has host => (
97             is => 'ro',
98             default => sub {'localhost'},
99             );
100             has port => (
101             is => 'ro',
102             default => sub {10_000},
103             );
104             has sasl => (
105             is => 'ro',
106             default => 0,
107             );
108              
109             # 1 hour default recv socket timeout. Increase for longer-running queries
110             # called "timeout" for simplicity's sake, as this is how a user will experience
111             # it: a time after which the Thrift stack will throw an exception if not
112             # getting an answer from the server
113              
114             has timeout => (
115             is => 'rw',
116             default => sub {3_600},
117             );
118              
119             # These exist to make testing with various other Thrift Implementation classes
120             # easier, eventually.
121              
122             has _socket => ( is => 'rwp', lazy => 1 );
123             has _transport => ( is => 'rwp', lazy => 1 );
124             has _protocol => ( is => 'rwp', lazy => 1 );
125             has _client => ( is => 'rwp', lazy => 1 );
126             has _sasl => ( is => 'rwp', lazy => 1 );
127              
128             # setters implied by the 'rwp' mode on the attrs above.
129              
130 0     0     sub _set_socket { $_[0]->{_socket} = $_[1] }
131 0     0     sub _set_transport { $_[0]->{_transport} = $_[1] }
132 0     0     sub _set_protocol { $_[0]->{_protocol} = $_[1] }
133 0     0     sub _set_client { $_[0]->{_client} = $_[1] }
134              
135             sub _set_sasl {
136 0     0     my ( $self, $sasl ) = @_;
137 0 0         return if !$sasl;
138              
139             # This normally selects XS first (hopefully)
140 0           require Authen::SASL;
141 0           Authen::SASL->import;
142              
143 0           require Thrift::SASL::Transport;
144 0           Thrift::SASL::Transport->import;
145              
146 0 0         if ( $sasl == 1 ) {
    0          
147 0           return $self->{_sasl} = Authen::SASL->new( mechanism => 'GSSAPI' );
148             }
149             elsif ( reftype $sasl eq "HASH" ) {
150 0           return $self->{_sasl} = Authen::SASL->new(%$sasl); #, debug => 8 );
151             }
152 0           die "Incorrect parameter passed to _set_sasl";
153             }
154              
155             # after constructon is complete, initialize any attributes that
156             # weren't set in the constructor.
157             sub BUILD {
158 0     0 0   my $self = shift;
159              
160 0 0         $self->_set_socket( Thrift::Socket->new( $self->host, $self->port ) )
161             unless $self->_socket;
162 0           $self->_socket->setRecvTimeout( $self->timeout * 1000 );
163              
164 0 0 0       $self->_set_sasl( $self->sasl ) if ( $self->sasl && !$self->_sasl );
165              
166 0 0         if ( !$self->_transport ) {
167 0           my $transport = Thrift::BufferedTransport->new( $self->_socket );
168 0 0         if ( $self->_sasl ) {
169 0           $self->_set_transport( Thrift::SASL::Transport->new( $transport, $self->_sasl ) );
170             }
171             else {
172 0           $self->_set_transport($transport);
173             }
174             }
175              
176 0 0         $self->_set_protocol( $self->_init_protocol( $self->_transport ) )
177             unless $self->_protocol;
178              
179 0 0         $self->_set_client( Thrift::API::HiveClient2::TCLIServiceClient->new( $self->_protocol ) )
180             unless $self->_client;
181             }
182              
183             sub _init_protocol {
184 0     0     my $self = shift;
185 0           my $err;
186             my $protocol = eval {
187 0 0         $self->use_xs
188             && require Thrift::XS::BinaryProtocol;
189 0           Thrift::XS::BinaryProtocol->new( $self->_transport );
190 0 0         } or do { $err = $@; 0 };
  0            
  0            
191             $protocol
192 0   0       ||= do { require Thrift::BinaryProtocol; Thrift::BinaryProtocol->new( $self->_transport ) };
  0            
  0            
193 0 0         $self->_set_use_xs(0) if ref($protocol) !~ /XS/;
194              
195             # TODO Add warning when XS was asked but failed to load
196 0           return $protocol;
197             }
198              
199             sub connect {
200 0     0 1   my ($self) = @_;
201 0           $self->_transport->open;
202             }
203              
204             has _session => (
205             is => 'rwp',
206             isa => sub {
207             die "Session isn't a Thrift::API::HiveClient2::TOpenSessionResp"
208             if !blessed( $_[0] ) || !$_[0]->isa('Thrift::API::HiveClient2::TOpenSessionResp');
209             },
210             lazy => 1,
211             builder => '_build_session',
212             );
213              
214             has username => (
215             is => 'rwp',
216             lazy => 1,
217             default => sub { $ENV{USER} },
218             );
219              
220             has password => (
221             is => 'rwp',
222             lazy => 1,
223             default => sub {''},
224             );
225              
226             sub _build_session {
227 0     0     my $self = shift;
228 0 0         $self->_transport->open if !$self->_transport->isOpen;
229 0           return $self->_client->OpenSession(
230             Thrift::API::HiveClient2::TOpenSessionReq->new(
231             { username => $self->username,
232             password => $self->password,
233             }
234             )
235             );
236             }
237              
238             has _session_handle => (
239             is => 'rwp',
240             isa => sub {
241             die "Session handle isn't a Thrift::API::HiveClient2::TSessionHandle"
242             if !blessed( $_[0] ) || !$_[0]->isa('Thrift::API::HiveClient2::TSessionHandle');
243             },
244             lazy => 1,
245             builder => '_build_session_handle',
246             );
247              
248             sub _build_session_handle {
249 0     0     my $self = shift;
250 0           return $self->_session->{sessionHandle};
251             }
252              
253             has _operation => (
254             is => "rwp",
255             isa => sub {
256             die "Operation isn't a Thrift::API::HiveClient2::T*Resp"
257             if defined $_[0]
258             && (
259             !blessed( $_[0] )
260             || ( !$_[0]->isa('Thrift::API::HiveClient2::TExecuteStatementResp')
261             && !$_[0]->isa('Thrift::API::HiveClient2::TGetColumnsResp')
262             && !$_[0]->isa('Thrift::API::HiveClient2::TGetTablesResp') )
263             );
264             },
265             lazy => 1,
266             );
267              
268             has _operation_handle => (
269             is => 'rwp',
270             isa => sub {
271             die "Operation handle isn't a Thrift::API::HiveClient2::TOperationHandle"
272             if defined $_[0]
273             && ( !blessed( $_[0] )
274             || !$_[0]->isa('Thrift::API::HiveClient2::TOperationHandle') );
275             },
276             lazy => 1,
277             );
278              
279             sub _cleanup_previous_operation {
280 0     0     my $self = shift;
281              
282             # We seeem to have some memory leaks in the Hive server, let's try freeing the
283             # operation handle explicitely
284 0 0         if ( $self->_operation_handle ) {
285 0           $self->_client->CloseOperation(
286             Thrift::API::HiveClient2::TCloseOperationReq->new(
287             { operationHandle => $self->_operation_handle, }
288             )
289             );
290 0           $self->_set__operation(undef);
291 0           $self->_set__operation_handle(undef);
292             }
293             }
294              
295             sub execute {
296 0     0 1   my $self = shift;
297 0           my ($query) = @_; # make this a bit more flexible
298              
299 0           $self->_cleanup_previous_operation;
300              
301 0           my $rh = $self->_client->ExecuteStatement(
302             Thrift::API::HiveClient2::TExecuteStatementReq->new(
303             { sessionHandle => $self->_session_handle, statement => $query, confOverlay => {} }
304             )
305             );
306 0 0         if ( $rh->{status}{errorCode} ) {
307 0           die __PACKAGE__ . "::execute: $rh->{status}{errorMessage}; HQL was: \"$query\"";
308             }
309 0           $self->_set__operation($rh);
310 0           $self->_set__operation_handle( $rh->{operationHandle} );
311 0           return $rh;
312             }
313              
314             {
315             # cache the column names we need to extract from the bloated data structure
316             # (keyed on query)
317             my ( $column_keys, $column_names );
318              
319             sub fetch_hashref {
320 0     0 1   my $self = shift;
321 0           my ( $rh, $rows_at_a_time ) = @_;
322 0           return $self->fetch( $rh, $rows_at_a_time, 1 );
323             }
324              
325             sub fetch {
326 0     0 1   my $self = shift;
327 0           my ( $rh, $rows_at_a_time, $use_hashref ) = @_;
328              
329             # if $rh looks like a number, use it instead of $rows_at_a_time
330             # it means we're using the new form for this call, which takes only the
331             # number of wanted rows, or even nothing (and relies on the defaults,
332             # and a cached copy of the query $rh)
333 0 0 0       $rows_at_a_time = $rh if ( $rh && !$rows_at_a_time && $rh =~ /^[1-9][0-9]*$/ );
      0        
334 0   0       $rows_at_a_time ||= 10_000;
335              
336 0           my $result = [];
337 0           my $has_more_rows;
338              
339             # NOTE we don't use the provided $rh any more, maybe we should leave
340             # that possibility open for parallel queries, but that would need a lot
341             # more testing. Patches welcome.
342 0           my $cached_rh = $self->_operation_handle;
343 0           $rh = $self->_client->FetchResults(
344             Thrift::API::HiveClient2::TFetchResultsReq->new(
345             { operationHandle => $cached_rh,
346             maxRows => $rows_at_a_time,
347             }
348             )
349             );
350 0 0         if ( ref $rh eq 'Thrift::API::HiveClient2::TFetchResultsResp' ) {
351              
352             # NOTE that currently (july 2013) the hasMoreRows method is broken,
353             # see the explanation in the POD
354 0           $has_more_rows = $rh->hasMoreRows();
355              
356 0 0         for my $row ( @{ $rh->{results}{rows} || [] } ) {
  0            
357              
358             # Find which fields to extract from each row, only on the first iteration
359 0 0         if ( !@{ $column_keys->{$cached_rh} || [] } ) {
  0 0          
360              
361             # metadata for the query
362 0 0         if ($use_hashref) {
363 0           my $rh_meta = $self->_client->GetResultSetMetadata(
364             Thrift::API::HiveClient2::TGetResultSetMetadataReq->new(
365             { operationHandle => $cached_rh }
366             )
367             );
368 0           $column_names = [ map { $_->{columnName} }
369 0 0         @{ $rh_meta->{schema}{columns} || [] } ];
  0            
370             }
371              
372             # TODO redo all this using the TGetResultSetMetadataResp object we retrieved
373             # above
374 0 0         for my $column ( @{ $row->{colVals} || [] } ) {
  0            
375              
376 0           my $first_col = {%$column};
377              
378             # Only 1 element of each TColumnValue is populated
379             # (although 7 keys are present, 1 for each possible data
380             # type) with a T*Value, and the rest is undef. Find out
381             # which is defined, and put the key (i.e. the data type) in
382             # cache, to reuse it to fetch the next rows faster.
383             # NOTE this data structure smells of Java and friends from
384             # miles away. Dynamically typed languages don't really need
385             # the bloat.
386 0           push @{ $column_keys->{$cached_rh} },
387 0           grep { ref $first_col->{$_} } keys %$first_col;
  0            
388             }
389             }
390              
391             # TODO find something faster? (see comment above)
392              
393 0           my $idx = 0;
394             my $retval = [
395 0           map { $_->value }
396 0           grep { defined $_ }
397 0           map { $row->{colVals}[ $idx++ ]{$_} } @{ $column_keys->{$cached_rh} }
  0            
  0            
398             ];
399 0 0         if ($use_hashref) {
400 0           push @$result, { zip @$column_names, @$retval };
401             }
402             else {
403 0           push @$result, $retval;
404             }
405             }
406             }
407 0 0         return wantarray ? ( $result, $has_more_rows ) : ( @$result ? $result : undef );
    0          
408             }
409             }
410              
411             sub get_columns {
412 0     0 1   my $self = shift;
413 0           my ( $table, $schema ) = @_;
414              
415             # note that not specifying a table name would return all columns for all
416             # tables we probably don't want that, but feel free to change this
417             # behaviour. Same goes for the schema name: we probably want a default
418             # value for the schema, which is what we use here.
419 0 0         die "Unspecified table name" if !$table;
420 0   0       $schema //= "default";
421              
422 0           $self->_cleanup_previous_operation;
423              
424 0           my $rh = $self->_client->GetColumns(
425             Thrift::API::HiveClient2::TGetColumnsReq->new(
426             { sessionHandle => $self->_session_handle,
427             catalogName => undef,
428             schemaName => $schema,
429             tableName => $table,
430             columnName => undef,
431             confOverlay => {}
432             }
433             )
434             );
435 0 0         if ( $rh->{status}{errorCode} ) {
436 0           die __PACKAGE__ . "::execute: $rh->{status}{errorMessage}";
437             }
438 0           $self->_set__operation($rh);
439 0           $self->_set__operation_handle( $rh->{operationHandle} );
440 0           my $columns;
441 0           while ( my $res = $self->fetch($rh) ) {
442 0           for my $line (@$res) {
443 0           my $idx = 0;
444 0           push @$columns, { map { $_ => $line->[ $idx++ ] } @odbc_coldesc_fields };
  0            
445             }
446             }
447 0           return $columns;
448             }
449              
450             sub get_tables {
451 0     0 1   my $self = shift;
452 0           my ( $schema, $table_pattern ) = @_;
453              
454             # note that not specifying a table name would return all columns for all
455             # tables we probably don't want that, but feel free to change this
456             # behaviour. Same goes for the schema name: we probably want a default
457             # value for the schema, which is what we use here.
458 0   0       $schema //= "default";
459              
460 0           $self->_cleanup_previous_operation;
461              
462 0           my $rh = $self->_client->GetTables(
463             Thrift::API::HiveClient2::TGetTablesReq->new(
464             { sessionHandle => $self->_session_handle,
465             catalogName => undef,
466             schemaName => $schema,
467             tableName => $table_pattern,
468             confOverlay => {},
469             }
470             )
471             );
472 0 0         if ( $rh->{status}{errorCode} ) {
473 0           die __PACKAGE__ . "::execute: $rh->{status}{errorMessage}";
474             }
475 0           $self->_set__operation($rh);
476 0           $self->_set__operation_handle( $rh->{operationHandle} );
477 0           my $tables;
478 0           while ( my $res = $self->fetch($rh) ) {
479 0           for my $line (@$res) {
480 0           my $idx = 0;
481 0           push @$tables, { map { $_ => $line->[ $idx++ ] } @tabledesc_fields };
  0            
482             }
483             }
484 0           return $tables;
485             }
486              
487             sub DEMOLISH {
488 0     0 0   my $self = shift;
489              
490 0           $self->_cleanup_previous_operation;
491              
492 0 0         if ( $self->_session_handle ) {
493 0           $self->_client->CloseSession(
494             Thrift::API::HiveClient2::TCloseSessionReq->new(
495             { sessionHandle => $self->_session_handle, }
496             )
497             );
498             }
499 0           $self->_transport->close;
500             }
501              
502             # when the user calls a method on an object of this class, see if that method
503             # exists on the TCLIService object. If so, create a sub that calls that method
504             # on the client object. If not, die horribly.
505             sub AUTOLOAD {
506 0     0     my ($self) = @_;
507 0           ( my $meth = our $AUTOLOAD ) =~ s/.*:://;
508 0 0         return if $meth eq 'DESTROY';
509 0           print STDERR "$meth\n";
510 2     2   14 no strict 'refs';
  2         3  
  2         330  
511 0 0         if ( $self->_client->can($meth) ) {
512 0     0     *$AUTOLOAD = sub { shift->_client->$meth(@_) };
  0            
513 0           goto &$AUTOLOAD;
514             }
515 0           croak "No such method exists: $AUTOLOAD";
516             }
517              
518             1;
519              
520             __END__