File Coverage

blib/lib/HPC/Runner/Threads.pm
Criterion Covered Total %
statement 13 15 86.6
branch n/a
condition n/a
subroutine 5 5 100.0
pod n/a
total 18 20 90.0


line stmt bran cond sub pod time code
1             package HPC::Runner::Threads;
2              
3 1     1   26619 use IPC::Open3;
  1         5678  
  1         81  
4 1     1   718 use IO::Select;
  1         2250  
  1         70  
5 1     1   9 use Symbol;
  1         7  
  1         75  
6 1     1   1021 use Data::Dumper;
  1         12346  
  1         107  
7 1     1   1352 use Parallel::ForkManager;
  0            
  0            
8             use Log::Log4perl qw(:easy);
9             use DateTime;
10             use DateTime::Format::Duration;
11             use Cwd;
12             use Moose;
13             use Moose::Util::TypeConstraints;
14              
15             extends 'HPC::Runner';
16              
17             =head1 NAME
18              
19             HPC::Runner::Threads - Job submission using threads
20              
21             =head1 VERSION
22              
23             Version 0.01
24              
25             =cut
26              
27             our $VERSION = '2.34';
28              
29             =head1 SYNOPSIS
30              
31             Use Parallel::ForkManager to run arbitrary bash commands
32              
33             =head1 Variables
34              
35             =cut
36              
37             =head2 twait
38              
39             How frequently to test for a thread having exited the queue in seconds. Defaults to once every 60 seconds. If your jobs are very fast, you may want to decrease this number, or vice versa if they are very long.
40              
41             =cut
42              
43             has 'twait' => (
44             is => 'rw',
45             isa => 'Int',
46             lazy => 1,
47             required => 1,
48             default => 60,
49             documentation => q{How frequently to test for a thread having exited the queue in seconds. Defaults to once every 60 seconds.}
50             );
51              
52             =head2 threads
53              
54             This uses Parallel::ForkManager to deploy the threads. If you wish to use something else you must redefine it here.
55              
56             =cut
57              
58             has 'threads' => (
59             traits => ['NoGetopt'],
60             is => 'rw',
61             lazy => 1,
62             default => sub {
63             my $self = shift;
64             return new Parallel::ForkManager($self->procs);
65             }
66             );
67              
68             =head1 SUBROUTINES/METHODS
69              
70             =cut
71              
72             =head2 go
73              
74             This is the main application. It starts the logging, build the threads, parses the file, runs the commands, and finishes logging.
75              
76             =cut
77              
78             sub go {
79             my $self = shift;
80              
81             my $dt1 = DateTime->now();
82              
83             $self->prepend_logfile("MAIN_");
84             $self->log($self->init_log);
85              
86             #Threads specific
87             build_threads($self);
88              
89             $self->parse_file_threads;
90             #End threads specific
91              
92             my $dt2 = DateTime->now();
93             my $duration = $dt2 - $dt1;
94             my $format = DateTime::Format::Duration->new(
95             pattern => '%Y years, %m months, %e days, %H hours, %M minutes, %S seconds'
96             );
97              
98             $self->log->info("Total execution time ".$format->format_duration($duration));
99             }
100              
101             =head2 parse_file_threads
102              
103             Parse the file of commands and send each command off to the queue.
104              
105             #TODO
106             #Merge mce/threads subroutines
107              
108             =cut
109              
110              
111             sub parse_file_threads{
112             my $self = shift;
113              
114             my $fh = IO::File->new( $self->infile, q{<} ) or $self->log->fatal("Error opening file ".$self->infile." ".$!); # even better!
115              
116             my $cmd;
117             while(<$fh>){
118             my $line = $_;
119             next unless $line;
120             next unless $line =~ m/\S/;
121             next if $line =~ m/^#/;
122              
123             if($self->has_cmd){
124             $self->add_cmd($line);
125             if($line =~ m/\\$/){
126             next;
127             }
128             else{
129             $self->log->info("Enqueuing command:\n".$self->cmd);
130             $self->run_command_threads;
131             $self->clear_cmd;
132             $self->inc_counter;
133             }
134             }
135             else{
136             $self->cmd($line);
137             if($line =~ m/\\$/){
138             next;
139             }
140             elsif( $self->match_cmd(qr/^wait$/) ){
141             $self->log->info("Beginning command:\n".$self->cmd);
142             $self->log->info("Waiting for all children to exit...");
143             $self->clear_cmd;
144             $self->threads->wait_all_children;
145             $self->log->info("All children are out of the pool!");
146             $self->inc_counter;
147              
148             }
149             else{
150             $self->log->info("Enqueuing command:\n".$self->cmd);
151             $self->run_command_threads;
152             $self->clear_cmd;
153             $self->inc_counter;
154             }
155             }
156             }
157              
158             $self->threads->wait_all_children;
159              
160             }
161              
162             =head2 build_threads
163              
164             This is the command to build the threads. To change this just add a build_threads method in your script.
165              
166             $self->threads->run_on_wait(
167             sub {
168             $self->log->debug("** Queue full. Waiting for one process to end ...");
169             },
170             $self->twait,
171             );
172              
173             or
174              
175             package Main;
176             extends 'HPC::Runner::Threads';
177              
178             sub build_threads {
179              
180             $self->threads->run_on_wait(
181             sub {
182             $self->log->debug("** This is my custom message");
183             },
184             $self->twait,
185             );
186             }
187              
188             =cut
189              
190             sub build_threads{
191             my $self = shift;
192              
193             $self->threads->run_on_wait(
194             sub {
195             $self->log->debug("** Queue full. Waiting for one process to end ...");
196             },
197             $self->twait,
198             );
199             }
200              
201             1;
202              
203              
204             =head1 See Also
205              
206             Parallel::ForkManager
207              
208             =head1 AUTHOR
209              
210             Jillian Rowe, C<< >>
211              
212             =head1 BUGS
213              
214             Please report any bugs or feature requests to C, or through
215             the web interface at L. I will be notified, and then you'll
216             automatically be notified of progress on your bug as I make changes.
217              
218              
219             =head1 SUPPORT
220              
221             You can find documentation for this module with the perldoc command.
222              
223             perldoc HPC::Runner::Threads
224              
225              
226             You can also look for information at:
227              
228             =over 4
229              
230             =item * RT: CPAN's request tracker (report bugs here)
231              
232             L
233              
234             =item * AnnoCPAN: Annotated CPAN documentation
235              
236             L
237              
238             =item * CPAN Ratings
239              
240             L
241              
242             =item * Search CPAN
243              
244             L
245              
246             =back
247              
248              
249             =head1 ACKNOWLEDGEMENTS
250              
251             This module was originally developed at and for Weill Cornell Medical College in Qatar. With approval from WCMC-Q, this information was generalized and put on github, for which the authors would like to express their gratitude.
252              
253             =head1 LICENSE AND COPYRIGHT
254              
255             Copyright 2014 Jillian Rowe.
256              
257             This program is free software; you can redistribute it and/or modify it
258             under the terms of the the Artistic License (2.0). You may obtain a
259             copy of the full license at:
260              
261             L
262              
263             Any use, modification, and distribution of the Standard or Modified
264             Versions is governed by this Artistic License. By using, modifying or
265             distributing the Package, you accept this license. Do not use, modify,
266             or distribute the Package, if you do not accept this license.
267              
268             If your Modified Version has been derived from a Modified Version made
269             by someone other than you, you are nevertheless required to ensure that
270             your Modified Version complies with the requirements of this license.
271              
272             This license does not grant you the right to use any trademark, service
273             mark, tradename, or logo of the Copyright Holder.
274              
275             This license includes the non-exclusive, worldwide, free-of-charge
276             patent license to make, have made, use, offer to sell, sell, import and
277             otherwise transfer the Package with respect to any patent claims
278             licensable by the Copyright Holder that are necessarily infringed by the
279             Package. If you institute patent litigation (including a cross-claim or
280             counterclaim) against any party alleging that the Package constitutes
281             direct or contributory patent infringement, then this Artistic License
282             to you shall terminate on the date that such litigation is filed.
283              
284             Disclaimer of Warranty: THE PACKAGE IS PROVIDED BY THE COPYRIGHT HOLDER
285             AND CONTRIBUTORS "AS IS' AND WITHOUT ANY EXPRESS OR IMPLIED WARRANTIES.
286             THE IMPLIED WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
287             PURPOSE, OR NON-INFRINGEMENT ARE DISCLAIMED TO THE EXTENT PERMITTED BY
288             YOUR LOCAL LAW. UNLESS REQUIRED BY LAW, NO COPYRIGHT HOLDER OR
289             CONTRIBUTOR WILL BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, OR
290             CONSEQUENTIAL DAMAGES ARISING IN ANY WAY OUT OF THE USE OF THE PACKAGE,
291             EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
292              
293              
294             =cut
295              
296             # End of Runner::Threads