File Coverage

blib/lib/HPC/Runner/Threads.pm
Criterion Covered Total %
statement 13 15 86.6
branch n/a
condition n/a
subroutine 5 5 100.0
pod n/a
total 18 20 90.0


line stmt bran cond sub pod time code
1             package HPC::Runner::Threads;
2              
3 1     1   16722 use IPC::Open3;
  1         3566  
  1         75  
4 1     1   501 use IO::Select;
  1         1213  
  1         43  
5 1     1   6 use Symbol;
  1         5  
  1         70  
6 1     1   596 use Data::Dumper;
  1         11529  
  1         92  
7 1     1   1287 use Parallel::ForkManager;
  0            
  0            
8             use Log::Log4perl qw(:easy);
9             use DateTime;
10             use DateTime::Format::Duration;
11             use Cwd;
12             use Moose;
13             use Moose::Util::TypeConstraints;
14              
15             extends 'HPC::Runner';
16              
17             with 'MooseX::Getopt';
18             with 'MooseX::Getopt::Usage';
19             with 'MooseX::Getopt::Usage::Role::Man';
20              
21             =head1 NAME
22              
23             HPC::Runner::Threads - Job submission using threads
24              
25             =head1 VERSION
26              
27             Version 0.01
28              
29             =cut
30              
31             our $VERSION = '2.35';
32              
33             =head1 SYNOPSIS
34              
35             Use Parallel::ForkManager to run arbitrary bash commands
36              
37             =head1 Attributes
38              
39             =cut
40              
41             =head2 twait
42              
43             How frequently to test for a thread having exited the queue in seconds. Defaults to once every 60 seconds. If your jobs are very fast, you may want to decrease this number, or vice versa if they are very long.
44              
45             =cut
46              
47             has 'twait' => (
48             is => 'rw',
49             isa => 'Int',
50             lazy => 1,
51             required => 1,
52             default => 60,
53             documentation => q{How frequently to test for a thread having exited the queue in seconds. Defaults to once every 60 seconds.}
54             );
55              
56             =head2 threads
57              
58             This uses Parallel::ForkManager to deploy the threads. If you wish to use something else you must redefine it here.
59              
60             =cut
61              
62             has 'threads' => (
63             traits => ['NoGetopt'],
64             is => 'rw',
65             lazy => 1,
66             default => sub {
67             my $self = shift;
68             return new Parallel::ForkManager($self->procs);
69             }
70             );
71              
72             =head1 SUBROUTINES/METHODS
73              
74             =head2 go
75              
76             This is the main application. It starts the logging, build the threads, parses the file, runs the commands, and finishes logging.
77              
78             =cut
79              
80             sub go {
81             my $self = shift;
82              
83             my $dt1 = DateTime->now();
84              
85             $self->prepend_logfile("MAIN_");
86             $self->log($self->init_log);
87              
88             #Threads specific
89             build_threads($self);
90              
91             $self->parse_file_threads;
92             #End threads specific
93              
94             my $dt2 = DateTime->now();
95             my $duration = $dt2 - $dt1;
96             my $format = DateTime::Format::Duration->new(
97             pattern => '%Y years, %m months, %e days, %H hours, %M minutes, %S seconds'
98             );
99              
100             $self->log->info("Total execution time ".$format->format_duration($duration));
101             }
102              
103             =head2 parse_file_threads
104              
105             Parse the file of commands and send each command off to the queue.
106              
107             #TODO
108             #Merge mce/threads subroutines
109              
110             =cut
111              
112              
113             sub parse_file_threads{
114             my $self = shift;
115              
116             my $fh = IO::File->new( $self->infile, q{<} ) or $self->log->fatal("Error opening file ".$self->infile." ".$!); # even better!
117              
118             my $cmd;
119             while(<$fh>){
120             my $line = $_;
121             next unless $line;
122             next unless $line =~ m/\S/;
123             next if $line =~ m/^#/;
124              
125             if($self->has_cmd){
126             $self->add_cmd($line);
127             if($line =~ m/\\$/){
128             next;
129             }
130             else{
131             $self->log->info("Enqueuing command:\n".$self->cmd);
132             $self->run_command_threads;
133             $self->clear_cmd;
134             $self->inc_counter;
135             }
136             }
137             else{
138             $self->cmd($line);
139             if($line =~ m/\\$/){
140             next;
141             }
142             elsif( $self->match_cmd(qr/^wait$/) ){
143             $self->log->info("Beginning command:\n".$self->cmd);
144             $self->log->info("Waiting for all children to exit...");
145             $self->clear_cmd;
146             $self->threads->wait_all_children;
147             $self->log->info("All children are out of the pool!");
148             $self->inc_counter;
149              
150             }
151             else{
152             $self->log->info("Enqueuing command:\n".$self->cmd);
153             $self->run_command_threads;
154             $self->clear_cmd;
155             $self->inc_counter;
156             }
157             }
158             }
159              
160             $self->threads->wait_all_children;
161              
162             }
163              
164             =head2 build_threads
165              
166             This is the command to build the threads. To change this just add a build_threads method in your script.
167              
168             $self->threads->run_on_wait(
169             sub {
170             $self->log->debug("** Queue full. Waiting for one process to end ...");
171             },
172             $self->twait,
173             );
174              
175             or
176              
177             package Main;
178             extends 'HPC::Runner::Threads';
179              
180             sub build_threads {
181              
182             $self->threads->run_on_wait(
183             sub {
184             $self->log->debug("** This is my custom message");
185             },
186             $self->twait,
187             );
188             }
189              
190             =cut
191              
192             sub build_threads{
193             my $self = shift;
194              
195             $self->threads->run_on_wait(
196             sub {
197             $self->log->debug("** Queue full. Waiting for one process to end ...");
198             },
199             $self->twait,
200             );
201             }
202              
203             1;
204              
205              
206             =head1 See Also
207              
208             L<Parallel::ForkManager>, L<HPC::Runner::GnuParallel>
209              
210             =head1 AUTHOR
211              
212             Jillian Rowe, C<< <jillian.e.rowe at gmail.com> >>
213              
214             =head1 BUGS
215              
216             Please report any bugs or feature requests to C<bug-runner-init at rt.cpan.org>, or through
217             the web interface at L<http://rt.cpan.org/NoAuth/ReportBug.html?Queue=HPC-Runner-Threads>. I will be notified, and then you'll
218             automatically be notified of progress on your bug as I make changes.
219              
220              
221             =head1 SUPPORT
222              
223             You can find documentation for this module with the perldoc command.
224              
225             perldoc HPC::Runner::Threads
226              
227              
228             You can also look for information at:
229              
230             =over 4
231              
232             =item * RT: CPAN's request tracker (report bugs here)
233              
234             L<http://rt.cpan.org/NoAuth/Bugs.html?Dist=HPC-Runner-Threads>
235              
236             =item * AnnoCPAN: Annotated CPAN documentation
237              
238             L<http://annocpan.org/dist/HPC-Runner-Threads>
239              
240             =item * CPAN Ratings
241              
242             L<http://cpanratings.perl.org/d/HPC-Runner-Threads>
243              
244             =item * Search CPAN
245              
246             L<http://search.cpan.org/dist/HPC-Runner-Threads/>
247              
248             =back
249              
250             =head1 Acknowledgements
251              
252             This module was originally developed at and for Weill Cornell Medical
253             College in Qatar within ITS Advanced Computing Team. With approval from
254             WCMC-Q, this information was generalized and put on github, for which
255             the authors would like to express their gratitude.
256              
257             =head1 LICENSE AND COPYRIGHT
258              
259             Copyright 2014 Weill Cornell Medical College Qatar.
260              
261             This program is free software; you can redistribute it and/or modify it
262             under the terms of the the Artistic License (2.0). You may obtain a
263             copy of the full license at:
264              
265             L<http://www.perlfoundation.org/artistic_license_2_0>
266              
267             Any use, modification, and distribution of the Standard or Modified
268             Versions is governed by this Artistic License. By using, modifying or
269             distributing the Package, you accept this license. Do not use, modify,
270             or distribute the Package, if you do not accept this license.
271              
272             If your Modified Version has been derived from a Modified Version made
273             by someone other than you, you are nevertheless required to ensure that
274             your Modified Version complies with the requirements of this license.
275              
276             This license does not grant you the right to use any trademark, service
277             mark, tradename, or logo of the Copyright Holder.
278              
279             This license includes the non-exclusive, worldwide, free-of-charge
280             patent license to make, have made, use, offer to sell, sell, import and
281             otherwise transfer the Package with respect to any patent claims
282             licensable by the Copyright Holder that are necessarily infringed by the
283             Package. If you institute patent litigation (including a cross-claim or
284             counterclaim) against any party alleging that the Package constitutes
285             direct or contributory patent infringement, then this Artistic License
286             to you shall terminate on the date that such litigation is filed.
287              
288             Disclaimer of Warranty: THE PACKAGE IS PROVIDED BY THE COPYRIGHT HOLDER
289             AND CONTRIBUTORS "AS IS' AND WITHOUT ANY EXPRESS OR IMPLIED WARRANTIES.
290             THE IMPLIED WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
291             PURPOSE, OR NON-INFRINGEMENT ARE DISCLAIMED TO THE EXTENT PERMITTED BY
292             YOUR LOCAL LAW. UNLESS REQUIRED BY LAW, NO COPYRIGHT HOLDER OR
293             CONTRIBUTOR WILL BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, OR
294             CONSEQUENTIAL DAMAGES ARISING IN ANY WAY OUT OF THE USE OF THE PACKAGE,
295             EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
296              
297              
298             =cut