File Coverage

blib/lib/MogileFS/ProcManager.pm
Criterion Covered Total %
statement 28 362 7.7
branch 1 170 0.5
condition 0 72 0.0
subroutine 11 55 20.0
pod 0 46 0.0
total 40 705 5.6


line stmt bran cond sub pod time code
1             package MogileFS::ProcManager;
2 21     21   113 use strict;
  21         46  
  21         504  
3 21     21   81 use warnings;
  21         31  
  21         486  
4 21     21   92 use POSIX qw(:sys_wait_h sigprocmask SIGINT SIG_BLOCK SIG_UNBLOCK);
  21         29  
  21         100  
5 21     21   2846 use Symbol;
  21         43  
  21         967  
6 21     21   108 use Socket;
  21         35  
  21         7676  
7 21     21   7377 use MogileFS::Connection::Client;
  21         38  
  21         474  
8 21     21   6829 use MogileFS::Connection::Worker;
  21         46  
  21         495  
9 21     21   110 use MogileFS::Util qw(apply_state_events);
  21         33  
  21         85858  
10              
11             # This class handles keeping lists of workers and clients and
12             # assigning them to each other when things happen. You don't actually
13             # instantiate a procmanager. the class itself holds all state.
14              
15             # Mappings: fd => [ clientref, jobstring, starttime ]
16             # queues are just lists of Client class objects
17             # ChildrenByJob: job => { pid => $client }
18             # ErrorsTo: fid => Client
19             # RecentQueries: [ string, string, string, ... ]
20             # Stats: element => number
21             our ($IsChild, @RecentQueries,
22             %Mappings, %ChildrenByJob, %ErrorsTo, %Stats);
23              
24             our $starttime = time(); # time we got going
25 0     0 0 0 sub server_starttime { return $starttime }
26              
27             my @IdleQueryWorkers; # workers that are idle, able to process commands (MogileFS::Worker::Query, ...)
28             my @PendingQueries; # [ MogileFS::Connection::Client, "$ip $query" ]
29              
30             my %idle_workers = (); # 'job' -> {href of idle workers}
31             my %pending_work = (); # 'job' -> [aref of pending work]
32              
33             $IsChild = 0; # either false if we're the parent, or a MogileFS::Worker object
34              
35             # keep track of what all child pids are doing, and what jobs are being
36             # satisifed.
37             my %child = (); # pid -> MogileFS::Connection::Worker
38             my %todie = (); # pid -> 1 (lists pids that we've asked to die)
39             my %jobs = (); # jobname -> [ min, current ]
40              
41             # we start job_master after monitor has run, but this avoid undef warning
42             # in job_needs_reduction
43             $jobs{job_master} = [ 0, 0 ];
44              
45             our $allkidsup = 0; # if true, all our kids are running. set to 0 when a kid dies.
46              
47             my @prefork_cleanup; # subrefs to run to clean stuff up before we make a new child
48              
49             *error = \&Mgd::error;
50              
51             my $monitor_good = 0; # ticked after monitor executes once after startup
52              
53             my $nowish; # updated approximately once per second
54              
55             # it's pointless to spawn certain jobs without a job_master
56             my $want_job_master;
57             my %needs_job_master = (
58             delete => 1,
59             fsck => 1,
60             replicate => 1,
61             );
62              
63             sub push_pre_fork_cleanup {
64 0     0 0 0 my ($class, $code) = @_;
65 0         0 push @prefork_cleanup, $code;
66             }
67              
68             sub RecentQueries {
69 0     0 0 0 return @RecentQueries;
70             }
71              
72             sub write_pidfile {
73 0     0 0 0 my $class = shift;
74 0 0       0 my $pidfile = MogileFS->config("pidfile")
75             or return 1;
76 0         0 my $fh;
77 0 0       0 unless (open($fh, ">$pidfile")) {
78 0         0 Mgd::log('err', "couldn't create pidfile '$pidfile': $!");
79 0         0 return 0;
80             }
81 0 0 0     0 unless ((print $fh "$$\n") && close($fh)) {
82 0         0 Mgd::log('err', "couldn't write into pidfile '$pidfile': $!");
83 0         0 remove_pidfile();
84 0         0 return 0;
85             }
86 0         0 return 1;
87             }
88              
89             sub remove_pidfile {
90 0     0 0 0 my $class = shift;
91 0 0       0 my $pidfile = MogileFS->config("pidfile")
92             or return;
93 0         0 unlink $pidfile;
94 0         0 return 1;
95             }
96              
97             sub set_min_workers {
98 0     0 0 0 my ($class, $job, $min) = @_;
99 0   0     0 $jobs{$job} ||= [undef, 0]; # [min, current]
100 0         0 $jobs{$job}->[0] = $min;
101              
102             # TODO: set allkipsup false, so spawner re-checks?
103             }
104              
105             sub job_to_class_suffix {
106 0     0 0 0 my ($class, $job) = @_;
107             return {
108             fsck => "Fsck",
109             queryworker => "Query",
110             delete => "Delete",
111             replicate => "Replicate",
112             reaper => "Reaper",
113             monitor => "Monitor",
114             job_master => "JobMaster",
115 0         0 }->{$job};
116             }
117              
118             sub job_to_class {
119 0     0 0 0 my ($class, $job) = @_;
120 0 0       0 my $suffix = $class->job_to_class_suffix($job) or return "";
121 0         0 return "MogileFS::Worker::$suffix";
122             }
123              
124             sub child_pids {
125 0     0 0 0 return keys %child;
126             }
127              
128             sub WatchDog {
129 0     0 0 0 foreach my $pid (keys %child) {
130 0         0 my MogileFS::Connection::Worker $child = $child{$pid};
131 0         0 my $healthy = $child->watchdog_check;
132 0 0       0 next if $healthy;
133              
134             # special $todie level of 2 means the watchdog tried to kill it.
135             # TODO: Should be a CONSTANT?
136 0 0 0     0 next if $todie{$pid} && $todie{$pid} == 2;
137 0         0 note_pending_death($child->job, $pid, 2);
138              
139 0         0 error("Watchdog killing worker $pid (" . $child->job . ")");
140 0         0 kill 9, $pid;
141             }
142             }
143              
144             # returns a sub that Danga::Socket calls after each event loop round.
145             # the sub must return 1 for the program to continue running.
146             sub PostEventLoopChecker {
147 0     0 0 0 my $lastspawntime = 0; # time we last ran spawn_children sub
148              
149             return sub {
150 0     0   0 MogileFS::Connection::Client->ProcessPipelined;
151             # run only once per second
152 0         0 $nowish = time();
153 0 0       0 return 1 unless $nowish > $lastspawntime;
154 0         0 $lastspawntime = $nowish;
155              
156 0         0 MogileFS::ProcManager->WatchDog;
157 0         0 MogileFS::Connection::Client->WriterWatchDog;
158              
159             # see if anybody has died, but don't hang up on doing so
160 0         0 while(my $pid = waitpid -1, WNOHANG) {
161 0 0       0 last unless $pid > 0;
162 0         0 $allkidsup = 0; # know something died
163              
164             # when a child dies, figure out what it was doing
165             # and note that job has one less worker
166 0         0 my $jobconn;
167 0 0       0 if (($jobconn = delete $child{$pid})) {
168 0         0 my $job = $jobconn->job;
169 0 0       0 my $extra = $todie{$pid} ? "expected" : "UNEXPECTED";
170 0         0 error("Child $pid ($job) died: $? ($extra)");
171 0         0 MogileFS::ProcManager->NoteDeadChild($pid);
172 0         0 $jobconn->close;
173              
174 0 0       0 if (my $jobstat = $jobs{$job}) {
175             # if the pid is in %todie, then we have asked it to shut down
176             # and have already decremented the jobstat counter and don't
177             # want to do it again
178 0 0       0 unless (my $true = delete $todie{$pid}) {
179             # decrement the count of currently running jobs
180 0         0 $jobstat->[1]--;
181             }
182             }
183             }
184             }
185              
186 0 0       0 return 1 if $allkidsup;
187              
188             # foreach job, fork enough children
189 0         0 while (my ($job, $jobstat) = each %jobs) {
190              
191             # do not spawn job_master-dependent workers if we have no job_master
192 0 0 0     0 next if (! $want_job_master && $needs_job_master{$job});
193              
194 0         0 my $need = $jobstat->[0] - $jobstat->[1];
195 0 0       0 if ($need > 0) {
196 0         0 error("Job $job has only $jobstat->[1], wants $jobstat->[0], making $need.");
197 0         0 for (1..$need) {
198 0 0       0 my $jobconn = make_new_child($job)
199             or return 1; # basically bail: true value keeps event loop running
200 0         0 $child{$jobconn->pid} = $jobconn;
201              
202             # now increase the count of processes currently doing this job
203 0         0 $jobstat->[1]++;
204             }
205             }
206             }
207              
208             # if we got this far, all jobs have been re-created. note that
209             # so we avoid more CPU usage in this post-event-loop callback later
210 0         0 $allkidsup = 1;
211              
212             # true value keeps us running:
213 0         0 return 1;
214 0         0 };
215             }
216              
217             sub make_new_child {
218 0     0 0 0 my $job = shift;
219              
220 0         0 my $pid;
221             my $sigset;
222              
223             # Ensure our dbh is closed before we fork anything.
224             # Causes problems on some platforms (Solaris+Postgres)
225 0         0 Mgd::close_store();
226              
227             # block signal for fork
228 0         0 $sigset = POSIX::SigSet->new(SIGINT);
229 0 0       0 sigprocmask(SIG_BLOCK, $sigset)
230             or return error("Can't block SIGINT for fork: $!");
231              
232 0 0       0 socketpair(my $parents_ipc, my $childs_ipc, AF_UNIX, SOCK_STREAM, PF_UNSPEC )
233             or die( "socketpair failed: $!" );
234              
235 0 0       0 return error("fork failed creating $job: $!")
236             unless defined ($pid = fork);
237              
238             # enable auto-flush, so it's not pipe-buffered between parent/child
239 0         0 select((select( $parents_ipc ), $|++)[0]);
240 0         0 select((select( $childs_ipc ), $|++)[0]);
241              
242             # if i'm the parent
243 0 0       0 if ($pid) {
244 0 0       0 sigprocmask(SIG_UNBLOCK, $sigset)
245             or return error("Can't unblock SIGINT for fork: $!");
246              
247 0         0 close($childs_ipc); # unnecessary but explicit
248 0         0 IO::Handle::blocking($parents_ipc, 0);
249              
250 0         0 my $worker_conn = MogileFS::Connection::Worker->new($parents_ipc);
251 0         0 $worker_conn->pid($pid);
252 0         0 $worker_conn->job($job);
253 0         0 MogileFS::ProcManager->RegisterWorkerConn($worker_conn);
254 0         0 return $worker_conn;
255             }
256              
257             # let children have different random number seeds
258 0         0 srand();
259              
260             # as a child, we want to close these and ignore them
261 0         0 $_->() foreach @prefork_cleanup;
262 0         0 close($parents_ipc);
263 0         0 undef $parents_ipc;
264              
265 0         0 $SIG{INT} = 'DEFAULT';
266 0         0 $SIG{TERM} = 'DEFAULT';
267 0         0 $0 .= " [$job]";
268              
269             # unblock signals
270 0 0       0 sigprocmask(SIG_UNBLOCK, $sigset)
271             or return error("Can't unblock SIGINT for fork: $!");
272              
273             # now call our job function
274 0 0       0 my $class = MogileFS::ProcManager->job_to_class($job)
275             or die "No worker class defined for job '$job'\n";
276 0         0 my $worker = $class->new($childs_ipc);
277              
278             # set our frontend into child mode
279 0         0 MogileFS::ProcManager->SetAsChild($worker);
280              
281 0         0 $worker->work;
282 0         0 exit 0;
283             }
284              
285             sub PendingQueryCount {
286 0     0 0 0 return scalar @PendingQueries;
287             }
288              
289             sub BoredQueryWorkerCount {
290 0     0 0 0 return scalar @IdleQueryWorkers;
291             }
292              
293             sub QueriesInProgressCount {
294 0     0 0 0 return scalar keys %Mappings;
295             }
296              
297             # Toss in any queue depths.
298             sub StatsHash {
299 0     0 0 0 for my $job (keys %pending_work) {
300 0         0 $Stats{'work_queue_for_' . $job} = @{$pending_work{$job}};
  0         0  
301             }
302 0         0 return \%Stats;
303             }
304              
305             sub foreach_job {
306 0     0 0 0 my ($class, $cb) = @_;
307 0         0 foreach my $job (sort keys %ChildrenByJob) {
308 0         0 my $ct = scalar(keys %{$ChildrenByJob{$job}});
  0         0  
309 0         0 $cb->($job, $ct, $jobs{$job}->[0], [ join(' ', sort { $a <=> $b } keys %{$ChildrenByJob{$job}}) ]);
  0         0  
  0         0  
310             }
311             }
312              
313             sub foreach_pending_query {
314 0     0 0 0 my ($class, $cb) = @_;
315 0         0 foreach my $clq (@PendingQueries) {
316 0         0 $cb->($clq->[0], # client object,
317             $clq->[1], # "$ip $query"
318             );
319             }
320             }
321              
322             sub is_monitor_good {
323 0     0 0 0 return $monitor_good;
324             }
325              
326             sub is_valid_job {
327 0     0 0 0 my ($class, $job) = @_;
328 0         0 return defined $jobs{$job};
329             }
330              
331             sub valid_jobs {
332 0     0 0 0 return sort keys %jobs;
333             }
334              
335             sub request_job_process {
336 0     0 0 0 my ($class, $job, $n) = @_;
337 0 0       0 return 0 unless $class->is_valid_job($job);
338 0 0 0     0 return 0 if ($job =~ /^(?:job_master|monitor)$/i && $n > 1); # ghetto special case
339              
340 0 0       0 $want_job_master = $n if ($job eq "job_master");
341              
342 0         0 $jobs{$job}->[0] = $n;
343 0         0 $allkidsup = 0;
344              
345             # try to clean out the queryworkers (if that's what we're doing?)
346 0 0       0 MogileFS::ProcManager->CullQueryWorkers
347             if $job eq 'queryworker';
348              
349             # other workers listening off of a queue should be pinging parent
350             # frequently. shouldn't explicitly kill them.
351             }
352              
353              
354             # when a child is spawned, they'll have copies of all the data from the
355             # parent, but they don't need it. this method is called when you want
356             # to indicate that this procmanager is running on a child and should clean.
357             sub SetAsChild {
358 0     0 0 0 my ($class, $worker) = @_;
359              
360 0         0 @IdleQueryWorkers = ();
361 0         0 @PendingQueries = ();
362 0         0 %Mappings = ();
363 0         0 $IsChild = $worker;
364 0         0 %ErrorsTo = ();
365 0         0 %idle_workers = ();
366 0         0 %pending_work = ();
367 0         0 %ChildrenByJob = ();
368 0         0 %child = ();
369 0         0 %todie = ();
370 0         0 %jobs = ();
371              
372             # we just forked from our parent process, also using Danga::Socket,
373             # so we need to lose all that state and start afresh.
374 0         0 Danga::Socket->Reset;
375 0         0 MogileFS::Connection::Client->Reset;
376             }
377              
378             # called when a child has died. a child is someone doing a job for us,
379             # but it might be a queryworker or any other type of job. we just want
380             # to remove them from our list of children. they're actually respawned
381             # by the make_new_child function elsewhere in Mgd.
382             sub NoteDeadChild {
383 0     0 0 0 my $pid = $_[1];
384 0         0 foreach my $job (keys %ChildrenByJob) {
385             return if # bail out if we actually delete one
386 0 0       0 delete $ChildrenByJob{$job}->{$pid};
387             }
388             }
389              
390             # called when a client dies. clients are users, management or non.
391             # we just want to remove them from the error reporting interface, if
392             # they happen to be part of it.
393             sub NoteDeadClient {
394 0     0 0 0 my $client = $_[1];
395 0         0 delete $ErrorsTo{$client->{fd}};
396             }
397              
398             # called when the error function in Mgd is called and we're in the parent,
399             # so it's pretty simple that basically we just spit it out to folks listening
400             # to errors
401             sub NoteError {
402 4 50   4 0 19 return unless %ErrorsTo;
403              
404 0         0 my $msg = ":: ${$_[1]}\r\n";
  0         0  
405 0         0 foreach my $client (values %ErrorsTo) {
406 0         0 $client->write(\$msg);
407             }
408             }
409              
410             sub RemoveErrorWatcher {
411 0     0 0 0 my ($class, $client) = @_;
412 0         0 return delete $ErrorsTo{$client->{fd}};
413             }
414              
415             sub AddErrorWatcher {
416 0     0 0 0 my ($class, $client) = @_;
417 0         0 $ErrorsTo{$client->{fd}} = $client;
418             }
419              
420             # one-time initialization of a new worker connection
421             sub RegisterWorkerConn {
422 0     0 0 0 my MogileFS::Connection::Worker $worker = $_[1];
423 0         0 $worker->watch_read(1);
424              
425             #warn sprintf("Registering start-up of $worker (%s) [%d]\n", $worker->job, $worker->pid);
426              
427             # now do any special case startup
428 0 0       0 if ($worker->job eq 'queryworker') {
429 0         0 MogileFS::ProcManager->NoteIdleQueryWorker($worker);
430             }
431              
432             # add to normal list
433 0         0 $ChildrenByJob{$worker->job}->{$worker->pid} = $worker;
434              
435             }
436              
437             sub EnqueueCommandRequest {
438 0     0 0 0 my ($class, $line, $client) = @_;
439 0   0     0 push @PendingQueries, [
440             $client,
441             ($client->peer_ip_string || '0.0.0.0') . " $line"
442             ];
443 0         0 MogileFS::ProcManager->ProcessQueues;
444 0 0       0 if (@PendingQueries) {
445             # Don't like the name. Feel free to change if you find better.
446 0         0 $Stats{times_out_of_qworkers}++;
447             }
448             }
449              
450             # puts a worker back in the queue, deleting any outstanding jobs in
451             # the mapping list for this fd.
452             sub NoteIdleQueryWorker {
453             # first arg is class, second is worker
454 0     0 0 0 my MogileFS::Connection::Worker $worker = $_[1];
455 0         0 delete $Mappings{$worker->{fd}};
456              
457             # see if we need to kill off some workers
458 0 0       0 if (job_needs_reduction('queryworker')) {
459 0         0 Mgd::error("Reducing queryworker headcount by 1.");
460 0         0 MogileFS::ProcManager->AskWorkerToDie($worker);
461 0         0 return;
462             }
463              
464             # must be okay, so put it in the queue
465 0         0 push @IdleQueryWorkers, $worker;
466 0         0 MogileFS::ProcManager->ProcessQueues;
467             }
468              
469             # if we need to kill off a worker, this function takes in the WorkerConn
470             # object, tells it to die, marks us as having requested its death, and decrements
471             # the count of running jobs.
472             sub AskWorkerToDie {
473 0     0 0 0 my MogileFS::Connection::Worker $worker = $_[1];
474 0         0 note_pending_death($worker->job, $worker->pid);
475 0         0 $worker->write(":shutdown\r\n");
476             }
477              
478             # kill bored query workers so we can get down to the level requested. this
479             # continues killing until we run out of folks to kill.
480             sub CullQueryWorkers {
481 0   0 0 0 0 while (@IdleQueryWorkers && job_needs_reduction('queryworker')) {
482 0         0 my MogileFS::Connection::Worker $worker = shift @IdleQueryWorkers;
483 0         0 MogileFS::ProcManager->AskWorkerToDie($worker);
484             }
485             }
486              
487             # called when we get a response from a worker. this reenqueues the
488             # worker so it can handle another response as well as passes the answer
489             # back on to the client.
490             sub HandleQueryWorkerResponse {
491             # got a response from a worker
492 0     0 0 0 my MogileFS::Connection::Worker $worker;
493             my $line;
494 0         0 (undef, $worker, $line) = @_;
495              
496 0 0       0 return Mgd::error("ASSERT: ProcManager (Child) got worker response: $line") if $IsChild;
497 0 0 0     0 return unless $worker && $Mappings{$worker->{fd}};
498              
499             # get the client we're working with (if any)
500 0         0 my ($client, $jobstr, $starttime) = @{ $Mappings{$worker->{fd}} };
  0         0  
501              
502             # if we have no client, then we just got a standard message from
503             # the queryworker and need to pass it up the line
504 0 0       0 return MogileFS::ProcManager->HandleChildRequest($worker, $line) if !$client;
505              
506             # at this point it was a command response, but if the client has gone
507             # away, just reenqueue this query worker
508 0 0       0 return MogileFS::ProcManager->NoteIdleQueryWorker($worker) if $client->{closed};
509              
510             # [client-side time to complete]
511 0         0 my ($time, $id, $res);
512 0 0       0 if ($line =~ /^(\d+-\d+)\s+(\-?\d+\.\d+)\s+(.+)$/) {
513             # save time and response for use later
514             # Note the optional negative sign in the regexp. Somebody
515             # on the mailing list was getting a time of -0.0000, causing
516             # broken connections.
517 0         0 ($id, $time, $res) = ($1, $2, $3);
518             }
519              
520             # now, if it doesn't match
521 0 0 0     0 unless ($id && $id eq "$worker->{pid}-$worker->{reqid}") {
522 0 0       0 $id = "" unless defined $id;
523 0 0       0 $line = "" unless defined $line;
524 0         0 $line =~ s/\n/\\n/g;
525 0         0 $line =~ s/\r/\\r/g;
526 0         0 Mgd::error("Worker responded with id $id (line: [$line]), but expected id $worker->{pid}-$worker->{reqid}, killing");
527 0         0 $client->close('worker_mismatch');
528 0         0 return MogileFS::ProcManager->AskWorkerToDie($worker);
529             }
530              
531             # now time this interval and add to @RecentQueries
532 0         0 my $tinterval = Time::HiRes::time() - $starttime;
533 0         0 push @RecentQueries, sprintf("%s %.4f %s", $jobstr, $tinterval, $time);
534 0 0       0 shift @RecentQueries if scalar(@RecentQueries) > 50;
535              
536             # send text to client, put worker back in queue
537 0         0 $client->write("$res\r\n");
538 0         0 MogileFS::ProcManager->NoteIdleQueryWorker($worker);
539             }
540              
541             # new per-worker magic internal queue runner.
542             # TODO: Since this fires only when a master asks or a worker reports
543             # in bored, it should just operate on that *one* queue?
544             #
545             # new change: if worker in $job, but not in _bored, do not send work.
546             # if work is received, only delete from _bored
547             sub process_worker_queues {
548 0 0   0 0 0 return if $IsChild;
549              
550 0         0 JOB: while (my ($job, $queue) = each %pending_work) {
551 0 0       0 next JOB unless @$queue;
552 0 0 0     0 next JOB unless $idle_workers{$job} && keys %{$idle_workers{$job}};
  0         0  
553 0         0 WORKER: for my $worker_key (keys %{$idle_workers{$job}}) {
  0         0  
554             my MogileFS::Connection::Worker $worker =
555 0         0 delete $idle_workers{_bored}->{$worker_key};
556 0 0 0     0 if (!defined $worker || $worker->{closed}) {
557 0         0 delete $idle_workers{$job}->{$worker_key};
558 0         0 next WORKER;
559             }
560              
561             # allow workers to grab a linear range of work.
562 0   0     0 while (@$queue && $worker->wants_todo($job)) {
563 0         0 $worker->write(":queue_todo $job " . shift(@$queue) . "\r\n");
564 0         0 $Stats{'work_sent_to_' . $job}++;
565             }
566 0 0       0 next JOB unless @$queue;
567             }
568             }
569             }
570              
571             # called from various spots to empty the queues of available pairs.
572             sub ProcessQueues {
573 0 0   0 0 0 return if $IsChild;
574              
575             # try to match up a client with a worker
576 0   0     0 while (@IdleQueryWorkers && @PendingQueries) {
577             # get client that isn't closed
578 0         0 my $clref;
579 0   0     0 while (!$clref && @PendingQueries) {
580 0 0       0 $clref = shift @PendingQueries
581             or next;
582 0 0       0 if ($clref->[0]->{closed}) {
583 0         0 $clref = undef;
584 0         0 next;
585             }
586             }
587 0 0       0 next unless $clref;
588              
589             # get worker and make sure it's not closed already
590 0         0 my MogileFS::Connection::Worker $worker = pop @IdleQueryWorkers;
591 0 0 0     0 if (!defined $worker || $worker->{closed}) {
592 0         0 unshift @PendingQueries, $clref;
593 0         0 next;
594             }
595              
596             # put in mapping and send data to worker
597 0         0 push @$clref, Time::HiRes::time();
598 0         0 $Mappings{$worker->{fd}} = $clref;
599 0         0 $Stats{queries}++;
600              
601             # increment our counter so we know what request counter this is going out
602 0         0 $worker->{reqid}++;
603             # so we're writing a string of the form:
604             # 123-455 10.2.3.123 get_paths foo=bar&blah=bar\r\n
605 0         0 $worker->write("$worker->{pid}-$worker->{reqid} $clref->[1]\r\n");
606             }
607             }
608              
609             # send short descriptions of commands we support to the user
610             sub SendHelp {
611 0     0 0 0 my $client = $_[1];
612              
613             # send general purpose help
614 0         0 $client->write(<
615             Mogilefsd admin commands:
616              
617             !version Server version
618             !recent Recently executed queries and how long they took.
619             !queue Queries that are pending execution.
620             !stats General stats on what we\'re up to.
621             !watch Observe errors/messages from children.
622             !jobs Outstanding job counts, desired level, and pids.
623             !shutdown Immediately kill all of mogilefsd.
624              
625             !to
626             Send to all workers of .
627             Mostly used for debugging.
628              
629             !want
630             Alter the level of workers of this class desired.
631             Example: !want 20 queryworker, !want 3 replicate.
632             See !jobs for what jobs are available.
633              
634             HELP
635              
636             }
637              
638             # a child has contacted us with some command/status/something.
639             sub HandleChildRequest {
640 0 0   0 0 0 if ($IsChild) {
641 0         0 Mgd::fatal("ASSERT: child $_[2] shouldn't be getting requests from other children");
642             }
643              
644             # if they have no job set, then their first line is what job they are
645             # and not a command. they also specify their pid, just so we know what
646             # connection goes with what pid, in case it's ever useful information.
647 0         0 my MogileFS::Connection::Worker $child = $_[1];
648 0         0 my $cmd = $_[2];
649              
650 0 0       0 die "Child $child with no pid?" unless $child->job;
651              
652             # at this point we've got a command of some sort
653 0 0       0 if ($cmd =~ /^error (.+)$/i) {
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
654             # pass it on to our error handler, prefaced with the child's job
655 0         0 Mgd::error("[" . $child->job . "(" . $child->pid . ")] $1");
656              
657             } elsif ($cmd =~ /^debug (.+)$/i) {
658             # pass it on to our error handler, prefaced with the child's job
659 0         0 Mgd::debug("[" . $child->job . "(" . $child->pid . ")] $1");
660              
661             } elsif ($cmd =~ /^queue_depth (\w+)/) {
662 0         0 my $job = $1;
663 0 0       0 if ($job eq 'all') {
664 0         0 for my $qname (keys %pending_work) {
665 0         0 my $depth = @{$pending_work{$qname}};
  0         0  
666 0         0 $child->write(":queue_depth $qname $depth\r\n");
667             }
668             } else {
669 0         0 my $depth = 0;
670 0 0       0 if ($pending_work{$job}) {
671 0         0 $depth = @{$pending_work{$job}};
  0         0  
672             }
673 0         0 $child->write(":queue_depth $job $depth\r\n");
674             }
675 0         0 MogileFS::ProcManager->process_worker_queues;
676             } elsif ($cmd =~ /^queue_todo (\w+) (.+)/) {
677 0         0 my $job = $1;
678 0   0     0 $pending_work{$job} ||= [];
679 0         0 push(@{$pending_work{$job}}, $2);
  0         0  
680             # Don't process queues immediately, to allow batch processing.
681             } elsif ($cmd =~ /^worker_bored (\d+) (.+)/) {
682 0         0 my $batch = $1;
683 0         0 my $types = $2;
684 0 0       0 if (job_needs_reduction($child->job)) {
685 0         0 MogileFS::ProcManager->AskWorkerToDie($child);
686             } else {
687 0 0       0 unless (exists $idle_workers{$child->job}) {
688 0         0 $idle_workers{$child->job} = {};
689             }
690 0   0     0 $idle_workers{_bored} ||= {};
691 0         0 $idle_workers{_bored}->{$child} = $child;
692 0         0 for my $type (split(/\s+/, $types)) {
693 0   0     0 $idle_workers{$type} ||= {};
694 0         0 $idle_workers{$type}->{$child}++;
695 0         0 $child->wants_todo($type, $batch);
696             }
697 0         0 MogileFS::ProcManager->process_worker_queues;
698             }
699             } elsif ($cmd eq ":ping") {
700              
701             # warn sprintf("Job '%s' with pid %d is still alive at %d\n", $child->job, $child->pid, time());
702              
703             # this command expects a reply, either to die or stay alive. beginning of worker's loops
704 0 0       0 if (job_needs_reduction($child->job)) {
705 0         0 MogileFS::ProcManager->AskWorkerToDie($child);
706             } else {
707 0         0 $child->write(":stay_alive\r\n");
708             }
709              
710             } elsif ($cmd eq ":still_alive") {
711             # a no-op
712              
713             } elsif ($cmd =~ /^:monitor_events/) {
714             # Apply the state locally, so when we fork children they have a
715             # pre-parsed factory.
716             # We do not replay the events back to where it came, since this
717             # severely impacts startup performance for instances with several
718             # thousand domains, classes, hosts or devices.
719 0         0 apply_state_events(\$cmd);
720 0         0 MogileFS::ProcManager->send_to_all_children($cmd, $child);
721              
722             } elsif ($cmd eq ":monitor_just_ran") {
723 0         0 send_monitor_has_run($child);
724              
725             } elsif ($cmd =~ /^:wake_a (\w+)$/) {
726              
727 0         0 MogileFS::ProcManager->wake_a($1, $child);
728             } elsif ($cmd =~ /^:set_config_from_child (\S+) (.+)/) {
729             # and this will rebroadcast it to all other children
730             # (including the one that just set it to us, but eh)
731 0         0 MogileFS::Config->set_config($1, $2);
732             } elsif ($cmd =~ /^:refresh_monitor$/) {
733 0         0 MogileFS::ProcManager->ImmediateSendToChildrenByJob("monitor", $cmd);
734             } else {
735             # unknown command
736 0         0 my $show = $cmd;
737 0 0       0 $show = substr($show, 0, 80) . "..." if length $cmd > 80;
738 0         0 Mgd::error("Unknown command [$show] from child; job=" . $child->job);
739             }
740             }
741              
742             # Class method.
743             # ProcManager->ImmediateSendToChildrenByJob($class, $message, [ $child ])
744             # given a job class, and a message, send it to all children of that job. returns
745             # the number of children the message was sent to.
746             #
747             # if child is specified, the message will be sent to members of the job class that
748             # aren't that child. so you can exclude the one that originated the message.
749             #
750             # doesn't add to queue of things child gets on next interactive command: writes immediately
751             # (won't get in middle of partial write, though, as danga::socket queues things up)
752             #
753             # if $just_one is specified, only a single process is notified, then we stop.
754             sub ImmediateSendToChildrenByJob {
755 0     0 0 0 my ($pkg, $class, $msg, $exclude_child, $just_one) = @_;
756              
757 0         0 my $childref = $ChildrenByJob{$class};
758 0 0 0     0 return 0 unless defined $childref && %$childref;
759              
760 0         0 foreach my $child (values %$childref) {
761             # ignore the child specified as the third arg if one is sent
762 0 0 0     0 next if $exclude_child && $exclude_child == $child;
763              
764             # send the message to this child
765 0         0 $child->write("$msg\r\n");
766 0 0       0 return 1 if $just_one;
767             }
768 0         0 return scalar(keys %$childref);
769             }
770              
771             # called when we notice that a worker has bit it. we might have to restart a
772             # job that they had been working on.
773             sub NoteDeadWorkerConn {
774 0 0   0 0 0 return if $IsChild;
775              
776             # get parms and error check
777 0         0 my MogileFS::Connection::Worker $worker = $_[1];
778 0 0       0 return unless $worker;
779              
780 0         0 my $fd = $worker->{fd};
781 0 0       0 return unless defined($fd);
782              
783             # if there's a mapping for this worker's fd, they had a job that didn't get done
784 0 0       0 if ($Mappings{$fd}) {
785             # unshift, since this one already went through the queue once
786 0         0 unshift @PendingQueries, $Mappings{$worker->{fd}};
787 0         0 delete $Mappings{$worker->{fd}};
788              
789             # now try to get it processing again
790 0         0 MogileFS::ProcManager->ProcessQueues;
791             }
792             }
793              
794             # given (job, pid), record that this worker is about to die
795             # $level is so we can tell if watchdog requested the death.
796             sub note_pending_death {
797 0     0 0 0 my ($job, $pid, $level) = @_;
798              
799             die "$job not defined in call to note_pending_death.\n"
800 0 0       0 unless defined $jobs{$job};
801              
802 0   0     0 $level ||= 1;
803             # don't double decrement.
804 0 0       0 $jobs{$job}->[1]-- unless $todie{$pid};
805 0         0 $todie{$pid} = $level;
806             }
807              
808             # see if we should reduce the number of active children
809             sub job_needs_reduction {
810 0     0 0 0 my $job = shift;
811 0         0 my $q;
812              
813             # drop job_master-dependent workers if there is no job_master and no
814             # previously queued work
815 0 0 0     0 if (!$want_job_master && $needs_job_master{$job}
      0        
      0        
      0        
816             && $jobs{job_master}->[1] == 0 # check if job_master is really dead
817             && (($q = $pending_work{$job}) && !@$q || !$q)) {
818 0         0 return 1;
819             }
820              
821 0         0 return $jobs{$job}->[0] < $jobs{$job}->[1];
822             }
823              
824             sub is_child {
825 52     52 0 260 return $IsChild;
826             }
827              
828             sub wake_a {
829 0     0 0 0 my ($pkg, $class, $fromchild) = @_; # from arg is optional (which child sent it)
830 0         0 my $child = MogileFS::ProcManager->is_child;
831 0 0       0 if ($child) {
832 0         0 $child->wake_a($class);
833             } else {
834 0         0 MogileFS::ProcManager->ImmediateSendToChildrenByJob($class, ":wake_up", $fromchild, "just_one");
835             }
836             }
837              
838             sub send_to_all_children {
839 48     48 0 109 my ($pkg, $msg, $exclude) = @_;
840 48         175 foreach my $child (values %child) {
841 0 0 0       next if $exclude && $child == $exclude;
842 0           $child->write($msg . "\r\n");
843             }
844             }
845              
846             sub send_monitor_has_run {
847 0     0 0   my $child = shift;
848             # Gas up other workers if monitor's completed for the first time.
849 0 0         if (! $monitor_good) {
850 0           MogileFS::ProcManager->set_min_workers('queryworker' => MogileFS->config('query_jobs'));
851 0           MogileFS::ProcManager->set_min_workers('delete' => MogileFS->config('delete_jobs'));
852 0           MogileFS::ProcManager->set_min_workers('replicate' => MogileFS->config('replicate_jobs'));
853 0           MogileFS::ProcManager->set_min_workers('reaper' => MogileFS->config('reaper_jobs'));
854 0           MogileFS::ProcManager->set_min_workers('fsck' => MogileFS->config('fsck_jobs'));
855              
856             # only one job_master at most
857 0           $want_job_master = !!MogileFS->config('job_master');
858 0           MogileFS::ProcManager->set_min_workers('job_master' => $want_job_master);
859              
860 0           $monitor_good = 1;
861 0           $allkidsup = 0;
862             }
863 0           for my $type (qw(queryworker)) {
864 0           MogileFS::ProcManager->ImmediateSendToChildrenByJob($type, ":monitor_has_run", $child);
865             }
866             }
867              
868             1;
869              
870             # Local Variables:
871             # mode: perl
872             # c-basic-indent: 4
873             # indent-tabs-mode: nil
874             # End: