File Coverage

blib/lib/Bioinfo/PBS/Queue.pm
Criterion Covered Total %
statement 21 55 38.1
branch 0 8 0.0
condition 0 3 0.0
subroutine 7 11 63.6
pod 2 3 66.6
total 30 80 37.5


line stmt bran cond sub pod time code
1             package Bioinfo::PBS::Queue;
2 1     1   62709 use Moose;
  1         357593  
  1         4  
3 1     1   5862 use Modern::Perl;
  1         2  
  1         8  
4 1     1   601 use Parallel::ForkManager;
  1         18338  
  1         25  
5 1     1   7 use IO::All;
  1         2  
  1         11  
6 1     1   58 use List::Util 'uniq';
  1         2  
  1         44  
7 1     1   252 use namespace::autoclean;
  1         6108  
  1         3  
8              
9             our $VERSION = '0.1.11'; # VERSION:
10             # ABSTRACT: used to submit a batch of task to Torque cluster
11              
12              
13             has tasks => (
14             is => 'rw',
15             isa => 'ArrayRef[Bioinfo::PBS]',
16             default => sub { [] },
17             traits => ['Array'],
18             handles => {
19             _add_tasks => 'push',
20             all_tasks => 'elements',
21             filter_tasks => 'grep',
22             count_tasks => 'count',
23             pop_tasks => 'pop',
24             },
25             );
26              
27              
28              
29             sub add_tasks {
30 0     0 1   my $self = shift;
31 0           my $type = ref $_[0];
32 0 0         if ($type eq 'Bioinfo::PBS') {
    0          
    0          
33 0           $self->_add_tasks(@_);
34             } elsif ($type eq 'Bioinfo::PBS::Queue') {
35 0           $self->_add_tasks($_[0]->all_tasks);
36             } elsif ($type eq 'HASH') {
37 1     1   416 use Bioinfo::PBS;
  1         3  
  1         614  
38 0           $self->_add_tasks(Bioinfo::PBS->new($_)) for @_;
39             }
40             }
41              
42             has name => (
43             is => 'rw',
44             isa => 'Str',
45             default => sub { 'pbs' },
46             );
47              
48             has parallel => (
49             is => 'rw',
50             isa => 'Int',
51             lazy => 1,
52             default => sub { shift->count_tasks }
53             );
54              
55             has _log => (
56             is => 'rw',
57             isa => 'IO::All',
58             default => sub { io(shift->name . ".log") },
59             lazy => 1,
60             );
61              
62             has run_queue => (
63             is => 'ro',
64             isa => 'ArrayRef[Bioinfo::PBS]',
65             default => sub { [] },
66             traits => ['Array'],
67             handles => {
68             run_queue_add => 'push',
69             run_queue_tasks => 'elements',
70             run_queue_count => 'count',
71             },
72             );
73              
74             has finished_queue => (
75             is => 'ro',
76             isa => 'ArrayRef[Bioinfo::PBS]',
77             default => sub { [] },
78             lazy => 1,
79             traits => ['Array'],
80             handles => {
81             finished_queue_add => 'push',
82             finished_queue_tasks => 'elements',
83             finished_queue_filter => 'grep',
84             },
85             );
86              
87             has stage => (
88             is => 'ro',
89             writer => "_set_writer",
90             isa => 'Int',
91             default => sub { '1' },
92             lazy => 1,
93             );
94              
95              
96             sub execute {
97 0     0 1   my $self = shift;
98 0           my @tasks = $self->all_tasks;
99 0           my $task_run_num = $self->parallel;
100 0           my @stages = uniq (map { $_->priority } @tasks);
  0            
101 0           my $content = "name\tcpu\tpriority\tsh_name\tjob_id\tstat\tcmd\n";
102 0           $self->_log->lock->append($content)->unlock;
103 0           for my $stage (@stages) {
104 0           $self->_log->lock->append("# Stage$stage: running\n")->unlock;
105 0           say "Stage $stage";
106 0     0     my @stage_tasks = $self->filter_tasks( sub {$_->priority == $stage} );
  0            
107 0   0       my $paralell_num = $task_run_num || ($#stage_tasks + 1);
108 0           my $pm = Parallel::ForkManager->new($paralell_num);
109             DATA_LOOP:
110 0           for my $task (@stage_tasks) {
111 0           sleep 1;
112 0           my ($name, $cpu, $cmd, $priority) = ($task->name, $task->cpu, $task->cmd, $task->priority);
113 0 0         my $pid = $pm->start and next DATA_LOOP;
114 0           say "$name will be submitted\n";
115 0           $task->qsub->wait;
116 0           say "$name finished\n";
117 0           my ($stat, $job_id, $sh_name) = ($task->job_stat, $task->job_id, $task->_sh_name);
118 0           $content = "$name\t$cpu\t$priority\t$sh_name\t$job_id\t$stat\t$cmd\n";
119             #say "$content";
120             #my $log_name = $self->name . ".log";
121             #io($log_name)->lock->append($content)->unlock;
122 0           $self->_log->lock->append($content)->unlock;
123 0           $pm->finish;
124             }
125 0           $pm->wait_all_children;
126 0           say "finished the whole project";
127             }
128             }
129              
130             sub log {
131 0     0 0   my ($self, $content) = @_;
132             }
133              
134             __PACKAGE__->meta->make_immutable;
135              
136             1;
137              
138             __END__
139              
140             =pod
141              
142             =encoding UTF-8
143              
144             =head1 NAME
145              
146             Bioinfo::PBS::Queue - used to submit a batch of task to Torque cluster
147              
148             =head1 VERSION
149              
150             version 0.1.11
151              
152             =head1 SYNOPSIS
153              
154             use Bioinfo::PBS;
155             use Bioinfo::PBS::Queue;
156             my $para = {
157             cpu => 2,
158             name => 'blast',
159             cmd => 'ls -alh; pwd',
160             };
161             my $pbs_obj = Bioinfo::PBS->new($para);
162              
163             # three tasks are running at the same time
164             my $queue_obj = Bioinfo::PBS::Queue->new(name => 'blastnr', parallel => 3);
165              
166             # all tasks will be running at the same time if parallel is not setted
167             my $queue_obj = Bioinfo::PBS::Queue->new(name => 'blastnr');
168             $queue_obj->add_tasks($pbs_obj);
169             $queue_obj->add_tasks($pbs_obj);
170             $queue_obj->execute;
171              
172             =head1 DESCRIPTION
173              
174             This module is created to simplify process of task submitting in PBS system,
175             and waiting for the finish of multiple tasks.
176              
177             =head1 ATTRIBUTES
178              
179             =head2 tasks
180              
181             cpu number that will apply
182              
183             =head2 add_tasks
184              
185             one or more object of Bioinfo::PBS can be added to queue.
186             if a Bioinfo::PBS::Queque be added, all its tasks are added.
187             if a hashref can be passed, a object of Bioinfo::PBS will be
188             created and added to this queue, for example:
189             {cpu =>2, name =>'blast', cmd=>"ls -alh", priority=>1}
190              
191             =head1 METHODS
192              
193             =head2 execute
194              
195             run all tasks in the queue by the order
196              
197             =head1 AUTHOR
198              
199             Yan Xueqing <yanxueqing621@163.com>
200              
201             =head1 COPYRIGHT AND LICENSE
202              
203             This software is copyright (c) 2017 by Yan Xueqing.
204              
205             This is free software; you can redistribute it and/or modify it under
206             the same terms as the Perl 5 programming language system itself.
207              
208             =cut