File Coverage

blib/lib/MogileFS/Worker.pm
Criterion Covered Total %
statement 15 154 9.7
branch 0 68 0.0
condition 0 19 0.0
subroutine 5 24 20.8
pod 0 19 0.0
total 20 284 7.0


line stmt bran cond sub pod time code
1             package MogileFS::Worker;
2 21     21   136 use strict;
  21         39  
  21         1096  
3 21         233 use fields ('psock', # socket for parent/child communications
4             'last_bcast_state', # "{device|host}-$devid" => [$time, {alive|dead}]
5             'readbuf', # unparsed data from parent
6             'monitor_has_run', # true once we've heard of the monitor job being alive
7             'last_ping', # time we last said we're alive
8             'woken_up', # bool: if we've been woken up
9             'last_wake', # hashref: { $class -> time() } when we last woke up a certain job class
10             'queue_depth', # depth of a queue we queried
11             'queue_todo', # aref of hrefs of work sent from parent
12 21     21   119 );
  21         42  
13              
14 21     21   2440 use MogileFS::Util qw(error eurl decode_url_args apply_state_events);
  21         45  
  21         2689  
15 21     21   113 use MogileFS::Server;
  21         46  
  21         598  
16              
17             use vars (
18 21         58610 '$got_live_vs_die', # local'ized scalarref flag for whether we've
19             # gotten a live-vs-die instruction from parent
20 21     21   129 );
  21         40  
21              
22             sub new {
23 0     0 0   my ($self, $psock) = @_;
24 0 0         $self = fields::new($self) unless ref $self;
25              
26 0           $self->{psock} = $psock;
27 0           $self->{readbuf} = '';
28 0           $self->{last_bcast_state} = {};
29 0           $self->{monitor_has_run} = MogileFS::ProcManager->is_monitor_good;
30 0           $self->{last_ping} = 0;
31 0           $self->{last_wake} = {};
32 0           $self->{queue_depth} = {};
33 0           $self->{queue_todo} = {};
34              
35 0           IO::Handle::blocking($psock, 0);
36 0           return $self;
37             }
38              
39             sub psock_fd {
40 0     0 0   my $self = shift;
41 0           return fileno($self->{psock});
42             }
43              
44             sub psock {
45 0     0 0   my $self = shift;
46 0           return $self->{psock};
47             }
48              
49             sub validate_dbh {
50 0     0 0   return Mgd::validate_dbh();
51             }
52              
53             sub monitor_has_run {
54 0     0 0   my $self = shift;
55 0 0         return $self->{monitor_has_run} ? 1 : 0;
56             }
57              
58             sub forget_that_monitor_has_run {
59 0     0 0   my $self = shift;
60 0           $self->{monitor_has_run} = 0;
61             }
62              
63             sub wait_for_monitor {
64 0     0 0   my $self = shift;
65 0           while (! $self->monitor_has_run) {
66 0           $self->read_from_parent(1);
67 0           $self->still_alive;
68             }
69             }
70              
71             # method that workers can call just to write something to the parent, so worker
72             # doesn't get killed. (during idle/slow operation, say)
73             # returns current time, so caller can avoid a time() call as well, for its loop
74             sub still_alive {
75 0     0 0   my $self = shift;
76 0           my $now = time();
77 0 0         if ($now > $self->{last_ping} + ($self->watchdog_timeout / 4)) {
78 0           $self->send_to_parent(":still_alive"); # a no-op, just for the watchdog
79 0           $self->{last_ping} = $now;
80             }
81 0           return $now;
82             }
83              
84             sub send_to_parent {
85 0     0 0   my $self = shift;
86              
87             # can be called as package method: MogileFS::Worker->send_to_parent...
88 0 0         unless (ref $self) {
89 0 0         $self = MogileFS::ProcManager->is_child
90             or return;
91             }
92              
93 0           my $write = "$_[0]\r\n";
94 0           my $totallen = length $write;
95 0           my $rv = syswrite($self->{psock}, $write);
96 0 0 0       return 1 if defined $rv && $rv == $totallen;
97 0 0 0       die "Error writing to parent process: $!" if $! && ! $!{EAGAIN};
98              
99 0   0       $rv ||= 0; # could've been undef, if EAGAIN immediately.
100 0           my $remain = $totallen - $rv;
101 0           my $offset = $rv;
102 0           while ($remain > 0) {
103 0 0         MogileFS::Util::wait_for_writeability(fileno($self->{psock}), 30)
104             or die "Parent not writable in 30 seconds";
105              
106 0           $rv = syswrite($self->{psock}, $write, $remain, $offset);
107 0 0 0       die "Error writing to parent process (in loop): $!" if $! && ! $!{EAGAIN};
108 0 0         if ($rv) {
109 0           $remain -= $rv;
110 0           $offset += $rv;
111             }
112             }
113 0 0         die "remain is negative: $remain" if $remain < 0;
114 0           return 1;
115             }
116              
117             # override in children
118             sub watchdog_timeout {
119 0     0 0   return 10;
120             }
121              
122             # should be overridden by workers to process worker-specific directives
123             # from the parent process. return 1 if you recognize the command, 0 otherwise.
124             sub process_line {
125 0     0 0   my ($self, $lineref) = @_;
126 0           return 0;
127             }
128              
129             sub read_from_parent {
130 0     0 0   my $self = shift;
131 0   0       my $timeout = shift || 0;
132 0           my $psock = $self->{psock};
133              
134             # while things are immediately available,
135             # (or optionally sleep a bit)
136 0           while (MogileFS::Util::wait_for_readability(fileno($psock), $timeout)) {
137 0           $timeout = 0; # only wait on the timeout for the first read.
138 0           my $buf;
139 0           my $rv = sysread($psock, $buf, Mgd::UNIX_RCVBUF_SIZE());
140 0 0         if (!$rv) {
141 0 0         if (defined $rv) {
142 0           die "While reading pipe from parent, got EOF. Parent's gone. Quitting.\n";
143             } else {
144 0           die "Error reading pipe from parent: $!\n";
145             }
146             }
147              
148 0 0         if ($Mgd::POST_SLEEP_DEBUG) {
149 0           my $out = $buf;
150 0           $out =~ s/\s+$//;
151 0           warn "proc ${self}[$$] read: [$out]\n"
152             }
153 0           $self->{readbuf} .= $buf;
154              
155 0           while ($self->{readbuf} =~ s/^(.+?)\r?\n//) {
156 0           my $line = $1;
157              
158 0 0         next if $self->process_generic_command(\$line);
159 0           my $ok = $self->process_line(\$line);
160 0 0         unless ($ok) {
161 0           error("Unrecognized command from parent: $line");
162             }
163             }
164             }
165             }
166              
167             sub parent_ping {
168 0     0 0   my $self = shift;
169 0           my $psock = $self->{psock};
170 0           $self->send_to_parent(':ping');
171              
172 0           my $got_reply = 0;
173 0 0         die "recursive parent_ping!" if $got_live_vs_die;
174 0           local $got_live_vs_die = \$got_reply;
175              
176 0           my $loops = 0;
177              
178 0           while (!$got_reply) {
179 0           $self->read_from_parent;
180 0 0         return if $got_reply;
181              
182 0           $loops++;
183 0           select undef, undef, undef, 0.20;
184 0 0         if ($loops > 5) {
185 0           warn "No simple reply from parent to child $self [$$] in $loops 0.2second loops.\n";
186 0 0         die "No answer in 4 seconds from parent to child $self [$$], dying" if $loops > 20;
187             }
188             }
189             }
190              
191             # tries to parse generic (not job-specific) commands sent from parent
192             # to child. returns 1 on success, or 0 if command given isn't generic,
193             # and child should parse.
194             # lineref doesn't have \r\n at end.
195             sub process_generic_command {
196 0     0 0   my ($self, $lineref) = @_;
197 0 0         return 0 unless $$lineref =~ /^:/; # all generic commands start with colon
198              
199 0 0         if ($$lineref =~ /^:shutdown/) {
200 0 0         $$got_live_vs_die = 1 if $got_live_vs_die;
201 0           exit 0;
202             }
203              
204 0 0         if ($$lineref =~ /^:stay_alive/) {
205 0 0         $$got_live_vs_die = 1 if $got_live_vs_die;
206 0           return 1;
207             }
208              
209 0 0         if ($$lineref =~ /^:monitor_events/) {
210 0           apply_state_events($lineref);
211 0           return 1;
212             }
213              
214 0 0         if ($$lineref =~ /^:monitor_has_run/) {
215 0           $self->{monitor_has_run} = 1;
216 0           return 1;
217             }
218              
219 0 0         if ($$lineref =~ /^:wake_up/) {
220 0           $self->{woken_up} = 1;
221 0           return 1;
222             }
223              
224 0 0         if ($$lineref =~ /^:set_config_from_parent (\S+) (.+)/) {
225             # the 'no_broadcast' API keeps us from looping forever.
226 0           MogileFS::Config->set_config_no_broadcast($1, $2);
227 0           return 1;
228             }
229              
230             # queue_name depth
231 0 0         if ($$lineref =~ /^:queue_depth (\w+) (\d+)/) {
232 0           $self->queue_depth($1, $2);
233 0           return 1;
234             }
235              
236             # queue_name encoded_item
237 0 0         if ($$lineref =~ /^:queue_todo (\w+) (.+)/) {
238             # TODO: Use the accessor.
239 0           push(@{$self->{queue_todo}->{$1}}, decode_url_args(\$2));
  0            
240 0           return 1;
241             }
242              
243             # TODO: warn on unknown commands?
244              
245 0           return 0;
246             }
247              
248             sub queue_depth {
249 0     0 0   my MogileFS::Worker $self = shift;
250 0           my $type = shift;
251 0   0       $self->{queue_depth}->{$type} ||= 0;
252 0 0         return $self->{queue_depth}->{$type} unless @_;
253 0           return $self->{queue_depth}->{$type} = shift;
254             }
255              
256             sub queue_todo {
257 0     0 0   my MogileFS::Worker $self = shift;
258 0           my $type = shift;
259 0   0       $self->{queue_todo}->{$type} ||= [];
260 0 0         push(@{$self->{queue_todo}->{$type}}, @_) if @_;
  0            
261 0           return $self->{queue_todo}->{$type};
262             }
263              
264             sub was_woken_up {
265 0     0 0   my MogileFS::Worker $self = shift;
266 0           return $self->{woken_up};
267             }
268              
269             sub forget_woken_up {
270 0     0 0   my MogileFS::Worker $self = shift;
271 0           $self->{woken_up} = 0;
272             }
273              
274             # don't wake processes more than once a second... not necessary.
275             sub wake_a {
276 0     0 0   my ($self, $class) = @_;
277 0           my $now = time();
278 0 0 0       return if ($self->{last_wake}{$class}||0) == $now;
279 0           $self->{last_wake}{$class} = $now;
280 0           $self->send_to_parent(":wake_a $class");
281             }
282              
283             1;
284              
285             # Local Variables:
286             # mode: perl
287             # c-basic-indent: 4
288             # indent-tabs-mode: nil
289             # End:
290