File Coverage

blib/lib/Parallel/Mpich/MPD.pm
Criterion Covered Total %
statement 88 302 29.1
branch 21 152 13.8
condition 20 66 30.3
subroutine 12 24 50.0
pod 12 14 85.7
total 153 558 27.4


line stmt bran cond sub pod time code
1             package Parallel::Mpich::MPD;
2              
3 12     12   50746 use warnings;
  12         35  
  12         464  
4 12     12   69 use strict;
  12         24  
  12         426  
5 12     12   17262 use File::Temp;
  12         291909  
  12         2119  
6 12     12   14499 use IO::All;
  12         155153  
  12         129  
7 12     12   960 use Carp;
  12         26  
  12         894  
8 12     12   13443 use Time::HiRes qw( usleep);
  12         23927  
  12         68  
9 12     12   5193 use Data::Dumper;
  12         14029  
  12         677  
10 12     12   8205 use Parallel::Mpich::MPD::Common;
  12         46  
  12         2811  
11 12     12   9131 use Parallel::Mpich::MPD::Job;
  12         51  
  12         110  
12              
13             =head1 NAME
14              
15             Parallel::Mpich::MPD - Mpich MPD wrapper
16              
17             =item I<$VERSION>
18              
19              
20             =cut
21              
22             our $VERSION = '0.9.3';
23              
24             =head1 SYNOPSIS
25             use Parallel::Mpich::MPD;
26            
27             # VERBOSE LEVEL
28             #$Parallel::Mpich::MPD::Common::WARN=1;
29             #$Parallel::Mpich::MPD::Common::DEBUG=1;
30            
31             #CHECK ENV
32             Parallel::Mpich::MPD::Common::env_Hostsfile(/path/to/machinesfile);
33             Parallel::Mpich::MPD::Common::env_MpichHome(/path/to/mpdhome);
34             Parallel::Mpich::MPD::Common::env_Check();
35            
36             #CHECK MPD AND NETWORK
37             my %hostsup;
38             my %hostsdown;
39             my %info=Parallel::Mpich::MPD::info(); #check mpd master
40             print Dumper(\%info)
41             %hostsup= Parallel::Mpich::MPD::Common::checkHosts(hostsdown => \%hostsdown ); #check ping and ssh on machines
42             %hostsup= Parallel::Mpich::MPD::check( reboot =>1:0, hostsdown=>\%hostsdown); #check mpds instances and try to repair
43             ...
44            
45             # USE MPD
46             Parallel::Mpich::MPD::boot(); #start mpd instances defined by default machinesfile
47             my $alias1=Parallel::Mpich::MPD::makealias();
48             if Parallel::Mpich::MPD::createJob(cmd => $cmd, params => $parms, $machinesfile => $hostsfile, alias => $alias1)){
49             my $job=Parallel::Mpich::MPD::findJob(jobalias => $alias, getone => 1);
50             $job->sig_kill() if defined $job;
51             }
52              
53             =head1 DESCRIPTION
54              
55             This I, a wrapper module for MPICH2 Process Management toolkit from L.
56             The wrapper include the following tools: basic configuration, mpdcheck, mpdboot, mpdcleanup, mpdtrace,
57             mpdringtest, mpdallexit, mpiexec, mpdsigjob and mpdlistjobs.
58              
59             =over 4
60              
61             =item boot(hosts => @hosts, machinesfile => $machines, checkOnly => 1|0, output => \$output)
62            
63             starts a set of mpd's on a list of machines. boot try to verify that the hosts in the host
64             file are up before attempting start mpds on any of them.
65            
66             =item rebootHost(host => $hostname)
67              
68             restart mpd on the specified host. rebootHost will kill old mpds before restarting a new one.
69             The killed MPDS are filtered by specific port and host.
70              
71             =item check(machinesfile => $file, hostsup => \%hosts, hostsdown => \%hostsdown , reboot => 1)
72              
73             Check if MPD master and nodes are well up. If MPD master is down it try to ping and ssh machines.
74             If you use the option reboot, check will try to restart mpd on specified machines or to reboot the master.
75            
76             =item info( )
77              
78             return an %info of the master with the following keys (master, hostname, port)
79            
80             =item validateMachinesfile(machinefiles => $filename)
81              
82             check with mpdtrace if all machines specified by filename are up. If not, a temporary file is
83             created with the resized machinesfile
84              
85             =item shutdown( )
86              
87             causes all mpds in the ring to exit
88              
89             =item createJob({cmd => $cmd , machinesfile=> $filename, [params => $params], [ncpu => $ncpu], [alias => $alias])
90            
91             start a new job with the command line and his params. It return true if ok.
92             WARNING ncpu could be redefined if mpdtrace return à small hosts list
93            
94             Example:
95            
96             Parallel::Mpich::MPD::createJob(cmd => $cmd, params => $parms, ncpu => '3', alias => 'job1');
97              
98             =item listJobs([mpdlistjobs_contents=>$str])
99              
100             Return an Parallel::Mpich::MPD::Job array for all available jobs
101             If mpdlistjobs_contents argument is present, the code will not call mpdlistjobs but
102             take the parameter as a fake results of this command
103              
104             =item findJob([%criteria][, return=>(getone|host2pidlist))
105              
106             find a job from crtiteria. It return a Job instance or undef for no match
107              
108             =over 4
109              
110             =item Criteria can be of
111              
112             =item username=>'somename' or username=>\@arrayOfNames
113              
114             =item jobid=>'somename' or jobid=>\@arrayOfJobid
115              
116             =item jobalias=>'somename' or jobalias=>\@arrayOfJobalias
117              
118             To set an array of names;
119              
120             $criteria{psid} [&& $criteria{rank}] You can select psid from the specified rank.
121             $criteria{reloadlist} force the call of listjobs
122              
123             =back
124              
125             =head4 return value
126              
127             By default (no return=>... argument), returned value will be a hash (or a hash ref, depending on context), {jobid1=>$job1, jobid2=>job2,...}
128              
129             =over 4
130              
131             =item return=>'getone'
132              
133             Will force to return the one job matching, or undef if none or error if many.
134              
135             =item return=>'host2pidlist'
136              
137             return a hash (or a ref to this hash, depending on context), host=>\@pidlist
138              
139             =back
140              
141             =head4 Examples
142              
143             =begin verbatim
144              
145             Parallel::Mpich::MPD::findJob(alias => 'olivier1', return=>'getone')->sendSig();
146             my %kobs=Parallel::Mpich::MPD::findJob(alias => ['olivier1', 'olivier2']);
147             my $refjobs=Parallel::Mpich::MPD::findJob(alias => ['olivier1', 'olivier2']);
148             my $job=Parallel::Mpich::MPD::findJob(jobid => '1@linux02_32996', return=>'getone');
149              
150             =end verbatim
151              
152             =item trace([hosts => %hosts], long => 1)
153              
154             Lists the hostname of each of the mpds in the ring
155             return true if ok
156            
157             [long=1] shows full hostnames and listening ports and ifhn
158              
159             =item makealias( )
160              
161             "handle-" + PID + RAND(100) + Instance COUNTER++
162            
163             return a uniq string alias
164            
165             =item clean([hosts => %hosts] [killcmd=>"cmd"])
166              
167             Removes the Unix socket on local (the default) and remote machines
168             This is useful in case the mpd crashed badly and did not remove it, which it normally does
169             [hosts => %hosts] use specified hosts ,
170             [$cleancmd="cmd"] user defined kill command
171              
172              
173             =head1 AUTHOR
174              
175             Olivier Evalet, Alexandre Masselot, C<< >>
176              
177             =head1 BUGS
178              
179             Please report any bugs or feature requests to
180             C, or through the web interface at
181             L.
182             I will be notified, and then you'll automatically be notified of progress on
183             your bug as I make changes.
184              
185             =head1 SUPPORT
186              
187             You can find documentation for this module with the perldoc command.
188              
189             perldoc Parallel::Mpich::MPD
190              
191             You can also look for information at:
192              
193             =over 4
194              
195             =item * AnnoCPAN: Annotated CPAN documentation
196              
197             L
198              
199             =item * CPAN Ratings
200              
201             L
202              
203             =item * RT: CPAN's request tracker
204              
205             L
206              
207             =item * Search CPAN
208              
209             L
210              
211             =back
212              
213             =head1 ACKNOWLEDGEMENTS
214              
215             =head1 COPYRIGHT & LICENSE
216              
217             Copyright 2006 Olivier Evalet, Alexandre Masselot, all rights reserved.
218              
219             This program is free software; you can redistribute it and/or modify it
220             under the same terms as Perl itself.
221              
222             =cut
223              
224             require Exporter;
225             our $cached_all_jobs={};
226             our (@ISA, @EXPORT, @EXPORT_OK);
227             @ISA = qw(Exporter);
228              
229              
230             @EXPORT = qw();
231             @EXPORT_OK = ();
232              
233              
234             #sub new{
235             # my $pkg=shift;
236             # my $this={};
237             # env_Init() unless defined(%env);
238             # bless $this, $pkg;
239             # return $this;
240             #}
241              
242             # hosts => @hosts, machinesfile => $machines, checkOnly => 1|0,check => \$output
243             sub boot{
244 0     0 1 0 my %params=@_;
245 0         0 env_Init();
246 0         0 my $cmdparms=""; #only one instance of mpd should run
247 0         0 my $usermachinesfile;
248 0 0       0 $usermachinesfile.="$params{machinesfile}" if defined $params{machinesfile};
249 0 0       0 $usermachinesfile.=Parallel::Mpich::MPD::Common::__param_buildHost(@{$params{hosts}}) if defined $params{hosts};
  0         0  
250            
251             # if (defined $env{info}{user}){
252             # $cmdparms.=" -u $env{info}{user}";
253             # }
254              
255 0 0       0 if (defined $env{conf}{rpc}){
256 0         0 $cmdparms.=" -r $env{conf}{rpc}";
257             }
258 0 0       0 if (defined $usermachinesfile){
    0          
259 0         0 $cmdparms.=" -f $usermachinesfile";
260 0         0 $cmdparms.=" -n ".Parallel::Mpich::MPD::Common::nbHostInMachinefile($usermachinesfile);
261             }elsif (defined $env{conf}{mpd}{hostsfile}){
262 0         0 my ($ncpu,$stripfile)=Parallel::Mpich::MPD::Common::stripMachinefile($env{conf}{mpd}{hostsfile});
263 0         0 $cmdparms.=" -f $stripfile ";
264 0         0 $cmdparms.=" -n $ncpu ";
265             }
266             # if (defined $env{conf}{mpiexec}{ncpu}){
267             # $cmdparms.=" -n ".($env{conf}{mpiexec}{ncpu}+1);
268             # }
269 0 0       0 if (defined($params{checkOnly})){
    0          
270 0         0 $cmdparms.=" --chkuponly --verbose ";
271             }elsif (defined($params{verbose})){
272 0         0 $cmdparms.=" --verbose ";
273             }
274              
275 0         0 my %execparms=(cmd => commandPath('mpdboot')." $cmdparms");
276 0 0       0 if (defined($params{output})){
277 0         0 $execparms{stdout}=$params{output}
278             }
279 0         0 my $ret=Parallel::Mpich::MPD::Common::__exec(%execparms);
280            
281            
282 0         0 return $ret==0;
283             }
284              
285             sub info{
286             #genebio-95_59850 (192.168.173.95)
287 0     0 1 0 my %params=@_;
288 0         0 env_Init();
289 0         0 my $stdout="";
290 0         0 my $stderr="";
291 0         0 my $jobret=Parallel::Mpich::MPD::Common::__exec(cmd => commandPath('mpdtrace')." -l",stdout => \$stdout, stderr=>\$stderr);
292 0         0 my @hosts=split /\n/, $stdout;
293 0         0 my %info=();
294 0 0       0 if ($jobret==0){
295 0         0 $info{master}=$hosts[0];
296 0         0 my $tmp=$info{master};
297 0         0 $tmp=~ s/(\S+)_(\d+)/$info{hostname}=$1;$info{port}=$2/e;
  0         0  
  0         0  
298 0 0       0 print "DEBUG:MPD::info:". Dumper(\%info) if $Parallel::Mpich::MPD::Common::DEBUG==1;
299             }
300 0         0 return %info;
301             }
302              
303              
304             sub rebootHost{
305 0     0 1 0 my %params=@_;
306 0         0 my $stdout;
307 0         0 env_Init();
308            
309 0         0 my %mpdinfo=();
310 0 0       0 unless (defined($params{host})){
311 0         0 print STDERR "ERROR:rebootHost(1): you should define an hostname where to start mpd \n";
312 0         0 return %mpdinfo;
313             }
314            
315 0         0 %mpdinfo=info();
316            
317 0 0       0 if((my $c=%mpdinfo) eq "0"){
318 0         0 print STDERR "ERROR:rebootHost(2): MPD seems dead, restart the mpd system \n";
319 0         0 return undef;
320             }
321            
322             #TODO make this better ;)
323             # pkill -f mpd.py
324 0         0 my $kill;
325             my $cmd;
326 0 0       0 if (system("pkill --help &>/dev/null")){
327 0         0 $kill="pkill -U $env{info}{user} -f mpd.py";
328 0         0 $cmd="ssh $params{'host'} 'pkill -U $env{info}{user} -f mpd.py'";
329             }else{
330 0         0 $kill="ps -U $env{info}{user} -o pid,command|grep -e \"$mpdinfo{hostname} -p $mpdinfo{port}\" |grep -v grep |cut -d \" \" -f2";
331 0         0 $cmd="ssh $params{'host'} '$kill|xargs kill 2>/dev/null'";
332             }
333 0         0 my $ret=system $cmd;
334            
335 0         0 $cmd="ssh $params{'host'} ". commandPath('mpd')." -h $mpdinfo{hostname} -p $mpdinfo{port} --ncpus=1 -e -d";
336            
337 0         0 my $res=Parallel::Mpich::MPD::Common::__exec(cmd => $cmd, stdout => \$stdout);
338            
339 0 0       0 if ($res!=0){
340 0         0 print STDERR "ERROR:rebootHost(3): Could not start mpd on host $params{host} \n";
341 0         0 return undef;
342             }
343            
344 0 0       0 print "INFO:restart mpd on $params{'host'}:$mpdinfo{port} \n" if $Parallel::Mpich::MPD::Common::WARN==1;
345            
346             #check mpdtrace to detect the new host
347             # my %hosts;
348             # my $res=trace(hosts => \%hosts);
349             # return (defined($hosts{$params{host}}) && $hosts{$params{host}}==1)?1:undef;
350 0         0 return 1;
351             }
352              
353             # TODO check should also check on each nodes that:
354             # $HOME/.mpd.conf owner and attr=600
355             # check DNS hostname for master and nodes
356              
357             # use case check
358             # =================
359              
360             # 1) unplug network wire
361             # 2) some mpd died correctly
362             # 3) some mpd died badly
363             # 4) all nodes are dead
364            
365             #
366             #
367             # machinesfile => $file, hostsup => \%hosts, hostsdown => \%hostsdown , reboot => 1
368             sub check{
369 0     0 1 0 my %params=@_;
370 0         0 env_Init();
371 0         0 my $out="";
372 0         0 my %hosts=();
373 0         0 my $should_resize;
374            
375 0 0       0 my ($ncpu,$stripfile)=Parallel::Mpich::MPD::Common::stripMachinefile($env{conf}{mpd}{hostsfile}) unless defined $params{machinesfile};
376            
377 0 0       0 my $machinesfile=(defined $params{machinesfile})?$params{machinesfile}:$stripfile;
378             #trace hosts
379            
380             #1) check mpdtrace hosts
381 0         0 my $res=trace(hosts => \%hosts);
382            
383 0         0 my $machines=io($machinesfile)->slurp;
384 0         0 my %hostsdown;
385 0 0       0 if ($res){
386             #compare hosts result with the input machinesfile
387 0         0 my %hostsup;
388            
389 0         0 foreach (split /[\n\r]/, $machines){
390 0 0       0 next unless /\S/;
391 0 0       0 next if /#.*$/;
392 0         0 s/([^.]+).*$/$1/;
393 0 0 0     0 if (defined($hosts{$_}) && $hosts{$_}==1){
394 0         0 $hostsup{$_}=1;
395             }else{
396 0         0 $should_resize=1;
397 0 0 0     0 if (defined $params{reboot} && $params{reboot}==1){
398 0 0       0 $hostsdown{$_}=1 unless(defined(rebootHost(host =>"$_")));
399             }else{
400 0         0 $hostsdown{$_}=1;
401             }
402             }
403             }
404             # here the status is wrong, cluster should be resized
405 0 0 0     0 if (defined $should_resize && defined $params{reboot} && $params{reboot}==1){
      0        
406 0         0 %hostsup=check(machinesfile => $machinesfile);
407             }
408 0 0       0 %{$params{hostsup}}=%hostsup if defined $params{hostsup};
  0         0  
409 0 0       0 %{$params{hostsdown}}=%hostsdown if defined $params{hostsdown};
  0         0  
410            
411 0         0 return %hostsup ;
412            
413              
414             }else{
415 0         0 foreach (split /[\n\r]/, $machines){
416 0 0       0 next unless /\S/;
417 0 0       0 next if /#.*$/;
418 0         0 s/([^.]+).*$/$1/;
419 0         0 $hostsdown{$_}=1;
420             }
421 0 0       0 %{$params{hostsdown}}=%hostsdown if defined $params{hostsdown};
  0         0  
422            
423 0 0       0 print "ERROR:MPD::check(): MPD seems down, it's a really bad news!\n" if ($Parallel::Mpich::MPD::Common::WARN == 1);
424 0 0 0     0 if (defined $params{reboot} && $params{reboot}==1){
425 0         0 print "INFO: Clean all MPD.\n";
426 0         0 clean(pkill=>1);
427 0         0 print "INFO: trying to restart MPD.\n";
428 0         0 boot();
429 0         0 return check(machinesfile => $machinesfile, hostsup=>$params{hostsup}, hostsdown=>$params{hostsdown});
430             }
431             }
432            
433             #2) check vailable hosts (ping + ssh publicKey) if mpdtrace return FALSE
434 0         0 checkHosts(machinesfile => $machinesfile, hostsup => \%hosts);
435 0 0       0 unless (keys %hosts){
436             #
437             # FATAL ERROR: the cluster is dead
438             #
439 0         0 die "ERROR: your cluster is dead! All Hosts defined in $machinesfile are down.";
440             }
441              
442            
443             #3) should clean and restart mpd ?
444              
445              
446 0         0 return %hosts=();
447             }
448              
449              
450              
451              
452              
453              
454             #use a cached jobs to force the update on each getJobs call!!!
455             sub __jobsFactory{
456              
457 2     2   933 my $myjobs=shift;
458 2         6 my $host={};
459 2         5 my @allentries;
460             my $job;#={infos => {jobid=>''}};
461              
462 2         12 $cached_all_jobs={};
463              
464             #map result to hash
465 2         30 foreach (split/\n\n/, $myjobs){
466 14         27 my %h;
467 14         109 s/(\S+)[\s=]+(\S*)?$/$h{$1}=$2/emg;
  112         824  
468 14         70 push @allentries, \%h;
469             }
470              
471             #order job
472 2         15 foreach (@allentries){
473 14         162 my $data=$_;
474 14   66     295 $job=$cached_all_jobs->{$data->{jobid}}||=Parallel::Mpich::MPD::Job->new(jobid=>$data->{jobid});
475             # if (!$job || ($data->{jobid} ne $job->jobid)){
476             # $job=P;
477             # =$job;
478             # $job->jobid($data->{jobid});
479 14         456 $job->jobalias($data->{jobalias});
480 14         498 $job->username($data->{username});
481             # }
482 14         104 $host={};
483 14         48 $host->{host} =$data->{host};
484 14         43 $host->{pid} =$data->{pid};
485 14         37 $host->{sid} =$data->{sid};
486 14         34 $host->{rank} =$data->{rank};
487 14         37 $host->{pgm} =$data->{pgm};
488 14         58 $job->hosts_push($host);
489             }
490              
491 2 50       34 print STDERR (values %{$cached_all_jobs}) if ($Parallel::Mpich::MPD::Common::DEBUG == 1);
  0         0  
492 2         5 return values %{$cached_all_jobs};
  2         47  
493             }
494              
495              
496              
497             sub validateMachinesfile{
498 0     0 1 0 my %params=@_;
499 0         0 my $trace="";
500 0         0 my %hosts=();
501 0 0       0 return undef unless defined $params{machinesfile};
502 0         0 my $retfile=$params{machinesfile};
503            
504            
505 0 0 0     0 if ( defined($params{machinesfile}) && -e $params{machinesfile}){
506             #1) check mpdtrace hosts
507 0         0 my $res=trace(hosts => \%hosts);
508            
509 0 0       0 if ($res){
510             #compare hosts result with the input machinesfile
511 0         0 my $machines=io($params{machinesfile})->slurp;
512 0 0       0 print "DEBUG:validateMachinesfile(1) : machines=$machines\n\n" if ($Parallel::Mpich::MPD::Common::DEBUG == 1);
513            
514 0         0 my $fhosts = new File::Temp(UNLINK=>0, TEMPLATE => File::Spec->tmpdir."/$TMP_MPD_PREFIX-machines-XXXX");
515 0         0 foreach (split /[\n\r]/, $machines){
516 0 0       0 next unless /\S/;
517 0 0       0 next if /#.*$/;
518 0         0 s/([^.]+).*$/$1/;
519            
520 0 0 0     0 if (defined($hosts{$_}) && $hosts{$_}==1 ){
521 0         0 print $fhosts $_."\n";
522 0 0       0 print "validated machines : ".$_."\n" if ($Parallel::Mpich::MPD::Common::DEBUG == 1);
523             }else{
524 0         0 $retfile=$fhosts->filename;
525 0         0 print STDERR "WARNING: node $_ defined on machinesfile [".$params{machinesfile}."] is not available\n";
526             }
527             }
528             }else{
529 0         0 print STDERR "ERROR:validateMachinesfile(1): fatal mpd error \n";
530             }
531             }else{
532 0         0 print STDERR "ERROR:validateMachinesfile(2): could not open file : ".$params{machinesfile}."\n";
533             }
534              
535 0         0 return $retfile;
536             }
537              
538              
539             sub isJobRegistered{
540 0     0 0 0 my ($alias)=@_;
541             # we don't know if job is register without alias
542 0 0       0 return 1 unless defined $alias;
543 0 0       0 if(system(commandPath('mpdlistjobs')." 2>/dev/null|grep -qe \"alias.*$alias\"")){
544 0         0 return 0;
545             }else{
546 0 0       0 print STDERR "INFO: job $alias is registered \n" if ($Parallel::Mpich::MPD::Common::WARN == 1);
547 0         0 return 1;
548             }
549             }
550              
551             sub waitJobRegistration{
552 0     0 0 0 my ($alias)=@_;
553 0         0 my $TIMEOUT=0;
554 0   0     0 do{
555 0         0 usleep (400*1000);
556 0         0 $TIMEOUT+=400;
557             }while(!isJobRegistered($alias) && $TIMEOUT<10000);
558 0         0 return 1;
559             }
560              
561              
562              
563             # TODO check that the machine file contains booted hosts
564             # FIXME create job should wait until job is correctly registered by mpd.
565             sub createJob{
566 0     0 1 0 my %params=@_;
567 0         0 env_Init();
568 0         0 my $spawn = 'yes';
569 0 0       0 $spawn=$params{spawn} if exists $params{spawn};
570 0         0 my $_out;
571             my $_err;
572 0         0 my $_pid="";
573              
574 0         0 my $mpiexecArgs="";
575 0 0       0 $mpiexecArgs.=" -a $params{alias}" if defined($params{alias});
576              
577 0 0       0 my $inputfile=(defined $params{machinesfile})?$params{machinesfile}:Parallel::Mpich::MPD::Common::env_Hostsfile();
578 0         0 my $machinesfile=validateMachinesfile(machinesfile => $inputfile);
579 0         0 $mpiexecArgs.=" -machinefile ".$machinesfile;
580              
581 0 0       0 $mpiexecArgs.="-u $params{user}" if defined($params{user});
582              
583             #TODO much more here see mpiexec -hosts
584             #$mpiexecArgs.=" -n $params{ncpu}" if defined($params{ncpu});
585            
586 0 0 0     0 if(defined $params{ncpu} && "$inputfile" eq "$machinesfile"){
587 0         0 $mpiexecArgs.=" -n $params{ncpu}";
588             }else{
589 0         0 my $n=Parallel::Mpich::MPD::Common::nbHostInMachinefile($machinesfile);
590 0         0 $mpiexecArgs.=" -n $n";
591             }
592 0 0       0 $mpiexecArgs.=" -env LD_ASSUME_KERNEL ".$ENV{LD_ASSUME_KERNEL} if ($ENV{LD_ASSUME_KERNEL});
593              
594 0 0       0 if (! defined($params{cmd})){
595 0         0 carp "ERROR: key cmd not available";
596 0         0 return undef;
597             }
598             # hosts info
599 0         0 my $cmdparms="";#.="-f ".Parallel::Mpich::MPD::Common::__param_buildHost(@hosts) if defined(@hosts);
600 0         0 my $cmd=commandPath('mpiexec')." $mpiexecArgs $params{cmd} $params{params}";
601 0         0 print STDERR "hostfile=".$machinesfile."\n";
602              
603              
604 0         0 my %args=(cmd => $cmd);
605 0 0       0 $args{params}=\$params{params} if (defined($params{params}) );
606 0 0       0 $args{spawn}=\$params{spawn} if (defined($params{spawn}) );
607 0 0       0 $args{stdout}=$params{stdout} if (defined($params{stdout}) );
608 0 0       0 $args{stderr}=$params{stderr} if (defined($params{stderr}) );
609 0 0       0 $args{pid}=$params{pid} if (defined($params{pid}) );
610            
611 0         0 my $ret=Parallel::Mpich::MPD::Common::__exec(%args);
612 0 0 0     0 waitJobRegistration($params{alias}) if defined($params{spawn}) && defined($params{alias});
613            
614            
615 0         0 return $ret==0;
616             }
617              
618             sub listJobs{
619 1     1 1 90433 my %hparams=@_;
620 1         9 env_Init();
621 1         9 my $stdout="";
622 1         3 my $stderr;
623 1 50       5 if(defined ($hparams{mpdlistjobs_contents})){
624 1         8 $stdout=$hparams{mpdlistjobs_contents};
625             }else{
626 0         0 my $res=Parallel::Mpich::MPD::Common::__exec(cmd => commandPath('mpdlistjobs'), stdout => \$stdout);
627             # my $cmd=commandPath('mpdlistjobs');
628             # $stdout=`$cmd`;
629 0 0       0 print "DEBUG: mpdlistjobs stdout= $stdout \n" if ($Parallel::Mpich::MPD::Common::DEBUG == 1);
630 0 0       0 if ($res!=0){
631 0         0 carp "error executing line mpdlistjobs command\n";
632 0         0 return undef;
633             }
634 0 0       0 if ($stdout eq ""){
635 0         0 return undef;
636             }
637             }
638            
639              
640 1         19 return __jobsFactory($stdout);
641             }
642              
643              
644              
645              
646             #find a job from crtiteria. It return an Hash of Jobs instance or null for no match
647             #params
648             # $criteria{pid} [ && $criteria{rank}]
649             # $criteria{jobalias}
650             # $criteria{username}
651             # $criteria{reloadlist}
652             # $criteria{getone} return the first occurrence
653              
654             sub findJob{
655 13     13 1 3834 my %criteria=@_;
656             #Transform criteria from array or scalar into a hash
657 13         32 foreach my $k(qw(jobalias username jobid)) {
658 39 100       111 next unless defined $criteria{$k};
659 13 100       42 if(ref($criteria{$k}) eq 'ARRAY'){
660             # print STDERR "k=[$k]\n";
661 3         4 my %h;
662 3         3 foreach(@{$criteria{$k}}){
  3         9  
663             # print STDERR "\$_=[$_]\n";
664 4         11 $h{$_}=1;
665             }
666 3         8 $criteria{$k}=\%h;
667             }else{
668             #scalar, make it array
669 10         42 $criteria{$k}={$criteria{$k}=>1};
670             }
671             }
672              
673             #force to reload the list
674 13 50       40 listJobs() if defined($criteria{reloadlist});
675              
676             # # je ne comprend pas pourquoi $jobs->{$job->jobid} est un tableau!!!
677             # undef my $jobs;
678             # foreach my $job (values %{$cached_all_jobs}){
679             # if ($job->equals(%criteria)){
680             # #WRONG: returns the first one!
681             # return $job if (defined($criteria{getone}));
682              
683             # if (defined($criteria{jobalias})) {
684             # push @{$jobs->{$job->jobid}}, $job;
685              
686             # }elsif (defined($criteria{username})) {
687             # push @{$jobs->{$job->jobid}}, $job;
688             # }elsif (defined($criteria{jobid})) {
689             # push @{$jobs->{$job->jobid}},$job;
690             # }elsif (defined($criteria{pid})) {
691             # push @{$jobs->{$job->jobid}}, $job;
692             # }
693             # }
694             # }
695 13         16 my %retjobs;
696 13         17 foreach my $j (values %{$cached_all_jobs}){
  13         73  
697 66 100 66     7769 if((defined($j->jobid) && $criteria{jobid}{$j->jobid}) ||
      66        
      66        
      66        
      66        
698             (defined($j->jobalias) && $criteria{jobalias}{$j->jobalias}) ||
699             (defined($j->username) && $criteria{username}{$j->username})){
700 19         2955 $retjobs{$j->jobid}=$j;
701             }
702             }
703              
704 13 100 100     1079 if ($criteria{return} && $criteria{return} eq 'getone'){
    100 66        
705             #no match!!
706 6 100       75 return undef unless %retjobs;
707              
708 3         9 my $sz=keys %retjobs;
709 3 50       12 print __PACKAGE__."::findjobs() ".Dumper \%retjobs if $Parallel::Mpich::MPD::Common::WARN;
710            
711 3 100       11 if($sz==1){
712             #OK
713 2         19 return (values %retjobs)[0];
714             }else{
715             #too many matches
716             die "too many matches ($sz) for criteria
717             jobid=> (".($criteria{jobid} || join('|', keys %{$criteria{jobid}})).")
718             jobalias=> (".($criteria{jobalias} || join('|', keys %{$criteria{jobalias}})).")
719 1   33     110 username=>(".($criteria{username} || join('|', keys %{$criteria{username}})).")
      33        
      33        
720             ";
721             }
722             }elsif ($criteria{return} && $criteria{return} eq 'host2pidlist'){
723 1         2 my %rethash;
724 1         5 foreach (values %retjobs){
725 3         14 my @h=$_->hosts();
726 3         42 foreach (@h){
727 4         25 $rethash{$_->{host}}{$_->{pid}}=$_->{pgm};
728             }
729             }
730 1 50       11 return wantarray?%rethash:\%rethash;
731             }else{
732 6 100       57 return wantarray?%retjobs:\%retjobs;
733             }
734             }
735              
736              
737             sub trace{
738 0     0 1   my %params=@_;
739 0           env_Init();
740 0           my $stdout="";
741 0           my $jobret=Parallel::Mpich::MPD::Common::__exec(cmd => commandPath('mpdtrace'),stdout => \$stdout);
742 0 0 0       if (defined($params{hosts}) && $jobret==0){
743 0           my %tmp=%{$params{hosts}};
  0            
744 0           foreach (split /\n/, $stdout){
745 0           $tmp{$_}=1;
746             }
747 0           %{$params{hosts}}=%tmp;
  0            
748             }
749 0           return $jobret==0;
750             }
751              
752              
753             #
754             # clean
755             # Removes the Unix socket on local (the default) and remote machines
756             # This is useful in case the mpd crashed badly and did not remove it, which it normally does
757             # params
758             # [hosts => %hosts] pkill=>1
759             sub clean{
760 0     0 1   my %params=@_;
761 0           env_Init();
762 0           my $cmdparms;
763 0 0         if (defined $params{hosts}){
764 0           $cmdparms.=" -f ".Paralle l::Mpich::MPD::Common::__param_buildHost(@{$params{hosts}}) ;
  0            
765             }else{
766 0           $cmdparms.=" -f ".Parallel::Mpich::MPD::Common::stripMachinefile($env{conf}{mpd}{hostsfile});
767             }
768 0 0         $cmdparms.=" -k \"pkill -U $env{info}{user} -f mpd.py\"" if defined $params{pkill};
769              
770 0           my $cmd=commandPath('mpdcleanup')."$cmdparms 2>/dev/null";
771             # print STDERR $cmd."\n";
772 0           return system ($cmd);
773             }
774              
775             sub shutdown{
776 0     0 1   my $stdout="";
777 0           my $stderr="";
778 0           env_Init();
779 0           my $ret2=Parallel::Mpich::MPD::Common::__exec(cmd => commandPath('mpdallexit'));
780 0 0         $ret2=clean() if $ret2;
781 0           return $ret2==0;
782             }
783              
784             our $aliasCounter=0;
785             sub makealias{
786            
787             # alias is hablde + PID + RAND(100) + COUNTER++
788             # inside a thread the aliascounter is a copy !
789 0     0 1   return "handle-$$-".int(rand(100))."-".($aliasCounter++);
790             }
791              
792              
793             1; # End of Parallel::Mpich::MPD