1
package MogileFS::ProcManager;
4
use POSIX qw(:sys_wait_h sigprocmask SIGINT SIG_BLOCK SIG_UNBLOCK);
7
use MogileFS::Connection::Client;
8
use MogileFS::Connection::Worker;
10
# This class handles keeping lists of workers and clients and
11
# assigning them to each other when things happen. You don't actually
12
# instantiate a procmanager. the class itself holds all state.
14
# Mappings: fd => [ clientref, jobstring, starttime ]
15
# queues are just lists of Client class objects
16
# ChildrenByJob: job => { pid => $client }
17
# ErrorsTo: fid => Client
18
# RecentQueries: [ string, string, string, ... ]
19
# Stats: element => number
20
our ($IsChild, @RecentQueries,
21
%Mappings, %ChildrenByJob, %ErrorsTo, %Stats);
23
our $starttime = time(); # time we got going
24
sub server_starttime { return $starttime }
26
my @IdleQueryWorkers; # workers that are idle, able to process commands (MogileFS::Worker::Query, ...)
27
my @PendingQueries; # [ MogileFS::Connection::Client, "$ip $query" ]
29
my %idle_workers = (); # 'job' -> {href of idle workers}
30
my %pending_work = (); # 'job' -> [aref of pending work]
32
$IsChild = 0; # either false if we're the parent, or a MogileFS::Worker object
34
# keep track of what all child pids are doing, and what jobs are being
36
my %child = (); # pid -> MogileFS::Connection::Worker
37
my %todie = (); # pid -> 1 (lists pids that we've asked to die)
38
my %jobs = (); # jobname -> [ min, current ]
40
our $allkidsup = 0; # if true, all our kids are running. set to 0 when a kid dies.
42
my @prefork_cleanup; # subrefs to run to clean stuff up before we make a new child
44
*error = \&Mgd::error;
46
my %dev_util; # devid -> utilization
47
my $last_util_spray = 0; # time we lost spread %dev_util to children
49
my $nowish; # updated approximately once per second
51
sub push_pre_fork_cleanup {
52
my ($class, $code) = @_;
53
push @prefork_cleanup, $code;
57
return @RecentQueries;
62
my $pidfile = MogileFS->config("pidfile")
65
unless (open($fh, ">$pidfile")) {
66
Mgd::log('err', "couldn't create pidfile '$pidfile': $!");
69
unless ((print $fh "$$\n") && close($fh)) {
70
Mgd::log('err', "couldn't write into pidfile '$pidfile': $!");
79
my $pidfile = MogileFS->config("pidfile")
86
my ($class, $job, $min) = @_;
87
$jobs{$job} ||= [undef, 0]; # [min, current]
88
$jobs{$job}->[0] = $min;
90
# TODO: set allkipsup false, so spawner re-checks?
93
sub job_to_class_suffix {
94
my ($class, $job) = @_;
97
queryworker => "Query",
99
replicate => "Replicate",
101
monitor => "Monitor",
102
job_master => "JobMaster",
107
my ($class, $job) = @_;
108
my $suffix = $class->job_to_class_suffix($job) or return "";
109
return "MogileFS::Worker::$suffix";
117
foreach my $pid (keys %child) {
118
my MogileFS::Connection::Worker $child = $child{$pid};
119
my $healthy = $child->watchdog_check;
122
# special $todie level of 2 means the watchdog tried to kill it.
123
# TODO: Should be a CONSTANT?
124
next if $todie{$pid} && $todie{$pid} == 2;
125
note_pending_death($child->job, $pid, 2);
127
error("Watchdog killing worker $pid (" . $child->job . ")");
132
# returns a sub that Danga::Socket calls after each event loop round.
133
# the sub must return 1 for the program to continue running.
134
sub PostEventLoopChecker {
135
my $lastspawntime = 0; # time we last ran spawn_children sub
138
# run only once per second
140
return 1 unless $nowish > $lastspawntime;
141
$lastspawntime = $nowish;
143
MogileFS::ProcManager->WatchDog;
145
# see if anybody has died, but don't hang up on doing so
146
while(my $pid = waitpid -1, WNOHANG) {
147
last unless $pid > 0;
148
$allkidsup = 0; # know something died
150
# when a child dies, figure out what it was doing
151
# and note that job has one less worker
153
if (($jobconn = delete $child{$pid})) {
154
my $job = $jobconn->job;
155
my $extra = $todie{$pid} ? "expected" : "UNEXPECTED";
156
error("Child $pid ($job) died: $? ($extra)");
157
MogileFS::ProcManager->NoteDeadChild($pid);
160
if (my $jobstat = $jobs{$job}) {
161
# if the pid is in %todie, then we have asked it to shut down
162
# and have already decremented the jobstat counter and don't
163
# want to do it again
164
unless (my $true = delete $todie{$pid}) {
165
# decrement the count of currently running jobs
172
return 1 if $allkidsup;
174
# foreach job, fork enough children
175
while (my ($job, $jobstat) = each %jobs) {
176
my $need = $jobstat->[0] - $jobstat->[1];
178
error("Job $job has only $jobstat->[1], wants $jobstat->[0], making $need.");
180
my $jobconn = make_new_child($job)
181
or return 1; # basically bail: true value keeps event loop running
182
$child{$jobconn->pid} = $jobconn;
184
# now increase the count of processes currently doing this job
190
# if we got this far, all jobs have been re-created. note that
191
# so we avoid more CPU usage in this post-event-loop callback later
194
# true value keeps us running:
205
# Ensure our dbh is closed before we fork anything.
206
# Causes problems on some platforms (Solaris+Postgres)
209
# block signal for fork
210
$sigset = POSIX::SigSet->new(SIGINT);
211
sigprocmask(SIG_BLOCK, $sigset)
212
or return error("Can't block SIGINT for fork: $!");
214
socketpair(my $parents_ipc, my $childs_ipc, AF_UNIX, SOCK_STREAM, PF_UNSPEC )
215
or die( "Sockpair failed" );
217
return error("fork failed creating $job: $!")
218
unless defined ($pid = fork);
220
# enable auto-flush, so it's not pipe-buffered between parent/child
221
select((select( $parents_ipc ), $|++)[0]);
222
select((select( $childs_ipc ), $|++)[0]);
226
sigprocmask(SIG_UNBLOCK, $sigset)
227
or return error("Can't unblock SIGINT for fork: $!");
229
close($childs_ipc); # unnecessary but explicit
230
IO::Handle::blocking($parents_ipc, 0);
232
my $worker_conn = MogileFS::Connection::Worker->new($parents_ipc);
233
$worker_conn->pid($pid);
234
$worker_conn->job($job);
235
MogileFS::ProcManager->RegisterWorkerConn($worker_conn);
239
# as a child, we want to close these and ignore them
240
$_->() foreach @prefork_cleanup;
244
$SIG{INT} = 'DEFAULT';
245
$SIG{TERM} = 'DEFAULT';
249
sigprocmask(SIG_UNBLOCK, $sigset)
250
or return error("Can't unblock SIGINT for fork: $!");
252
# now call our job function
253
my $class = MogileFS::ProcManager->job_to_class($job)
254
or die "No worker class defined for job '$job'\n";
255
my $worker = $class->new($childs_ipc);
257
# set our frontend into child mode
258
MogileFS::ProcManager->SetAsChild($worker);
264
sub PendingQueryCount {
265
return scalar @PendingQueries;
268
sub BoredQueryWorkerCount {
269
return scalar @IdleQueryWorkers;
272
sub QueriesInProgressCount {
273
return scalar keys %Mappings;
276
# Toss in any queue depths.
278
for my $job (keys %pending_work) {
279
$Stats{'work_queue_for_' . $job} = @{$pending_work{$job}};
285
my ($class, $cb) = @_;
286
foreach my $job (sort keys %ChildrenByJob) {
287
my $ct = scalar(keys %{$ChildrenByJob{$job}});
288
$cb->($job, $ct, $jobs{$job}->[0], [ join(' ', sort { $a <=> $b } keys %{$ChildrenByJob{$job}}) ]);
292
sub foreach_pending_query {
293
my ($class, $cb) = @_;
294
foreach my $clq (@PendingQueries) {
295
$cb->($clq->[0], # client object,
296
$clq->[1], # "$ip $query"
302
my ($class, $job) = @_;
303
return defined $jobs{$job};
307
return sort keys %jobs;
310
sub request_job_process {
311
my ($class, $job, $n) = @_;
312
return 0 unless $class->is_valid_job($job);
313
return 0 if $job eq 'job_master' && $n > 1; # ghetto special case
315
$jobs{$job}->[0] = $n;
318
# try to clean out the queryworkers (if that's what we're doing?)
319
MogileFS::ProcManager->CullQueryWorkers
320
if $job eq 'queryworker';
322
# other workers listening off of a queue should be pinging parent
323
# frequently. shouldn't explicitly kill them.
327
# when a child is spawned, they'll have copies of all the data from the
328
# parent, but they don't need it. this method is called when you want
329
# to indicate that this procmanager is running on a child and should clean.
331
my ($class, $worker) = @_;
333
@IdleQueryWorkers = ();
334
@PendingQueries = ();
341
# and now kill off our event loop so that we don't waste time
342
Danga::Socket->SetPostLoopCallback(sub { return 0; });
345
# called when a child has died. a child is someone doing a job for us,
346
# but it might be a queryworker or any other type of job. we just want
347
# to remove them from our list of children. they're actually respawned
348
# by the make_new_child function elsewhere in Mgd.
351
foreach my $job (keys %ChildrenByJob) {
352
return if # bail out if we actually delete one
353
delete $ChildrenByJob{$job}->{$pid};
357
# called when a client dies. clients are users, management or non.
358
# we just want to remove them from the error reporting interface, if
359
# they happen to be part of it.
362
delete $ErrorsTo{$client->{fd}};
365
# called when the error function in Mgd is called and we're in the parent,
366
# so it's pretty simple that basically we just spit it out to folks listening
369
return unless %ErrorsTo;
371
my $msg = ":: ${$_[1]}\r\n";
372
foreach my $client (values %ErrorsTo) {
373
$client->write(\$msg);
377
sub RemoveErrorWatcher {
378
my ($class, $client) = @_;
379
return delete $ErrorsTo{$client->{fd}};
382
sub AddErrorWatcher {
383
my ($class, $client) = @_;
384
$ErrorsTo{$client->{fd}} = $client;
387
# one-time initialization of a new worker connection
388
sub RegisterWorkerConn {
389
my MogileFS::Connection::Worker $worker = $_[1];
390
$worker->watch_read(1);
392
#warn sprintf("Registering start-up of $worker (%s) [%d]\n", $worker->job, $worker->pid);
394
# now do any special case startup
395
if ($worker->job eq 'queryworker') {
396
MogileFS::ProcManager->NoteIdleQueryWorker($worker);
400
$ChildrenByJob{$worker->job}->{$worker->pid} = $worker;
404
sub EnqueueCommandRequest {
405
my ($class, $line, $client) = @_;
406
push @PendingQueries, [
408
($client->peer_ip_string || '0.0.0.0') . " $line"
410
MogileFS::ProcManager->ProcessQueues;
413
# puts a worker back in the queue, deleting any outstanding jobs in
414
# the mapping list for this fd.
415
sub NoteIdleQueryWorker {
416
# first arg is class, second is worker
417
my MogileFS::Connection::Worker $worker = $_[1];
418
delete $Mappings{$worker->{fd}};
420
# see if we need to kill off some workers
421
if (job_needs_reduction('queryworker')) {
422
Mgd::error("Reducing queryworker headcount by 1.");
423
MogileFS::ProcManager->AskWorkerToDie($worker);
427
# must be okay, so put it in the queue
428
push @IdleQueryWorkers, $worker;
429
MogileFS::ProcManager->ProcessQueues;
432
# if we need to kill off a worker, this function takes in the WorkerConn
433
# object, tells it to die, marks us as having requested its death, and decrements
434
# the count of running jobs.
436
my MogileFS::Connection::Worker $worker = $_[1];
437
note_pending_death($worker->job, $worker->pid);
438
$worker->write(":shutdown\r\n");
441
# kill bored query workers so we can get down to the level requested. this
442
# continues killing until we run out of folks to kill.
443
sub CullQueryWorkers {
444
while (@IdleQueryWorkers && job_needs_reduction('queryworker')) {
445
my MogileFS::Connection::Worker $worker = shift @IdleQueryWorkers;
446
MogileFS::ProcManager->AskWorkerToDie($worker);
450
# called when we get a response from a worker. this reenqueues the
451
# worker so it can handle another response as well as passes the answer
452
# back on to the client.
453
sub HandleQueryWorkerResponse {
454
# got a response from a worker
455
my MogileFS::Connection::Worker $worker;
457
(undef, $worker, $line) = @_;
459
return Mgd::error("ASSERT: ProcManager (Child) got worker response: $line") if $IsChild;
460
return unless $worker && $Mappings{$worker->{fd}};
462
# get the client we're working with (if any)
463
my ($client, $jobstr, $starttime) = @{ $Mappings{$worker->{fd}} };
465
# if we have no client, then we just got a standard message from
466
# the queryworker and need to pass it up the line
467
return MogileFS::ProcManager->HandleChildRequest($worker, $line) if !$client;
469
# at this point it was a command response, but if the client has gone
470
# away, just reenqueue this query worker
471
return MogileFS::ProcManager->NoteIdleQueryWorker($worker) if $client->{closed};
473
# <numeric id> [client-side time to complete] <response>
474
my ($time, $id, $res);
475
if ($line =~ /^(\d+-\d+)\s+(\-?\d+\.\d+)\s+(.+)$/) {
476
# save time and response for use later
477
# Note the optional negative sign in the regexp. Somebody
478
# on the mailing list was getting a time of -0.0000, causing
479
# broken connections.
480
($id, $time, $res) = ($1, $2, $3);
483
# now, if it doesn't match
484
unless ($id && $id eq "$worker->{pid}-$worker->{reqid}") {
485
$id = "<undef>" unless defined $id;
486
$line = "<undef>" unless defined $line;
489
Mgd::error("Worker responded with id $id (line: [$line]), but expected id $worker->{pid}-$worker->{reqid}, killing");
490
$client->close('worker_mismatch');
491
return MogileFS::ProcManager->AskWorkerToDie($worker);
494
# now time this interval and add to @RecentQueries
495
my $tinterval = Time::HiRes::time() - $starttime;
496
push @RecentQueries, sprintf("%s %.4f %s", $jobstr, $tinterval, $time);
497
shift @RecentQueries if scalar(@RecentQueries) > 50;
499
# send text to client, put worker back in queue
500
$client->write("$res\r\n");
501
MogileFS::ProcManager->NoteIdleQueryWorker($worker);
504
# new per-worker magic internal queue runner.
505
# TODO: Since this fires only when a master asks or a worker reports
506
# in bored, it should just operate on that *one* queue?
508
# new change: if worker in $job, but not in _bored, do not send work.
509
# if work is received, only delete from _bored
510
sub process_worker_queues {
513
JOB: while (my ($job, $queue) = each %pending_work) {
514
next JOB unless @$queue;
515
next JOB unless $idle_workers{$job} && keys %{$idle_workers{$job}};
516
WORKER: for my $worker_key (keys %{$idle_workers{$job}}) {
517
my MogileFS::Connection::Worker $worker =
518
delete $idle_workers{_bored}->{$worker_key};
519
if (!defined $worker || $worker->{closed}) {
520
delete $idle_workers{$job}->{$worker_key};
524
# allow workers to grab a linear range of work.
525
while (@$queue && $worker->wants_todo($job)) {
526
$worker->write(":queue_todo $job " . shift(@$queue) . "\r\n");
528
next JOB unless @$queue;
533
# called from various spots to empty the queues of available pairs.
537
# try to match up a client with a worker
538
while (@IdleQueryWorkers && @PendingQueries) {
539
# get client that isn't closed
541
while (!$clref && @PendingQueries) {
542
$clref = shift @PendingQueries
544
if ($clref->[0]->{closed}) {
551
# get worker and make sure it's not closed already
552
my MogileFS::Connection::Worker $worker = shift @IdleQueryWorkers;
553
if (!defined $worker || $worker->{closed}) {
554
unshift @PendingQueries, $clref;
558
# put in mapping and send data to worker
559
push @$clref, Time::HiRes::time();
560
$Mappings{$worker->{fd}} = $clref;
563
# increment our counter so we know what request counter this is going out
565
# so we're writing a string of the form:
566
# 123-455 10.2.3.123 get_paths foo=bar&blah=bar\r\n
567
$worker->write("$worker->{pid}-$worker->{reqid} $clref->[1]\r\n");
571
# send short descriptions of commands we support to the user
575
# send general purpose help
576
$client->write(<<HELP);
577
Mogilefsd admin commands:
579
!version Server version
580
!recent Recently executed queries and how long they took.
581
!queue Queries that are pending execution.
582
!stats General stats on what we\'re up to.
583
!watch Observe errors/messages from children.
584
!jobs Outstanding job counts, desired level, and pids.
585
!shutdown Immediately kill all of mogilefsd.
587
!to <job class> <message>
588
Send <message> to all workers of <job class>.
589
Mostly used for debugging.
591
!want <count> <job class>
592
Alter the level of workers of this class desired.
593
Example: !want 20 queryworker, !want 3 replicate.
594
See !jobs for what jobs are available.
600
# a child has contacted us with some command/status/something.
601
sub HandleChildRequest {
603
Mgd::fatal("ASSERT: child $_[2] shouldn't be getting requests from other children");
606
# if they have no job set, then their first line is what job they are
607
# and not a command. they also specify their pid, just so we know what
608
# connection goes with what pid, in case it's ever useful information.
609
my MogileFS::Connection::Worker $child = $_[1];
612
die "Child $child with no pid?" unless $child->job;
614
# at this point we've got a command of some sort
615
if ($cmd =~ /^error (.+)$/i) {
616
# pass it on to our error handler, prefaced with the child's job
617
Mgd::error("[" . $child->job . "(" . $child->pid . ")] $1");
619
} elsif ($cmd =~ /^debug (.+)$/i) {
620
# pass it on to our error handler, prefaced with the child's job
621
Mgd::debug("[" . $child->job . "(" . $child->pid . ")] $1");
623
} elsif ($cmd =~ /^:state_change (\w+) (\d+) (\w+)/) {
624
my ($what, $whatid, $state) = ($1, $2, $3);
625
state_change($what, $whatid, $state, $child);
627
} elsif ($cmd =~ /^queue_depth (\w+)/) {
630
for my $qname (keys %pending_work) {
631
my $depth = @{$pending_work{$qname}};
632
$child->write(":queue_depth $qname $depth\r\n");
636
if ($pending_work{$job}) {
637
$depth = @{$pending_work{$job}};
639
$child->write(":queue_depth $job $depth\r\n");
641
MogileFS::ProcManager->process_worker_queues;
642
} elsif ($cmd =~ /^queue_todo (\w+) (.+)/) {
644
$pending_work{$job} ||= [];
645
push(@{$pending_work{$job}}, $2);
646
# Don't process queues immediately, to allow batch processing.
647
} elsif ($cmd =~ /^worker_bored (\d+) (.+)/) {
650
if (job_needs_reduction($child->job)) {
651
MogileFS::ProcManager->AskWorkerToDie($child);
653
unless (exists $idle_workers{$child->job}) {
654
$idle_workers{$child->job} = {};
656
$idle_workers{_bored} ||= {};
657
$idle_workers{_bored}->{$child} = $child;
658
for my $type (split(/\s+/, $types)) {
659
$idle_workers{$type} ||= {};
660
$idle_workers{$type}->{$child}++;
661
$child->wants_todo($type, $batch);
663
MogileFS::ProcManager->process_worker_queues;
665
} elsif ($cmd eq ":ping") {
667
# warn sprintf("Job '%s' with pid %d is still alive at %d\n", $child->job, $child->pid, time());
669
# this command expects a reply, either to die or stay alive. beginning of worker's loops
670
if (job_needs_reduction($child->job)) {
671
MogileFS::ProcManager->AskWorkerToDie($child);
673
$child->write(":stay_alive\r\n");
676
} elsif ($cmd eq ":still_alive") {
679
} elsif ($cmd eq ":monitor_just_ran") {
680
send_monitor_has_run($child);
682
} elsif ($cmd =~ /^:wake_a (\w+)$/) {
684
MogileFS::ProcManager->wake_a($1, $child);
686
} elsif ($cmd =~ /^:invalidate_meta (\w+)/) {
689
MogileFS::ProcManager->send_to_all_children(":invalidate_meta_once $what", $child);
691
} elsif ($cmd =~ /^:set_config_from_child (\S+) (.+)/) {
692
# and this will rebroadcast it to all other children
693
# (including the one that just set it to us, but eh)
694
MogileFS::Config->set_config($1, $2);
695
} elsif (my ($devid, $util) = $cmd =~ /^:set_dev_utilization (\d+) (.+)/) {
696
$dev_util{$devid} = $util;
698
# time to rebroadcast dev utilization messages to all children?
699
if ($nowish > $last_util_spray + 3) {
700
$last_util_spray = $nowish;
701
MogileFS::ProcManager->send_to_all_children(":set_dev_utilization " . join(" ", %dev_util));
706
$show = substr($show, 0, 80) . "..." if length $cmd > 80;
707
Mgd::error("Unknown command [$show] from child; job=" . $child->job);
712
# ProcManager->ImmediateSendToChildrenByJob($class, $message, [ $child ])
713
# given a job class, and a message, send it to all children of that job. returns
714
# the number of children the message was sent to.
716
# if child is specified, the message will be sent to members of the job class that
717
# aren't that child. so you can exclude the one that originated the message.
719
# doesn't add to queue of things child gets on next interactive command: writes immediately
720
# (won't get in middle of partial write, though, as danga::socket queues things up)
722
# if $just_one is specified, only a single process is notified, then we stop.
723
sub ImmediateSendToChildrenByJob {
724
my ($pkg, $class, $msg, $exclude_child, $just_one) = @_;
726
my $childref = $ChildrenByJob{$class};
727
return 0 unless defined $childref && %$childref;
729
foreach my $child (values %$childref) {
730
# ignore the child specified as the third arg if one is sent
731
next if $exclude_child && $exclude_child == $child;
733
# send the message to this child
734
$child->write("$msg\r\n");
735
return 1 if $just_one;
737
return scalar(keys %$childref);
740
# called when we notice that a worker has bit it. we might have to restart a
741
# job that they had been working on.
742
sub NoteDeadWorkerConn {
745
# get parms and error check
746
my MogileFS::Connection::Worker $worker = $_[1];
747
return unless $worker;
749
my $fd = $worker->{fd};
750
return unless defined($fd);
752
# if there's a mapping for this worker's fd, they had a job that didn't get done
753
if ($Mappings{$fd}) {
754
# unshift, since this one already went through the queue once
755
unshift @PendingQueries, $Mappings{$worker->{fd}};
756
delete $Mappings{$worker->{fd}};
758
# now try to get it processing again
759
MogileFS::ProcManager->ProcessQueues;
763
# given (job, pid), record that this worker is about to die
764
# $level is so we can tell if watchdog requested the death.
765
sub note_pending_death {
766
my ($job, $pid, $level) = @_;
768
die "$job not defined in call to note_pending_death.\n"
769
unless defined $jobs{$job};
772
# don't double decrement.
773
$jobs{$job}->[1]-- unless $todie{$pid};
774
$todie{$pid} = $level;
777
# see if we should reduce the number of active children
778
sub job_needs_reduction {
780
return $jobs{$job}->[0] < $jobs{$job}->[1];
788
my ($what, $whatid, $state, $exclude) = @_;
789
my $key = "$what-$whatid";
791
foreach my $child (values %child) {
792
my $old = $child->{known_state}{$key} || "";
793
if (!$old || $old->[1] ne $state || $old->[0] < $now - 300) {
794
$child->{known_state}{$key} = [$now, $state];
796
$child->write(":state_change $what $whatid $state\r\n")
797
unless $exclude && $child == $exclude;
803
my ($pkg, $class, $fromchild) = @_; # from arg is optional (which child sent it)
804
my $child = MogileFS::ProcManager->is_child;
806
$child->wake_a($class);
808
MogileFS::ProcManager->ImmediateSendToChildrenByJob($class, ":wake_up", $fromchild, "just_one");
812
sub send_to_all_children {
813
my ($pkg, $msg, $exclude) = @_;
814
foreach my $child (values %child) {
815
next if $exclude && $child == $exclude;
816
$child->write("$msg\r\n");
820
sub send_monitor_has_run {
822
for my $type (qw(replicate fsck queryworker delete)) {
823
MogileFS::ProcManager->ImmediateSendToChildrenByJob($type, ":monitor_has_run", $child);
832
# indent-tabs-mode: nil