File Coverage

lib/Schedule/SGELK.pm
Criterion Covered Total %
statement 138 285 48.4
branch 29 124 23.3
condition 9 33 27.2
subroutine 20 28 71.4
pod 15 19 78.9
total 211 489 43.1


line stmt bran cond sub pod time code
1             #! /usr/bin/env perl
2              
3             =pod
4              
5             =head1 NAME
6              
7             Schedule::SGELK
8              
9             =head1 SYNOPSIS
10              
11             A module for submitting jobs to an SGE queue.
12             use Schedule::SGELK
13             my $sge=Schedule::SGELK->new(verbose=>1,numnodes=>5,numcpus=>8,workingdir=>"SGE/",waitForEachJobToStart=>1);
14             $sge->set("jobname","thisisaname");
15             # run a series of jobs and wait for them all to finish
16             my $job=$sge->pleaseExecute("sleep 60");
17             my $job2=$sge->pleaseExecute("sleep 60");
18             $sge->wrapItUp();
19             # or you can specify which jobs to wait for
20             $sge->waitOnJobs([$job,$job2],1); # 1 means wait for all jobs to finish; 0 to wait for any free node
21             # or in one step
22             $sge->pleaseExecute_andWait("sleep 60");
23              
24             A quick test for this module is the following one-liner
25             perl -MSchedule::SGELK -e '$sge=Schedule::SGELK->new(numnodes=>5); for(1..3){$sge->pleaseExecute("sleep 3");}$sge->wrapItUp();'
26              
27             Another quick test is to use the test() method, if you want to see standardized text output (see test() below)
28             perl -MSchedule::SGELK -e '$sge=Schedule::SGELK->new(-numnodes=>2,-numcpus=>8); $sge->test(\%tmpSettings);'
29            
30              
31             =head1 DESCRIPTION
32              
33             A module for submitting jobs to an SGE queue. Monitoring is
34             performed using a combination of monitoring files
35             written to the hard drive and qstat.
36             Submitting is performed internally by making a perl script.
37              
38             =head1 AUTHOR
39              
40             Author: Lee Katz
41              
42             =cut
43              
44             package Schedule::SGELK;
45 1     1   806 use strict;
  1         2  
  1         29  
46 1     1   5 use warnings;
  1         2  
  1         22  
47 1     1   5 use Data::Dumper;
  1         2  
  1         51  
48 1     1   6 use File::Basename qw/basename/;
  1         6  
  1         62  
49 1     1   7 use File::Spec;
  1         1  
  1         39  
50 1     1   587 use File::Slurp qw/read_file write_file/;
  1         30575  
  1         69  
51 1     1   7 use File::Temp qw/tempdir/;
  1         3  
  1         42  
52 1     1   542 use String::Escape qw/escape/;
  1         5769  
  1         94  
53 1     1   511 use version 0.77;
  1         1924  
  1         7  
54              
55             our $VERSION = version->declare("v1.5");
56              
57             my $has_threads=eval{
58             return 0; # this isn't working yet
59             require threads;
60             return 1;
61             };
62              
63             # some global variables
64             my @jobsToClean=();
65             my @jobsToMonitor=();
66             my $numSlots=0; # number of slots that are being used now
67              
68 4     4 0 916 sub logmsg {local $0=basename $0;my $FH = *STDOUT; print $FH "$0: ".(caller(1))[3].": @_\n";}
  4         80  
  4         437  
69             local $SIG{'__DIE__'} = sub { my $e = $_[0]; $e =~ s/(at [^\s]+? line \d+\.$)/\nStopped $1/; die("$0: ".(caller(1))[3].": ".$e); };
70             local $SIG{INT} = sub{ cleanAllJobs(); };
71              
72             # to be called when the script exits
73             sub cleanAllJobs{
74 1 50   1 0 16 return if(!@jobsToClean);
75 1         14 logmsg "Cleaning all jobs";
76 1         12 for (@jobsToClean){
77 2         12 cleanAJob($_);
78             }
79             }
80             END{
81             cleanAllJobs();
82             }
83              
84             =pod
85              
86             =head2 METHODS
87              
88             =over
89              
90             =item sub new
91              
92             create a new instance of a scheduler.
93             Arguments and their defaults:
94             numnodes=>50 maximum nodes to use
95             numcpus=>128 maximum cpus that will be used per node in a script
96             maxslots=>9999 maximum slots that you can use. Useful if you want to be limited by total slots instead of nodes or CPUs. E.g. {numnodes=>100,numcpus=>1,maxslots=>20}
97             verbose=>0
98             workingdir=>$ENV{PWD} a directory that all nodes can access to read/write jobs and log files
99             waitForEachJobToStart=>0 Allow each job to start as it's run (0), or to wait until the qstat sees the job before continuing (1)
100             jobname=>... This is the name given to the job when you view it with qstat. By default, it will be named after the script that calls this module.
101             warn_on_error=>1 This will make the script give a warning instead of exiting
102             qsubxopts=>... These are extra options to pass to qsub. E.g., {qsubxopts=>"-V"} Options are overwritten by appending them to the within-script options. Therefore this is not the best way to choose a different queue but it is a way to change a job name or the number of processors.
103             noqsub=>1 Force performing a system call instead of using qsub
104             queue=>all.q Choose the queue to use for a new job. Default: all.q
105              
106             Examples:
107             {numnodes=>100,numcpus=>1,maxslots=>50} # for many small jobs
108             {numnodes=>5,numcpus=>8,maxslots=>40} # for a few larger jobs (note: maxslots should be >= numnodes * maxslots
109              
110             =back
111              
112             =cut
113              
114             sub new{
115 1     1 1 1610 my($class,%args)=@_;
116 1         3 my $self=bless{},$class;
117              
118             # just start with verbosity = 0. This avoids an error in the >= checks
119 1         6 $self->{'settings'}={};
120 1         2 $self->{'error'}="";
121 1         2 $self->{'exit_code'}=0;
122              
123             # load up if we know what we have
124 1         5 foreach my $key (keys %args) {
125 6         8 my $nodash=$key;
126 6         11 $nodash =~ s/^\-//;
127 6         11 $self->set($nodash,$args{$key});
128             }
129              
130             # set defaults if they are not set
131 1         4 my %default=(numnodes=>50,numcpus=>128,verbose=>0,waitForEachJobToStart=>0,maxslots=>9999,queue=>"all.q",scheduler=>"SGE");
132 1         5 while(my($key,$value)=each(%default)){
133 7 100       11 $self->settings($key,$value) if(!defined($self->settings($key)));
134             }
135 1 50       2 if(!$self->get("workingdir")){
136 0         0 $self->set("workingdir",$self->mktempdir());
137 0         0 logmsg "Working directory not set. Using ".$self->get("workingdir");
138             }
139              
140             # executables
141 1         3 for(qw(qsub qstat qdel)){
142             # See if it exists
143 3         17510 my $exec=`which $_ 2>/dev/null`;
144 3 50       77 $exec="" if $?;
145 3         14 chomp($exec);
146 3         101 $self->set($_,$exec);
147              
148 3 50       26 $self->set("scheduler","") if(!$exec);
149             }
150              
151             # See if SGE is present
152 1 50 33     75 if($ENV{SGE_ROOT} && -e $ENV{SGE_ROOT}){
153 0         0 $self->set("scheduler","SGE");
154             } else{
155 1         19 logmsg "Env variable \$SGE_ROOT is not set. I will not use SGE";
156 1         39 $self->set("scheduler","");
157 1         14 $self->set("qsub","");
158             }
159              
160             # Remove the scheduler option if the user explicitly
161             # chooses not to use it.
162 1 50       37 $self->set("scheduler","") if($self->get("noqsub"));
163              
164 1         61 return $self;
165             }
166              
167             =pod
168              
169             =over
170              
171             =item sub error($msg,$exit_code) or error()
172              
173             Get or set the error. Can set the error code too, if provided.
174              
175             =back
176              
177             =cut
178              
179             # Sets the error and returns the previous error message.
180             # Or, simply returns the current error message.
181             sub error{
182 0     0 1 0 my($self,$msg,$exit_code)=@_;
183 0 0       0 return $self->{error} if(!defined($msg));
184 0         0 my $oldmsg=$self->{error};
185 0         0 $self->{error}=$msg;
186 0 0       0 $self->{exit_code}=$exit_code if(defined($exit_code));
187 0         0 return $oldmsg;
188             }
189             =pod
190              
191             =over
192              
193             =item sub set() get() settings()
194              
195             Get or set a setting. All settings are listed under sub new().
196             If a setting is provided without a value, then nothing new will be set,
197             and only the value of the specified setting will be returned.
198              
199             =back
200              
201             =cut
202              
203             sub set{
204 37     37 1 148 my($self,$key,$value)=@_;
205 37 100 100     252 if(defined($key) && defined($value)){
    100          
206 18         121 $self->{'settings'}{$key}=$value;
207             } elsif (defined($key)){
208 17         205 return $self->{'settings'}{$key};
209             }
210 20 50       45 return %{$self->{'settings'}} if wantarray;
  0         0  
211 20         91 return $self->{'settings'};
212             }
213             # renaming of sub set()
214             sub get{
215 8     8 1 82 my($self,@args)=@_;
216 8         43 return $self->set(@args);
217             }
218             # renaming of sub set()
219             sub settings{
220 14     14 1 95 my($self,@args)=@_;
221 14         48 return $self->set(@args);
222             }
223              
224             =pod
225              
226             =over
227              
228             =item pleaseExecute()
229              
230             This is the main method. It will submit a command to the cluster.
231             $sge->set("jobname","a_nu_start");
232             $sge->pleaseExecute("someCommand with parameters");
233              
234             If you are already occupying more than numnodes, then it will pause before
235             executing the command. It will also create many files under workingdir, so
236             be sure to specify it. The workingdir is the current directory by default.
237              
238             You can also specify temporary settings for this one command with a referenced hash.
239              
240             $sge->pleaseExecute("someCommand with parameters",{jobname=>"a_nu_start",numcpus=>2});
241              
242             =back
243              
244             =cut
245              
246             sub pleaseExecute{
247 2     2 1 64 my($self,$cmd,$tmpSettings)=@_;
248 2         308 local $0=basename $0;
249 2         11 my %settings=%{ $self->settings };
  2         16  
250              
251             # read in any temporary settings for this command
252 2 50       15 $tmpSettings={} if(!defined($tmpSettings));
253 2         19 $settings{$_}=$$tmpSettings{$_} for(keys(%$tmpSettings));
254              
255             # default settings for undefined settings
256 2         12 my $jobid=-1; # -1 is an error state
257 2   33     50 $settings{jobname}||="sgelk$0";
258 2   33     44 $settings{logfile}||="$0.log";
259 2   50     8 $settings{numcpus}||=1;
260 2   50     21 $settings{timeout}||=60; # how long we will wait for qsub to start
261 2 50       43 return 0 if($cmd=~/^\s*$/); # if there's no command, then no worries
262              
263 2         29 $self->waitOnJobs(\@jobsToMonitor,0); # wait until the job can be submitted
264              
265 2         75 my $rand=int(rand(999999));
266 2         10 my $tempdir=$settings{workingdir};
267             # create a perl script with the literal command in it
268 2         11 my $script="$tempdir/qsub.$rand.pl";
269              
270 2 50       32 my $prefix=($0 eq '-e')?"STDIN":$0;
271 2         10 $prefix="$settings{workingdir}/$prefix.$rand";
272 2         26 my($submitted,$running,$finished,$died,$output)=("$prefix.submitted", "$prefix.running", "$prefix.finished","$prefix.died","$prefix.log");
273            
274 2         8497 my $perl=`which perl`; chomp($perl);
  2         58  
275 2 50       349 open(SCRIPT,">",$script) or die "Could not write to temporary script $script: $!";
276 2         57 print SCRIPT "#! $perl\n\n";
277             # It has SGE params in it.
278 2         23 print SCRIPT "#\$ -N $settings{jobname}\n";
279 2         48 print SCRIPT "#\$ -S $perl\n";
280 2         18 print SCRIPT "#\$ -V\n";
281 2         11 print SCRIPT "#\$ -wd $ENV{PWD}\n";
282 2         60 print SCRIPT "#\$ -pe smp $settings{numcpus}\n";
283 2         10 print SCRIPT "#\$ -o $output\n";
284 2         6 print SCRIPT "#\$ -e $output\n";
285 2         22 print SCRIPT "#\$ -q $settings{queue}\n";
286             # qsubxopts get to be in here first but will be overwritten by later qsubopts below
287 2 50       29 if(my $opts=$settings{qsubxopts}){
288 0         0 print SCRIPT "# options specified by qsubxopts are in the next line:\n";
289 0         0 print SCRIPT "#\$ $opts\n";
290             }
291 2         25 print SCRIPT "use strict;\nuse warnings;\n";
292 2         11 print SCRIPT "use File::Slurp qw/read_file write_file/;\n";
293              
294             # announces that it was submitted
295 2         121 my $sanitized=escape('qqbackslash',$cmd);
296 2         667 print SCRIPT "write_file('$submitted',$sanitized);\n";
297             # it runs the command
298 2         15 print SCRIPT "write_file('$running',$sanitized);\n";
299 2         14 print SCRIPT "system($sanitized);\n";
300             # let the script try one more time if it fails
301             #print SCRIPT "system($sanitized) if \$?;\n";
302             # print a parsable error if the script dies. This error will be in $output and in file $died
303 2         20 print SCRIPT <
304             if(\$?){
305             my \$error=\"QSUB ERROR\\n\$?\\n\$!\";
306             write_file('$died',$sanitized);
307             write_file('$died',{append=>1},"\\n\$error\\n");
308             die \$error;
309             }
310             END
311             # announces when it is finished
312 2         27 print SCRIPT "write_file('$finished',$sanitized);\n";
313 2         132 close SCRIPT;
314 2 50       7502 system("touch $script"); die if $?; # make the system close the script. Why isn't Perl closing it?
  2         118  
315             #system("cat $script");sleep 60;die;
316              
317             # now run the script and get the jobid
318 2         198 my %return=(submitted=>$submitted,running=>$running,finished=>$finished,died=>$died,tempdir=>$tempdir,output=>$output,cmd=>$cmd,script=>$script,jobname=>$settings{jobname},numcpus=>$settings{numcpus});
319 2         69 my $qsub=$self->get("qsub");
320 2 50       19 if(!$settings{scheduler}){
321 2         64 my $job=command("$perl $script",$has_threads,\%settings);
322 2 50       44 $return{thread}=$job if($has_threads);
323 2 50       65 $return{jobid}=$job->tid if($has_threads);
324 2 50       56 push(@jobsToClean,\%return) if(!$self->settings("keep"));
325 2         17 push(@jobsToMonitor,\%return);
326 2         39 $numSlots+=$settings{numcpus}; # claim these cpus
327 2 50       33 return %return if wantarray;
328 2         358 return \%return;
329             }
330              
331             # At this point, qsub is on this computer. Submit the job.
332 0         0 my $out=`$qsub $script`; chomp($out);
  0         0  
333 0 0       0 if($out=~/Your job (\d+)/){
334 0         0 $jobid=$1;
335 0         0 $out.=" from $script";
336 0 0       0 logmsg $out if($settings{verbose});
337             } else {
338 0         0 logmsg "WARNING: the last job submitted did not have an obvious jobid. It can't be tracked!";
339             }
340              
341             # monitor for the script to be running before moving on
342 0         0 my $started=time;
343 0         0 while(!-e $submitted){
344 0 0       0 last if(!$self->settings("waitForEachJobToStart"));
345 0         0 sleep 1;
346 0 0       0 die "Command timed out!\n $cmd" if((time-$started)>$settings{timeout});
347 0 0       0 die "Command resulted in an error. qstat -j $jobid for more info\n $cmd" if($self->jobStatus($jobid) eq 'Eqw');
348             }
349              
350             # TODO create a link from the jobid to the random id
351            
352 0         0 $return{jobid}=$jobid;
353 0 0       0 push(@jobsToClean,\%return) if(!$self->settings("keep"));
354 0         0 push(@jobsToMonitor,\%return);
355 0         0 $numSlots+=$settings{numcpus}; # claim these cpus
356 0 0       0 return %return if wantarray;
357 0         0 return \%return;
358             }
359              
360             =pod
361              
362             =over
363              
364             =item pleaseExecute_andWait()
365              
366             Exact same as pleaseExecute(), except it will wait for the command to finish
367             before continuing. Internally calls pleaseExecute() and then waitOnJobs().
368             However one key difference between pleaseExecute() and this sub is that you can
369             give a list of commands.
370              
371             # this will take 100 seconds because all commands have to finish.
372             $sge->pleaseExecute_andWait(["sleep 60","sleep 100","sleep 3"]);
373              
374             =back
375              
376             =cut
377              
378             sub pleaseExecute_andWait{
379 0     0 1 0 my($self,$cmd)=@_;
380 0         0 my %settings=$self->settings;
381 0         0 my $mustfinish=$self->settings("mustfinish"); # should be restored later
382 0         0 $self->set("mustfinish",0);
383 0 0       0 $cmd=[$cmd] if(ref($cmd) eq ""); # let cmd be a string but turn it into a list internally
384 0         0 my(@jobid);
385 0         0 for(@$cmd){
386 0         0 my $jobid=$self->pleaseExecute($_);
387 0         0 push(@jobid,$jobid);
388 0         0 $self->waitOnJobs(\@jobid);
389             }
390 0         0 $self->waitOnJobs(\@jobid,1);
391             }
392              
393             =pod
394              
395             =over
396              
397             =item checkJob($jobHash)
398              
399             Checks the status of a given job. The job variable is obtained from pleaseExecute().
400             $self->error can be set if there is an error in the job. Return values:
401             1 for finished; 0 for still running or hasn't started; -1 for error.
402              
403             =back
404              
405             =cut
406              
407             sub checkJob{
408 0     0 1 0 my($self,$job)=@_;
409             # See what the job status is {jobid} for fast checking
410 0         0 my $status=$self->jobStatus($$job{jobid});
411 0 0       0 if($status eq 'qw'){ # queued but not running
    0          
    0          
412 0         0 return 0;
413             } elsif($status eq 'Eqw'){ # error
414 0         0 $self->error("Command resulted in an error. qstat -j $$job{jobid} for more info\n $$job{cmd}");
415 0         0 return -1;
416             } elsif($status=~/[rt]/){ # running or is delayed
417 0         0 return 0;
418             }
419              
420             # look at files to check on the job status, for slower checking.
421             # see if the job has even started: {submitted}
422 0 0       0 return 0 if(!-e $$job{submitted});
423             # if the job finished, then great! {finished}
424 0 0       0 return 1 if(-e $$job{finished});
425 0 0       0 return 1 if(!keys(%$job)); # sometimes a job is blank... why?
426             # if the job died
427 0 0       0 if(-e $$job{died}){
428 0         0 my @content=read_file($$job{output});
429 0         0 chomp(@content);
430 0         0 $self->error(join("\n",@content[-3..-1]));
431 0         0 return -1;
432             }
433             # It's running if the die-file isn't there and if the running file is there
434 0 0       0 return 0 if(-e $$job{running});
435 0         0 logmsg "ERROR: Could not understand what the status is of job $$job{jobid}!\n".Dumper($job);
436 0         0 return -1;
437             }
438              
439             =pod
440              
441             =over
442              
443             =item jobStatus(jobid)
444              
445             Given an SGE job id, it returns its qstat status
446              
447             =back
448              
449             =cut
450              
451             sub jobStatus{
452 0     0 1 0 my($self,$jobid)=@_;
453 0         0 my $state=0;
454 0   0     0 $jobid||=0;
455 0         0 my $qstat=$self->qstat;
456 0         0 for(split(/\n/,$qstat)){
457 0         0 my @F=split /\s+/;
458 0 0       0 if($F[0] eq $jobid){
459 0         0 $state=$F[4];
460             }
461             }
462 0         0 close QSTAT;
463 0         0 return $state;
464             }
465              
466             =pod
467              
468             =over
469              
470             =item qstat
471              
472             Runs qstat and caches the result for one second. Or, returns the cached result of qstat
473              
474             =back
475              
476             =cut
477              
478             sub qstat{
479 0     0 1 0 my($self)=@_;
480             # return the cached value if it was just accessed a second ago
481             #return $self->get("qstat") if(defined($self->get("qstat")) && $self->get("qstat_timestamp") <= time - 1);
482              
483 0         0 my $content="";
484 0 0       0 open(QSTAT,"qstat|") or die "ERROR: could not execute qstat! $!";
485 0         0 while(my $line=){
486 0         0 $line=~s/^\s+|\s+$//g;
487 0         0 $content.="$line\n";
488             }
489 0         0 close QSTAT;
490 0         0 $self->set("qstat",$content);
491 0         0 $self->set("qstat_timestamp",time);
492 0         0 return $self->get("qstat");
493             }
494              
495             =pod
496              
497             =over
498              
499             =item wrapItUp()
500              
501             Waits on all jobs to finish before pausing the program.
502             Calls waitOnJobs or joinAllThreads internally. Does not take any parameters.
503              
504             =back
505              
506             =cut
507              
508             # Wait on all jobs to finish and clear out the queue.
509             sub wrapItUp{
510 1     1 1 66 my($self)=@_;
511 1 50       37 if($self->get("scheduler")){
    50          
512 0         0 $self->waitOnJobs(\@jobsToMonitor,1);
513             } elsif($has_threads){
514 0         0 $self->joinAllThreads(\@jobsToMonitor,1);
515             }
516 1         12 return 1;
517             }
518              
519             =pod
520              
521             =over
522              
523             =item joinAllThreads($jobList)
524              
525             Joins all threads. This is if you have ithreads and if the scheduler is not set.
526             For example, if you specify noqsub or if qsub executable is not found.
527              
528             =back
529              
530             =cut
531              
532             sub joinAllThreads{
533 0     0 1 0 my($self,$job)=@_;
534              
535             JOINALLTHREADS:
536 0         0 for my $j(@$job){
537 0 0       0 next if(!$$j{thread});
538 0 0 0     0 next if($$j{thread} && !$$j{thread}->is_joinable);
539 0         0 logmsg "Joining TID".$$j{jobid};
540 0         0 $$j{thread}->join;
541 0         0 $$j{thread}=0;
542             }
543              
544             # clean out the joined jobs
545 0         0 my @newjob;
546 0         0 for my $j(@$job){
547 0 0       0 push(@newjob,$j) if($$j{thread});
548             }
549 0         0 $job=\@newjob;
550              
551             # if there is still something in @$job, then go for another round
552 0 0       0 if(@$job){
553 0         0 logmsg "Waiting for ".scalar(@$job)." more jobs to finish...";
554 0         0 sleep 1;
555 0         0 goto JOINALLTHREADS;
556             }
557              
558             }
559              
560              
561             =pod
562              
563             =over
564              
565             =item waitOnJobs($jobList,[$mustFinish])
566              
567             Waits on all given jobs to finish. The job list are jobs as given by pleaseExecute().
568             If $mustFinish evaluates to true, then the program will pause until
569             all jobs are finished.
570             Calls on checkJob() internally. Will die with an error message if a job dies.
571              
572             =back
573              
574             =cut
575              
576             # Wait on enough jobs to finish before returning.
577             # If a job finishes, splice it from the job array.
578             sub waitOnJobs{
579 3     3 1 26 my($self,$job,$mustfinish)=@_;
580            
581             # if there is no qsub, then every job is only going one at a time
582 3         13 my $qsub=$self->get("qsub");
583 3 50       15 return @$job if(!$qsub);
584              
585 0         0 my %settings=$self->settings;
586 0 0       0 $settings{mustfinish}=$mustfinish if(defined($mustfinish));
587 0 0       0 if($settings{verbose}){
588 0 0       0 logmsg "We have reached node capacity ($settings{numnodes})! Waiting for a job to finish." if(@$job >= $settings{numnodes});
589 0 0       0 logmsg "We have reached slot capacity ($settings{maxslots})! Waiting for a job to finish." if($numSlots >= $settings{maxslots});
590             }
591 0         0 while(@$job > 0){
592 0         0 for(my $i=0;$i<@$job;$i++){
593 0   0     0 $$job[$i]{jobid}||=0;
594 0         0 my $state=$self->checkJob($$job[$i]);
595 0 0       0 if($state==1){
    0          
596 0 0       0 logmsg "A job finished: $$job[$i]{jobname} ($$job[$i]{jobid})" if($settings{verbose});
597 0         0 $numSlots = $numSlots - $$job[$i]{numcpus}; # not using these slots anymore
598 0         0 splice(@$job,$i,1);
599 0         0 last;
600             } elsif($state==-1){
601 0         0 my $msg="A job failed ($$job[$i]{jobname} [$$job[$i]{jobid}]! Look at $$job[$i]{output} for more details.\nError message was ".$self->error()."\n".Dumper($$job[$i]);
602 0 0       0 die $msg if(!$settings{warn_on_error});
603             # just print the warning if the script didn't die and forget about this dead job
604 0         0 logmsg $msg;
605 0         0 $numSlots = $numSlots - $$job[$i]{numcpus}; # not using these slots anymore
606 0         0 $self->error($msg);
607 0         0 splice(@$job,$i,1);
608 0         0 last;
609             }
610             }
611 0         0 sleep 1;
612             # break out if you don't have to finish yet but you can still add in another job
613 0 0 0     0 last if(!$settings{mustfinish} && @$job<$settings{numnodes} && $numSlots<$settings{maxslots});
      0        
614             }
615 0         0 return @$job;
616             }
617              
618             =pod
619              
620             =over
621              
622             =item cleanAJob
623              
624             This is internally used for cleaning up files after a job is done.
625             Do not use externally.
626              
627             =back
628              
629             =cut
630              
631             sub cleanAJob{
632 2     2 1 6 my($job)=@_;
633 2   50     69 my $jobid=$$job{jobid} || return 0;
634 0         0 logmsg $jobid;
635 0         0 for (qw(running submitted finished output script died)){
636 0         0 unlink $$job{$_};
637             }
638              
639 0         0 system("qdel $$job{jobid} 2>/dev/null | grep -v 'does not exist'");
640             #die "Internal error" if $?;
641 0         0 return 1;
642             }
643              
644             sub mktempdir{
645 0     0 0 0 my ($self,$settings) = @_;
646 0   0     0 $settings||={};
647             # SGELK.22623.XXXXX
648             #my $tempdir_path = File::Spec->join(File::Spec->tmpdir(), (split("::",(caller(1))[3]))[1].".$$.XXXXX");
649 0 0       0 mkdir "./.SGELK" if(!-d "./.SGELK");
650 0 0       0 die if $?;
651 0         0 my $tempdir_path = File::Spec->join("./.SGELK",(split("::",(caller(1))[3]))[1].".$$.XXXXX");
652 0         0 my $tempdir = tempdir($tempdir_path, CLEANUP => !($$settings{keep}));
653 0         0 return $tempdir;
654             }
655              
656             sub command{
657 2     2 0 15 my($cmd,$use_threads,$settings)=@_;
658 2         11 my $job=0;
659 2 50       17 if($use_threads){
660 0         0 $job=threads->new(\&command,$cmd,0,$settings);
661             } else {
662 2         68 logmsg "Running $cmd";
663 2         158766 system($cmd);
664 2 50       152 die "ERROR with command: $!\n $cmd" if $?;
665             }
666              
667 2         112 return $job;
668             }
669              
670             =pod
671              
672             =over
673              
674             =item test
675              
676             Use this method to perform a test. The test sends
677             ten jobs that print debugging information.
678              
679             You can give an optional hash argument to send other settings as described in new().
680              
681             perl -MSchedule::SGELK -e '$sge=Schedule::SGELK->new(-numnodes=>2,-numcpus=>8); $sge->test(\%tmpSettings);'
682              
683             =back
684              
685             =cut
686              
687             sub test{
688 0     0 1   my($self,$tmpSettings)=@_;
689              
690             # get settings
691 0           my %settings=%{ $self->settings };
  0            
692             # read in any temporary settings for this command
693 0 0         $tmpSettings={} if(!defined($tmpSettings));
694 0           $$tmpSettings{verbose}=1; # make sure it's verbose for debugging
695 0           $settings{$_}=$$tmpSettings{$_} for(keys(%$tmpSettings));
696              
697             # execute the jobs
698 0           for(1..$self->get("numnodes")){
699 0           logmsg "Job $_ is being submitted";
700 0           my $text="Job count\t$_\n";
701 0           $text.="$_\t$settings{$_}\n" for(keys(%settings));
702 0           $self->pleaseExecute("echo '$text'|column -t",$tmpSettings);
703             }
704 0           $self->wrapItUp();
705 0           return 1;
706             }
707              
708             1;