File Coverage

blib/lib/MogileFS/ProcManager.pm
Criterion Covered Total %
statement 21 297 7.0
branch 0 142 0.0
condition 0 45 0.0
subroutine 7 54 12.9
pod 0 45 0.0
total 28 583 4.8


line stmt bran cond sub pod time code
1             package MogileFS::ProcManager;
2 7     7   58 use strict;
  7         14  
  7         235  
3 7     7   38 use warnings;
  7         14  
  7         252  
4 7     7   36 use POSIX qw(:sys_wait_h sigprocmask SIGINT SIG_BLOCK SIG_UNBLOCK);
  7         13  
  7         69  
5 7     7   1848 use Symbol;
  7         12  
  7         502  
6 7     7   82 use Socket;
  7         16  
  7         6425  
7 7     7   4396 use MogileFS::Connection::Client;
  7         24  
  7         229  
8 7     7   4958 use MogileFS::Connection::Worker;
  7         27  
  7         39513  
9              
10             # This class handles keeping lists of workers and clients and
11             # assigning them to eachother when things happen. You don't actually
12             # instantiate a procmanager. the class itself holds all state.
13              
14             # Mappings: fd => [ clientref, jobstring, starttime ]
15             # queues are just lists of Client class objects
16             # ChildrenByJob: job => { pid => $client }
17             # ErrorsTo: fid => Client
18             # RecentQueries: [ string, string, string, ... ]
19             # Stats: element => number
20             our ($IsChild, @RecentQueries,
21             %Mappings, %ChildrenByJob, %ErrorsTo, %Stats);
22              
23             our $starttime = time(); # time we got going
24 0     0 0   sub server_starttime { return $starttime }
25              
26             my @IdleQueryWorkers; # workers that are idle, able to process commands (MogileFS::Worker::Query, ...)
27             my @PendingQueries; # [ MogileFS::Connection::Client, "$ip $query" ]
28              
29             $IsChild = 0; # either false if we're the parent, or a MogileFS::Worker object
30              
31             # keep track of what all child pids are doing, and what jobs are being
32             # satisifed.
33             my %child = (); # pid -> MogileFS::Connection::Worker
34             my %todie = (); # pid -> 1 (lists pids that we've asked to die)
35             my %jobs = (); # jobname -> [ min, current ]
36              
37             our $allkidsup = 0; # if true, all our kids are running. set to 0 when a kid dies.
38              
39             my @prefork_cleanup; # subrefs to run to clean stuff up before we make a new child
40              
41             *error = \&Mgd::error;
42              
43             my %dev_util; # devid -> utilization
44             my $last_util_spray = 0; # time we lost spread %dev_util to children
45              
46             my $nowish; # updated approximately once per second
47              
48             sub push_pre_fork_cleanup {
49 0     0 0   my ($class, $code) = @_;
50 0           push @prefork_cleanup, $code;
51             }
52              
53             sub RecentQueries {
54 0     0 0   return @RecentQueries;
55             }
56              
57             sub write_pidfile {
58 0     0 0   my $class = shift;
59 0 0         my $pidfile = MogileFS->config("pidfile")
60             or return 1;
61 0           my $fh;
62 0 0         unless (open($fh, ">$pidfile")) {
63 0           Mgd::log('err', "couldn't create pidfile '$pidfile': $!");
64 0           return 0;
65             }
66 0 0 0       unless ((print $fh "$$\n") && close($fh)) {
67 0           Mgd::log('err', "couldn't write into pidfile '$pidfile': $!");
68 0           remove_pidfile();
69 0           return 0;
70             }
71 0           return 1;
72             }
73              
74             sub remove_pidfile {
75 0     0 0   my $class = shift;
76 0 0         my $pidfile = MogileFS->config("pidfile")
77             or return;
78 0           unlink $pidfile;
79 0           return 1;
80             }
81              
82             sub set_min_workers {
83 0     0 0   my ($class, $job, $min) = @_;
84 0   0       $jobs{$job} ||= [undef, 0]; # [min, current]
85 0           $jobs{$job}->[0] = $min;
86              
87             # TODO: set allkipsup false, so spawner re-checks?
88             }
89              
90             sub job_to_class_suffix {
91 0     0 0   my ($class, $job) = @_;
92             return {
93 0           fsck => "Fsck",
94             queryworker => "Query",
95             delete => "Delete",
96             replicate => "Replicate",
97             reaper => "Reaper",
98             monitor => "Monitor",
99             }->{$job};
100             }
101              
102             sub job_to_class {
103 0     0 0   my ($class, $job) = @_;
104 0 0         my $suffix = $class->job_to_class_suffix($job) or return "";
105 0           return "MogileFS::Worker::$suffix";
106             }
107              
108             sub child_pids {
109 0     0 0   return keys %child;
110             }
111              
112             sub WatchDog {
113 0     0 0   foreach my $pid (keys %child) {
114 0           my MogileFS::Connection::Worker $child = $child{$pid};
115 0           my $healthy = $child->watchdog_check;
116 0 0         next if $healthy;
117              
118 0           error("Watchdog killing worker $pid (" . $child->job . ")");
119 0           kill 9, $pid;
120             }
121             }
122              
123             # returns a sub that Danga::Socket calls after each event loop round.
124             # the sub must return 1 for the program to continue running.
125             sub PostEventLoopChecker {
126 0     0 0   my $lastspawntime = 0; # time we last ran spawn_children sub
127              
128             return sub {
129             # run only once per second
130 0     0     $nowish = time();
131 0 0         return 1 unless $nowish > $lastspawntime;
132 0           $lastspawntime = $nowish;
133              
134 0           MogileFS::ProcManager->WatchDog;
135              
136             # see if anybody has died, but don't hang up on doing so
137 0           my $pid = waitpid -1, WNOHANG;
138 0 0 0       return 1 if $pid <= 0 && $allkidsup;
139 0           $allkidsup = 0; # know something died
140              
141             # when a child dies, figure out what it was doing
142             # and note that job has one less worker
143 0           my $jobconn;
144 0 0 0       if ($pid > -1 && ($jobconn = delete $child{$pid})) {
145 0           my $job = $jobconn->job;
146 0 0         my $extra = $todie{$pid} ? "expected" : "UNEXPECTED";
147 0           error("Child $pid ($job) died: $? ($extra)");
148 0           MogileFS::ProcManager->NoteDeadChild($pid);
149 0           $jobconn->close;
150              
151 0 0         if (my $jobstat = $jobs{$job}) {
152             # if the pid is in %todie, then we have asked it to shut down
153             # and have already decremented the jobstat counter and don't
154             # want to do it again
155 0 0         unless (my $true = delete $todie{$pid}) {
156             # decrement the count of currently running jobs
157 0           $jobstat->[1]--;
158             }
159             }
160             }
161              
162             # foreach job, fork enough children
163 0           while (my ($job, $jobstat) = each %jobs) {
164 0           my $need = $jobstat->[0] - $jobstat->[1];
165 0 0         if ($need > 0) {
166 0           error("Job $job has only $jobstat->[1], wants $jobstat->[0], making $need.");
167 0           for (1..$need) {
168 0 0         my $jobconn = make_new_child($job)
169             or return 1; # basically bail: true value keeps event loop running
170 0           $child{$jobconn->pid} = $jobconn;
171              
172             # now increase the count of processes currently doing this job
173 0           $jobstat->[1]++;
174             }
175             }
176             }
177              
178             # if we got this far, all jobs have been re-created. note that
179             # so we avoid more CPU usage in this post-event-loop callback later
180 0           $allkidsup = 1;
181              
182             # true value keeps us running:
183 0           return 1;
184 0           };
185             }
186              
187             sub make_new_child {
188 0     0 0   my $job = shift;
189              
190 0           my $pid;
191             my $sigset;
192              
193             # block signal for fork
194 0           $sigset = POSIX::SigSet->new(SIGINT);
195 0 0         sigprocmask(SIG_BLOCK, $sigset)
196             or return error("Can't block SIGINT for fork: $!");
197              
198 0 0         socketpair(my $parents_ipc, my $childs_ipc, AF_UNIX, SOCK_STREAM, PF_UNSPEC )
199             or die( "Sockpair failed" );
200              
201 0 0         return error("fork failed creating $job: $!")
202             unless defined ($pid = fork);
203              
204             # enable auto-flush, so it's not pipe-buffered between parent/child
205 0           select((select( $parents_ipc ), $|++)[0]);
206 0           select((select( $childs_ipc ), $|++)[0]);
207              
208             # if i'm the parent
209 0 0         if ($pid) {
210 0 0         sigprocmask(SIG_UNBLOCK, $sigset)
211             or return error("Can't unblock SIGINT for fork: $!");
212              
213 0           close($childs_ipc); # unnecessary but explicit
214 0           IO::Handle::blocking($parents_ipc, 0);
215              
216 0           my $worker_conn = MogileFS::Connection::Worker->new($parents_ipc);
217 0           $worker_conn->pid($pid);
218 0           $worker_conn->job($job);
219 0           MogileFS::ProcManager->RegisterWorkerConn($worker_conn);
220 0           return $worker_conn;
221             }
222              
223             # as a child, we want to close these and ignore them
224 0           $_->() foreach @prefork_cleanup;
225 0           close($parents_ipc);
226 0           undef $parents_ipc;
227              
228 0           $SIG{INT} = 'DEFAULT';
229 0           $SIG{TERM} = 'DEFAULT';
230 0           $0 .= " [$job]";
231              
232             # unblock signals
233 0 0         sigprocmask(SIG_UNBLOCK, $sigset)
234             or return error("Can't unblock SIGINT for fork: $!");
235              
236             # now call our job function
237 0 0         my $class = MogileFS::ProcManager->job_to_class($job)
238             or die "No worker class defined for job '$job'\n";
239 0           my $worker = $class->new($childs_ipc);
240              
241             # set our frontend into child mode
242 0           MogileFS::ProcManager->SetAsChild($worker);
243              
244 0           $worker->work;
245 0           exit 0;
246             }
247              
248             sub PendingQueryCount {
249 0     0 0   return scalar @PendingQueries;
250             }
251              
252             sub BoredQueryWorkerCount {
253 0     0 0   return scalar @IdleQueryWorkers;
254             }
255              
256             sub QueriesInProgressCount {
257 0     0 0   return scalar keys %Mappings;
258             }
259              
260             sub StatsHash {
261 0     0 0   return \%Stats;
262             }
263              
264             sub foreach_job {
265 0     0 0   my ($class, $cb) = @_;
266 0           foreach my $job (sort keys %ChildrenByJob) {
267 0           my $ct = scalar(keys %{$ChildrenByJob{$job}});
  0            
268 0           $cb->($job, $ct, $jobs{$job}->[0], [ join(' ', sort { $a <=> $b } keys %{$ChildrenByJob{$job}}) ]);
  0            
  0            
269             }
270             }
271              
272             sub foreach_pending_query {
273 0     0 0   my ($class, $cb) = @_;
274 0           foreach my $clq (@PendingQueries) {
275 0           $cb->($clq->[0], # client object,
276             $clq->[1], # "$ip $query"
277             );
278             }
279             }
280              
281             sub is_valid_job {
282 0     0 0   my ($class, $job) = @_;
283 0           return defined $jobs{$job};
284             }
285              
286             sub valid_jobs {
287 0     0 0   return sort keys %jobs;
288             }
289              
290             sub request_job_process {
291 0     0 0   my ($class, $job, $n) = @_;
292 0 0         return 0 unless $class->is_valid_job($job);
293              
294 0           $jobs{$job}->[0] = $n;
295 0           $allkidsup = 0;
296              
297             # try to clean out the queryworkers (if that's what we're doing?)
298 0 0         MogileFS::ProcManager->CullQueryWorkers
299             if $job eq 'queryworker';
300             }
301              
302              
303             # when a child is spawned, they'll have copies of all the data from the
304             # parent, but they don't need it. this method is called when you want
305             # to indicate that this procmanager is running on a child and should clean.
306             sub SetAsChild {
307 0     0 0   my ($class, $worker) = @_;
308              
309 0           @IdleQueryWorkers = ();
310 0           @PendingQueries = ();
311 0           %Mappings = ();
312 0           $IsChild = $worker;
313 0           %ErrorsTo = ();
314              
315             # and now kill off our event loop so that we don't waste time
316 0     0     Danga::Socket->SetPostLoopCallback(sub { return 0; });
  0            
317             }
318              
319             # called when a child has died. a child is someone doing a job for us,
320             # but it might be a queryworker or any other type of job. we just want
321             # to remove them from our list of children. they're actually respawned
322             # by the make_new_child function elsewhere in Mgd.
323             sub NoteDeadChild {
324 0     0 0   my $pid = $_[1];
325 0           foreach my $job (keys %ChildrenByJob) {
326             return if # bail out if we actually delete one
327 0 0         delete $ChildrenByJob{$job}->{$pid};
328             }
329             }
330              
331             # called when a client dies. clients are users, management or non.
332             # we just want to remove them from the error reporting interface, if
333             # they happen to be part of it.
334             sub NoteDeadClient {
335 0     0 0   my $client = $_[1];
336 0           delete $ErrorsTo{$client->{fd}};
337             }
338              
339             # called when the error function in Mgd is called and we're in the parent,
340             # so it's pretty simple that basically we just spit it out to folks listening
341             # to errors
342             sub NoteError {
343 0 0   0 0   return unless %ErrorsTo;
344              
345 0           my $msg = ":: ${$_[1]}\r\n";
  0            
346 0           foreach my $client (values %ErrorsTo) {
347 0           $client->write(\$msg);
348             }
349             }
350              
351             sub RemoveErrorWatcher {
352 0     0 0   my ($class, $client) = @_;
353 0           return delete $ErrorsTo{$client->{fd}};
354             }
355              
356             sub AddErrorWatcher {
357 0     0 0   my ($class, $client) = @_;
358 0           $ErrorsTo{$client->{fd}} = $client;
359             }
360              
361             # one-time initialization of a new worker connection
362             sub RegisterWorkerConn {
363 0     0 0   my MogileFS::Connection::Worker $worker = $_[1];
364 0           $worker->watch_read(1);
365              
366             #warn sprintf("Registering start-up of $worker (%s) [%d]\n", $worker->job, $worker->pid);
367              
368             # now do any special case startup
369 0 0         if ($worker->job eq 'queryworker') {
370 0           MogileFS::ProcManager->NoteIdleQueryWorker($worker);
371             }
372              
373             # add to normal list
374 0           $ChildrenByJob{$worker->job}->{$worker->pid} = $worker;
375              
376             }
377              
378             sub EnqueueCommandRequest {
379 0     0 0   my ($class, $line, $client) = @_;
380 0   0       push @PendingQueries, [
381             $client,
382             ($client->peer_ip_string || '0.0.0.0') . " $line"
383             ];
384 0           MogileFS::ProcManager->ProcessQueues;
385             }
386              
387             # puts a worker back in the queue, deleting any outstanding jobs in
388             # the mapping list for this fd.
389             sub NoteIdleQueryWorker {
390             # first arg is class, second is worker
391 0     0 0   my MogileFS::Connection::Worker $worker = $_[1];
392 0           delete $Mappings{$worker->{fd}};
393              
394             # see if we need to kill off some workers
395 0 0         if (job_needs_reduction('queryworker')) {
396 0           Mgd::error("Reducing queryworker headcount by 1.");
397 0           MogileFS::ProcManager->AskWorkerToDie($worker);
398 0           return;
399             }
400              
401             # must be okay, so put it in the queue
402 0           push @IdleQueryWorkers, $worker;
403 0           MogileFS::ProcManager->ProcessQueues;
404             }
405              
406             # if we need to kill off a worker, this function takes in the WorkerConn
407             # object, tells it to die, marks us as having requested its death, and decrements
408             # the count of running jobs.
409             sub AskWorkerToDie {
410 0     0 0   my MogileFS::Connection::Worker $worker = $_[1];
411 0           note_pending_death($worker->job, $worker->pid);
412 0           $worker->write(":shutdown\r\n");
413             }
414              
415             # kill bored query workers so we can get down to the level requested. this
416             # continues killing until we run out of folks to kill.
417             sub CullQueryWorkers {
418 0   0 0 0   while (@IdleQueryWorkers && job_needs_reduction('queryworker')) {
419 0           my MogileFS::Connection::Worker $worker = shift @IdleQueryWorkers;
420 0           MogileFS::ProcManager->AskWorkerToDie($worker);
421             }
422             }
423              
424             # called when we get a response from a worker. this reenqueues the
425             # worker so it can handle another response as well as passes the answer
426             # back on to the client.
427             sub HandleQueryWorkerResponse {
428             # got a response from a worker
429 0     0 0   my MogileFS::Connection::Worker $worker;
430             my $line;
431 0           (undef, $worker, $line) = @_;
432              
433 0 0         return Mgd::error("ASSERT: ProcManager (Child) got worker response: $line") if $IsChild;
434 0 0 0       return unless $worker && $Mappings{$worker->{fd}};
435              
436             # get the client we're working with (if any)
437 0           my ($client, $jobstr, $starttime) = @{ $Mappings{$worker->{fd}} };
  0            
438              
439             # if we have no client, then we just got a standard message from
440             # the queryworker and need to pass it up the line
441 0 0         return MogileFS::ProcManager->HandleChildRequest($worker, $line) if !$client;
442              
443             # at this point it was a command response, but if the client has gone
444             # away, just reenqueue this query worker
445 0 0         return MogileFS::ProcManager->NoteIdleQueryWorker($worker) if $client->{closed};
446              
447             # [client-side time to complete]
448 0           my ($time, $id, $res);
449 0 0         if ($line =~ /^(\d+-\d+)\s+(\-?\d+\.\d+)\s+(.+)$/) {
450             # save time and response for use later
451             # Note the optional negative sign in the regexp. Somebody
452             # on the mailing list was getting a time of -0.0000, causing
453             # broken connections.
454 0           ($id, $time, $res) = ($1, $2, $3);
455             }
456              
457             # now, if it doesn't match
458 0 0 0       unless ($id && $id eq "$worker->{pid}-$worker->{reqid}") {
459 0 0         $id = "" unless defined $id;
460 0 0         $line = "" unless defined $line;
461 0           $line =~ s/\n/\\n/g;
462 0           $line =~ s/\r/\\r/g;
463 0           Mgd::error("Worker responded with id $id (line: [$line]), but expected id $worker->{pid}-$worker->{reqid}, killing");
464 0           $client->close('worker_mismatch');
465 0           return MogileFS::ProcManager->AskWorkerToDie($worker);
466             }
467              
468             # now time this interval and add to @RecentQueries
469 0           my $tinterval = Time::HiRes::time() - $starttime;
470 0           push @RecentQueries, sprintf("%s %.4f %s", $jobstr, $tinterval, $time);
471 0 0         shift @RecentQueries if scalar(@RecentQueries) > 50;
472              
473             # send text to client, put worker back in queue
474 0           $client->write("$res\r\n");
475 0           MogileFS::ProcManager->NoteIdleQueryWorker($worker);
476             }
477              
478             # called from various spots to empty the queues of available pairs.
479             sub ProcessQueues {
480 0 0   0 0   return if $IsChild;
481              
482             # try to match up a client with a worker
483 0   0       while (@IdleQueryWorkers && @PendingQueries) {
484             # get client that isn't closed
485 0           my $clref;
486 0   0       while (!$clref && @PendingQueries) {
487 0 0         $clref = shift @PendingQueries
488             or next;
489 0 0         if ($clref->[0]->{closed}) {
490 0           $clref = undef;
491 0           next;
492             }
493             }
494 0 0         next unless $clref;
495              
496             # get worker and make sure it's not closed already
497 0           my MogileFS::Connection::Worker $worker = shift @IdleQueryWorkers;
498 0 0 0       if (!defined $worker || $worker->{closed}) {
499 0           unshift @PendingQueries, $clref;
500 0           next;
501             }
502              
503             # put in mapping and send data to worker
504 0           push @$clref, Time::HiRes::time();
505 0           $Mappings{$worker->{fd}} = $clref;
506 0           $Stats{queries}++;
507              
508             # increment our counter so we know what request counter this is going out
509 0           $worker->{reqid}++;
510             # so we're writing a string of the form:
511             # 123-455 10.2.3.123 get_paths foo=bar&blah=bar\r\n
512 0           $worker->write("$worker->{pid}-$worker->{reqid} $clref->[1]\r\n");
513             }
514             }
515              
516             # send short descriptions of commands we support to the user
517             sub SendHelp {
518 0     0 0   my $client = $_[1];
519              
520             # send general purpose help
521 0           $client->write(<
522             Mogilefsd admin commands:
523              
524             !version Server version
525             !recent Recently executed queries and how long they took.
526             !queue Queries that are pending execution.
527             !stats General stats on what we\'re up to.
528             !watch Observe errors/messages from children.
529             !jobs Outstanding job counts, desired level, and pids.
530             !shutdown Immediately kill all of mogilefsd.
531              
532             !replication
533             (Deprecated/old)
534             See the replication status for unreplicated files.
535             Output format:
536            
537              
538             !to
539             Send to all workers of .
540             Mostly used for debugging.
541              
542             !want
543             Alter the level of workers of this class desired.
544             Example: !want 20 queryworker, !want 3 replicate.
545             See !jobs for what jobs are available.
546              
547             HELP
548              
549             }
550              
551             # a child has contacted us with some command/status/something.
552             sub HandleChildRequest {
553 0 0   0 0   if ($IsChild) {
554 0           Mgd::fatal("ASSERT: child $_[2] shouldn't be getting requests from other children");
555             }
556              
557             # if they have no job set, then their first line is what job they are
558             # and not a command. they also specify their pid, just so we know what
559             # connection goes with what pid, in case it's ever useful information.
560 0           my MogileFS::Connection::Worker $child = $_[1];
561 0           my $cmd = $_[2];
562              
563 0 0         die "Child $child with no pid?" unless $child->job;
564              
565             # at this point we've got a command of some sort
566 0 0         if ($cmd =~ /^error (.+)$/i) {
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
567             # pass it on to our error handler, prefaced with the child's job
568 0           Mgd::error("[" . $child->job . "(" . $child->pid . ")] $1");
569              
570             } elsif ($cmd =~ /^debug (.+)$/i) {
571             # pass it on to our error handler, prefaced with the child's job
572 0           Mgd::debug("[" . $child->job . "(" . $child->pid . ")] $1");
573              
574             } elsif ($cmd =~ /^:state_change (\w+) (\d+) (\w+)/) {
575 0           my ($what, $whatid, $state) = ($1, $2, $3);
576 0           state_change($what, $whatid, $state, $child);
577              
578             } elsif ($cmd =~ /^:repl_unreachable (\d+)/) {
579             # announce to the other replicators that this fid can't be reached, but note
580             # that we don't actually drain the queue to the requestor, as the replicator
581             # isn't in a place where it can accept a queue drain right now.
582 0           MogileFS::ProcManager->ImmediateSendToChildrenByJob('replicate', "repl_unreachable $1", $child);
583              
584             } elsif ($cmd =~ /^repl_i_did (\d+)/) {
585 0           my $fidid = $1;
586              
587             # announce to the other replicators that this fid was done and then drain the
588             # queue to this person.
589 0           MogileFS::ProcManager->ImmediateSendToChildrenByJob('replicate', "repl_was_done $fidid", $child);
590              
591             } elsif ($cmd =~ /^repl_starting (\d+)/) {
592 0           my $fidid = $1;
593              
594             # announce to the other replicators that this fid is starting to be replicated
595 0           MogileFS::ProcManager->ImmediateSendToChildrenByJob('replicate', "repl_starting $fidid", $child);
596              
597             } elsif ($cmd eq ":ping") {
598              
599             # warn sprintf("Job '%s' with pid %d is still alive at %d\n", $child->job, $child->pid, time());
600              
601             # this command expects a reply, either to die or stay alive. beginning of worker's loops
602 0 0         if (job_needs_reduction($child->job)) {
603 0           MogileFS::ProcManager->AskWorkerToDie($child);
604             } else {
605 0           $child->write(":stay_alive\r\n");
606             }
607              
608             } elsif ($cmd eq ":still_alive") {
609             # a no-op
610              
611             } elsif ($cmd eq ":monitor_just_ran") {
612 0           send_monitor_has_run($child);
613              
614             } elsif ($cmd =~ /^:wake_a (\w+)$/) {
615              
616 0           MogileFS::ProcManager->wake_a($1, $child);
617              
618             } elsif ($cmd =~ /^:invalidate_meta (\w+)/) {
619              
620 0           my $what = $1;
621 0           MogileFS::ProcManager->send_to_all_children(":invalidate_meta_once $what", $child);
622              
623             } elsif ($cmd =~ /^:set_config_from_child (\S+) (.+)/) {
624             # and this will rebroadcast it to all other children
625             # (including the one that just set it to us, but eh)
626 0           MogileFS::Config->set_config($1, $2);
627             } elsif (my ($devid, $util) = $cmd =~ /^:set_dev_utilization (\d+) (.+)/) {
628 0           $dev_util{$devid} = $util;
629              
630             # time to rebroadcast dev utilization messages to all children?
631 0 0         if ($nowish > $last_util_spray + 3) {
632 0           $last_util_spray = $nowish;
633 0           MogileFS::ProcManager->send_to_all_children(":set_dev_utilization " . join(" ", %dev_util));
634             }
635             } else {
636             # unknown command
637 0           my $show = $cmd;
638 0 0         $show = substr($show, 0, 80) . "..." if length $cmd > 80;
639 0           Mgd::error("Unknown command [$show] from child; job=" . $child->job);
640             }
641             }
642              
643             # Class method.
644             # ProcManager->ImmediateSendToChildrenByJob($class, $message, [ $child ])
645             # given a job class, and a message, send it to all children of that job. returns
646             # the number of children the message was sent to.
647             #
648             # if child is specified, the message will be sent to members of the job class that
649             # aren't that child. so you can exclude the one that originated the message.
650             #
651             # doesn't add to queue of things child gets on next interactive command: writes immediately
652             # (won't get in middle of partial write, though, as danga::socket queues things up)
653             #
654             # if $just_one is specified, only a single process is notified, then we stop.
655             sub ImmediateSendToChildrenByJob {
656 0     0 0   my ($pkg, $class, $msg, $exclude_child, $just_one) = @_;
657              
658 0           my $childref = $ChildrenByJob{$class};
659 0 0 0       return 0 unless defined $childref && %$childref;
660              
661 0           foreach my $child (values %$childref) {
662             # ignore the child specified as the third arg if one is sent
663 0 0 0       next if $exclude_child && $exclude_child == $child;
664              
665             # send the message to this child
666 0           $child->write("$msg\r\n");
667 0 0         return 1 if $just_one;
668             }
669 0           return scalar(keys %$childref);
670             }
671              
672             # called when we notice that a worker has bit it. we might have to restart a
673             # job that they had been working on.
674             sub NoteDeadWorkerConn {
675 0 0   0 0   return if $IsChild;
676              
677             # get parms and error check
678 0           my MogileFS::Connection::Worker $worker = $_[1];
679 0 0         return unless $worker;
680              
681 0           my $fd = $worker->{fd};
682 0 0         return unless defined($fd);
683              
684             # if there's a mapping for this worker's fd, they had a job that didn't get done
685 0 0         if ($Mappings{$fd}) {
686             # unshift, since this one already went through the queue once
687 0           unshift @PendingQueries, $Mappings{$worker->{fd}};
688 0           delete $Mappings{$worker->{fd}};
689              
690             # now try to get it processing again
691 0           MogileFS::ProcManager->ProcessQueues;
692             }
693             }
694              
695             # given (job, pid), record that this worker is about to die
696             sub note_pending_death {
697 0     0 0   my ($job, $pid) = @_;
698              
699 0 0         die "$job not defined in call to note_pending_death.\n"
700             unless defined $jobs{$job};
701              
702 0           $todie{$pid} = 1;
703 0           $jobs{$job}->[1]--;
704             }
705              
706             # see if we should reduce the number of active children
707             sub job_needs_reduction {
708 0     0 0   my $job = shift;
709 0           return $jobs{$job}->[0] < $jobs{$job}->[1];
710             }
711              
712             sub is_child {
713 0     0 0   return $IsChild;
714             }
715              
716             sub state_change {
717 0     0 0   my ($what, $whatid, $state, $exclude) = @_;
718 0           my $key = "$what-$whatid";
719 0           foreach my $child (values %child) {
720 0 0 0       next if $exclude && $child == $exclude;
721 0   0       my $old = $child->{known_state}{$key} || "";
722 0 0         if ($old ne $state) {
723 0           $child->{known_state}{$key} = $state;
724 0           $child->write(":state_change $what $whatid $state\r\n");
725             }
726             }
727             }
728              
729             sub wake_a {
730 0     0 0   my ($pkg, $class, $fromchild) = @_; # from arg is optional (which child sent it)
731 0           my $child = MogileFS::ProcManager->is_child;
732 0 0         if ($child) {
733 0           $child->wake_a($class);
734             } else {
735 0           MogileFS::ProcManager->ImmediateSendToChildrenByJob($class, ":wake_up", $fromchild, "just_one");
736             }
737             }
738              
739             sub send_to_all_children {
740 0     0 0   my ($pkg, $msg, $exclude) = @_;
741 0           foreach my $child (values %child) {
742 0 0 0       next if $exclude && $child == $exclude;
743 0           $child->write("$msg\r\n");
744             }
745             }
746              
747             sub send_monitor_has_run {
748 0     0 0   my $child = shift;
749 0           for my $type (qw(replicate fsck queryworker delete)) {
750 0           MogileFS::ProcManager->ImmediateSendToChildrenByJob($type, ":monitor_has_run", $child);
751             }
752             }
753              
754             1;
755              
756             # Local Variables:
757             # mode: perl
758             # c-basic-indent: 4
759             # indent-tabs-mode: nil
760             # End: