File Coverage

blib/lib/Net/Hadoop/Oozie.pm
Criterion Covered Total %
statement 33 35 94.2
branch n/a
condition n/a
subroutine 12 12 100.0
pod n/a
total 45 47 95.7


line stmt bran cond sub pod time code
1             package Net::Hadoop::Oozie;
2             $Net::Hadoop::Oozie::VERSION = '0.110';
3 9     9   235412 use 5.010;
  9         40  
4 9     9   50 use strict;
  9         19  
  9         204  
5 9     9   44 use warnings;
  9         19  
  9         302  
6              
7 9     9   3679 use parent qw( Clone );
  9         2238  
  9         50  
8              
9 9     9   26730 use URI;
  9         38200  
  9         234  
10 9     9   56 use Carp qw( confess );
  9         15  
  9         372  
11 9     9   3923 use Moo;
  9         88072  
  9         45  
12 9         564 use Ref::Util qw(
13             is_arrayref
14             is_hashref
15 9     9   6581 );
  9         12280  
16 9     9   3628 use Hash::Flatten qw( :all );
  9         17169  
  9         1048  
17 9     9   3549 use Date::Parse qw( str2time );
  9         49288  
  9         540  
18 9     9   5620 use XML::Simple qw( xml_in );
  9         60022  
  9         57  
19 9     9   14843 use XML::Twig;
  0            
  0            
20              
21             use Constant::FromGlobal DEBUG => { int => 1, default => 0, env => 1 };
22              
23             use Net::Hadoop::Oozie::Constants qw(:all);
24              
25             with qw(
26             Net::Hadoop::Oozie::Role::Common
27             Net::Hadoop::Oozie::Role::LWP
28             );
29              
30             has api_version => (
31             is => 'rw',
32             isa => sub {
33             my $param = shift;
34             if ( ! $RE_VALID_ENDPOINT->{ $param } ) {
35             confess sprintf '%s is not a valid version', $param;
36             }
37             },
38             default => 'v1',
39             lazy => 1,
40             );
41              
42             has 'offset' => (
43             is => 'rw',
44             isa => sub {
45             confess "$_[0] is not an positive Int" if defined $_[0] && ($_[0] !~ /^[0-9]+$/ || $_[0] < 1);
46             },
47             default => sub { 1 },
48             lazy => 1,
49             );
50              
51             has 'len' => (
52             is => 'rw',
53             isa => sub {
54             confess "$_[0] is not an positive Int" if defined $_[0] && ($_[0] !~ /^[0-9]+$/ || $_[0] < 1);
55             },
56             default => sub { 50 },
57             lazy => 1,
58             );
59              
60             has 'order' => (
61             is => 'rw',
62             isa => sub {
63             confess "$_[0] should be asc or desc" if defined $_[0] && $_[0] !~ /^(desc|asc)$/;
64             },
65             default => sub { "asc" },
66             lazy => 1,
67             );
68              
69             has doas => (
70             is => 'rw',
71             isa => sub {
72             my $param = shift;
73             confess "$param is not a valid username" if $param !~ /^[a-z]+$/;
74             },
75             lazy => 1,
76             );
77              
78             has 'show' => (
79             is => 'rw',
80             isa => sub {
81             if ( $_[0] && ! $IS_VALID_SHOW{ $_[0] || q{} } ) {
82             confess "$_[0] is not a recognized show type";
83             }
84             },
85             default => sub { q{} },
86             lazy => 1,
87             );
88              
89             has 'action' => (
90             is => 'rw',
91             isa => sub {
92             if ( $_[0] && ! $IS_VALID_ACTION{ $_[0] || q{} } ) {
93             confess "$_[0] is not a recognized action type";
94             }
95             },
96             default => sub { q{} },
97             lazy => 1,
98             );
99              
100             has 'jobtype' => (
101             is => 'rw',
102             isa => sub {
103             confess "$_[0] is not a recognized jobtype"
104             if $_[0] && $_[0] !~ /^(|workflows|coordinators|bundles)$/;
105             },
106             coerce => sub { ($_[0] || '') eq 'workflows' ? '' : $_[0] },
107             default => '', # this seems to be the default, equivalent to 'workflows'
108             lazy => 1,
109             );
110              
111             has 'filter' => (
112             is => 'rw',
113             isa => \&_process_filters,
114             default => sub { return {} },
115             lazy => 1,
116             );
117              
118             has expand_xml_conf => (
119             is => 'rw',
120             default => sub { 0 },
121             );
122              
123             has shortcircuit_via_callback => (
124             is => 'rw',
125             default => sub { 0 },
126             );
127              
128             #------------------------------------------------------------------------------#
129              
130             # API
131              
132             sub admin {
133             my $self = shift;
134             my $endpoint = shift || confess "No endpoint specified for admin";
135             my $valid = $RE_VALID_ENDPOINT->{ $self->api_version };
136             my $ep = "admin/$endpoint";
137              
138             if ( $ep !~ $valid ) {
139             confess sprintf '%s is not a valid admin endpoint!', $endpoint;
140             }
141              
142             return $self->agent_request( $self->_make_full_uri( $ep ) );
143             }
144              
145             sub build_version {
146             my $self = shift;
147             my $version = $self->admin("build-version")->{buildVersion};
148             return $version;
149             }
150              
151             sub oozie_version {
152             my $self = shift;
153             my $build = $self->build_version;
154             my($v) = split m{ [-] }xms, $build, 2;
155             return $v;
156             }
157              
158             sub max_node_name_len {
159             my $self = shift;
160             my $version = $self->oozie_version;
161              
162             # A simple grep in oozie.git shows that it was always set to "50"
163             # up until v4.3.0. So, no need to check any older version for even
164             # lower limits.
165              
166             return $version ge '4.3.0' ? 128 : 50;
167             }
168              
169             # Takes a hash[ref] for the options
170              
171             sub jobs {
172             my $self = shift->clone; # as we are clobbering lots of attributes
173              
174             my $options = @_ > 1 ? {@_} : ($_[0] || {});
175              
176             # TODO: this is a broken logic!
177             #
178             for (qw(len offset jobtype)) {
179             $self->$_($options->{$_}) if defined $options->{$_};
180             }
181              
182             # TODO: rework this, logic makes no sense. Filter should have a default and
183             # be overridable in a flexible manner
184             $self->filter(
185             $options->{filter}
186             || $self->filter
187             || { status => "RUNNING" }
188             ); # maybe merge instead?
189              
190             my $jobs = $self->agent_request( $self->_make_full_uri('jobs') );
191              
192             $self->_expand_meta_data($jobs); # make this optional given the horrible implementation?
193              
194             return $jobs;
195             }
196              
197             # IMPORTANT ! FIXME ?
198             #
199             # when querying a coordinator, the actions field will contain action details,
200             # in execution order. Since the defaults are offset 1 and len 50, for most
201             # coordinators this information will be useless. the proper way of querying
202             # would then be (to obtain the last 50 actions):
203             #
204             # my $details = Net::Hadoop::Oozie->new({ len => 1 })->job( $coordJobId );
205             # my $total_actions = $details->{total};
206             # my $offset = $details->{total} - 49;
207             # $offset = 1 if $offset < 1;
208             # $details = Net::Hadoop::Oozie->new({ len => 50, offset => $offset })->job( $coordJobId );
209             #
210             # NOTE: this should be fixed in oozie 4, which has an 'order' (asc by default, can be desc) parameter
211              
212             sub job {
213             my $self = shift->clone; # as we are clobbering lots of attributes
214             my $id = shift || confess "No job id specified";
215             my $options;
216             if ( ref $_[0] eq 'HASH') {
217             $options = shift;
218             }
219             else {
220             $options = {@_};
221             }
222              
223             for ( JOB_OPTIONS ) {
224             $self->$_($options->{$_}) if defined $options->{$_};
225             }
226              
227             $self->show( 'info' ) if !$self->show;
228              
229             my $job = $self->agent_request( $self->_make_full_uri('job/' . $id ) );
230             $self->_expand_meta_data($job); # make this optional given the horrible implementation?
231              
232             return $job;
233             }
234              
235             # Take hashes for options
236              
237             sub coordinators {
238             my $self = shift;
239             return $self->jobs( jobtype => 'coordinators', @_ );
240             }
241              
242             sub workflows {
243             my $self = shift;
244             return $self->jobs( jobtype => '', @_ );
245             }
246              
247             #------------------------------------------------------------------------------#
248              
249             # EXTENSIONS
250              
251             # This will return the job data if the job exists to prevent a second call
252             #
253             sub job_exists {
254             my $self = shift;
255             my $id = shift || confess "No job id specified";
256             my $ok;
257              
258             eval {
259             $ok = $self->job( $id, @_ );
260             1;
261             } or do {
262             my $eval_error = $@ || 'Zombie error';
263             confess $eval_error if $eval_error !~ $RE_BAD_REQUEST;
264             };
265              
266             return $ok;
267             }
268              
269             sub submit_job {
270             # TODO: verify the existence of the workflow on HDFS
271              
272             my $self = shift;
273             my ($config) = @_ == 1 ? $_[0] : { @_ };
274            
275             $config = {
276             'user.name' => 'mapred',
277             %{ $config },
278             };
279              
280             for (qw(
281             appName
282             oozie.wf.application.path
283             )) {
284             if ( ! $config->{$_} ) {
285             die "No $_ provided in submit_job()";
286             }
287             }
288              
289             my $xml_config = XML::Twig->new();
290             $xml_config->set_encoding("UTF-8");
291             $xml_config->set_root(my $root = XML::Twig::Elt->new('configuration'));
292             while (my ($k, $v) = each %$config) {
293             $xml_config->root->insert_new_elt(
294             'last_child', 'property', {},
295             XML::Twig::Elt->new( 'name', {}, $k ),
296             XML::Twig::Elt->new( 'value', {}, $v ),
297             );
298             }
299             $xml_config->trim->set_pretty_print('indented');
300             my $content = $xml_config->sprint;
301              
302             if ($config->{debug}) {
303             warn sprintf "XML payload (job config): %s\n", $content;
304             }
305              
306             # remove some params, add one, to get a valid endpoint url
307             # really not happy about how I did this initially, it needs to be cleaned
308             # up at some stage (state is way too permanent, should be reinitialized
309             # between calls)
310             my $saved_offset = $self->offset();
311             my $saved_len = $self->len();
312             my $saved_action = $self->action();
313              
314             $self->offset(undef);
315             $self->len(undef);
316             $self->action('start');
317              
318             my $uri = $self->_make_full_uri('jobs');
319             my $res = $self->agent_request( $uri, 'post', $content );
320              
321             if ($config->{debug} || !$res->{id}) {
322             local $Data::Dumper::Terse = 1;
323             print "JSON response: ", Data::Dumper::Dumper $res;
324             }
325            
326             $self->offset($saved_offset);
327             $self->len($saved_len);
328             $self->action($saved_action);
329            
330             return $res->{id};
331             }
332              
333             sub _collect_suspended {
334             my $self = shift;
335             my $opt = shift || {};
336              
337             die "Options need to be a HASH" if ! is_hashref $opt;
338              
339             my $is_coord = $opt->{is_coord};
340             my $key = $is_coord ? 'coordinatorjobs' : 'workflows';
341              
342             $self->filter( { status => [qw( SUSPENDED )] } );
343              
344             my(@wanted);
345              
346             $self->_jobs_iterator(
347             jobtype => $is_coord ? 'coordinators' : '',
348             {
349             ( $is_coord ? (
350             is_coordinator => 1,
351             ):()),
352             callback => sub {
353             my $job = shift;
354             return 1 if ! $job->{ $key };
355             push @wanted, @{ $job->{ $key } };
356             return 1;
357             },
358             }
359             );
360              
361             return \@wanted;
362             }
363              
364             sub suspended_workflows {
365             shift->_collect_suspended;
366             }
367              
368             sub suspended_coordinators {
369             shift->_collect_suspended({ is_coord => 1 });
370             }
371              
372             sub active_coordinators {
373             my $self = shift;
374             my $opt = ref $_[0] eq 'HASH' ? shift @_ : {};
375             $opt->{status} ||= [qw(
376             RUNNING
377             PREP
378             )];
379              
380             $self->filter( { status => $opt->{status} } );
381              
382             my(@wanted, $default_cb);
383             $opt->{callback} ||= do {
384             $default_cb = 1;
385             sub {
386             my $job = shift;
387             push @wanted, @{ $job->{coordinatorjobs} };
388             return 1;
389             }
390             };
391              
392             $self->_jobs_iterator(
393             jobtype => 'coordinators',
394             {
395             callback => delete $opt->{callback},
396             is_coordinator => 1,
397             }
398             );
399              
400             return $default_cb ? \@wanted : ();
401             }
402              
403             sub standalone_active_workflows {
404             my $self = shift;
405             my $opt = ref $_[0] eq 'HASH' ? shift @_ : {};
406             $opt->{status} ||= [qw(
407             RUNNING
408             PREP
409             )];
410              
411             $self->filter( { status => $opt->{status} } );
412              
413             my(@wanted, $default_cb);
414             $opt->{callback} ||= do {
415             $default_cb = 1;
416             sub {
417             my $job = shift;
418             push @wanted,
419             map {
420             # - /jobs endpoint might be lying to you about certain fields:
421             # https://issues.apache.org/jira/browse/OOZIE-2418
422             # Also check the status of the above ticket and remove
423             # the aggressive logic down below if it's fixed.
424             defined $_->{appPath}
425             ? $_
426             : $self->job( $_->{id} )
427             }
428             grep { ! $_->{parentId} }
429             @{ $job->{workflows} };
430             return 1;
431             }
432             };
433              
434             $self->_jobs_iterator(
435             jobtype => '',
436             {
437             callback => $opt->{callback},
438             }
439             );
440              
441             return $default_cb ? \@wanted : ();
442             }
443              
444             sub active_job_paths {
445             state $is_type = {
446             map { $_ => 1 } qw(
447             all
448             coordinator
449             wf
450             )
451             };
452              
453             my $self = shift;
454             my $type = shift;
455             my $oozie_base_path = shift || '';
456             my $re_hdfs_base;
457             if ( $oozie_base_path ) {
458             $re_hdfs_base = qr{ \A \Q$oozie_base_path\E }xms;
459             }
460            
461              
462             if ( ! $type || ! $is_type->{ $type } ) {
463             die sprintf "Unknown type `%s` was specified. Valid options are: '%s'.",
464             $type // '[undefined]',
465             join(q{', '}, sort keys %{ $is_type }),
466             ;
467             }
468              
469             my %path;
470              
471             my $collect = sub {
472             my($all_jobs, $id_name, $path_name, $wanted_fields) = @_;
473              
474             foreach my $this_job ( @{ $all_jobs } ) {
475             my $hdfs_path = $this_job->{ $path_name };
476             push @{ $path{ $hdfs_path } ||= [] },
477             {
478             $this_job->{ $id_name } => {
479             (
480             map { $_ => $this_job->{ $_ } }
481             @{ $wanted_fields }
482             ),
483             ( $re_hdfs_base && $hdfs_path !~ $re_hdfs_base ? (
484             # shouldn't happen, but you can never know
485             alien => 1,
486             ): ()),
487             },
488             }
489             ;
490             }
491              
492             return 1;
493             };
494              
495             my @status = qw/
496             PREP
497             RUNNING
498             SUSPENDED
499             /;
500              
501             if ( $type eq 'coordinator' || $type eq 'all' ) {
502             $self->active_coordinators({
503             status => \@status,
504             callback => sub {
505             my $job = shift;
506             $collect->(
507             $job->{coordinatorjobs},
508             'coordJobId',
509             'coordJobPath',
510             [qw( coordJobName status )],
511             );
512             return 1;
513             },
514             });
515             }
516              
517             if ( $type eq 'wf' || $type eq 'all' ) {
518             $collect->(
519             $self->standalone_active_workflows({ status => \@status }),
520             'id',
521             'appPath',
522             [qw( appName status )],
523             );
524             }
525              
526             return \%path;
527             }
528              
529             # better be verbose than a cryptic shortname
530             #
531             sub coordinators_with_the_same_appname_on_the_same_path {
532             my $self = shift;
533             my $apath = $self->active_job_paths('coordinator');
534              
535             my $multi = {
536             map { $_ => $apath->{$_} }
537             grep { @{ $apath->{$_} } > 1 }
538             keys %{ $apath }
539             };
540              
541             my $dupe = {};
542             for my $path ( keys %{ $multi } ) {
543             for my $coord ( @{ $multi->{ $path } }) {
544             foreach my $cid ( keys %{ $coord } ) {
545             my $meta = $coord->{ $cid };
546             # filter status=RUNNING?
547             push @{ $dupe->{ $meta->{ coordJobName } } ||= [] }, $cid;
548             }
549             }
550             }
551             return map { $_ => $dupe->{$_} }
552             grep { @{ $dupe->{$_} } > 1 }
553             keys %{ $dupe };
554             }
555              
556             sub coordinators_on_the_same_path {
557             my $self = shift;
558             my $apath = $self->active_job_paths('coordinator');
559              
560             my $multi = {
561             map { $_ => $apath->{$_} }
562             grep { @{ $apath->{$_} } > 1 }
563             keys %{ $apath }
564             };
565              
566             my %rv;
567             for my $path ( keys %{ $multi } ) {
568             for my $coord ( @{ $multi->{ $path } }) {
569             foreach my $cid ( keys %{ $coord } ) {
570             my $meta = $coord->{ $cid };
571             # filter status=RUNNING?
572             $rv{ $path }{ $cid } = $meta->{ coordJobName };
573             }
574             }
575             }
576              
577             return %rv;
578             }
579              
580             # param 1 : fractional hours
581             # param 2 : pattern for appname filtering
582              
583             sub failed_workflows_last_n_hours {
584             my $self = shift;
585             my $n_hours = shift || 1;
586             my $pattern = shift;
587             my $opt = shift || {
588             parent_info => 1,
589             };
590              
591             confess "Options need to be a hash" if ! is_hashref $opt;
592              
593             # can be slow to collect if there are too many coordinators
594             # as there will be a single api request per coordinator id
595             # might be good to investigate a bulk request for that.
596             #
597             my $want_parent_info = $opt->{parent_info};
598              
599             $self->filter( { status => [qw(FAILED SUSPENDED KILLED)] } );
600             my $jobs = $self->jobs(jobtype => 'workflows');
601              
602             my @failed;
603             my $console_url_base; # not available in coordinators, we'll use a trick
604             for my $workflow ( @{ $jobs->{workflows} } ) {
605              
606             next if ($pattern && $workflow->{appName} !~ /$pattern/);
607              
608             if (( !$workflow->{endTime_epoch}
609             && $workflow->{startTime_epoch} >= time - $n_hours * 3600
610             )
611             || $workflow->{endTime_epoch}
612             && $workflow->{endTime_epoch} >= time - $n_hours * 3600
613             )
614             {
615             if ( !$console_url_base ) {
616             ( $console_url_base = $workflow->{consoleUrl} ) =~ s/job=.*/job=/;
617             }
618             my $details = $self->job( $workflow->{id} );
619              
620             my ($error) = map { $_->{errorMessage} ? $_->{errorMessage} : () } @{$details->{actions}||[]};
621              
622             # Extract some data from the workflow xml config to:
623             # - check wether the workflow should be skipped from this list: if
624             # it has parameters.timeoutSkipErrorMail set (emk workflows,
625             # for instance, where timeout is a normal condition)
626             # - gather the parameters.errorEmailTo addresses, for automated
627             # sending
628             my $conf = eval { xml_in($details->{conf}) } || {};
629             for (qw(timeoutSkipErrorMail errorEmailTo)) {
630             $workflow->{$_} = $conf->{property}{$_}{value};
631             }
632              
633             my $parent_id = $workflow->{parentId} = $details->{parentId}
634             // "";
635              
636             # This workflow was triggered by a coordinator, let's get some info
637             if ($parent_id && $want_parent_info ) {
638             $parent_id =~ s/\@[0-9]+$//;
639             my $parent = $self->job($parent_id);
640             $workflow->{parentConsoleUrl}
641             = $parent->{coordJobId}
642             ? $console_url_base . $parent->{coordJobId}
643             : 'not found';
644             $workflow->{parentStatus} = $parent->{status};
645             $workflow->{parentAppname} = $parent->{coordJobName};
646             $workflow->{parentId} = $parent->{coordJobId};
647             $workflow->{scheduled}++;
648             }
649             $workflow->{errorMessage} = $error || '-';
650             push @failed, $workflow;
651             }
652             }
653             return \@failed;
654             }
655              
656             sub failed_workflows_last_n_hours_pretty {
657             my $self = shift;
658             my $failed_workflows = $self->failed_workflows_last_n_hours(shift);
659              
660             return if ! is_arrayref( $failed_workflows ) || ! @{ $failed_workflows };
661              
662             my ($out, $previous_is_scheduled);
663             for my $wf (
664             sort {
665             ( $b->{scheduled} || 0 ) <=> ( $a->{scheduled} || 0 )
666             || $b->{lastModTime_epoch} <=> $a->{lastModTime_epoch}
667             } @$failed_workflows
668             )
669             {
670             # insert a separation between scheduled and standalone wfs
671             if ($previous_is_scheduled && !$wf->{scheduled}) {
672             $out .= "\n" . "-"x50 . "\n" if $out;
673             $previous_is_scheduled = 0;
674             }
675             $previous_is_scheduled++ if $wf->{scheduled};
676              
677             $out .= "\n" if $out;
678              
679             $out .= sprintf
680             "* %s (%s):\n Id: %s\n ConsoleURL: %s\n Status: %s\n Error: %s\n",
681             $wf->{appName}, ( $wf->{scheduled} ? "SCHEDULED" : "standalone" ),
682             @{$wf}{qw(id consoleUrl status errorMessage)};
683              
684             if ( $wf->{parentId} ) {
685             $out
686             .= sprintf
687             " Coordinator info:\n Appname: %s\n Id: %s\n ConsoleURL: %s\n Status: %s\n",
688             @{$wf}{qw(parentAppname parentId parentConsoleUrl parentStatus)};
689             }
690             }
691             return $out;
692             }
693              
694             sub coord_rerun {
695             my $self = shift;
696              
697             # coord ID is like 0390096-150728120555443-oozie-oozi-C
698             # actions can be like '1', '10-12', '1,2,4-6', etc.
699             my ( $coord_id, $actions, $debug ) = @_;
700             $actions =~ s/\s+//g;
701             my $saved_action = $self->action();
702             $self->action('coord-rerun');
703              
704             my $uri = $self->_make_full_uri( 'job/' . $coord_id );
705             $uri->query_form(
706             $uri->query_form,
707             type => 'action',
708             scope => $actions,
709             refresh => 'true',
710             nocleanup => 'false',
711             );
712             my $error;
713             my $res = eval { $self->agent_request( $uri, 'put' ) } or do {
714             $error = $@;
715             warn "oozie server returned an error:\n$error";
716             };
717              
718             $self->action($saved_action);
719             return if $error;
720              
721             if ( $debug || !@{ $res->{actions} || [] } ) {
722             local $Data::Dumper::Terse = 1;
723             warn "JSON response: ", Data::Dumper::Dumper $res;
724             }
725              
726             # return some of the response
727             my $ret;
728             for ( @{ $res->{actions} || [] } ) {
729             push @$ret, [ $_->{id}, $_->{status} ];
730             }
731             return $ret;
732             }
733              
734             sub kill {
735             my $self = shift;
736             my ( $id, $debug ) = @_;
737             my $saved_action = $self->action();
738             $self->action('kill');
739              
740             my $error;
741             my $uri = $self->_make_full_uri( 'job/' . $id );
742             my $res = eval { $self->agent_request( $uri, 'put' ) } or do {
743             $error = $@;
744             warn "oozie server returned an error:\n$error";
745             };
746             $self->action($saved_action);
747             return if $error;
748             return 1;
749             }
750              
751             #------------------------------------------------------------------------------#
752              
753             sub _process_filters {
754             my $filter = shift;
755             return if ! is_hashref $filter;
756             my @unknown = grep { $_ !~ /^(name|user|group|status)$/ } keys %$filter;
757             local $" = ", ";
758             confess "unknown filter name(s): @unknown" if @unknown;
759             for my $name ( keys %$filter ) {
760             confess "filter is not a string or an array of strings"
761             if ( ref $filter->{$name} && ! is_arrayref $filter->{$name} );
762              
763             # lazy, so let's turn a single string to an array of one
764             $filter->{$name} = [ $filter->{$name} ] if !ref $filter->{$name};
765              
766             for my $filter_value ( @{ $filter->{$name} } ) {
767              
768             confess "empty value specified for filter $name"
769             if !length $filter_value;
770              
771             confess "'$filter_value' is not a valid status"
772             if $name eq "status"
773             && $filter_value !~ $RE_VALID_STATUS;
774             }
775             }
776             return $filter;
777             }
778              
779             sub _make_full_uri {
780             my $self = shift;
781             my $endpoint= shift;
782              
783             if ( $endpoint !~ $RE_VALID_ENDPOINT->{$self->api_version} ) {
784             confess "endpoint '$endpoint' is not supported";
785             }
786              
787             my $uri = URI->new( $self->oozie_uri );
788             my %filter = %{ $self->filter };
789              
790             my ( @accepted_params, @base_params, $do_filter_string, $filter_string );
791              
792             # very few params accepted for 'job', more for other reqs
793             # only 1 param for some job actions (the rest bypasses this old mechanism
794             # by injecting in URI directly, urgh)
795             if ( $endpoint =~ /^job\// && $self->action =~ /^(coord-rerun|kill)$/ ) {
796             @accepted_params = qw( action );
797             }
798             elsif ( $endpoint =~ /^job\// ) {
799             @accepted_params = qw( len offset show doas order );
800             }
801             else {
802             $do_filter_string++;
803             @accepted_params = qw( len offset jobtype show action doas order );
804             }
805              
806             @base_params = map {
807             my $value = $self->$_;
808             defined $value && length $value > 0 ? ( $_ => $value ) : ()
809             } @accepted_params;
810              
811             # the filter parameter requires URL encoding, so we treat it differently.
812             # It will be encoded by query_form once we have assembled it
813             if ($do_filter_string) {
814             my @filter_string;
815             while ( my ( $name, $values ) = ( each %filter ) ) {
816             push @filter_string, join ';', map {"$name=$_"} @$values;
817             }
818             $filter_string = join ';', @filter_string;
819             }
820              
821             $uri->query_form( [ @base_params, ($filter_string ? (filter => $filter_string) : ()) ] );
822             $uri->path( sprintf "%s/%s/%s", $uri->path,$self->api_version, $endpoint );
823              
824             printf STDERR "URI: %s\n", $uri if DEBUG;
825              
826             return $uri;
827             }
828              
829             # [dmorel] Add *_epoch to all nested data structures appearing to contain (GMT) dates. I
830             # suspect someone will harm me for doing it this way.
831              
832             sub _expand_meta_data {
833             my $self = shift;
834             my ($jobs) = @_;
835              
836             my $expand_xml_conf = $self->expand_xml_conf;
837             my $uri = URI->new( $self->oozie_uri );
838              
839             # Jobs is supposed to be a 2-level JSON hash
840             my $flat_jobs = flatten($jobs);
841             for my $k (keys %$flat_jobs) {
842             my $v = $flat_jobs->{$k};
843              
844             # add epochs
845             if ( ( $v || '' ) =~ m/ GMT$/ ) {
846             my $epoch = str2time($v);
847             if ($epoch) {
848             $flat_jobs->{"${k}_epoch"} = $epoch;
849             }
850             }
851             # add consoleURL for coordinators
852             if ($k =~ /(^.+)\.coordJobId$/ && $v) {
853             $uri->query_form(job => $v);
854             $flat_jobs->{"$1.consoleUrl"} = "$uri";
855             }
856             }
857              
858             %{ $jobs } = %{ unflatten $flat_jobs };
859              
860             if ( $expand_xml_conf ) {
861             my $expand = sub {
862             my $data = shift;
863             eval {
864             my $cs = $data->{conf_struct} = xml_in( $data->{conf}, KeepRoot => 1 );
865             1;
866             } or do {
867             my $eval_error = $@ || 'Zombie error';
868             warn "Failed to map the Oozie job configuration to a data structure: $eval_error";
869             };
870             };
871              
872             if ( my $conf = $jobs->{conf} ) {
873             if ( ! ref $conf && $conf =~ m{ \A \Q\E \s+ \Q\E}xms ) {
874             $expand->( $jobs );
875             }
876             }
877              
878             foreach my $action ( @{ $jobs->{actions} } ) {
879             my $conf = $action->{conf} || next;
880             if ( ! ref $conf && $conf =~ m{ \A [<] }xms ) {
881             $expand->( $action );
882             }
883             }
884             }
885              
886             return;
887             }
888              
889             sub _jobs_iterator {
890             my $self = shift;
891             my @param = @_;
892             my $opt = @param && ref $param[-1] eq 'HASH' ? pop @param : {};
893             my $cb = delete $opt->{callback};
894              
895             if ( ref $cb ne 'CODE' ) {
896             die "callback either not specified or is not a CODE";
897             }
898              
899             my($len, $offset, $total, $total_jobs);
900             my $key = $opt->{is_coordinator} ? 'coordinatorjobs' : 'workflows';
901             my $shortcircuit = $self->shortcircuit_via_callback;
902             my $eof;
903              
904             do {
905             my $jobs = $self->jobs(
906             @param,
907             ( $offset ? (
908             offset => $offset,
909             len => $len,
910             ) : ())
911             );
912             ($len, $offset, $total) = @{ $jobs }{qw/ len offset total /};
913             $total_jobs += $jobs->{ $key } ? @{$jobs->{ $key }} : 0; # len overflow
914             $offset += $len;
915              
916             my $ok = $cb->( $jobs );
917              
918             if ( $shortcircuit ) {
919             # If the option above is enabled, then the callback always need to
920             # return true to be able to continue.
921             #
922             if ( ! $ok ) {
923             if ( DEBUG ) {
924             printf STDERR "_jobs_iterator(short-circuit): callback returned false.\n";
925             }
926             $eof = 1;
927             }
928             }
929              
930             } while ! $eof && $offset < $total;
931              
932             if ( !$shortcircuit && $total_jobs != $total ) {
933             warn "Something is wrong, the collected total workflows and the computed total mismatch ($total_jobs != $total)";
934             }
935              
936             return;
937             }
938              
939             1;
940              
941             __END__