File Coverage

blib/lib/IPC/QWorker.pm
Criterion Covered Total %
statement 81 84 96.4
branch 7 10 70.0
condition 1 3 33.3
subroutine 14 14 100.0
pod 0 6 0.0
total 103 117 88.0


line stmt bran cond sub pod time code
1             package IPC::QWorker;
2              
3 12     12   63819 use 5.000;
  12         114  
4 12     12   60 use strict;
  12         24  
  12         251  
5 12     12   59 use warnings;
  12         24  
  12         463  
6 12     12   6817 use utf8;
  12         168  
  12         59  
7              
8             # ABSTRACT: processing a queue in parallel
9             our $VERSION = '0.08'; # VERSION
10              
11             our $DEBUG = 0;
12              
13 12     12   5817 use IO::Select;
  12         19900  
  12         618  
14              
15 12     12   4829 use IPC::QWorker::Worker;
  12         47  
  12         9181  
16              
17              
18             sub new {
19 11     11 0 143 my $this = shift;
20 11   33     77 my $class = ref($this) || $this;
21 11         121 my $self = {
22             '_workers' => [],
23             '_queue' => [],
24             '_pids' => {},
25             '_ready_workers' => [],
26             '_io_select' => IO::Select->new(),
27             @_
28             };
29              
30 11         176 bless( $self, $class );
31 11         33 return ($self);
32             }
33              
34             sub create_workers {
35 11     11 0 1342 my $self = shift();
36 11         22 my $num_workers = shift();
37 11         11 my $worker;
38              
39 11         55 for ( my $i = 0 ; $i < $num_workers ; $i++ ) {
40             # create the worker
41 65         10380 $worker = IPC::QWorker::Worker->new(@_);
42             # add him to the list of workers
43 55         541 push( @{ $self->{'_workers'} }, $worker);
  55         915  
44             # add him to the pid->workers index
45 55         1629 $self->{'_pids'}->{$worker->{'pid'}} = $worker;
46             # add him to IO::Select
47 55         1004 $self->{'_io_select'}->add( $worker->{'pipe'} );
48             }
49             }
50              
51             sub push_queue {
52 121     121 0 153 my $self = shift;
53              
54 121         136 push( @{ $self->{'_queue'} }, @_ );
  121         243  
55             }
56              
57             sub _get_ready_workers {
58 30     30   44 my $self = shift();
59 30         34 my $timeout = shift();
60 30         72 my @can_read_pipes;
61             my $i;
62 30         0 my $wpid;
63              
64             # if we have no ready workers find some
65 30         83 @can_read_pipes = $self->{'_io_select'}->can_read($timeout);
66 30 50       2355 if ($IPC::QWorker::DEBUG) {
67 0         0 print STDERR "found " . scalar(@can_read_pipes) . " ready workers!\n";
68             }
69 30         64 foreach $i (@can_read_pipes) {
70             # get pid from a msg like "12345 READY\n"
71 130         3642 ($wpid) = split(' ', readline($i));
72 130         419 $self->{'_pids'}->{$wpid}->{'ready'} = 1;
73 130         153 push(@{$self->{'_ready_workers'}}, $self->{'_pids'}->{$wpid});
  130         353  
74             }
75             }
76              
77             sub process_queue {
78 2     2 0 198 my $self = shift;
79 2         10 my $timeout = shift;
80 2         11 my $qentry;
81             my $worker;
82              
83 2 100       22 if(defined($timeout)) {
84             # if timeout is set wait for timeout till a worker is ready
85 1         6 $self->_get_ready_workers($timeout);
86 1         3 while($worker = shift(@{$self->{'_ready_workers'}})) {
  11         1285  
87 10         22 $worker->send_entry(shift(@{ $self->{'_queue'}}));
  10         40  
88             }
89             } else {
90             # loop over the Q till its empty
91             # will block while waiting for ready workers
92             # returns when the queue is empty
93 1         2 while($qentry = shift(@{ $self->{'_queue'}})) {
  111         9439  
94 110         176 while(!scalar(@{$self->{'_ready_workers'}})) {
  137         330  
95 27 50       47 if ($IPC::QWorker::DEBUG) {
96 0         0 print STDERR "no ready workers. wait for workers...\n";
97             }
98 27         60 $self->_get_ready_workers();
99             }
100              
101 110         139 $worker = shift(@{$self->{'_ready_workers'}});
  110         176  
102 110         301 $worker->send_entry($qentry);
103             }
104             }
105             }
106              
107             sub _get_busy_workers {
108 3     3   13 my $self = shift();
109 3         13 my @result;
110             my $worker;
111              
112 3         6 foreach $worker (@{$self->{'_workers'}}) {
  3         12  
113 30 100       56 if(!$worker->{'ready'}) {
114 7         10 push(@result, $worker);
115             }
116             }
117 3         44 return(@result);
118             }
119              
120             # will block till all workers are finished
121             sub flush_queue {
122 1     1 0 102 my $self = shift();
123 1         3 my @busy_workers;
124 1         21 my $select = IO::Select->new();
125              
126 1         17 while(scalar(@busy_workers = $self->_get_busy_workers())) {
127 2 50       6 if ($IPC::QWorker::DEBUG) {
128 0         0 print STDERR "still " . scalar(@busy_workers) . " busy workers...\n";
129             }
130 2         5 $self->_get_ready_workers();
131             }
132             }
133              
134             sub stop_workers {
135 1     1 0 131 my $self = shift;
136 1         4 my $worker;
137              
138             # may be we could also use signals here
139 1         2 foreach $worker ( @{ $self->{'_workers'} } ) {
  1         4  
140 10         191 $worker->exit_child();
141             }
142             }
143              
144             1;
145              
146             # vim:ts=2:expandtab:syntax=perl:
147              
148             __END__