File Coverage

blib/lib/HPC/Runner/MCE.pm
Criterion Covered Total %
statement 21 85 24.7
branch 0 18 0.0
condition n/a
subroutine 7 11 63.6
pod 3 4 75.0
total 31 118 26.2


line stmt bran cond sub pod time code
1             #!/usr/bin/env perl
2              
3             package HPC::Runner::MCE;
4              
5 1     1   25683 use MCE;
  1         62679  
  1         8  
6 1     1   17402 use MCE::Queue;
  1         13969  
  1         7  
7 1     1   1876 use DateTime;
  1         176501  
  1         33  
8 1     1   1063 use DateTime::Format::Duration;
  1         6737  
  1         64  
9              
10 1     1   964 use Moose;
  1         577244  
  1         9  
11             extends 'HPC::Runner';
12             with 'MooseX::Getopt::Usage';
13              
14             =head1 NAME
15              
16             HPC::Runner::MCE - Job submission using MCE
17              
18             =head1 VERSION
19              
20             Version 0.01
21              
22             =cut
23              
24             our $VERSION = '2.40';
25              
26             =head1 SYNOPSIS
27              
28             Use MCE and MCE::Queue to run arbitrary bash commands in parallel using processes.
29              
30             =cut
31              
32             =head1 Variables
33              
34             =cut
35              
36             has 'queue' => (
37             traits => ['NoGetopt'],
38             is => 'rw',
39             lazy => 0, ## must be 0 to ensure the queue is created prior to spawning
40             default => sub {
41             my $self = shift;
42             return MCE::Queue->new();
43             }
44             );
45              
46             has 'mce' => (
47             traits => ['NoGetopt'],
48             is => 'rw',
49             lazy => 1,
50             default => sub {
51             my $self = shift;
52             return MCE->new(
53             max_workers => $self->procs, use_threads => 0, user_func => sub {
54             my $mce = shift;
55             while (1) {
56             my ($counter, $cmd) = $self->queue->dequeue(2);
57             last unless defined $counter;
58             $self->counter($counter);
59             $self->cmd($cmd);
60             $self->run_command_mce();
61             }
62             }
63             );
64             }
65             );
66              
67             has 'using_mce' => (
68             is => 'rw',
69             isa => 'Bool',
70             default => 1,
71             required => 1,
72             );
73              
74              
75             =head1 SUBROUTINES/METHODS
76              
77             =cut
78              
79             =head2 go
80              
81             Initialize MCE things and use Runner::Init to parse and exec commands
82              
83             =cut
84              
85             sub go{
86 0     0 1   my $self = shift;
87              
88 0           my $dt1 = DateTime->now();
89              
90 0           $self->prepend_logfile("MAIN_");
91 0           $self->append_logfile(".log");
92 0           $self->log($self->init_log);
93 0           $self->mce->spawn;
94              
95             #MCE specific
96 0           $self->parse_file_mce;
97              
98 0           $DB::single=2;
99             # MCE workers dequeue 2 elements at a time. Thus the reason for * 2.
100 0           $self->queue->enqueue((undef) x ($self->procs * 2));
101             # MCE will automatically shutdown after running for 1 or no args.
102 0           $self->mce->run(1);
103             #End MCE specific
104              
105 0           my $dt2 = DateTime->now();
106 0           my $duration = $dt2 - $dt1;
107 0           my $format = DateTime::Format::Duration->new(
108             pattern => '%Y years, %m months, %e days, %H hours, %M minutes, %S seconds'
109             );
110              
111 0           $self->log->info("Total execution time ".$format->format_duration($duration));
112 0           return;
113             }
114              
115             =head2 parse_file_mce
116              
117             The default method of parsing the file.
118              
119             #starts a comment
120             wait - says wait until all other processes/threads exitcode
121              
122             #this is a one line command
123             echo "starting"
124              
125             #This is a multiline command
126             echo "starting line 1" \
127             echo "starting line 2" \
128             echo "finishing
129              
130             =cut
131              
132             sub parse_file_mce{
133 0     0 1   my $self = shift;
134              
135 0           $DB::single=2;
136              
137 0 0         my $fh = IO::File->new( $self->infile, q{<} ) or $self->log->fatal("Error opening file ".$self->infile." ".$!);
138 0 0         die unless $fh;
139              
140 0           while(<$fh>){
141 0           my $line = $_;
142 0 0         next unless $line;
143 0 0         next unless $line =~ m/\S/;
144 0           $self->process_lines($line);
145 0           $self->wait(0);
146             }
147              
148 0           $DB::single=2;
149             }
150              
151 1     1   8344 use Storable qw(dclone);
  1         3  
  1         467  
152              
153             sub process_lines{
154 0     0 0   my $self = shift;
155 0           my $line = shift;
156              
157 0 0         return if $line =~ m/^#/;
158 0 0         if($self->has_cmd){
159 0           $DB::single=2;
160 0           $self->add_cmd($line);
161 0 0         if($line =~ m/\\$/){
162 0           return;
163             }
164             else{
165 0           $self->log->info("Enqueuing command:\n".$self->cmd);
166             #MCE
167             #$self->mce->send($self);
168 0           $self->queue->enqueue($self->counter, $self->cmd);
169             #Threads
170             # $self->run_command_threads;
171 0           $self->clear_cmd;
172 0           $self->inc_counter;
173             }
174             }
175             else{
176 0           $DB::single=2;
177 0           $self->cmd($line);
178 0 0         if($line =~ m/\\$/){
    0          
179 0           next;
180             }
181             elsif( $self->match_cmd(qr/^wait$/) ){
182 0           $DB::single=2;
183 0           $self->log->info("Beginning command:\n".$self->cmd);
184 0           $self->log->info("Waiting for all children to complete...");
185 0           $self->clear_cmd;
186             #MCE
187              
188 0           $self->wait(1);
189 0           push(@{$self->jobref}, []);
  0            
190 0           $self->queue->enqueue((undef) x ($self->procs * 2));
191 0           $self->mce->run(0); # 0 indicates do not shutdown after running
192              
193             # #THREADS
194             # $self->threads->wait_all_children;
195 0           $self->log->info("All children have completed processing!");
196             }
197             else{
198 0           $self->log->info("Enqueuing command:\n".$self->cmd);
199 0           $DB::single=2;
200             #MCE
201 0           $self->queue->enqueue($self->counter, $self->cmd);
202             # #Threads
203             # $self->run_command_threads;
204 0           $self->clear_cmd;
205 0           $self->inc_counter;
206             }
207             }
208             }
209              
210             =head2 run_command_mce
211              
212             MCE knows which subcommand to use from Runner/MCE - object mce
213              
214             =cut
215              
216 1     1   1265 use Data::Dumper;
  1         7544  
  1         168  
217             sub run_command_mce{
218 0     0 1   my $self = shift;
219              
220 0           my $pid = $$;
221              
222 0           $DB::single=2;
223              
224             #Mce doesn't take exitcode to end
225 0           push(@{$self->jobref->[-1]}, $pid);
  0            
226 0           $self->_log_commands($pid);
227              
228 0           return;
229             }
230              
231              
232             #use namespace::autoclean;
233             1;
234              
235             =head1 AUTHOR
236              
237             Jillian Rowe, C<< <jillian.e.rowe at gmail.com> >>
238             Mario Roy, C<< <marioeroy at gmail.com> >>
239              
240             =head1 BUGS
241              
242             Please report any bugs or feature requests to C<bug-runner-init at rt.cpan.org>, or through
243             the web interface at L<http://rt.cpan.org/NoAuth/ReportBug.html?Queue=HPC::Runner::MCE>. I will be notified, and then you'll
244             automatically be notified of progress on your bug as I make changes.
245              
246              
247             =head1 SUPPORT
248              
249             You can find documentation for this module with the perldoc command.
250              
251             perldoc HPC::Runner::MCE
252              
253              
254             You can also look for information at:
255              
256             =over 4
257              
258             =item * RT: CPAN's request tracker (report bugs here)
259              
260             L<http://rt.cpan.org/NoAuth/Bugs.html?Dist=HPC-Runner-MCE>
261              
262             =item * AnnoCPAN: Annotated CPAN documentation
263              
264             L<http://annocpan.org/dist/HPC-Runner-MCE>
265              
266             =item * CPAN Ratings
267              
268             L<http://cpanratings.perl.org/d/HPC-Runner-MCE>
269              
270             =item * Search CPAN
271              
272             L<http://search.cpan.org/dist/HPC-Runner-MCE/>
273              
274             =back
275              
276             =head1 Acknowledgements
277              
278             This module was originally developed at and for Weill Cornell Medical
279             College in Qatar within ITS Advanced Computing Team. With approval from
280             WCMC-Q, this information was generalized and put on github, for which
281             the authors would like to express their gratitude.
282              
283             =head1 LICENSE AND COPYRIGHT
284              
285             Copyright 2014 Weill Cornell Medical College.
286              
287             This program is free software; you can redistribute it and/or modify it
288             under the terms of the the Artistic License (2.0). You may obtain a
289             copy of the full license at:
290              
291             L<http://www.perlfoundation.org/artistic_license_2_0>
292              
293             Any use, modification, and distribution of the Standard or Modified
294             Versions is governed by this Artistic License. By using, modifying or
295             distributing the Package, you accept this license. Do not use, modify,
296             or distribute the Package, if you do not accept this license.
297              
298             If your Modified Version has been derived from a Modified Version made
299             by someone other than you, you are nevertheless required to ensure that
300             your Modified Version complies with the requirements of this license.
301              
302             This license does not grant you the right to use any trademark, service
303             mark, tradename, or logo of the Copyright Holder.
304              
305             This license includes the non-exclusive, worldwide, free-of-charge
306             patent license to make, have made, use, offer to sell, sell, import and
307             otherwise transfer the Package with respect to any patent claims
308             licensable by the Copyright Holder that are necessarily infringed by the
309             Package. If you institute patent litigation (including a cross-claim or
310             counterclaim) against any party alleging that the Package constitutes
311             direct or contributory patent infringement, then this Artistic License
312             to you shall terminate on the date that such litigation is filed.
313              
314             Disclaimer of Warranty: THE PACKAGE IS PROVIDED BY THE COPYRIGHT HOLDER
315             AND CONTRIBUTORS "AS IS' AND WITHOUT ANY EXPRESS OR IMPLIED WARRANTIES.
316             THE IMPLIED WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
317             PURPOSE, OR NON-INFRINGEMENT ARE DISCLAIMED TO THE EXTENT PERMITTED BY
318             YOUR LOCAL LAW. UNLESS REQUIRED BY LAW, NO COPYRIGHT HOLDER OR
319             CONTRIBUTOR WILL BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, OR
320             CONSEQUENTIAL DAMAGES ARISING IN ANY WAY OUT OF THE USE OF THE PACKAGE,
321             EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
322              
323              
324             =cut
325              
326             # End of Runner::MCE
327