File Coverage

blib/lib/Net/Hadoop/Oozie.pm
Criterion Covered Total %
statement 33 35 94.2
branch n/a
condition n/a
subroutine 12 12 100.0
pod n/a
total 45 47 95.7


line stmt bran cond sub pod time code
1             package Net::Hadoop::Oozie;
2             $Net::Hadoop::Oozie::VERSION = '0.112';
3 9     9   534366 use 5.010;
  9         125  
4 9     9   49 use strict;
  9         16  
  9         204  
5 9     9   48 use warnings;
  9         18  
  9         312  
6              
7 9     9   2330 use parent qw( Clone );
  9         2349  
  9         60  
8              
9 9     9   22904 use URI;
  9         50278  
  9         327  
10 9     9   74 use Carp qw( confess );
  9         21  
  9         614  
11 9     9   3223 use Moo;
  9         77736  
  9         52  
12 9         567 use Ref::Util qw(
13             is_arrayref
14             is_hashref
15 9     9   5470 );
  9         11672  
16 9     9   2756 use Hash::Flatten qw( :all );
  9         18861  
  9         1293  
17 9     9   2489 use Date::Parse qw( str2time );
  9         48322  
  9         618  
18 9     9   4791 use XML::Simple qw( xml_in );
  9         61212  
  9         75  
19 9     9   14175 use XML::Twig;
  0            
  0            
20              
21             use Constant::FromGlobal DEBUG => { int => 1, default => 0, env => 1 };
22              
23             use Net::Hadoop::Oozie::Constants qw(:all);
24              
25             with qw(
26             Net::Hadoop::Oozie::Role::Common
27             Net::Hadoop::Oozie::Role::LWP
28             );
29              
30             has api_version => (
31             is => 'rw',
32             isa => sub {
33             my $param = shift;
34             if ( ! $RE_VALID_ENDPOINT->{ $param } ) {
35             confess sprintf '%s is not a valid version', $param;
36             }
37             },
38             default => 'v1',
39             lazy => 1,
40             );
41              
42             has 'offset' => (
43             is => 'rw',
44             isa => sub {
45             confess "$_[0] is not an positive Int" if defined $_[0] && ($_[0] !~ /^[0-9]+$/ || $_[0] < 1);
46             },
47             default => sub { 1 },
48             lazy => 1,
49             );
50              
51             has 'len' => (
52             is => 'rw',
53             isa => sub {
54             confess "$_[0] is not an positive Int" if defined $_[0] && ($_[0] !~ /^[0-9]+$/ || $_[0] < 1);
55             },
56             default => sub { 50 },
57             lazy => 1,
58             );
59              
60             has 'order' => (
61             is => 'rw',
62             isa => sub {
63             confess "$_[0] should be asc or desc" if defined $_[0] && $_[0] !~ /^(desc|asc)$/;
64             },
65             default => sub { "asc" },
66             lazy => 1,
67             );
68              
69             has doas => (
70             is => 'rw',
71             isa => sub {
72             my $param = shift;
73             confess "$param is not a valid username" if $param !~ /^[a-z]+$/;
74             },
75             lazy => 1,
76             );
77              
78             has 'show' => (
79             is => 'rw',
80             isa => sub {
81             if ( $_[0] && ! $IS_VALID_SHOW{ $_[0] || q{} } ) {
82             confess "$_[0] is not a recognized show type";
83             }
84             },
85             default => sub { q{} },
86             lazy => 1,
87             );
88              
89             has 'action' => (
90             is => 'rw',
91             isa => sub {
92             if ( $_[0] && ! $IS_VALID_ACTION{ $_[0] || q{} } ) {
93             confess "$_[0] is not a recognized action type";
94             }
95             },
96             default => sub { q{} },
97             lazy => 1,
98             );
99              
100             has 'jobtype' => (
101             is => 'rw',
102             isa => sub {
103             confess "$_[0] is not a recognized jobtype"
104             if $_[0] && $_[0] !~ /^(|workflows|coordinators|bundles)$/;
105             },
106             coerce => sub { ($_[0] || '') eq 'workflows' ? '' : $_[0] },
107             default => '', # this seems to be the default, equivalent to 'workflows'
108             lazy => 1,
109             );
110              
111             has 'filter' => (
112             is => 'rw',
113             isa => \&_process_filters,
114             default => sub { return {} },
115             lazy => 1,
116             );
117              
118             has expand_xml_conf => (
119             is => 'rw',
120             default => sub { 0 },
121             );
122              
123             has shortcircuit_via_callback => (
124             is => 'rw',
125             default => sub { 0 },
126             );
127              
128             #------------------------------------------------------------------------------#
129              
130             # API
131              
132             sub admin {
133             my $self = shift;
134             my $endpoint = shift || confess "No endpoint specified for admin";
135             my $valid = $RE_VALID_ENDPOINT->{ $self->api_version };
136             my $ep = "admin/$endpoint";
137              
138             if ( $ep !~ $valid ) {
139             confess sprintf '%s is not a valid admin endpoint!', $endpoint;
140             }
141              
142             return $self->agent_request( $self->_make_full_uri( $ep ) );
143             }
144              
145             sub kerberos_enabled {
146             # All relevant config keys:
147             #
148             # oozie.authentication.kerberos.keytab
149             # oozie.authentication.kerberos.name.rules
150             # oozie.authentication.kerberos.principal
151             # oozie.authentication.type
152             # oozie.server.authentication.type
153             # oozie.service.HadoopAccessorService.kerberos.enabled
154             # oozie.service.HadoopAccessorService.kerberos.principal
155             # oozie.service.HadoopAccessorService.keytab.file
156             #
157              
158             state $krb_key = 'oozie.service.HadoopAccessorService.kerberos.enabled';
159             my $self = shift;
160             my $conf = $self->admin('configuration')
161             || confess "Failed to collect admin/configuration";
162             my $krb_val = $conf->{ $krb_key } || return;
163             return $krb_val eq 'true';
164             }
165              
166             sub build_version {
167             my $self = shift;
168             my $version = $self->admin("build-version")->{buildVersion};
169             return $version;
170             }
171              
172             sub oozie_version {
173             my $self = shift;
174             my $build = $self->build_version;
175             my($v) = split m{ [-] }xms, $build, 2;
176             return $v;
177             }
178              
179             sub max_node_name_len {
180             my $self = shift;
181             my $version = $self->oozie_version;
182              
183             # A simple grep in oozie.git shows that it was always set to "50"
184             # up until v4.3.0. So, no need to check any older version for even
185             # lower limits.
186              
187             return $version ge '4.3.0' ? 128 : 50;
188             }
189              
190             # Takes a hash[ref] for the options
191              
192             sub jobs {
193             my $self = shift->clone; # as we are clobbering lots of attributes
194              
195             my $options = @_ > 1 ? {@_} : ($_[0] || {});
196              
197             # TODO: this is a broken logic!
198             #
199             for (qw(len offset jobtype)) {
200             $self->$_($options->{$_}) if defined $options->{$_};
201             }
202              
203             # TODO: rework this, logic makes no sense. Filter should have a default and
204             # be overridable in a flexible manner
205             $self->filter(
206             $options->{filter}
207             || $self->filter
208             || { status => "RUNNING" }
209             ); # maybe merge instead?
210              
211             my $jobs = $self->agent_request( $self->_make_full_uri('jobs') );
212              
213             $self->_expand_meta_data($jobs); # make this optional given the horrible implementation?
214              
215             return $jobs;
216             }
217              
218             # IMPORTANT ! FIXME ?
219             #
220             # when querying a coordinator, the actions field will contain action details,
221             # in execution order. Since the defaults are offset 1 and len 50, for most
222             # coordinators this information will be useless. the proper way of querying
223             # would then be (to obtain the last 50 actions):
224             #
225             # my $details = Net::Hadoop::Oozie->new({ len => 1 })->job( $coordJobId );
226             # my $total_actions = $details->{total};
227             # my $offset = $details->{total} - 49;
228             # $offset = 1 if $offset < 1;
229             # $details = Net::Hadoop::Oozie->new({ len => 50, offset => $offset })->job( $coordJobId );
230             #
231             # NOTE: this should be fixed in oozie 4, which has an 'order' (asc by default, can be desc) parameter
232              
233             sub job {
234             my $self = shift->clone; # as we are clobbering lots of attributes
235             my $id = shift || confess "No job id specified";
236             my $options;
237             if ( ref $_[0] eq 'HASH') {
238             $options = shift;
239             }
240             else {
241             $options = {@_};
242             }
243              
244             for ( JOB_OPTIONS ) {
245             $self->$_($options->{$_}) if defined $options->{$_};
246             }
247              
248             $self->show( 'info' ) if !$self->show;
249              
250             my $job = $self->agent_request( $self->_make_full_uri('job/' . $id ) );
251             $self->_expand_meta_data($job); # make this optional given the horrible implementation?
252              
253             return $job;
254             }
255              
256             # Take hashes for options
257              
258             sub coordinators {
259             my $self = shift;
260             return $self->jobs( jobtype => 'coordinators', @_ );
261             }
262              
263             sub workflows {
264             my $self = shift;
265             return $self->jobs( jobtype => '', @_ );
266             }
267              
268             #------------------------------------------------------------------------------#
269              
270             # EXTENSIONS
271              
272             # This will return the job data if the job exists to prevent a second call
273             #
274             sub job_exists {
275             my $self = shift;
276             my $id = shift || confess "No job id specified";
277             my $ok;
278              
279             eval {
280             $ok = $self->job( $id, @_ );
281             1;
282             } or do {
283             my $eval_error = $@ || 'Zombie error';
284             confess $eval_error if $eval_error !~ $RE_BAD_REQUEST;
285             };
286              
287             return $ok;
288             }
289              
290             sub submit_job {
291             # TODO: verify the existence of the workflow on HDFS
292              
293             my $self = shift;
294             my ($config) = @_ == 1 ? $_[0] : { @_ };
295            
296             $config = {
297             'user.name' => 'mapred',
298             %{ $config },
299             };
300              
301             for (qw(
302             appName
303             oozie.wf.application.path
304             )) {
305             if ( ! $config->{$_} ) {
306             die "No $_ provided in submit_job()";
307             }
308             }
309              
310             my $xml_config = XML::Twig->new();
311             $xml_config->set_encoding("UTF-8");
312             $xml_config->set_root(my $root = XML::Twig::Elt->new('configuration'));
313             while (my ($k, $v) = each %$config) {
314             $xml_config->root->insert_new_elt(
315             'last_child', 'property', {},
316             XML::Twig::Elt->new( 'name', {}, $k ),
317             XML::Twig::Elt->new( 'value', {}, $v ),
318             );
319             }
320             $xml_config->trim->set_pretty_print('indented');
321             my $content = $xml_config->sprint;
322              
323             if ($config->{debug}) {
324             warn sprintf "XML payload (job config): %s\n", $content;
325             }
326              
327             # remove some params, add one, to get a valid endpoint url
328             # really not happy about how I did this initially, it needs to be cleaned
329             # up at some stage (state is way too permanent, should be reinitialized
330             # between calls)
331             my $saved_offset = $self->offset();
332             my $saved_len = $self->len();
333             my $saved_action = $self->action();
334              
335             $self->offset(undef);
336             $self->len(undef);
337             $self->action('start');
338              
339             my $uri = $self->_make_full_uri('jobs');
340             my $res = $self->agent_request( $uri, 'post', $content );
341              
342             if ($config->{debug} || !$res->{id}) {
343             local $Data::Dumper::Terse = 1;
344             print "JSON response: ", Data::Dumper::Dumper $res;
345             }
346            
347             $self->offset($saved_offset);
348             $self->len($saved_len);
349             $self->action($saved_action);
350            
351             return $res->{id};
352             }
353              
354             sub _collect_suspended {
355             my $self = shift;
356             my $opt = shift || {};
357              
358             die "Options need to be a HASH" if ! is_hashref $opt;
359              
360             my $is_coord = $opt->{is_coord};
361             my $key = $is_coord ? 'coordinatorjobs' : 'workflows';
362              
363             $self->filter( { status => [qw( SUSPENDED )] } );
364              
365             my(@wanted);
366              
367             $self->_jobs_iterator(
368             jobtype => $is_coord ? 'coordinators' : '',
369             {
370             ( $is_coord ? (
371             is_coordinator => 1,
372             ):()),
373             callback => sub {
374             my $job = shift;
375             return 1 if ! $job->{ $key };
376             push @wanted, @{ $job->{ $key } };
377             return 1;
378             },
379             }
380             );
381              
382             return \@wanted;
383             }
384              
385             sub suspended_workflows {
386             shift->_collect_suspended;
387             }
388              
389             sub suspended_coordinators {
390             shift->_collect_suspended({ is_coord => 1 });
391             }
392              
393             sub active_coordinators {
394             my $self = shift;
395             my $opt = ref $_[0] eq 'HASH' ? shift @_ : {};
396             $opt->{status} ||= [qw(
397             RUNNING
398             PREP
399             )];
400              
401             $self->filter( { status => $opt->{status} } );
402              
403             my(@wanted, $default_cb);
404             $opt->{callback} ||= do {
405             $default_cb = 1;
406             sub {
407             my $job = shift;
408             push @wanted, @{ $job->{coordinatorjobs} };
409             return 1;
410             }
411             };
412              
413             $self->_jobs_iterator(
414             jobtype => 'coordinators',
415             {
416             callback => delete $opt->{callback},
417             is_coordinator => 1,
418             }
419             );
420              
421             return $default_cb ? \@wanted : ();
422             }
423              
424             sub standalone_active_workflows {
425             my $self = shift;
426             my $opt = ref $_[0] eq 'HASH' ? shift @_ : {};
427             $opt->{status} ||= [qw(
428             RUNNING
429             PREP
430             )];
431              
432             $self->filter( { status => $opt->{status} } );
433              
434             my(@wanted, $default_cb);
435             $opt->{callback} ||= do {
436             $default_cb = 1;
437             sub {
438             my $job = shift;
439             push @wanted,
440             map {
441             # - /jobs endpoint might be lying to you about certain fields:
442             # https://issues.apache.org/jira/browse/OOZIE-2418
443             # Also check the status of the above ticket and remove
444             # the aggressive logic down below if it's fixed.
445             defined $_->{appPath}
446             ? $_
447             : $self->job( $_->{id} )
448             }
449             grep { ! $_->{parentId} }
450             @{ $job->{workflows} };
451             return 1;
452             }
453             };
454              
455             $self->_jobs_iterator(
456             jobtype => '',
457             {
458             callback => $opt->{callback},
459             }
460             );
461              
462             return $default_cb ? \@wanted : ();
463             }
464              
465             sub active_job_paths {
466             state $is_type = {
467             map { $_ => 1 } qw(
468             all
469             coordinator
470             wf
471             )
472             };
473              
474             my $self = shift;
475             my $type = shift;
476             my $oozie_base_path = shift || '';
477             my $re_hdfs_base;
478             if ( $oozie_base_path ) {
479             $re_hdfs_base = qr{ \A \Q$oozie_base_path\E }xms;
480             }
481            
482              
483             if ( ! $type || ! $is_type->{ $type } ) {
484             die sprintf "Unknown type `%s` was specified. Valid options are: '%s'.",
485             $type // '[undefined]',
486             join(q{', '}, sort keys %{ $is_type }),
487             ;
488             }
489              
490             my %path;
491              
492             my $collect = sub {
493             my($all_jobs, $id_name, $path_name, $wanted_fields) = @_;
494              
495             foreach my $this_job ( @{ $all_jobs } ) {
496             my $hdfs_path = $this_job->{ $path_name };
497             push @{ $path{ $hdfs_path } ||= [] },
498             {
499             $this_job->{ $id_name } => {
500             (
501             map { $_ => $this_job->{ $_ } }
502             @{ $wanted_fields }
503             ),
504             ( $re_hdfs_base && $hdfs_path !~ $re_hdfs_base ? (
505             # shouldn't happen, but you can never know
506             alien => 1,
507             ): ()),
508             },
509             }
510             ;
511             }
512              
513             return 1;
514             };
515              
516             my @status = qw/
517             PREP
518             RUNNING
519             SUSPENDED
520             /;
521              
522             if ( $type eq 'coordinator' || $type eq 'all' ) {
523             $self->active_coordinators({
524             status => \@status,
525             callback => sub {
526             my $job = shift;
527             $collect->(
528             $job->{coordinatorjobs},
529             'coordJobId',
530             'coordJobPath',
531             [qw( coordJobName status )],
532             );
533             return 1;
534             },
535             });
536             }
537              
538             if ( $type eq 'wf' || $type eq 'all' ) {
539             $collect->(
540             $self->standalone_active_workflows({ status => \@status }),
541             'id',
542             'appPath',
543             [qw( appName status )],
544             );
545             }
546              
547             return \%path;
548             }
549              
550             # better be verbose than a cryptic shortname
551             #
552             sub coordinators_with_the_same_appname_on_the_same_path {
553             my $self = shift;
554             my $apath = $self->active_job_paths('coordinator');
555              
556             my $multi = {
557             map { $_ => $apath->{$_} }
558             grep { @{ $apath->{$_} } > 1 }
559             keys %{ $apath }
560             };
561              
562             my $dupe = {};
563             for my $path ( keys %{ $multi } ) {
564             for my $coord ( @{ $multi->{ $path } }) {
565             foreach my $cid ( keys %{ $coord } ) {
566             my $meta = $coord->{ $cid };
567             # filter status=RUNNING?
568             push @{ $dupe->{ $meta->{ coordJobName } } ||= [] }, $cid;
569             }
570             }
571             }
572             return map { $_ => $dupe->{$_} }
573             grep { @{ $dupe->{$_} } > 1 }
574             keys %{ $dupe };
575             }
576              
577             sub coordinators_on_the_same_path {
578             my $self = shift;
579             my $apath = $self->active_job_paths('coordinator');
580              
581             my $multi = {
582             map { $_ => $apath->{$_} }
583             grep { @{ $apath->{$_} } > 1 }
584             keys %{ $apath }
585             };
586              
587             my %rv;
588             for my $path ( keys %{ $multi } ) {
589             for my $coord ( @{ $multi->{ $path } }) {
590             foreach my $cid ( keys %{ $coord } ) {
591             my $meta = $coord->{ $cid };
592             # filter status=RUNNING?
593             $rv{ $path }{ $cid } = $meta->{ coordJobName };
594             }
595             }
596             }
597              
598             return %rv;
599             }
600              
601             # param 1 : fractional hours
602             # param 2 : pattern for appname filtering
603              
604             sub failed_workflows_last_n_hours {
605             my $self = shift;
606             my $n_hours = shift || 1;
607             my $pattern = shift;
608             my $opt = shift || {
609             parent_info => 1,
610             };
611              
612             confess "Options need to be a hash" if ! is_hashref $opt;
613              
614             # can be slow to collect if there are too many coordinators
615             # as there will be a single api request per coordinator id
616             # might be good to investigate a bulk request for that.
617             #
618             my $want_parent_info = $opt->{parent_info};
619              
620             $self->filter( { status => [qw(FAILED SUSPENDED KILLED)] } );
621             my $jobs = $self->jobs(jobtype => 'workflows');
622              
623             my @failed;
624             my $console_url_base; # not available in coordinators, we'll use a trick
625             for my $workflow ( @{ $jobs->{workflows} } ) {
626              
627             next if ($pattern && $workflow->{appName} !~ /$pattern/);
628              
629             if (( !$workflow->{endTime_epoch}
630             && $workflow->{startTime_epoch} >= time - $n_hours * 3600
631             )
632             || $workflow->{endTime_epoch}
633             && $workflow->{endTime_epoch} >= time - $n_hours * 3600
634             )
635             {
636             if ( !$console_url_base ) {
637             ( $console_url_base = $workflow->{consoleUrl} ) =~ s/job=.*/job=/;
638             }
639             my $details = $self->job( $workflow->{id} );
640              
641             my ($error) = map { $_->{errorMessage} ? $_->{errorMessage} : () } @{$details->{actions}||[]};
642              
643             # Extract some data from the workflow xml config to:
644             # - check wether the workflow should be skipped from this list: if
645             # it has parameters.timeoutSkipErrorMail set (emk workflows,
646             # for instance, where timeout is a normal condition)
647             # - gather the parameters.errorEmailTo addresses, for automated
648             # sending
649             my $conf = eval { xml_in($details->{conf}) } || {};
650             for (qw(timeoutSkipErrorMail errorEmailTo)) {
651             $workflow->{$_} = $conf->{property}{$_}{value};
652             }
653              
654             my $parent_id = $workflow->{parentId} = $details->{parentId}
655             // "";
656              
657             # This workflow was triggered by a coordinator, let's get some info
658             if ($parent_id && $want_parent_info ) {
659             $parent_id =~ s/\@[0-9]+$//;
660             my $parent = $self->job($parent_id);
661             $workflow->{parentConsoleUrl}
662             = $parent->{coordJobId}
663             ? $console_url_base . $parent->{coordJobId}
664             : 'not found';
665             $workflow->{parentStatus} = $parent->{status};
666             $workflow->{parentAppname} = $parent->{coordJobName};
667             $workflow->{parentId} = $parent->{coordJobId};
668             $workflow->{scheduled}++;
669             }
670             $workflow->{errorMessage} = $error || '-';
671             push @failed, $workflow;
672             }
673             }
674             return \@failed;
675             }
676              
677             sub failed_workflows_last_n_hours_pretty {
678             my $self = shift;
679             my $failed_workflows = $self->failed_workflows_last_n_hours(shift);
680              
681             return if ! is_arrayref( $failed_workflows ) || ! @{ $failed_workflows };
682              
683             my ($out, $previous_is_scheduled);
684             for my $wf (
685             sort {
686             ( $b->{scheduled} || 0 ) <=> ( $a->{scheduled} || 0 )
687             || $b->{lastModTime_epoch} <=> $a->{lastModTime_epoch}
688             } @$failed_workflows
689             )
690             {
691             # insert a separation between scheduled and standalone wfs
692             if ($previous_is_scheduled && !$wf->{scheduled}) {
693             $out .= "\n" . "-"x50 . "\n" if $out;
694             $previous_is_scheduled = 0;
695             }
696             $previous_is_scheduled++ if $wf->{scheduled};
697              
698             $out .= "\n" if $out;
699              
700             $out .= sprintf
701             "* %s (%s):\n Id: %s\n ConsoleURL: %s\n Status: %s\n Error: %s\n",
702             $wf->{appName}, ( $wf->{scheduled} ? "SCHEDULED" : "standalone" ),
703             @{$wf}{qw(id consoleUrl status errorMessage)};
704              
705             if ( $wf->{parentId} ) {
706             $out
707             .= sprintf
708             " Coordinator info:\n Appname: %s\n Id: %s\n ConsoleURL: %s\n Status: %s\n",
709             @{$wf}{qw(parentAppname parentId parentConsoleUrl parentStatus)};
710             }
711             }
712             return $out;
713             }
714              
715             sub coord_rerun {
716             my $self = shift;
717              
718             # coord ID is like 0390096-150728120555443-oozie-oozi-C
719             # actions can be like '1', '10-12', '1,2,4-6', etc.
720             my ( $coord_id, $actions, $debug ) = @_;
721             $actions =~ s/\s+//g;
722             my $saved_action = $self->action();
723             $self->action('coord-rerun');
724              
725             my $uri = $self->_make_full_uri( 'job/' . $coord_id );
726             $uri->query_form(
727             $uri->query_form,
728             type => 'action',
729             scope => $actions,
730             refresh => 'true',
731             nocleanup => 'false',
732             );
733             my $error;
734             my $res = eval { $self->agent_request( $uri, 'put' ) } or do {
735             $error = $@;
736             warn "oozie server returned an error:\n$error";
737             };
738              
739             $self->action($saved_action);
740             return if $error;
741              
742             if ( $debug || !@{ $res->{actions} || [] } ) {
743             local $Data::Dumper::Terse = 1;
744             warn "JSON response: ", Data::Dumper::Dumper $res;
745             }
746              
747             # return some of the response
748             my $ret;
749             for ( @{ $res->{actions} || [] } ) {
750             push @$ret, [ $_->{id}, $_->{status} ];
751             }
752             return $ret;
753             }
754              
755             sub kill {
756             my $self = shift;
757             my ( $id, $debug ) = @_;
758             my $saved_action = $self->action();
759             $self->action('kill');
760              
761             my $error;
762             my $uri = $self->_make_full_uri( 'job/' . $id );
763             my $res = eval { $self->agent_request( $uri, 'put' ) } or do {
764             $error = $@;
765             warn "oozie server returned an error:\n$error";
766             };
767             $self->action($saved_action);
768             return if $error;
769             return 1;
770             }
771              
772             #------------------------------------------------------------------------------#
773              
774             sub _process_filters {
775             my $filter = shift;
776             return if ! is_hashref $filter;
777             my @unknown = grep { $_ !~ /^(name|user|group|status)$/ } keys %$filter;
778             local $" = ", ";
779             confess "unknown filter name(s): @unknown" if @unknown;
780             for my $name ( keys %$filter ) {
781             confess "filter is not a string or an array of strings"
782             if ( ref $filter->{$name} && ! is_arrayref $filter->{$name} );
783              
784             # lazy, so let's turn a single string to an array of one
785             $filter->{$name} = [ $filter->{$name} ] if !ref $filter->{$name};
786              
787             for my $filter_value ( @{ $filter->{$name} } ) {
788              
789             confess "empty value specified for filter $name"
790             if !length $filter_value;
791              
792             confess "'$filter_value' is not a valid status"
793             if $name eq "status"
794             && $filter_value !~ $RE_VALID_STATUS;
795             }
796             }
797             return $filter;
798             }
799              
800             sub _make_full_uri {
801             my $self = shift;
802             my $endpoint= shift;
803              
804             if ( $endpoint !~ $RE_VALID_ENDPOINT->{$self->api_version} ) {
805             confess "endpoint '$endpoint' is not supported";
806             }
807              
808             my $uri = URI->new( $self->oozie_uri );
809             my %filter = %{ $self->filter };
810              
811             my ( @accepted_params, @base_params, $do_filter_string, $filter_string );
812              
813             # very few params accepted for 'job', more for other reqs
814             # only 1 param for some job actions (the rest bypasses this old mechanism
815             # by injecting in URI directly, urgh)
816             if ( $endpoint =~ /^job\// && $self->action =~ /^(coord-rerun|kill)$/ ) {
817             @accepted_params = qw( action );
818             }
819             elsif ( $endpoint =~ /^job\// ) {
820             @accepted_params = qw( len offset show doas order );
821             }
822             else {
823             $do_filter_string++;
824             @accepted_params = qw( len offset jobtype show action doas order );
825             }
826              
827             @base_params = map {
828             my $value = $self->$_;
829             defined $value && length $value > 0 ? ( $_ => $value ) : ()
830             } @accepted_params;
831              
832             # the filter parameter requires URL encoding, so we treat it differently.
833             # It will be encoded by query_form once we have assembled it
834             if ($do_filter_string) {
835             my @filter_string;
836             while ( my ( $name, $values ) = ( each %filter ) ) {
837             push @filter_string, join ';', map {"$name=$_"} @$values;
838             }
839             $filter_string = join ';', @filter_string;
840             }
841              
842             $uri->query_form( [ @base_params, ($filter_string ? (filter => $filter_string) : ()) ] );
843             $uri->path( sprintf "%s/%s/%s", $uri->path,$self->api_version, $endpoint );
844              
845             printf STDERR "URI: %s\n", $uri if DEBUG;
846              
847             return $uri;
848             }
849              
850             # [dmorel] Add *_epoch to all nested data structures appearing to contain (GMT) dates. I
851             # suspect someone will harm me for doing it this way.
852              
853             sub _expand_meta_data {
854             my $self = shift;
855             my ($jobs) = @_;
856              
857             my $expand_xml_conf = $self->expand_xml_conf;
858             my $uri = URI->new( $self->oozie_uri );
859              
860             # Jobs is supposed to be a 2-level JSON hash
861             my $flat_jobs = flatten($jobs);
862             for my $k (keys %$flat_jobs) {
863             my $v = $flat_jobs->{$k};
864              
865             # add epochs
866             if ( ( $v || '' ) =~ m/ GMT$/ ) {
867             my $epoch = str2time($v);
868             if ($epoch) {
869             $flat_jobs->{"${k}_epoch"} = $epoch;
870             }
871             }
872             # add consoleURL for coordinators
873             if ($k =~ /(^.+)\.coordJobId$/ && $v) {
874             $uri->query_form(job => $v);
875             $flat_jobs->{"$1.consoleUrl"} = "$uri";
876             }
877             }
878              
879             %{ $jobs } = %{ unflatten $flat_jobs };
880              
881             if ( $expand_xml_conf ) {
882             my $expand = sub {
883             my $data = shift;
884             eval {
885             my $cs = $data->{conf_struct} = xml_in( $data->{conf}, KeepRoot => 1 );
886             1;
887             } or do {
888             my $eval_error = $@ || 'Zombie error';
889             warn "Failed to map the Oozie job configuration to a data structure: $eval_error";
890             };
891             };
892              
893             if ( my $conf = $jobs->{conf} ) {
894             if ( ! ref $conf && $conf =~ m{ \A \Q\E \s+ \Q\E}xms ) {
895             $expand->( $jobs );
896             }
897             }
898              
899             foreach my $action ( @{ $jobs->{actions} } ) {
900             my $conf = $action->{conf} || next;
901             if ( ! ref $conf && $conf =~ m{ \A [<] }xms ) {
902             $expand->( $action );
903             }
904             }
905             }
906              
907             return;
908             }
909              
910             sub _jobs_iterator {
911             my $self = shift;
912             my @param = @_;
913             my $opt = @param && ref $param[-1] eq 'HASH' ? pop @param : {};
914             my $cb = delete $opt->{callback};
915              
916             if ( ref $cb ne 'CODE' ) {
917             die "callback either not specified or is not a CODE";
918             }
919              
920             my($len, $offset, $total, $total_jobs);
921             my $key = $opt->{is_coordinator} ? 'coordinatorjobs' : 'workflows';
922             my $shortcircuit = $self->shortcircuit_via_callback;
923             my $eof;
924              
925             do {
926             my $jobs = $self->jobs(
927             @param,
928             ( $offset ? (
929             offset => $offset,
930             len => $len,
931             ) : ())
932             );
933             ($len, $offset, $total) = @{ $jobs }{qw/ len offset total /};
934             $total_jobs += $jobs->{ $key } ? @{$jobs->{ $key }} : 0; # len overflow
935             $offset += $len;
936              
937             my $ok = $cb->( $jobs );
938              
939             if ( $shortcircuit ) {
940             # If the option above is enabled, then the callback always need to
941             # return true to be able to continue.
942             #
943             if ( ! $ok ) {
944             if ( DEBUG ) {
945             printf STDERR "_jobs_iterator(short-circuit): callback returned false.\n";
946             }
947             $eof = 1;
948             }
949             }
950              
951             } while ! $eof && $offset < $total;
952              
953             if ( !$shortcircuit && $total_jobs != $total ) {
954             warn "Something is wrong, the collected total workflows and the computed total mismatch ($total_jobs != $total)";
955             }
956              
957             return;
958             }
959              
960             1;
961              
962             __END__