File Coverage

blib/lib/TaskForest/Family.pm
Criterion Covered Total %
statement 516 575 89.7
branch 132 186 70.9
condition 11 21 52.3
subroutine 42 43 97.6
pod 13 20 65.0
total 714 845 84.5


line stmt bran cond sub pod time code
1             ################################################################################
2             #
3             # $Id: Family.pm 292 2010-03-24 03:43:40Z aijaz $
4             #
5             ################################################################################
6              
7              
8             =head1 NAME
9              
10             TaskForest::Family - A collection of jobs
11              
12             =head1 SYNOPSIS
13              
14             use TaskForest::Family;
15              
16             my $family = TaskForest::Family->new(name => 'Foo');
17             # the associated job dependencies are read within new();
18              
19             $family->getCurrent();
20             # get the status of all jobs, what's failed, etc.
21              
22             $family->cycle();
23             # runs any jobs that are ready to be run
24              
25             $family->display();
26             # print to stdout a list of all jobs in the family
27             # and their statuses
28              
29             =head1 DOCUMENTATION
30              
31             If you're just looking to use the taskforest application, the only
32             documentation you need to read is that for TaskForest. You can do this
33             either of the two ways:
34              
35             perldoc TaskForest
36              
37             OR
38              
39             man TaskForest
40              
41             =head1 DESCRIPTION
42              
43             A family is a group of jobs that share the following
44             characteristics:
45              
46             =over
47              
48             =item *
49              
50             They all start on or after a common time known as the family start time.
51              
52             =item *
53              
54             They run only on the days specified in the family file.
55              
56             =item *
57              
58             They can be dependent on each other. These dependencies are
59             represented by the location of jobs with respect to each other in
60             the family file.
61              
62             =back
63              
64             For more information about jobs, please look at the documentation for
65             the TaskForest class.
66              
67             =head1 ATTRIBUTES
68              
69             The following are attributes of objects of the family class:
70              
71             =over 2
72              
73             =item name
74              
75             The name is the same as the name of the config file that contains the
76             job dependency information.
77              
78             =item start
79              
80             The family start time in 'HH:MM' format using the 24-hour clock. e.g.:
81             '17:30' for 5:30 p.m.
82              
83             =item tz
84              
85             The time zone with which the family start time is to be interpreted.
86              
87             =item days
88              
89             An array reference of days of the week on which this family's jobs may
90             run. Valid days are 'Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat' and 'Sun'.
91             Anything else will be ignored.
92              
93             =item options
94              
95             A hash reference that contains the values of the options retrieved from
96             the command line or the environment,
97              
98             =item jobs
99              
100             A hash reference of all the jobs that are members of this family. The
101             keys of this hash are the names of the jobs. The names of the jobs are in
102             the family configuration file and they're the same as the filenames of the
103             jobs on disk.
104              
105             =item current
106              
107             A boolean that is set to true after all the details of the family's jobs
108             are read from status files in the log directory. This boolean is set to
109             false when an attempt is made to run any jobs, and when the family config
110             file is first read (before getCurrent() is called).
111              
112             =item ready_jobs
113              
114             A temporary hash reference of jobs that are ready to be run - jobs whose
115             dependencies have been met.
116              
117             =item dependencies
118              
119             A hash reference of dependencies of all jobs (things that the jobs depend
120             ON). The keys of this hash are the job names. The values are array
121             references. Each array reference can contain 1 or more references to
122             objects of type TaskForest::Job or TaskForest::TimeDependency.
123              
124             All jobs have at least one dependency - a TimeDependency that's set to the
125             start time of the Family. In other words, after the start time of the
126             Family passes, the check() method of the TimeDependency will return 1.
127             Before that, it will return 0.
128              
129             =item time_dependencies
130              
131             For convenience, all time dependencies encountered in this family
132             (including that of the family start time) are saved in this array
133             reference. The other types of time dependencies are those that apply to
134             individual jobs.
135              
136             =item family_time_dependency
137              
138             This is the TaskForest::TimeDependency that refers to the family start
139             time.
140              
141             =item year, mon, mday and wday
142              
143             These attributes refer to the current day. They're saved within the
144             Family object so that we don't have to call localtime over and over again.
145             I really should have this cached this somewhere else. Oh, well.
146              
147             =item filehandle
148              
149             The readFromFile function was *really* long, so I refactored it into
150             smaller functions. Since at least two of the functions read from the
151             file, I saved the file handle within the object.
152              
153             =item current_dependency, last_dependency
154              
155             These are temporary attributes that builds dependency lists while parsing
156             the file.
157              
158             =back
159              
160             =head1 METHODS
161              
162             =cut
163              
164             package TaskForest::Family;
165              
166 92     92   7962 use strict;
  92         202  
  92         3931  
167 92     92   511 use warnings;
  92         166  
  92         3034  
168 92     92   66822 use TaskForest::Job qw ();
  92         394  
  92         2764  
169 92     92   741 use Data::Dumper;
  92         178  
  92         9343  
170 92     92   57935 use TaskForest::TimeDependency;
  92         359  
  92         3076  
171 92     92   69731 use TaskForest::Options;
  92         373  
  92         5051  
172 92     92   72023 use TaskForest::LogDir;
  92         269  
  92         2879  
173 92     92   623 use English '-no_match_vars';
  92         281  
  92         991  
174 92     92   57590 use FileHandle;
  92         213  
  92         856  
175 92     92   44698 use Carp;
  92         212  
  92         5950  
176 92     92   100665 use Time::Local;
  92         231490  
  92         7514  
177 92     92   780 use Fcntl qw(:DEFAULT :flock);
  92         213  
  92         86112  
178 92     92   879 use TaskForest::LocalTime;
  92         191  
  92         2085  
179 92     92   105580 use TaskForest::Calendar;
  92         228  
  92         6102  
180              
181              
182             BEGIN {
183 92     92   670 use vars qw($VERSION);
  92         202  
  92         5268  
184 92     92   907760 $VERSION = '1.32';
185             }
186              
187              
188             # ------------------------------------------------------------------------------
189             =pod
190              
191             =over 4
192              
193             =item new()
194              
195             Usage : my $family = TaskForest::Family->new();
196             Purpose : The Family constructor is passed the family name. It
197             uses this name along with the location of the family
198             directory to find the family configuration file and
199             reads the file. The family object is configured with
200             the data read in from the file.
201             Returns : Self
202             Argument : A hash that has the properties of he family. Of these,
203             the only required one is the 'name' property.
204             Throws : "No family name specified" if the name property is
205             blank.
206              
207             =back
208              
209             =cut
210              
211             # ------------------------------------------------------------------------------
212             sub new {
213 141     141 1 11526 my $arg = shift;
214 141   33     1621 my $class = (ref $arg) || $arg;
215              
216 141         1429 my $self = {
217             name => '',
218             start => '',
219             tz => 'America/Chicago', # default America/Chicago
220             days => {},
221             };
222              
223 141         800 my %args = @_;
224            
225 141         617 foreach my $key (keys %args) {
226 141         614 $self->{$key} = $args{$key};
227             }
228              
229 141 50       706 croak "No family name specified" unless $self->{name};
230 141         445 bless $self, $class;
231              
232 141         1208 $self->{options} = &TaskForest::Options::getOptions();
233            
234 141         3353 $self->readFromFile();
235              
236 134         932 return $self;
237             }
238              
239              
240             # ------------------------------------------------------------------------------
241             =pod
242              
243             =over 4
244              
245             =item display()
246              
247             Usage : $family->display()
248             Purpose : This method displays the status of all jobs in this family.
249             families that are scheduled to run today.
250             Returns : Nothing
251             Argument : A hash that will contain a list of jobs. This hash can be
252             passed to other jobs as well.
253             Throws : Nothing
254              
255             =back
256              
257             =cut
258              
259             # ------------------------------------------------------------------------------
260             sub display {
261 0     0 1 0 my ($self, $display_hash) = @_;
262              
263 0         0 foreach my $job_name (sort
  0         0  
264 0         0 { $self->{jobs}->{$a}->{start} cmp $self->{jobs}->{$b}->{start} }
265             (keys (%{$self->{jobs}}))) {
266            
267 0         0 my $job_hash = { family_name => $self->{name} };
268 0         0 my $job = $self->{jobs}->{$job_name};
269 0         0 foreach my $k (keys %$job) { $job_hash->{$k} = $job->{$k}; }
  0         0  
270 0         0 push (@{$display_hash->{all_jobs}}, $job_hash);
  0         0  
271 0         0 push (@{$display_hash->{$job_hash->{status}}}, $job_hash);
  0         0  
272             }
273              
274             }
275              
276              
277              
278              
279             # ------------------------------------------------------------------------------
280             =pod
281              
282             =over 4
283              
284             =item getCurrent()
285              
286             Usage : $family->getCurrent()
287             Purpose : This method reads all the semaphore files in the log
288             directory and gets the current status of the entire
289             family. Each run job can have succeeded or failed. As
290             a result of this, other jobs may be Ready to be run. If
291             a job's dependencies have not yet been met, it is said
292             to be in the Waiting state. Once a family is current,
293             the only thing that makes it 'uncurrent' is if any jobs
294             are run, or if its configuration file changes.
295             Returns : Nothing
296             Argument : None
297             Throws : Nothing
298              
299             =back
300              
301             =cut
302              
303             # ------------------------------------------------------------------------------
304             sub getCurrent {
305 106     106 1 235 my $self = shift;
306 106         1055 my $log_dir = &TaskForest::LogDir::getLogDir($self->{options}->{log_dir}, $self->{tz});
307              
308              
309 106 50       611 if ($self->{current}) {
310             # nothing to do, really
311 0         0 return;
312             }
313              
314             # now get the status of all the jobs from external families
315 106         531 $self->getForeignStatus($log_dir);
316              
317             # Get the status of all jobs, depending on the presence of job
318             # semaphore files
319             #
320 106         526 $self->updateJobStatuses();
321              
322             # Check to see if any of the time dependencies in this family have
323             # been met. A time dependency has been met if 'now' >= the time
324             # dependency.
325             #
326 106         645 $self->checkAllTimeDependencies();
327              
328             # Get a list of all jobs whose status is 'Waiting'
329             # Some of these may be running, and some may be ready
330             #
331 106         548 my $waiting_jobs = $self->getAllWaitingJobs();
332 106 50       595 print "waiting: ", Dumper($waiting_jobs) if $self->{options}->{verbose};
333              
334             # Construct a list of all ready jobs - these are jobs for which
335             # all dependencies have been met
336             #
337 106         408 $self->{ready_jobs} = {};
338 106         280 $self->{token_jobs} = {};
339 106         530 foreach my $job (values %$waiting_jobs) {
340 632         1966 my $started_semaphore = "$log_dir/$self->{name}.$job->{name}.started";
341             #print "Looking for $started_semaphore\n";
342 632 50       16639 if (-e $started_semaphore) { # already running
343             #open (F, $started_semaphore) || croak "Can't open file $started_semaphore";
344             #$_ = ;
345             #close F;
346             #if (/(\d\d):(\d\d)/) {
347             # $job->{actual_start} ="$1:$2";
348             #}
349 0         0 $job->{status} = 'Running';
350 0         0 $job->{stop} = '--:--';
351 0         0 $job->{rc} = '-';
352 0         0 my $pid_file = "$log_dir/$self->{name}.$job->{name}.pid";
353 0 0       0 open (F, $pid_file) || croak "Can't open file $pid_file";
354 0         0 while() {
355             #print " Looking at: $_";
356 0 0       0 if (/^pid: (\d+)/) {
    0          
    0          
357 0         0 $job->{pid} ="$1";
358             }
359             elsif (/^actual_start: (\d+)/) {
360 0         0 $job->{actual_start} = $1;
361             }
362             elsif (/^unique_id: (\S+)/) {
363 0         0 $job->{unique_id} = $1;
364             }
365             }
366 0         0 close F;
367 0         0 next;
368             }
369             # dependencies for each job
370             #
371 632         1665 my $dependencies = $self->{dependencies}->{$job->{name}};
372 632         757 my $ready = 1;
373 632         787 my $hold = 0;
374              
375 632         2844 my $hold_file = "$log_dir/$self->{name}.$job->{name}.hold";
376 632 50       14003 if (-e $hold_file) {
377 0         0 $hold = 1;
378 0         0 $job->{status} = 'Hold';
379 0         0 $job->{is_hold} = 1;
380             }
381            
382             # Don't bother checking for dependencies if on hold. Always set ready to 0;
383 632 50       1334 if ($hold) {
384 0         0 $ready = 0;
385             }
386             else {
387 632         1625 foreach my $dep (@$dependencies) {
388 712 100       2779 if ($dep->check($self->{foreign_status}) == 0) {
389 253         325 $ready = 0;
390 253         477 last;
391             }
392             }
393             }
394            
395             # A release file does not override a hold file.
396 632         2699 my $release_file = "$log_dir/$self->{name}.$job->{name}.release";
397 632 50 33     19180 if (-e $release_file && ($hold == 0) ) {
398 0         0 $ready = 1;
399              
400             # We cannot rely on the run wrapper to delete the release
401             # file. That's not its job. runReadyJobs has to do it.
402             }
403              
404            
405              
406 632 100       3272 if ($ready) {
407             # set the status of the job to be ready
408 379 100 66     1376 if ($job->{tokens} && @{$job->{tokens}}) {
  379         1524  
409             # token_jobs is a list of all jobs that have one more tokens
410 223         591 $self->{token_jobs}->{$job->{name}} = $job;
411 223         662 $job->{status} = 'TokenWait';
412             }
413             else {
414 156         456 $self->{ready_jobs}->{$job->{name}} = $job;
415 156         511 $job->{status} = 'Ready';
416             }
417             }
418             }
419              
420 106         778 $self->{current} = 1;
421 106 50       1091 print "ready: ", Dumper($self->{ready_jobs}) if $self->{options}->{verbose};
422             }
423              
424              
425             sub getForeignStatus {
426 106     106 0 294 my ($self, $log_dir) = @_;
427 106         372 $self->{foreign_status} = {};
428 106         297 foreach my $family_name (sort keys %{$self->{families}}) {
  106         836  
429 12         52 my $glob_string = "$log_dir/$family_name.*.0"; # TODO: Want this to stay $self->{name}, so create a new type of dep.
430 12         1730 my @files = glob($glob_string);
431 12         44 foreach my $file (@files) {
432 8         50 my @components = split(/\./, $file);
433 8         62 $self->{foreign_status}->{$components[-2]} = 1;
434             }
435             }
436             }
437              
438            
439              
440             # ------------------------------------------------------------------------------
441             =pod
442              
443             =over 4
444              
445             =item cycle()
446              
447             Usage : $family->cycle()
448             Purpose : This is the main method that is invoked once in every
449             loop, to run any jobs that are in a Ready state. It
450             gets the current status of the family and runs any jobs
451             that are in the Ready state.
452             Returns : Nothing
453             Argument : None
454             Throws : Nothing
455              
456             =back
457              
458             =cut
459              
460             # ------------------------------------------------------------------------------
461             sub cycle {
462 102     102 1 285 my $self = shift;
463              
464 102         644 $self->getCurrent();
465 102         492 $self->runReadyJobs();
466             }
467              
468              
469             # ------------------------------------------------------------------------------
470             =pod
471              
472             =over 4
473              
474             =item updateJobStatuses()
475              
476             Usage : $family->updateJobStatuses()
477             Purpose : This method looks at all the semaphore files in the
478             current day's log directory and updates job statuses
479             based on those semaphore files.
480             Returns : Nothing
481             Argument : None
482             Throws : Nothing
483              
484             =back
485              
486             =cut
487              
488             # ------------------------------------------------------------------------------
489             sub updateJobStatuses {
490 106     106 1 247 my $self = shift;
491              
492 106         1072 my $log_dir = &TaskForest::LogDir::getLogDir($self->{options}->{log_dir}, $self->{tz});
493              
494             # keep in mind that semaphore files are in the form F.J.[01] where
495             # F is the family name, J is a job name and 0 means success, and 1
496             # failure
497             #
498              
499             # TODO: get a list of all famaily names in jobs, and append results of glob to files
500            
501 106         615 my $glob_string = "$log_dir/$self->{name}.*.[01]"; # TODO: Want this to stay $self->{name}, so create a new type of dep.
502             # object - an external dep object
503              
504 106         14361 my @files = glob($glob_string);
505 106         834 my %valid_fields = (
506             actual_start => 1,
507             pid => 1,
508             stop => 1,
509             rc => 1,
510             unique_id => 1,
511             );
512            
513              
514 106         822 foreach my $file (sort @files) { # the sort ensures that 1 overrides 0
515 107         1688 my ($job_name, $status) = $file =~ /$log_dir\/$self->{name}\.([^\.]+)\.([01])/;
516 107         186 my ($orig, $actual_name);
517            
518 107 50       396 if ($job_name =~ /(^[^\-]+)--Orig/) {
519 0         0 $orig = 1;
520 0         0 $actual_name = $1;
521 0 0       0 next unless defined $self->{jobs}->{$actual_name}; # not defined if job is no longer in family, but ran ealier.
522 0         0 $self->{jobs}->{$job_name} = TaskForest::Job->new('name' => $job_name);
523             }
524             else {
525 107 50       431 next unless defined $self->{jobs}->{$job_name}; # not defined if job is no longer in family, but ran ealier.
526             }
527              
528             # when a job is rerun the the job name in the job file has --Orig_n-- appended to it
529            
530             # when a job is marked successful (or failed) only the file
531             # name is changed to *.1 (or *.0). The return code is not to
532             # be changed
533 107 50       412 if ($status == 1) {
534 0         0 $self->{jobs}->{$job_name}->{status} = 'Failure';
535             }
536             else {
537 107         362 $self->{jobs}->{$job_name}->{status} = 'Success';
538             }
539             # read the return code
540             #
541 107 50       5952 open(F, $file) || croak "cannot open $file to read rc";
542 107         1244 $_ = ;
543 107         216 chomp;
544 107         1790 $self->{jobs}->{$job_name}->{rc} = $_;
545 107         1056 close F;
546              
547             # read the pid file
548 107         279 substr($file, -1, 1) = 'pid';
549 107 50       9476 open(F, $file) || croak "cannot open $file to read job data";
550 107         1419 while () {
551 535         714 chomp;
552 535         2499 my ($k, $v) = /([^:]+): (.*)/;
553 535         996 $v =~ s/[^a-z0-9_ ,.\-]/_/ig;
554 535 50       1216 if ($valid_fields{$k}) {
555 535         2850 $self->{jobs}->{$job_name}->{$k} = $v;
556             }
557             }
558 107         1059 close F;
559            
560 107 50       498 if ($orig) {
561 0         0 $self->{jobs}->{$job_name}->{start} = $self->{jobs}->{$actual_name}->{start};
562 0         0 $self->{jobs}->{$job_name}->{tz} = $self->{jobs}->{$actual_name}->{tz};
563             }
564             }
565             }
566            
567            
568              
569             # ------------------------------------------------------------------------------
570             =pod
571              
572             =over 4
573              
574             =item runReadyJobs()
575              
576             Usage : $family->runReadyJobs()
577             Purpose : This method uses the fork and exec model to run all jobs
578             currently in the Ready state. The script that is
579             actually exec'ed is the run wrapper. The wrapper takes
580             a whole bunch of arguments, some of which can be derived
581             by others. The intent is to make it flexible and make
582             it easy for others to write custom wrappers. The code
583             that's executed in the child process before the exec is
584             rather paranoid and is taken from perldoc perlsec.
585             Returns : Nothing
586             Argument : None
587             Throws : "Can't drop privileges" if the userids cannot be
588             changed
589              
590             =back
591              
592             =cut
593              
594             # ------------------------------------------------------------------------------
595             sub runReadyJobs {
596 102     102 1 258 my $self = shift;
597 102         242 $self->{current} = 0; # no longer current. A reread of log dirs is necessary
598 102         364 my $wrapper = $self->{options}->{run_wrapper};
599 102         772 my $log_dir = &TaskForest::LogDir::getLogDir($self->{options}->{log_dir}, $self->{tz});
600              
601             #print Dumper($self);
602 102         776 $self->convertTokenWaitToReady(); # $self has just been made current
603             #print Dumper($self);
604              
605 102         222 foreach my $job (values %{$self->{ready_jobs}}) {
  102         914  
606 236         1316 my $pid;
607 236 100       1133721 if ($pid = fork) {
608             # parent
609 185         52086 print "Forked child process $job->{name} $pid\n";
610 185         4926 $job->{status} = 'Running';
611 185         11700 $self->writeSemaphoreFile("$log_dir/$self->{name}.$job->{name}.started", sprintf("%02d:%02d\n", $self->{hour}, $self->{min}));
612              
613             # you only get to release a job's dependencies once per
614             # cycle. If the job had run because its dependencies were
615             # released, remove the release-dependency directive
616             #
617 185         1932 my $release_file = "$log_dir/$self->{name}.$job->{name}.release";
618 185 50       271739 if (-e $release_file) {
619 0         0 my $ok = unlink $release_file;
620 0 0       0 if (!$ok) {
621 0         0 die ("Couldn't unlink the release_file directive - $release_file");
622             }
623             }
624             }
625             else {
626             #child - this code comes from perldoc perlsec
627 51 50       7474 croak "cannot fork: $!" unless defined $pid;
628              
629 51         13208 my @temp = ($EUID, $EGID);
630 51         5830 my $orig_uid = $UID;
631 51         1747 my $orig_gid = $GID;
632 51         7209 $EUID = $UID;
633 51         8095 $EGID = $GID;
634            
635             # Drop privileges
636             #
637 51         2658 $UID = $orig_uid;
638 51         1374 $GID = $orig_gid;
639            
640             # Make sure privs are really gone
641             #
642 51         6136 ($EUID, $EGID) = @temp;
643 51 50 33     7636 croak "Can't drop privileges" unless $UID == $EUID && $GID eq $EGID;
644 51         6074 $ENV{PATH} = "/bin:/usr/bin"; # Minimal PATH.
645 51         4391 $ENV{CDPATH} = ""; # We don't need this.
646            
647             # Consider sanitizing the environment even more.
648            
649 51         1521 my $job_file_name = $job->{name};
650 51         1185 $job_file_name =~ s/--Repeat.*//;
651            
652             # generate unique run_id
653             #
654 51         863 my $time = time();
655 51         1824 my $run_id = "$$.$time";
656              
657 51         1498 $ENV{TASKFOREST_FAMILY_NAME} = $self->{name};
658 51         17826 $ENV{TASKFOREST_JOB_NAME} = $job->{name};
659 51         5092 $ENV{TASKFOREST_JOB_FILE_NAME} = $job_file_name;
660 51         1411 $ENV{TASKFOREST_LOG_DIR} = $log_dir;
661 51         1842 $ENV{TASKFOREST_JOB_DIR} = $self->{options}->{job_dir};
662 51         1163 $ENV{TASKFOREST_PID_FILE} = "$log_dir/$self->{name}.$job->{name}.pid";
663 51         1581 $ENV{TASKFOREST_SUCCESS_FILE} = "$log_dir/$self->{name}.$job->{name}.0";
664 51         911 $ENV{TASKFOREST_FAILURE_FILE} = "$log_dir/$self->{name}.$job->{name}.1";
665 51         1352 $ENV{TASKFOREST_UNIQUE_ID} = $job->{unique_id};
666 51         2797 $ENV{TASKFOREST_NUM_RETRIES} = defOr($job->{num_retries}, 0);
667 51         2351 $ENV{TASKFOREST_RETRY_SLEEP} = defOr($job->{retry_sleep}, 0);
668 51         1783 $ENV{TASKFOREST_EMAIL} = defOr($job->{email}, "");
669 51         1646 $ENV{TASKFOREST_RETRY_EMAIL} = defOr($job->{retry_email}, "");
670 51         1089 $ENV{TASKFOREST_NO_RETRY_EMAIL} = defOr($job->{no_retry_email}, "");
671 51         775 $ENV{TASKFOREST_INSTRUCTIONS_DIR} = defOr($job->{instructions_dir}, "");
672 51         3451 $ENV{TASKFOREST_SMTP_SERVER} = defOr($self->{options}->{smtp_server}, "");
673 51         2509 $ENV{TASKFOREST_SMTP_PORT} = defOr($self->{options}->{smtp_port}, 0);
674 51         1075 $ENV{TASKFOREST_SMTP_SENDER} = defOr($self->{options}->{smtp_sender}, "");
675 51         1464 $ENV{TASKFOREST_MAIL_FROM} = defOr($self->{options}->{mail_from}, "");
676 51         452 $ENV{TASKFOREST_MAIL_REPLY_TO} = defOr($self->{options}->{mail_reply_to}, "");
677 51         859 $ENV{TASKFOREST_MAIL_RETURN_PATH} = defOr($self->{options}->{mail_return_path}, "");
678 51         1091 $ENV{TASKFOREST_SMTP_TIMEOUT} = defOr($self->{options}->{smtp_timeout}, 0);
679 51         659 $ENV{TASKFOREST_RETRY_SUCCESS_EMAIL} = defOr($job->{retry_success_email}, "");
680 51         1225 $ENV{TASKFOREST_NO_RETRY_SUCCESS_EMAIL} = defOr($job->{no_retry_success_email}, "");
681            
682 51 0       0 exec("$wrapper",
683             ) or croak "Can't exec: $!\n";
684             }
685             }
686            
687             }
688              
689            
690              
691             # ------------------------------------------------------------------------------
692             =pod
693              
694             =over 4
695              
696             =item checkAllTimeDependencies()
697              
698             Usage : $family->checkAllTimeDependencies()
699             Purpose : Runs td->check() on all time dependencies, to see
700             whether they have been met or not
701             Returns : Nothing
702             Argument : None
703             Throws : Nothing
704              
705             =back
706              
707             =cut
708              
709             # ------------------------------------------------------------------------------
710             sub checkAllTimeDependencies {
711 106     106 1 266 my $self = shift;
712              
713 106         203 foreach my $td (@{$self->{time_dependencies}}) {
  106         501  
714 127         838 $td->check();
715             }
716             }
717              
718             # ------------------------------------------------------------------------------
719             =pod
720              
721             =over 4
722              
723             =item getAllWaitingJobs()
724              
725             Usage : $family->getAllWaitingJobs()
726             Purpose : This method gets a hash of all jobs that are currently
727             in the Waiting state
728             Returns : Nothing
729             Argument : None
730             Throws : Nothing
731              
732             =back
733              
734             =cut
735              
736             # ------------------------------------------------------------------------------
737             sub getAllWaitingJobs {
738 112     112 1 20201 my $self = shift;
739              
740 112         307 my %waiting = map { $_->{name} => $_ } grep {$_->{status} eq 'Waiting'} values(%{$self->{jobs}});
  638         2383  
  745         1835  
  112         645  
741              
742 112         497 return \%waiting;
743             }
744              
745              
746             # ------------------------------------------------------------------------------
747             =pod
748              
749             =over 4
750              
751             =item readFromFile()
752              
753             Usage : $family->readFromFile
754             Purpose : This is the most crucial method of the application. It
755             reads the Family configuration file and constructs a
756             data structure that represents all the configuration
757             parameters of the family.
758             Returns : Nothing
759             Argument : None
760             Throws : "Can't read dir/file" if the config file cannot be read
761             "No start time specified for Family",
762             "No time zone specified for Family",
763             "No run days specified for Family",
764             if any of the 3 required headers are not present in
765             the file
766             Generic croak if the data cannot be extracted after an
767             eval.
768              
769             =back
770              
771             =cut
772              
773             # ------------------------------------------------------------------------------
774             sub readFromFile {
775 141     141 1 379 my $self = shift;
776              
777 141         877 $self->_initializeDataStructures();
778              
779 141         402 my $file = $self->{name};
780 141         1108 my $dir = $self->{options}->{family_dir};
781 141         2938 $self->{file_handle} = new FileHandle;
782 141 50       35839 $self->{file_handle}->open("<$dir/$file") || croak "cant read $dir/$file";
783              
784 141         13004 my $ok_to_run = $self->_parseHeaderLine();
785 138 100       602 return unless $ok_to_run;
786            
787              
788 137         902 my $sections = $self->_getSections(); # get concurrent sections
789 137 50       3009 return unless @$sections; # the file is either blank, or does not need to run today
790              
791 137         355 my @bad_lines = ();
792 137         479 foreach my $section (@$sections) {
793 326         1447 my @lines = split(/\n/, $section); # lines in the section
794            
795             # Create a one-element array of dependencies. This is the
796             # default dependency list for all jobs as they're first
797             # encountered.
798             #
799 326         1837 $self->{last_dependency} = [ $self->{family_time_dependency} ];
800              
801             # list of lines that failed to parse
802 326         531 my ($parsed_ok, $parse_error);
803 326         514 my $first_line = 1;
804 326         883 foreach my $line (@lines) {
805 599         2660 ($parsed_ok, $parse_error) = $self->_parseLine($line, $first_line);
806 599 100       1884 if (! $parsed_ok) {
807 10         32 push(@bad_lines, "$line --- $parse_error");
808             }
809 599         1716 $first_line = 0;
810             }
811             }
812            
813 137 100       1511 if (@bad_lines) {
814 4         222 die ("Family '$self->{name}' has unparseable lines:\n ", join(" \n", @bad_lines), "\n");
815             }
816              
817             }
818              
819              
820             # ------------------------------------------------------------------------------
821             =pod
822              
823             =over 4
824              
825             =item okToRunToday()
826              
827             Usage : $family->okToRunToday
828             Purpose : This method checks whether today is in the list of days
829             of the week that this family is eligible to run
830             Returns : 1 if it is, 0 if it's not.
831             Argument : $wday - the day of the week today
832             Throws : Nothing
833              
834             =back
835              
836             =cut
837              
838             # ------------------------------------------------------------------------------
839             sub okToRunToday {
840 95     95 1 283 my ($self, $wday) = @_;
841              
842 95         694 my @days = qw (Sun Mon Tue Wed Thu Fri Sat Sun);
843 95         219 my $today = $days[$wday];
844              
845 95 50       493 if ($self->{days}->{$today}) {
846 95         941 return 1;
847             }
848             else {
849 0         0 return 0;
850             }
851             }
852              
853              
854             # ------------------------------------------------------------------------------
855             =pod
856              
857             =over 4
858              
859             =item _initializeDataStrauctures()
860              
861             Usage : $self->_intializeDataStructures
862             Purpose : Used in readFrom file, before a file is opened for reading
863             Returns : Nothing
864             Argument : None
865             Throws : Nothing
866              
867             =back
868              
869             =cut
870              
871             # ------------------------------------------------------------------------------
872             sub _initializeDataStructures {
873 141     141   392 my $self = shift;
874            
875 141         576 $self->{dependencies} = {};
876 141         441 $self->{jobs} = {};
877 141         650 $self->{time_dependencies} = [];
878              
879             # once you reread a family's config file, it is no longer current
880             #
881 141         420 $self->{current} = 0;
882              
883             # get current time
884 141         1123 my ($sec,$min,$hour,$mday,$mon,$year,$wday,$yday,$isdst) = &TaskForest::LocalTime::lt();
885 141         891 ($self->{wday}, $self->{hour}, $self->{min}) = ($wday, $hour, $min);
886              
887            
888             }
889              
890              
891              
892             # ------------------------------------------------------------------------------
893             =pod
894              
895             =over 4
896              
897             =item _getSections()
898              
899             Usage : $self->_getSections
900             Purpose : Read concurrent sections from the family file
901             Returns : A list of sections, or () if the file is empty
902             Argument : None
903             Throws : Nothing
904              
905             =back
906              
907             =cut
908              
909             # ------------------------------------------------------------------------------
910             sub _getSections {
911 137     137   324 my $self = shift;
912 137         380 my $fh = $self->{file_handle};
913            
914             # PARSE THE FILE HERE
915 1528         22172 my @sections = split(/^ *-+ *\n/m, # split on a line of dashes
916             (join '', # convert back to a string
917             grep(/\S/, # get rid of blank lines
918 137         5946 (map {s/\#.*//; # get rid of comments
919 1528         2596 s/\r//g; # and line-feeds
920 1528         7044 $_; }
921             $fh->getlines())))); # all lines as a list
922            
923 137         4760 $fh->close();
924              
925 137         3188 return \@sections;
926             }
927              
928              
929             # ------------------------------------------------------------------------------
930             =pod
931              
932             =over 4
933              
934             =item _parseHeaderLine()
935              
936             Usage : $self->_parseHeaderLine()
937             Purpose : Read the first non-empty line from the family file.
938             If this family is not scheduled to run today, then just
939             close the file and return 0. This means that you
940             could change the days in the header file in the middle
941             of the day, and add today to the list of valid
942             days. This would cause the family to now become
943             eligible to run today, when earlier in the day it was
944             not.
945             Returns : 1 if the family is to run today, 0 otherwise.
946             Argument : None
947             Throws : Nothing
948              
949             =back
950              
951             =cut
952              
953             # ------------------------------------------------------------------------------
954             sub _parseHeaderLine {
955 141     141   663 my $self = shift;
956 141         371 my $fh = $self->{file_handle};
957 141         4125 while (<$fh>) {
958 141         444 chomp;
959 141 50       1780 last if /\S/; # get first non-blank line
960             }
961              
962 141         1520 s/\s//g; # get rid of spaces
963              
964             # make sure all the data we expect in the header is available
965             #
966 141         425 my $args = '';
967 141         365 my $file = $self->{name};
968              
969             # using this parsing means that extra junk in the header line is ignored - makes the parsing more
970             # resistant to errors
971             #
972 141 100       1342 if (/(start=>['"]\d+:\d+['"])/) { $args .= "$1,"; } else { croak "No start time specified for Family $file"; }
  140         788  
  1         245  
973 140 100       13161 if (/(days=>['"][a-zA-Z0-9,]+['"])/) { $args .= "$1,"; }
  97         370  
974 140 100       902 if (/(tz=>['"][a-zA-Z0-9\/\_]+['"])/) { $args .= "$1,"; } else { croak "No time zone specified for Family $file"; }
  139         675  
  1         211  
975 139 100       1053 if (/cal[ae]nd[ae]r=>(['"][a-zA-Z0-9_]+['"])/) { $args .= "calendar=>$1,"; }
  43         268  
976            
977 139         28489 my %args = eval($args);
978              
979 139         749 $self->{start} = $args{start}; # set the start time
980              
981 139 100       669 if ($args{days}) {
    50          
982 96         24848 my @days = split(/,/, $args{days});
983            
984 96         951 my %valid_days = (Mon=>1, Tue=>1, Wed=>1, Thu=>1, Fri=>1, Sat=>1, Sun=>1);
985 96         422 foreach my $day (@days) {
986 667 100       1596 if (!($valid_days{$day})) {
987 1         170 croak "Day $day is not a valid day. Valid days are: Mon, Tue, Wed, Thu, Fri, Sat and Sun";
988             }
989 666         4174 $self->{days}->{$day} = 1; # valid to run on these days
990             }
991              
992 95         1053 my ($sec,$min,$hour,$mday,$mon,$year,$wday,$yday,$isdst) = &TaskForest::LocalTime::ft($args{tz});
993 95         4759 ($self->{wday}, $self->{hour}, $self->{min}) = ($wday, $hour, $min);
994            
995 95 50       679 if ($self->okToRunToday($self->{wday}) == 0) { # nothing to do
996 0         0 $fh->close();
997 0         0 return 0;
998             }
999             }
1000             elsif($args{calendar}) {
1001             # make sure calendar exists
1002 43         119 my $calendar_name = $args{calendar};
1003 43         197 my $calendar_file = "$self->{options}->{calendar_dir}/$calendar_name";
1004 43 100       978 unless (-e $calendar_file) {
1005 1         8 print "Calendar $calendar_file does not exist\n";
1006 1         23 return 0;
1007             }
1008              
1009              
1010 42         98 my @rules;
1011 42 50       2912 if (open (C, $calendar_file)) {
1012 42         14999 @rules = ;
1013 42         20166 close C;
1014             }
1015             else {
1016 0         0 croak "Calendar $calendar_file cannot be opened";
1017             }
1018            
1019             #print "Calendar $calendar_name exists with tz= $args{tz}\n";
1020             #print Dumper($self->{options}->{calendar}->{$calendar_name}->{rules});
1021             # calendar exists - use it.
1022 42 50       561 if (&TaskForest::Calendar::canRunToday({ rules => \@rules, tz => $args{tz}}) ne '+') {
1023 0         0 $fh->close();
1024 0         0 return 0;
1025             }
1026             }
1027             else {
1028 0         0 croak "Neither 'days' nor 'calendar' is specified";
1029             }
1030              
1031             # create main dependency - every job has at least one dependency:
1032             # a time dependency on the start time of the family.
1033             #
1034 137         558 $self->{tz} = $args{tz};
1035 137         642 my %td_args = ( start => $self->{start} );
1036 137         412 $td_args{tz} = $self->{tz}; # for now, this is a required field
1037 137         2245 $self->{family_time_dependency} = TaskForest::TimeDependency->new(%td_args);
1038 137         360 push (@{$self->{time_dependencies}}, $self->{family_time_dependency});
  137         595  
1039              
1040 137         1306 return 1;
1041             }
1042              
1043             # ------------------------------------------------------------------------------
1044             =pod
1045              
1046             =over 4
1047              
1048             =item _parseLine()
1049              
1050             Usage : $self->_parseLine($line)
1051             Purpose : Get a list of all jobs on the line and parse them,
1052             creating the data structure.
1053             As we process each line, we add to each job's
1054             dependencies the dependencies in
1055             $self->{last_dependency}. We also add each job to the
1056             list of 'current' dependencies. When we're done parsing
1057             the line, we set 'last' to 'current', for the benefit of
1058             the next line.
1059             Returns : Nothing
1060             Argument : None
1061             Throws : Nothing
1062              
1063             =back
1064              
1065             =cut
1066              
1067             # ------------------------------------------------------------------------------
1068             sub _parseLine {
1069 599     599   1127 my ($self, $line, $first_line) = @_;
1070              
1071 599         1736 $self->{current_dependency} = [];
1072 599         4609 $line =~ s/\s//g; # get rid of spaces
1073              
1074 599         963 my @jobs;
1075              
1076             # make sure that the line looks like this:
1077             # ([a-z0-9_]+\([^\)]*\) *)*
1078 599 100       1326 if ($first_line) {
1079 326 100       2709 if ($line =~ /^(([a-z0-9_]+::)?[a-z0-9_]+\([^\)]*\))*$/i) {
1080             }
1081             else {
1082 1         4 return (0, "This line does not appear to contain a list of jobs that looks like (for example) 'J1() J2()'");
1083             }
1084 325         3242 @jobs = $line =~ /((?:[a-z0-9_]+::)?[a-z0-9_]+\([^\)]*\))/ig; # parens may be empty
1085             }
1086             else {
1087             # check for external dependencies
1088 273 100       1085 if ($line =~ /[a-z0-9_]+::[a-z0-9_]+\([^\)]*\)/i) {
1089 2         57 return (0, "Only the first line of a section may have foreign dependencies");
1090             }
1091 271 100       1527 if ($line =~ /^([a-z0-9_]+\([^\)]*\))*$/i) {
1092             }
1093             else {
1094 5         15 return (0, "This line does not appear to contain a list of jobs that looks like (for example) 'J1() J2()'");
1095             }
1096 266         1927 @jobs = $line =~ /([a-z0-9_]+\([^\)]*\))/ig; # parens may be empty
1097             }
1098            
1099              
1100            
1101              
1102 591         1103 my @errors = ();
1103 591         1292 my ($retval, $error);
1104            
1105 591         1032 foreach my $job (@jobs) {
1106 892         2414 ($retval, $error) = $self->_parseJob($job);
1107 892 100       13052 if ($retval == 0) {
1108 2         5 push (@errors, $error);
1109             }
1110             }
1111              
1112 591 100       1708 if (@errors) {
1113 2         9 return (0, join(", ", @errors));
1114             }
1115              
1116             # set the list of dependencies for the next iteration in
1117             # the loop
1118             #
1119 589         1204 $self->{last_dependency} = $self->{current_dependency};
1120              
1121 589         3209 return (1, "");
1122             }
1123              
1124              
1125              
1126             # ------------------------------------------------------------------------------
1127             =pod
1128              
1129             =over 4
1130              
1131             =item _parseJob()
1132              
1133             Usage : $self->_parseJob($job)
1134             Purpose : Parse the job definition, create additional dependencies
1135             if necessary, and create the job. If it's a recurring
1136             job, then create a bunch of 'repeat' jobs that are not
1137             dependent on the original job's predecessors, but on
1138             time dependencies only.
1139             Returns : Nothing
1140             Argument : None
1141             Throws : Nothing
1142              
1143             =back
1144              
1145             =cut
1146              
1147             # ------------------------------------------------------------------------------
1148             sub _parseJob {
1149 892     892   2318 my ($self, $job) = @_;
1150            
1151 892         5573 my ($family_name_with_colons, $family_name, $job_name, $args) = $job =~ /(([a-z0-9_]+)::)?([a-z0-9_]+)(\([^\)]*\))/i;
1152 892         1486 my $local_job = 0;
1153              
1154 892 100       2985 unless ($family_name) {
1155 876         1437 $family_name = $self->{name};
1156 876         1164 $local_job = 1;
1157             }
1158             #$self->{families}->{$family_name} = 1;
1159             #$job_name = $family."::$job_name";
1160              
1161 892         1101 my $job_object;
1162 892 100       1657 if ($local_job) {
1163 876 100       2759 if ($self->{jobs}->{$job_name}) {
1164 52         118 $job_object = $self->{jobs}->{$job_name}; # job already exists in this family
1165             }
1166             else {
1167             # if jobname has ::, get family name
1168             # else family_name is this family's name
1169             # job_object = new (job_name, family_name)
1170 824         3942 $job_object = TaskForest::Job->new(name => $job_name); # create new job
1171 824         3066 $self->{jobs}->{$job_name} = $job_object;
1172             }
1173              
1174             # Set dependencies. A dependency can be a time dependency or another job
1175             #
1176 876 100       3357 $self->{dependencies}->{$job_name} = [] unless $self->{dependencies}->{$job_name};
1177 876         1190 foreach my $dep (@{$self->{last_dependency}}) {
  876         2109  
1178 1094         1258 push (@{$self->{dependencies}->{$job_name}}, $dep);
  1094         3739  
1179             }
1180             }
1181             else {
1182             # external dependency
1183 16         157 $job_object = TaskForest::Job->new(name => $job_name, family => $family_name); # create new job
1184 16         75 $self->{families}->{$family_name} = 1;
1185             }
1186              
1187 892 100 66     6140 if ($local_job && $args =~ /^\(\S/) { # We have additional dependencies if this is a local job
1188 876         89470 my %args = eval ($args);
1189 876 100       4827 return (0, $@) if $@;
1190            
1191             # passed first level of checks
1192             # now make sure that the only things within the parentheses are valid keys
1193 875         2876 my ($retval, $error) = $self->_verifyJobHash(\%args);
1194 875 100       2281 if ($retval == 0) { return (0, $error); }
  1         5  
1195            
1196             #print "\$\@ is $@ and \$\! is $! and args is ", Dumper(\%args);
1197 874 100       3212 $args{tz} = $self->{tz} unless $args{tz}; # time zone defaults to family time zone
1198 874 100       1905 if ($args{start}) { # time dependency
1199 36         202 my $td = TaskForest::TimeDependency->new(start => $args{start}, tz => $args{tz});
1200 36         93 push (@{$self->{dependencies}->{$job_name}}, $td);
  36         135  
1201 36         67 push (@{$self->{time_dependencies}}, $td);
  36         112  
1202             }
1203             else {
1204 838         1627 $args{start} = $self->{start};
1205             }
1206              
1207 874         3122 ($job_object->{start} , $job_object->{tz}) = ($args{start}, $args{tz});
1208            
1209 874 100 66     3123 if ($args{every} and $args{every} !~ /\D/) {
1210 15         70 $self->_createRecurringJobs($job_name, \%args, $job_object);
1211             }
1212              
1213 874         5311 $job_object->{tokens} = [];
1214 874 100       2485 if ($args{token}) {
1215 377         1340 my @tokens = split(",", $args{token});
1216 377         700 foreach my $token (@tokens) {
1217 399 50       1711 if ($token =~ /^([a-z0-9_])+$/i) {
1218 399         501 push (@{$job_object->{tokens}}, $token);
  399         1910  
1219             }
1220             else {
1221 0         0 return (0, "Bad token name: $token. A token name can only contain the characters [a-zA-Z0-9.-_]");
1222             }
1223             }
1224             }
1225              
1226 874 50       2092 if ($args{email}) {
1227 0         0 $job_object->{email} = $args{email};
1228             }
1229             else {
1230 874         2385 $job_object->{email} = $self->{options}->{email};
1231             }
1232              
1233 874 50       1751 if ($args{retry_email}) {
1234 0         0 $job_object->{retry_email} = $args{retry_email};
1235             }
1236             else {
1237 874         2291 $job_object->{retry_email} = $self->{options}->{retry_email};
1238             }
1239              
1240 874 100       1642 if ($args{num_retries}) {
1241 2         6 $job_object->{num_retries} = $args{num_retries};
1242             }
1243             else {
1244 872         2900 $job_object->{num_retries} = $self->{options}->{num_retries};
1245             }
1246              
1247 874 100       3248 if ($args{retry_sleep}) {
1248 2         5 $job_object->{retry_sleep} = $args{retry_sleep};
1249             }
1250             else {
1251 872         2442 $job_object->{retry_sleep} = $self->{options}->{retry_sleep};
1252             }
1253              
1254 874 50       5005 if ($args{no_retry_email}) {
1255 0         0 $job_object->{no_retry_email} = $args{no_retry_email};
1256             }
1257             else {
1258 874         3240 $job_object->{no_retry_email} = $self->{options}->{no_retry_email};
1259             }
1260              
1261 874 50       1724 if ($args{instructions_dir}) {
1262 0         0 $job_object->{instructions_dir} = $args{instructions_dir};
1263             }
1264             else {
1265 874         2073 $job_object->{instructions_dir} = $self->{options}->{instructions_dir};
1266             }
1267              
1268 874 50       5187 if ($args{retry_success_email}) {
1269 0         0 $job_object->{retry_success_email} = $args{retry_success_email};
1270             }
1271             else {
1272 874         61614 $job_object->{retry_success_email} = $self->{options}->{retry_success_email};
1273             }
1274            
1275 874 50       1760 if ($args{no_retry_success_email}) {
1276 0         0 $job_object->{no_retry_success_email} = $args{no_retry_success_email};
1277             }
1278             else {
1279 874         4483 $job_object->{no_retry_success_email} = $self->{options}->{no_retry_success_email};
1280             }
1281            
1282             }
1283            
1284             # push this job into the dependency array for the jobs in the next line
1285             #
1286 890         1184 push (@{$self->{current_dependency}}, $job_object);
  890         22007  
1287              
1288 890         3947 return (1, "");
1289            
1290             }
1291              
1292              
1293             # ------------------------------------------------------------------------------
1294             =pod
1295              
1296             =over 4
1297              
1298             =item _verifyJobHash()
1299              
1300             Usage : $self->_verifyJobHash($args)
1301             Purpose : Verify that the hash created has valid keys
1302              
1303             Returns : 1 on success, 0 on failure
1304             Argument : $args - a reference to a hash
1305             Throws : Nothing
1306              
1307             =back
1308              
1309             =cut
1310              
1311             # ------------------------------------------------------------------------------
1312             sub _verifyJobHash {
1313 875     875   1389 my ($self, $args) = @_;
1314            
1315 875         8540 my $valid_job_args = {
1316             "start" => 1,
1317             "tz" => 1,
1318             "every" => 1,
1319             "until" => 1,
1320             "chained" => 1,
1321             "token" => 1,
1322            
1323             "email" => 1,
1324             "retry_email" => 1,
1325             "no_retry_email" => 1,
1326             "num_retries" => 1,
1327             "retry_sleep" => 1,
1328             "instructions_dir" => 1,
1329             "params" => 1,
1330             "retry_success_email" => 1,
1331             "no_retry_success_email" => 1,
1332              
1333            
1334             };
1335              
1336 875         2247 my @errors = ();
1337              
1338 875         2647 foreach (keys %$args) {
1339 460 100       2135 if (! ($valid_job_args->{$_})) {
1340 1         5 push(@errors, "'$_' is not a recognized attribute");
1341             }
1342             }
1343              
1344 875 100       2357 if (@errors) {
1345 1         6 return (0, join(", ", @errors));
1346             }
1347 874         3731 return (1, '');
1348             }
1349              
1350             # ------------------------------------------------------------------------------
1351             =pod
1352              
1353             =over 4
1354              
1355             =item _createRecurringJobs()
1356              
1357             Usage : $self->_createRecurringJobs($job_name, $args)
1358             Purpose : If a job is a recurring job, create new jobs with a
1359             prefix of --Repeat_$n-- where $n specifies the
1360             cardinality of the repeat job.
1361              
1362             By default, the newly created jobs are *not* dependent on
1363             each other. They're only dependent on their start times.
1364             If the 'chained=>1' option is given in the family file,
1365             or in the options, then the jobs are dependent on each
1366             other. This is, arguably, the more sensible behavior.
1367              
1368             Returns : Nothing
1369             Argument : None
1370             Throws : Nothing
1371              
1372             =back
1373              
1374             =cut
1375              
1376             # ------------------------------------------------------------------------------
1377             sub _createRecurringJobs {
1378 15     15   38 my ($self, $job_name, $args, $job_object) = @_;
1379              
1380 15 100       73 my $chained = defined($args->{chained})? $args->{chained} : $self->{options}->{chained};
1381             # if it's chained then each job is dependent on the other.
1382            
1383 15         33 my $until = $args->{until};
1384 15         30 my ($until_mm, $until_hh);
1385 15 50       155 if ($until =~ /^(\d\d):(\d\d)$/) { $until_hh = $1; $until_mm = $2; }
  15         54  
  15         53  
1386 0         0 else { $until_hh = 23; $until_mm = 59; }
  0         0  
1387              
1388             # get an epoch value for the the until time
1389             #
1390             # Set the until_time to be based on the job or family timezone
1391             #my $until_dt = DateTime->now(time_zone => $args->{tz});
1392 15         68 my $until_dt = DateTime->from_epoch(epoch => &TaskForest::LocalTime::epoch());
1393 15         5433 $until_dt->set_time_zone($args->{tz});
1394 15         8099 $until_dt->set(hour => $until_hh);
1395 15         14642 $until_dt->set(minute => $until_mm);
1396 15         11228 $until_dt->set(second => 0);
1397 15         12646 $until_dt->set(nanosecond => 0);
1398 15         12285 my $until_epoch = $until_dt->epoch();
1399            
1400            
1401              
1402             # get a start time epoch value, defaulting to the family start time
1403             #
1404 15         867 my ($start_dt, $start_hh, $start_mm);
1405 15 50       73 $args->{start} = $self->{start} unless $args->{start}; # default start is famil start
1406 15         121 ($start_hh, $start_mm) = $args->{start} =~ /(\d\d):(\d\d)/;
1407             #$start_dt = DateTime->now(time_zone => $args->{tz});
1408 15         78 $start_dt = DateTime->from_epoch(epoch => &TaskForest::LocalTime::epoch());
1409 15         5610 $start_dt->set_time_zone($args->{tz});
1410 15         7702 $start_dt->set(hour => $start_hh);
1411 15         11297 $start_dt->set(minute => $start_mm);
1412 15         10389 $start_dt->set(second => 0);
1413 15         11400 $start_dt->set(nanosecond => 0);
1414            
1415              
1416             # create a duration value that's added in every loop
1417             #
1418 15         13373 my $every_duration = DateTime::Duration->new(minutes => $args->{every});
1419 15         2018 my $next_dt = $start_dt + $every_duration;
1420 15         18296 my $next_epoch = $next_dt->epoch();
1421 15         132 my $next_n = 0;
1422 15         36 my $last_job = $job_object;
1423 15         79 while ($next_epoch <= $until_epoch) {
1424 458         955 $next_n++;
1425 458         1623 my $jn = "$job_name--Repeat_$next_n--";
1426 458         2284 my $td = TaskForest::TimeDependency->new($next_dt);
1427 458         3209 $self->{dependencies}->{$jn} = [$td];
1428 458         19380 my $repeat_job_object = TaskForest::Job->new(name => $jn, tz=>$args->{tz}, start => $td->{start});
1429 458         1880 $self->{jobs}->{$jn} = $repeat_job_object;
1430 458 100       3074 if ($chained) {
1431 444         3113 push(@{$self->{dependencies}->{$jn}}, $last_job)
  444         1297  
1432             }
1433              
1434 458         2031 $next_dt = $next_dt + $every_duration;
1435 458         582856 $next_epoch = $next_dt->epoch();
1436              
1437 458         7505 $last_job = $repeat_job_object;
1438             }
1439             }
1440              
1441              
1442             # ------------------------------------------------------------------------------
1443             =pod
1444              
1445             =over 4
1446              
1447             =item writeSemaphoreFile()
1448              
1449             Usage : $self->_writeSemaphoreFile($file_name)
1450             Purpose : Creates a semaphore file. If the file already exists, do nothing.
1451             Returns : Nothing
1452             Argument : Contents of the file
1453             Throws : Nothing
1454              
1455             =back
1456              
1457             =cut
1458              
1459             # ------------------------------------------------------------------------------
1460             sub writeSemaphoreFile {
1461 185     185 1 2094 my ($self, $file_name, $contents) = @_;
1462              
1463 185 50       11346 if (-e $file_name) {
1464 0         0 return;
1465             }
1466            
1467 185 50       138466 open (F, ">$file_name") || croak "Cannot touch file $file_name";
1468              
1469 185         4253 print F $contents;
1470            
1471 185         31798 close F;
1472             }
1473              
1474              
1475              
1476              
1477             # ------------------------------------------------------------------------------
1478             =pod
1479              
1480             =over 4
1481              
1482             =item findDependentJobs()
1483              
1484             Usage : $job_names = $self->findDependentJobs($job)
1485             Purpose : Find all jobs that are dependent on $job, either directly or
1486             indirectly
1487             Returns : An array ref of job names
1488             Argument : The name of the job whose dependents you are looking for
1489             Throws : Nothing
1490              
1491             =back
1492              
1493             =cut
1494              
1495             # ------------------------------------------------------------------------------
1496             sub findDependentJobs {
1497 2     2 1 3127 my ($self, $job_name) = @_;
1498              
1499 2         4 my @result = ();
1500              
1501             # first make a reverse dependency list
1502              
1503 2         6 $self->{dependents} = {};
1504 2         14 foreach my $j (keys %{$self->{dependencies}}) {
  2         16  
1505 42         47 foreach my $dep (grep { ref($_) eq 'TaskForest::Job' }@{$self->{dependencies}->{$j}}) {
  68         168  
  42         96  
1506 40         7139 push (@{$self->{dependents}->{$dep->{name}}}, $j);
  40         203  
1507             }
1508             }
1509              
1510             # now get the dependent jobs
1511 2         11 my $deps = $self->{dependents}->{$job_name};
1512              
1513 2         5 my $seen = {};
1514              
1515 2         11 while (my $j = shift(@$deps)) {
1516 14 100       65 push (@result, $j) unless $seen->{$j};
1517 14         23 $seen->{$j} = 1;
1518 14 100       46 unshift(@$deps, @{$self->{dependents}->{$j}}) if $self->{dependents}->{$j};
  7         29  
1519             }
1520              
1521 2         15 return \@result;
1522            
1523             }
1524              
1525              
1526              
1527             # ------------------------------------------------------------------------------
1528             =pod
1529              
1530             =over 4
1531              
1532             =item convertTokenWaitToReady()
1533              
1534             Usage : $got_necessary_tokens = $self->acquireAllTokens($job)
1535             Purpose : Attempt to consume all tokens required by the specified job
1536             indirectly
1537             Returns : 1 if the job is ready to run, 0 otherwise
1538             Argument : The name of the job
1539             Throws : Nothing
1540              
1541             =back
1542              
1543             =cut
1544              
1545             # ------------------------------------------------------------------------------
1546             sub convertTokenWaitToReady {
1547 102     102 1 315 my ($self) = @_;
1548 102         245 my $ok = 1;
1549 102         1065 my $log_dir = &TaskForest::LogDir::getLogDir($self->{options}->{log_dir}, $self->{tz});
1550 102         1764 my $lock_file = "$log_dir/tokens.lock";
1551              
1552             # we need to acquire a lock before we can do anything
1553             #
1554 102         14101 sysopen(FH, $lock_file, O_RDWR|O_CREAT);
1555 102         1351 flock(FH, LOCK_EX);
1556            
1557             # first check to see if the token file has been created.
1558             #
1559 102         415 my $token_file = "$log_dir/tokens.txt";
1560              
1561             # create token file if necessary
1562 102 100       2941 if (! -e $token_file) {
1563 65         465 $self->createTokenFile($token_file);
1564             }
1565              
1566             # read token file
1567             #
1568 102         878 my $token_hash = $self->readTokenFile($token_file);
1569              
1570             # release any tokens if possible, essentially making $token_hash 'current'
1571             #
1572 102         550 $self->releaseTokens($token_hash);
1573              
1574             # attempt to acquire tokens
1575             #
1576 102         513 $self->acquireAllTokens($token_hash);
1577              
1578             # write token file
1579             #
1580 102         451 $self->createTokenFile($token_file, $token_hash);
1581            
1582              
1583             # release the lock
1584             #
1585 102         975 flock(FH, LOCK_UN);
1586 102         1028 close FH;
1587            
1588 102         583 return $ok;
1589             }
1590              
1591             sub acquireTokens {
1592 223     223 0 333 my ($self, $token_hash, $job_name) = @_;
1593              
1594 223         449 my $job = $self->{jobs}->{$job_name};
1595              
1596 223         252 my @temp_list;
1597 223         277 my $ok = 1;
1598            
1599 223         269 foreach my $token_name (@{$job->{tokens}}) {
  223         1531  
1600 245         498 my $max_token_count = $token_hash->{$token_name}->{number};
1601 245         728 my $cur_token_count = 0;
1602 245 50       684 if ($token_hash->{$token_name}->{consumers}) {
1603 245         290 $cur_token_count = scalar(@{$token_hash->{$token_name}->{consumers}});
  245         1222  
1604             }
1605 245 100       493 if ($cur_token_count < $max_token_count) {
1606 167         901 push(@temp_list, $token_name);
1607             }
1608             else {
1609 78         190 $ok = 0;
1610             }
1611             }
1612 223 100       939 if ($ok) {
1613             # mark tokens as acquired - all or nothing
1614 153         250 foreach my $token_name (@temp_list) {
1615 157         222 push (@{$token_hash->{$token_name}->{consumers}}, $job->{unique_id});
  157         618  
1616             }
1617 153         612 return 1;
1618             #$job->{status} = 'Ready';
1619             }
1620 70         464 return 0;
1621            
1622             # for each token in the jobs list of tokens
1623             # figure out how many tokens can be there (max), from options
1624             # if length of consumers < that, acquire token - add it to a temp list
1625             #
1626             # if temp list is full list of job tokens, then add temp list to consumers and return 1
1627             # else return 0
1628             }
1629              
1630              
1631              
1632             sub acquireAllTokens {
1633 102     102 0 262 my ($self, $token_hash) = @_;
1634              
1635 102         204 foreach my $job_name (sort (keys %{$self->{token_jobs}})) {
  102         835  
1636 223         447 my $job = $self->{token_jobs}->{$job_name};
1637 223 100       547 if ($self->acquireTokens($token_hash, $job_name)) {
1638             # move to ready state and ready 'queue'
1639 153         258 $job->{status} = 'Ready';
1640 153         432 $self->{ready_jobs}->{$job_name} = $job;
1641             }
1642             }
1643            
1644             }
1645              
1646              
1647             # What happens in the following case:
1648             # 01 Job J using token T runs
1649             # 02 Job J Completes
1650             # 03 Job J is rerun
1651             # 04 TskForest Cycles
1652             #
1653             # You can't rely on the existence of the file (alone)
1654             # You have to look at the pid
1655             # You could do a status!
1656              
1657             sub releaseTokens {
1658 102     102 0 251 my ($self, $token_hash) = @_;
1659              
1660 102         532 my $completed_jobs_by_unique_id = {};
1661 102         255 foreach my $job_name (keys(%{$self->{jobs}})) {
  102         1357  
1662 735 100 66     8308 if ($self->{jobs}->{$job_name}->{status} eq 'Success' ||
1663             $self->{jobs}->{$job_name}->{status} eq 'Failure') {
1664 107         518 $completed_jobs_by_unique_id->{$self->{jobs}->{$job_name}->{unique_id}} = $self->{jobs}->{$job_name};
1665             }
1666             }
1667              
1668 102         602 foreach my $token_name (keys(%$token_hash)) {
1669 350         553 my $new_consumers = [];
1670              
1671             # consumers is a list of unique_ids
1672             # remove from the consumers list anything that we know has completed
1673             #
1674 71         234 my @new_consumers = grep { !($completed_jobs_by_unique_id->{$_}) }
  350         870  
1675 350         448 @{$token_hash->{$token_name}->{consumers}};
1676 350         1066 $token_hash->{$token_name}->{consumers} = \@new_consumers;
1677              
1678             # now we know who the consumers are, we know how many users there are.
1679             # this step removes names from the token_hash->{$token_name}->{consumers};
1680             }
1681             }
1682              
1683             sub readTokenFile {
1684 102     102 0 293 my ($self, $token_file) = @_;
1685              
1686 102 50       7367 if (open(F, $token_file)) {
1687 102         3511 my $data = join("", );
1688 102         1360 close F;
1689 102         705 $data =~ /(.*)/s;
1690 102         575 $data = $1;
1691 102         14361 my $hash = eval($data);
1692 102         553 return $hash;
1693             }
1694             else {
1695 0         0 confess("Cannot read token file $token_file");
1696             }
1697             }
1698              
1699             sub createTokenFile {
1700 167     167 0 1471 my ($self,
1701             $token_file,
1702             $token_hash) = @_;
1703            
1704 167 100       713 $token_hash = $self->{options}->{token} unless $token_hash;
1705            
1706 167 50       24946 if (open (F, ">$token_file")) {
1707 167         1545 print F "my ", Dumper($token_hash);
1708 167         125741 close F;
1709             }
1710             else {
1711 0         0 confess("Can't write to token_file $token_file");
1712             }
1713             }
1714              
1715              
1716             sub defOr {
1717 765     765 0 9113 my ($value, $or) = @_;
1718 765 100       10144 return $value if defined($value);
1719 203         2139 return $or;
1720             }
1721              
1722              
1723              
1724             1;
1725              
1726              
1727              
1728            
1729