File Coverage

blib/lib/IRC/Indexer/Trawl/Forking.pm
Criterion Covered Total %
statement 106 144 73.6
branch 22 56 39.2
condition 3 15 20.0
subroutine 22 28 78.5
pod 0 18 0.0
total 153 261 58.6


line stmt bran cond sub pod time code
1             package IRC::Indexer::Trawl::Forking;
2              
3             ## Object and session to handle a single forked trawler.
4             ## This is mostly intended for ircindexer-server-json.
5              
6             ## Provide compatible methods w/ Bot::Trawl
7             ## Other layers can use this with the same interface.
8              
9 2     2   10176 use 5.10.1;
  2         8  
  2         101  
10 2     2   11 use strict;
  2         5  
  2         73  
11 2     2   11 use warnings;
  2         4  
  2         59  
12 2     2   18 use Carp;
  2         10  
  2         147  
13              
14 2     2   10 use Config;
  2         4  
  2         89  
15              
16 2     2   11 use POE qw/Wheel::Run Filter::Reference/;
  2         4  
  2         38  
17              
18 2     2   94990 use Time::HiRes;
  2         4  
  2         22  
19              
20 2     2   749 use IRC::Indexer::Report::Server;
  2         4  
  2         5850  
21              
22             require IRC::Indexer::Process::Trawler;
23              
24             ## Trawl::Bot compat:
25              
26             sub new {
27 2     2 0 5144 my $self = {};
28 2         5 my $class = shift;
29 2         7 bless $self, $class;
30            
31 2         13 $self->{sessid} = undef;
32            
33 2         10 $self->{wheels}->{by_pid} = {};
34 2         8 $self->{wheels}->{by_wid} = {};
35            
36             ## Grab and save same opts as Bot::Trawl
37 2         8 my %args = @_;
38 2         18 $args{lc $_} = delete $args{$_} for keys %args;
39              
40 2 100 66     20 $self->{POST} = delete $args{postback}
41             if $args{postback} and ref $args{postback};
42            
43 2         7 $self->{TrawlerOpts} = \%args;
44            
45 2 50       10 croak "No Server specified in new()"
46             unless $self->{TrawlerOpts}->{server};
47              
48             ## This should get replaced later:
49 2         14 $self->{ReportObj} = IRC::Indexer::Report::Server->new();
50            
51 2         6 return $self
52             }
53              
54             sub spawn {
55             ## POE-compat constructor
56 0     0 0 0 my ($pkg, %opts) = @_;
57 0 0       0 croak "cannot use spawn() interface without a postback"
58             unless $opts{postback};
59 0         0 my $self = $pkg->new(%opts);
60 0         0 $self->run();
61 0         0 return $self->{sessid}
62             }
63              
64             sub run {
65 1     1 0 997 my ($self) = @_;
66             ## Create POE session to manage forked Bot::Trawl
67            
68 1         12 my $sess = POE::Session->create(
69             object_states => [
70             $self => [ qw/
71             _start
72             _stop
73             shutdown
74            
75             sess_sig_int
76            
77             tr_sig_chld
78            
79             tr_input
80             tr_error
81             tr_stderr
82             / ],
83             ],
84             );
85              
86 1         630 $self->{sessid} = $sess->ID;
87 1         32 return $self
88             }
89              
90 2     2 0 23 sub trawler_for { return $_[0]->{TrawlerOpts}->{server} }
91              
92 2     2 0 14 sub ID { return $_[0]->{sessid} }
93              
94             sub done {
95 4     4 0 1945 my ($self, $finished) = @_;
96            
97 4 100       16 if ($finished) {
98 1         7 $self->report->status('DONE');
99 1         3 $self->report->finishedat(time);
100              
101 1 50       9 if (my $postback = delete $self->{POST}) {
102             ## Send ourself in a postback.
103 1         38 $postback->($self);
104             }
105              
106             }
107            
108 4 50       157 return unless ref $self->report;
109 4 50 33     10 return unless defined $self->report->status
110             and $self->report->status ~~ [qw/DONE FAIL/];
111 4         16 return $self->report->status
112             }
113              
114             sub failed {
115 2     2 0 5 my ($self, $reason) = @_;
116            
117 2 100       9 if ($reason) {
118 1 50       2 unless (ref $self->report) {
119 0         0 $self->report( IRC::Indexer::Report::Server->new() );
120 0         0 $self->report->connectedto( $self->trawler_for );
121             }
122 1         7 $self->report->status('FAIL');
123 1         3 $self->report->failed($reason);
124 1         3 $self->report->finishedat(time);
125            
126 1 50       5 if (my $postback = delete $self->{POST}) {
127 0         0 $postback->($self);
128             }
129            
130             } else {
131 1 50       4 return unless ref $self->report;
132 1 50       5 return unless $self->report->status eq 'FAIL';
133             }
134            
135 2         6 return $self->report->failed
136             }
137              
138             sub dump {
139 0     0 0 0 my ($self) = @_;
140              
141 0 0       0 return unless ref $self->report;
142 0 0       0 return unless $self->report->status ~~ [ qw/DONE FAIL/ ];
143 0         0 return $self->report->netinfo
144             }
145              
146 28     28 0 132 sub report { info(@_) }
147             sub info {
148 28     28 0 42 my ($self, $reportobj) = @_;
149 28 50       50 $self->{ReportObj} = $reportobj if ref $reportobj;
150 28         120 return $self->{ReportObj}
151             }
152              
153              
154             ## POE:
155             sub _stop {
156 1     1   246 $_[OBJECT]->kill_all;
157             }
158              
159             sub sess_sig_int {
160 0     0 0 0 $_[OBJECT]->kill_all;
161             }
162              
163             sub shutdown {
164 1     1 0 493 $_[OBJECT]->kill_all;
165             }
166              
167             sub kill_all {
168 2     2 0 3 my ($self) = @_;
169 2         3 for my $pidof (keys %{ $self->{wheels}->{by_pid} }) {
  2         10  
170 0         0 my $wheel = delete $self->{wheels}->{by_pid}->{$pidof};
171 0 0       0 if (ref $wheel) {
172 0         0 $wheel->kill(9);
173             }
174             }
175 2         6 delete $self->{wheels};
176              
177 2 50       6 $self->failed("Terminated early") unless $self->done;
178             }
179              
180             sub _start {
181 1     1   217 my ($self, $kernel) = @_[OBJECT, KERNEL];
182            
183 1         6 $kernel->sig('INT', 'sess_sig_int');
184 1         52 $kernel->sig('TERM', 'sess_sig_int');
185            
186 1         39 $self->{sessid} = $_[SESSION]->ID();
187            
188 1         22 my $perlpath = $Config{perlpath};
189 1 50       74 if ($^O ne 'VMS') {
190 1 50       6 $perlpath .= $Config{_exe}
191             unless $perlpath =~ m/$Config{_exe}$/i;
192             }
193            
194 1         156 my $forkable;
195 1 50       6 if ($^O eq 'MSWin32') {
196 0         0 $forkable = \&IRC::Indexer::Process::Trawler::worker;
197             } else {
198 11         53 $forkable = [
199 1         3 $perlpath, (map { "-I$_" } @INC),
200             '-MIRC::Indexer::Process::Trawler', '-e',
201             'IRC::Indexer::Process::Trawler->worker()'
202             ];
203             }
204            
205 1         13 my $wheel = POE::Wheel::Run->new(
206             Program => $forkable,
207             ErrorEvent => 'tr_error',
208             StdoutEvent => 'tr_input',
209             StderrEvent => 'tr_stderr',
210             CloseEvent => 'tr_closed',
211             StdioFilter => POE::Filter::Reference->new(),
212             );
213            
214 1         7639 my $wheelid = $wheel->ID;
215 1         45 my $pidof = $wheel->PID;
216            
217 1         33 $kernel->sig_child($pidof, 'tr_sig_chld');
218              
219 1         278 $self->{wheels}->{by_pid}->{$pidof} = $wheel;
220 1         13 $self->{wheels}->{by_wid}->{$wheelid} = $wheel;
221              
222             ## Feed this worker the trawler conf.
223 1         10 my $trawlercf = $self->{TrawlerOpts};
224 1         38 my $item = [ $self->trawler_for, $trawlercf ];
225 1         23 $wheel->put($item);
226             }
227              
228             sub tr_input {
229 1     1 0 424242 my ($self, $kernel) = @_[OBJECT, KERNEL];
230 1         4 my $input = $_[ARG0];
231              
232             ## Received report->clone()'d hash
233              
234 1         4 my ($server, $info_h) = @$input;
235 1 50       16 unless (ref $info_h eq 'HASH') {
236 0         0 croak "tr_input received invalid input from worker";
237             }
238              
239             ## Re-create Report::Server obj
240 1         20 my $report = IRC::Indexer::Report::Server->new(
241             FromHash => $info_h,
242             );
243            
244 1         4 $self->{ReportObj} = $report;
245             ## We're finished.
246 1         11 $self->done(1);
247 1 50       11 $self->failed( $info_h->{Failure} ) if $info_h->{Failure};
248 1         21 delete $self->{wheels};
249             }
250              
251             sub tr_error {
252             ## these should sigchld and go away
253 0     0 0 0 my ($self, $kernel) = @_[OBJECT, KERNEL];
254 0         0 my ($op, $num, $str, $wid) = @_[ARG0 .. $#_];
255 0         0 my $wheel = $self->{wheels}->{by_wid}->{$wid};
256 0 0       0 my $pidof = $wheel->PID if ref $wheel;
257 0         0 warn "worker err, probably harmless: $self->trawler_for $wid err: $op"
258             ." $num $str\n";
259             }
260              
261             sub tr_stderr {
262 0     0 0 0 my ($self, $kernel) = @_[OBJECT, KERNEL];
263 0         0 my ($err, $id) = @_[ARG0, ARG1];
264             ## Report failed() and clean up
265 0         0 warn "Worker err: $err";
266 0 0 0     0 $self->failed("Worker: SIGCHLD")
267             unless $self->done or $self->failed;
268             }
269              
270             sub tr_sig_chld {
271 1     1 0 656 my ($self, $kernel) = @_[OBJECT, KERNEL];
272             ## Worker's gone
273            
274 1         3 my $pidof = $_[ARG1];
275              
276 1         4 my $wheel = delete $self->{wheels}->{by_pid}->{$pidof};
277 1 50       7 return unless ref $wheel;
278            
279 0           my $wheelid = $wheel->ID;
280 0           delete $self->{wheels}->{by_wid}->{$wheelid};
281              
282 0 0 0       $self->failed("Worker: SIGCHLD")
283             unless $self->done or $self->failed;
284             }
285              
286             sub tr_closed {
287 0     0 0   my ($self, $kernel) = @_[OBJECT, KERNEL];
288 0           my $wheelid = $_[ARG0];
289 0           my $wheel = delete $self->{wheels}->{by_wid}->{$wheelid};
290 0 0         if (ref $wheel) {
291 0 0 0       $self->failed("Worker closed output")
292             unless $self->done or $self->failed;
293 0           my $pidof = $wheel->PID;
294 0           delete $self->{wheels}->{by_pid}->{$pidof};
295 0           $wheel->kill(9);
296             }
297             }
298              
299             1;
300             __END__