~vcs-imports/mogilefs/trunk

« back to all changes in this revision

Viewing changes to server/lib/MogileFS/ProcManager.pm

  • Committer: hachi
  • Date: 2011-05-27 23:40:03 UTC
  • Revision ID: hachi-20110527234003-hioplx58nt6zb2mx
This has been moved to http://github.com/mogilefs/MogileFS-Server/

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
package MogileFS::ProcManager;
2
 
use strict;
3
 
use warnings;
4
 
use POSIX qw(:sys_wait_h sigprocmask SIGINT SIG_BLOCK SIG_UNBLOCK);
5
 
use Symbol;
6
 
use Socket;
7
 
use MogileFS::Connection::Client;
8
 
use MogileFS::Connection::Worker;
9
 
 
10
 
# This class handles keeping lists of workers and clients and
11
 
# assigning them to each other when things happen.  You don't actually
12
 
# instantiate a procmanager.  the class itself holds all state.
13
 
 
14
 
# Mappings: fd => [ clientref, jobstring, starttime ]
15
 
# queues are just lists of Client class objects
16
 
# ChildrenByJob: job => { pid => $client }
17
 
# ErrorsTo: fid => Client
18
 
# RecentQueries: [ string, string, string, ... ]
19
 
# Stats: element => number
20
 
our ($IsChild, @RecentQueries,
21
 
     %Mappings, %ChildrenByJob, %ErrorsTo, %Stats);
22
 
 
23
 
our $starttime = time(); # time we got going
24
 
sub server_starttime { return $starttime }
25
 
 
26
 
my @IdleQueryWorkers;  # workers that are idle, able to process commands  (MogileFS::Worker::Query, ...)
27
 
my @PendingQueries;    # [ MogileFS::Connection::Client, "$ip $query" ]
28
 
 
29
 
my %idle_workers = (); # 'job' -> {href of idle workers}
30
 
my %pending_work = (); # 'job' -> [aref of pending work]
31
 
 
32
 
$IsChild = 0;  # either false if we're the parent, or a MogileFS::Worker object
33
 
 
34
 
# keep track of what all child pids are doing, and what jobs are being
35
 
# satisifed.
36
 
my %child  = ();    # pid -> MogileFS::Connection::Worker
37
 
my %todie  = ();    # pid -> 1 (lists pids that we've asked to die)
38
 
my %jobs   = ();    # jobname -> [ min, current ]
39
 
 
40
 
our $allkidsup = 0;  # if true, all our kids are running. set to 0 when a kid dies.
41
 
 
42
 
my @prefork_cleanup;  # subrefs to run to clean stuff up before we make a new child
43
 
 
44
 
*error = \&Mgd::error;
45
 
 
46
 
my %dev_util;         # devid -> utilization
47
 
my $last_util_spray = 0;  # time we lost spread %dev_util to children
48
 
 
49
 
my $nowish;  # updated approximately once per second
50
 
 
51
 
sub push_pre_fork_cleanup {
52
 
    my ($class, $code) = @_;
53
 
    push @prefork_cleanup, $code;
54
 
}
55
 
 
56
 
sub RecentQueries {
57
 
    return @RecentQueries;
58
 
}
59
 
 
60
 
sub write_pidfile {
61
 
    my $class = shift;
62
 
    my $pidfile = MogileFS->config("pidfile")
63
 
        or return 1;
64
 
    my $fh;
65
 
    unless (open($fh, ">$pidfile")) {
66
 
        Mgd::log('err', "couldn't create pidfile '$pidfile': $!");
67
 
        return 0;
68
 
    }
69
 
    unless ((print $fh "$$\n") && close($fh)) {
70
 
        Mgd::log('err', "couldn't write into pidfile '$pidfile': $!");
71
 
        remove_pidfile();
72
 
        return 0;
73
 
    }
74
 
    return 1;
75
 
}
76
 
 
77
 
sub remove_pidfile {
78
 
    my $class = shift;
79
 
    my $pidfile = MogileFS->config("pidfile")
80
 
        or return;
81
 
    unlink $pidfile;
82
 
    return 1;
83
 
}
84
 
 
85
 
sub set_min_workers {
86
 
    my ($class, $job, $min) = @_;
87
 
    $jobs{$job} ||= [undef, 0];   # [min, current]
88
 
    $jobs{$job}->[0] = $min;
89
 
 
90
 
    # TODO: set allkipsup false, so spawner re-checks?
91
 
}
92
 
 
93
 
sub job_to_class_suffix {
94
 
    my ($class, $job) = @_;
95
 
    return {
96
 
        fsck        => "Fsck",
97
 
        queryworker => "Query",
98
 
        delete      => "Delete",
99
 
        replicate   => "Replicate",
100
 
        reaper      => "Reaper",
101
 
        monitor     => "Monitor",
102
 
        job_master  => "JobMaster",
103
 
    }->{$job};
104
 
}
105
 
 
106
 
sub job_to_class {
107
 
    my ($class, $job) = @_;
108
 
    my $suffix = $class->job_to_class_suffix($job) or return "";
109
 
    return "MogileFS::Worker::$suffix";
110
 
}
111
 
 
112
 
sub child_pids {
113
 
    return keys %child;
114
 
}
115
 
 
116
 
sub WatchDog {
117
 
    foreach my $pid (keys %child) {
118
 
        my MogileFS::Connection::Worker $child = $child{$pid};
119
 
        my $healthy = $child->watchdog_check;
120
 
        next if $healthy;
121
 
 
122
 
        # special $todie level of 2 means the watchdog tried to kill it.
123
 
        # TODO: Should be a CONSTANT?
124
 
        next if $todie{$pid} && $todie{$pid} == 2;
125
 
        note_pending_death($child->job, $pid, 2);
126
 
 
127
 
        error("Watchdog killing worker $pid (" . $child->job . ")");
128
 
        kill 9, $pid;
129
 
    }
130
 
}
131
 
 
132
 
# returns a sub that Danga::Socket calls after each event loop round.
133
 
# the sub must return 1 for the program to continue running.
134
 
sub PostEventLoopChecker {
135
 
    my $lastspawntime = 0; # time we last ran spawn_children sub
136
 
 
137
 
    return sub {
138
 
        # run only once per second
139
 
        $nowish = time();
140
 
        return 1 unless $nowish > $lastspawntime;
141
 
        $lastspawntime = $nowish;
142
 
 
143
 
        MogileFS::ProcManager->WatchDog;
144
 
 
145
 
        # see if anybody has died, but don't hang up on doing so
146
 
        while(my $pid = waitpid -1, WNOHANG) {
147
 
            last unless $pid > 0;
148
 
            $allkidsup = 0; # know something died
149
 
 
150
 
            # when a child dies, figure out what it was doing
151
 
            # and note that job has one less worker
152
 
            my $jobconn;
153
 
            if (($jobconn = delete $child{$pid})) {
154
 
                my $job = $jobconn->job;
155
 
                my $extra = $todie{$pid} ? "expected" : "UNEXPECTED";
156
 
                error("Child $pid ($job) died: $? ($extra)");
157
 
                MogileFS::ProcManager->NoteDeadChild($pid);
158
 
                $jobconn->close;
159
 
 
160
 
                if (my $jobstat = $jobs{$job}) {
161
 
                    # if the pid is in %todie, then we have asked it to shut down
162
 
                    # and have already decremented the jobstat counter and don't
163
 
                    # want to do it again
164
 
                    unless (my $true = delete $todie{$pid}) {
165
 
                        # decrement the count of currently running jobs
166
 
                        $jobstat->[1]--;
167
 
                    }
168
 
                }
169
 
            }
170
 
        }
171
 
 
172
 
        return 1 if $allkidsup;
173
 
 
174
 
        # foreach job, fork enough children
175
 
        while (my ($job, $jobstat) = each %jobs) {
176
 
            my $need = $jobstat->[0] - $jobstat->[1];
177
 
            if ($need > 0) {
178
 
                error("Job $job has only $jobstat->[1], wants $jobstat->[0], making $need.");
179
 
                for (1..$need) {
180
 
                    my $jobconn = make_new_child($job)
181
 
                        or return 1;  # basically bail: true value keeps event loop running
182
 
                    $child{$jobconn->pid} = $jobconn;
183
 
 
184
 
                    # now increase the count of processes currently doing this job
185
 
                    $jobstat->[1]++;
186
 
                }
187
 
            }
188
 
        }
189
 
 
190
 
        # if we got this far, all jobs have been re-created.  note that
191
 
        # so we avoid more CPU usage in this post-event-loop callback later
192
 
        $allkidsup = 1;
193
 
 
194
 
        # true value keeps us running:
195
 
        return 1;
196
 
    };
197
 
}
198
 
 
199
 
sub make_new_child {
200
 
    my $job = shift;
201
 
 
202
 
    my $pid;
203
 
    my $sigset;
204
 
 
205
 
    # Ensure our dbh is closed before we fork anything.
206
 
    # Causes problems on some platforms (Solaris+Postgres)
207
 
    Mgd::close_store();
208
 
 
209
 
    # block signal for fork
210
 
    $sigset = POSIX::SigSet->new(SIGINT);
211
 
    sigprocmask(SIG_BLOCK, $sigset)
212
 
        or return error("Can't block SIGINT for fork: $!");
213
 
 
214
 
    socketpair(my $parents_ipc, my $childs_ipc, AF_UNIX, SOCK_STREAM, PF_UNSPEC )
215
 
        or die( "Sockpair failed" );
216
 
 
217
 
    return error("fork failed creating $job: $!")
218
 
        unless defined ($pid = fork);
219
 
 
220
 
    # enable auto-flush, so it's not pipe-buffered between parent/child
221
 
    select((select( $parents_ipc ), $|++)[0]);
222
 
    select((select( $childs_ipc  ), $|++)[0]);
223
 
 
224
 
    # if i'm the parent
225
 
    if ($pid) {
226
 
        sigprocmask(SIG_UNBLOCK, $sigset)
227
 
            or return error("Can't unblock SIGINT for fork: $!");
228
 
 
229
 
        close($childs_ipc);  # unnecessary but explicit
230
 
        IO::Handle::blocking($parents_ipc, 0);
231
 
 
232
 
        my $worker_conn = MogileFS::Connection::Worker->new($parents_ipc);
233
 
        $worker_conn->pid($pid);
234
 
        $worker_conn->job($job);
235
 
        MogileFS::ProcManager->RegisterWorkerConn($worker_conn);
236
 
        return $worker_conn;
237
 
    }
238
 
 
239
 
    # as a child, we want to close these and ignore them
240
 
    $_->() foreach @prefork_cleanup;
241
 
    close($parents_ipc);
242
 
    undef $parents_ipc;
243
 
 
244
 
    $SIG{INT} = 'DEFAULT';
245
 
    $SIG{TERM} = 'DEFAULT';
246
 
    $0 .= " [$job]";
247
 
 
248
 
    # unblock signals
249
 
    sigprocmask(SIG_UNBLOCK, $sigset)
250
 
        or return error("Can't unblock SIGINT for fork: $!");
251
 
 
252
 
    # now call our job function
253
 
    my $class = MogileFS::ProcManager->job_to_class($job)
254
 
        or die "No worker class defined for job '$job'\n";
255
 
    my $worker = $class->new($childs_ipc);
256
 
 
257
 
    # set our frontend into child mode
258
 
    MogileFS::ProcManager->SetAsChild($worker);
259
 
 
260
 
    $worker->work;
261
 
    exit 0;
262
 
}
263
 
 
264
 
sub PendingQueryCount {
265
 
    return scalar @PendingQueries;
266
 
}
267
 
 
268
 
sub BoredQueryWorkerCount {
269
 
    return scalar @IdleQueryWorkers;
270
 
}
271
 
 
272
 
sub QueriesInProgressCount {
273
 
    return scalar keys %Mappings;
274
 
}
275
 
 
276
 
# Toss in any queue depths.
277
 
sub StatsHash {
278
 
    for my $job (keys %pending_work) {
279
 
        $Stats{'work_queue_for_' . $job} = @{$pending_work{$job}};
280
 
    }
281
 
    return \%Stats;
282
 
}
283
 
 
284
 
sub foreach_job {
285
 
    my ($class, $cb) = @_;
286
 
    foreach my $job (sort keys %ChildrenByJob) {
287
 
        my $ct = scalar(keys %{$ChildrenByJob{$job}});
288
 
        $cb->($job, $ct, $jobs{$job}->[0], [ join(' ', sort { $a <=> $b } keys %{$ChildrenByJob{$job}}) ]);
289
 
    }
290
 
}
291
 
 
292
 
sub foreach_pending_query {
293
 
    my ($class, $cb) = @_;
294
 
    foreach my $clq (@PendingQueries) {
295
 
        $cb->($clq->[0],  # client object,
296
 
              $clq->[1],  # "$ip $query"
297
 
              );
298
 
    }
299
 
}
300
 
 
301
 
sub is_valid_job {
302
 
    my ($class, $job) = @_;
303
 
    return defined $jobs{$job};
304
 
}
305
 
 
306
 
sub valid_jobs {
307
 
    return sort keys %jobs;
308
 
}
309
 
 
310
 
sub request_job_process {
311
 
    my ($class, $job, $n) = @_;
312
 
    return 0 unless $class->is_valid_job($job);
313
 
    return 0 if $job eq 'job_master' && $n > 1; # ghetto special case
314
 
 
315
 
    $jobs{$job}->[0] = $n;
316
 
    $allkidsup = 0;
317
 
 
318
 
    # try to clean out the queryworkers (if that's what we're doing?)
319
 
    MogileFS::ProcManager->CullQueryWorkers
320
 
        if $job eq 'queryworker';
321
 
 
322
 
    # other workers listening off of a queue should be pinging parent
323
 
    # frequently. shouldn't explicitly kill them.
324
 
}
325
 
 
326
 
 
327
 
# when a child is spawned, they'll have copies of all the data from the
328
 
# parent, but they don't need it.  this method is called when you want
329
 
# to indicate that this procmanager is running on a child and should clean.
330
 
sub SetAsChild {
331
 
    my ($class, $worker) = @_;
332
 
 
333
 
    @IdleQueryWorkers = ();
334
 
    @PendingQueries = ();
335
 
    %Mappings = ();
336
 
    $IsChild = $worker;
337
 
    %ErrorsTo = ();
338
 
    %idle_workers = ();
339
 
    %pending_work = ();
340
 
 
341
 
    # and now kill off our event loop so that we don't waste time
342
 
    Danga::Socket->SetPostLoopCallback(sub { return 0; });
343
 
}
344
 
 
345
 
# called when a child has died.  a child is someone doing a job for us,
346
 
# but it might be a queryworker or any other type of job.  we just want
347
 
# to remove them from our list of children.  they're actually respawned
348
 
# by the make_new_child function elsewhere in Mgd.
349
 
sub NoteDeadChild {
350
 
    my $pid = $_[1];
351
 
    foreach my $job (keys %ChildrenByJob) {
352
 
        return if # bail out if we actually delete one
353
 
            delete $ChildrenByJob{$job}->{$pid};
354
 
    }
355
 
}
356
 
 
357
 
# called when a client dies.  clients are users, management or non.
358
 
# we just want to remove them from the error reporting interface, if
359
 
# they happen to be part of it.
360
 
sub NoteDeadClient {
361
 
    my $client = $_[1];
362
 
    delete $ErrorsTo{$client->{fd}};
363
 
}
364
 
 
365
 
# called when the error function in Mgd is called and we're in the parent,
366
 
# so it's pretty simple that basically we just spit it out to folks listening
367
 
# to errors
368
 
sub NoteError {
369
 
    return unless %ErrorsTo;
370
 
 
371
 
    my $msg = ":: ${$_[1]}\r\n";
372
 
    foreach my $client (values %ErrorsTo) {
373
 
        $client->write(\$msg);
374
 
    }
375
 
}
376
 
 
377
 
sub RemoveErrorWatcher {
378
 
    my ($class, $client) = @_;
379
 
    return delete $ErrorsTo{$client->{fd}};
380
 
}
381
 
 
382
 
sub AddErrorWatcher {
383
 
    my ($class, $client) = @_;
384
 
    $ErrorsTo{$client->{fd}} = $client;
385
 
}
386
 
 
387
 
# one-time initialization of a new worker connection
388
 
sub RegisterWorkerConn {
389
 
    my MogileFS::Connection::Worker $worker = $_[1];
390
 
    $worker->watch_read(1);
391
 
 
392
 
    #warn sprintf("Registering start-up of $worker (%s) [%d]\n", $worker->job, $worker->pid);
393
 
 
394
 
    # now do any special case startup
395
 
    if ($worker->job eq 'queryworker') {
396
 
        MogileFS::ProcManager->NoteIdleQueryWorker($worker);
397
 
    }
398
 
 
399
 
    # add to normal list
400
 
    $ChildrenByJob{$worker->job}->{$worker->pid} = $worker;
401
 
 
402
 
}
403
 
 
404
 
sub EnqueueCommandRequest {
405
 
    my ($class, $line, $client) = @_;
406
 
    push @PendingQueries, [
407
 
                           $client,
408
 
                           ($client->peer_ip_string || '0.0.0.0') . " $line"
409
 
                           ];
410
 
    MogileFS::ProcManager->ProcessQueues;
411
 
}
412
 
 
413
 
# puts a worker back in the queue, deleting any outstanding jobs in
414
 
# the mapping list for this fd.
415
 
sub NoteIdleQueryWorker {
416
 
    # first arg is class, second is worker
417
 
    my MogileFS::Connection::Worker $worker = $_[1];
418
 
    delete $Mappings{$worker->{fd}};
419
 
 
420
 
    # see if we need to kill off some workers
421
 
    if (job_needs_reduction('queryworker')) {
422
 
        Mgd::error("Reducing queryworker headcount by 1.");
423
 
        MogileFS::ProcManager->AskWorkerToDie($worker);
424
 
        return;
425
 
    }
426
 
 
427
 
    # must be okay, so put it in the queue
428
 
    push @IdleQueryWorkers, $worker;
429
 
    MogileFS::ProcManager->ProcessQueues;
430
 
}
431
 
 
432
 
# if we need to kill off a worker, this function takes in the WorkerConn
433
 
# object, tells it to die, marks us as having requested its death, and decrements
434
 
# the count of running jobs.
435
 
sub AskWorkerToDie {
436
 
    my MogileFS::Connection::Worker $worker = $_[1];
437
 
    note_pending_death($worker->job, $worker->pid);
438
 
    $worker->write(":shutdown\r\n");
439
 
}
440
 
 
441
 
# kill bored query workers so we can get down to the level requested.  this
442
 
# continues killing until we run out of folks to kill.
443
 
sub CullQueryWorkers {
444
 
    while (@IdleQueryWorkers && job_needs_reduction('queryworker')) {
445
 
        my MogileFS::Connection::Worker $worker = shift @IdleQueryWorkers;
446
 
        MogileFS::ProcManager->AskWorkerToDie($worker);
447
 
    }
448
 
}
449
 
 
450
 
# called when we get a response from a worker.  this reenqueues the
451
 
# worker so it can handle another response as well as passes the answer
452
 
# back on to the client.
453
 
sub HandleQueryWorkerResponse {
454
 
    # got a response from a worker
455
 
    my MogileFS::Connection::Worker $worker;
456
 
    my $line;
457
 
    (undef, $worker, $line) = @_;
458
 
 
459
 
    return Mgd::error("ASSERT: ProcManager (Child) got worker response: $line") if $IsChild;
460
 
    return unless $worker && $Mappings{$worker->{fd}};
461
 
 
462
 
    # get the client we're working with (if any)
463
 
    my ($client, $jobstr, $starttime) = @{ $Mappings{$worker->{fd}} };
464
 
 
465
 
    # if we have no client, then we just got a standard message from
466
 
    # the queryworker and need to pass it up the line
467
 
    return MogileFS::ProcManager->HandleChildRequest($worker, $line) if !$client;
468
 
 
469
 
    # at this point it was a command response, but if the client has gone
470
 
    # away, just reenqueue this query worker
471
 
    return MogileFS::ProcManager->NoteIdleQueryWorker($worker) if $client->{closed};
472
 
 
473
 
    # <numeric id> [client-side time to complete] <response>
474
 
    my ($time, $id, $res);
475
 
    if ($line =~ /^(\d+-\d+)\s+(\-?\d+\.\d+)\s+(.+)$/) {
476
 
        # save time and response for use later
477
 
        # Note the optional negative sign in the regexp.  Somebody
478
 
        # on the mailing list was getting a time of -0.0000, causing
479
 
        # broken connections.
480
 
        ($id, $time, $res) = ($1, $2, $3);
481
 
    }
482
 
 
483
 
    # now, if it doesn't match
484
 
    unless ($id && $id eq "$worker->{pid}-$worker->{reqid}") {
485
 
        $id   = "<undef>" unless defined $id;
486
 
        $line = "<undef>" unless defined $line;
487
 
        $line =~ s/\n/\\n/g;
488
 
        $line =~ s/\r/\\r/g;
489
 
        Mgd::error("Worker responded with id $id (line: [$line]), but expected id $worker->{pid}-$worker->{reqid}, killing");
490
 
        $client->close('worker_mismatch');
491
 
        return MogileFS::ProcManager->AskWorkerToDie($worker);
492
 
    }
493
 
 
494
 
    # now time this interval and add to @RecentQueries
495
 
    my $tinterval = Time::HiRes::time() - $starttime;
496
 
    push @RecentQueries, sprintf("%s %.4f %s", $jobstr, $tinterval, $time);
497
 
    shift @RecentQueries if scalar(@RecentQueries) > 50;
498
 
 
499
 
    # send text to client, put worker back in queue
500
 
    $client->write("$res\r\n");
501
 
    MogileFS::ProcManager->NoteIdleQueryWorker($worker);
502
 
}
503
 
 
504
 
# new per-worker magic internal queue runner.
505
 
# TODO: Since this fires only when a master asks or a worker reports
506
 
# in bored, it should just operate on that *one* queue?
507
 
#
508
 
# new change: if worker in $job, but not in _bored, do not send work.
509
 
# if work is received, only delete from _bored
510
 
sub process_worker_queues {
511
 
    return if $IsChild;
512
 
 
513
 
    JOB: while (my ($job, $queue) = each %pending_work) {
514
 
        next JOB unless @$queue;
515
 
        next JOB unless $idle_workers{$job} && keys %{$idle_workers{$job}};
516
 
        WORKER: for my $worker_key (keys %{$idle_workers{$job}}) {
517
 
            my MogileFS::Connection::Worker $worker = 
518
 
                delete $idle_workers{_bored}->{$worker_key};
519
 
            if (!defined $worker || $worker->{closed}) {
520
 
                delete $idle_workers{$job}->{$worker_key};
521
 
                next WORKER;
522
 
            }
523
 
 
524
 
            # allow workers to grab a linear range of work.
525
 
            while (@$queue && $worker->wants_todo($job)) {
526
 
                $worker->write(":queue_todo $job " . shift(@$queue) . "\r\n");
527
 
            }
528
 
            next JOB unless @$queue;
529
 
        }
530
 
    }
531
 
}
532
 
 
533
 
# called from various spots to empty the queues of available pairs.
534
 
sub ProcessQueues {
535
 
    return if $IsChild;
536
 
 
537
 
    # try to match up a client with a worker
538
 
    while (@IdleQueryWorkers && @PendingQueries) {
539
 
        # get client that isn't closed
540
 
        my $clref;
541
 
        while (!$clref && @PendingQueries) {
542
 
            $clref = shift @PendingQueries
543
 
                or next;
544
 
            if ($clref->[0]->{closed}) {
545
 
                $clref = undef;
546
 
                next;
547
 
            }
548
 
        }
549
 
        next unless $clref;
550
 
 
551
 
        # get worker and make sure it's not closed already
552
 
        my MogileFS::Connection::Worker $worker = shift @IdleQueryWorkers;
553
 
        if (!defined $worker || $worker->{closed}) {
554
 
            unshift @PendingQueries, $clref;
555
 
            next;
556
 
        }
557
 
 
558
 
        # put in mapping and send data to worker
559
 
        push @$clref, Time::HiRes::time();
560
 
        $Mappings{$worker->{fd}} = $clref;
561
 
        $Stats{queries}++;
562
 
 
563
 
        # increment our counter so we know what request counter this is going out
564
 
        $worker->{reqid}++;
565
 
        # so we're writing a string of the form:
566
 
        #     123-455 10.2.3.123 get_paths foo=bar&blah=bar\r\n
567
 
        $worker->write("$worker->{pid}-$worker->{reqid} $clref->[1]\r\n");
568
 
    }
569
 
}
570
 
 
571
 
# send short descriptions of commands we support to the user
572
 
sub SendHelp {
573
 
    my $client = $_[1];
574
 
 
575
 
    # send general purpose help
576
 
    $client->write(<<HELP);
577
 
Mogilefsd admin commands:
578
 
 
579
 
    !version    Server version
580
 
    !recent     Recently executed queries and how long they took.
581
 
    !queue      Queries that are pending execution.
582
 
    !stats      General stats on what we\'re up to.
583
 
    !watch      Observe errors/messages from children.
584
 
    !jobs       Outstanding job counts, desired level, and pids.
585
 
    !shutdown   Immediately kill all of mogilefsd.
586
 
 
587
 
    !to <job class> <message>
588
 
                Send <message> to all workers of <job class>.
589
 
                Mostly used for debugging.
590
 
 
591
 
    !want <count> <job class>
592
 
                Alter the level of workers of this class desired.
593
 
                Example: !want 20 queryworker, !want 3 replicate.
594
 
                See !jobs for what jobs are available.
595
 
 
596
 
HELP
597
 
 
598
 
}
599
 
 
600
 
# a child has contacted us with some command/status/something.
601
 
sub HandleChildRequest {
602
 
    if ($IsChild) {
603
 
        Mgd::fatal("ASSERT: child $_[2] shouldn't be getting requests from other children");
604
 
    }
605
 
 
606
 
    # if they have no job set, then their first line is what job they are
607
 
    # and not a command.  they also specify their pid, just so we know what
608
 
    # connection goes with what pid, in case it's ever useful information.
609
 
    my MogileFS::Connection::Worker $child = $_[1];
610
 
    my $cmd = $_[2];
611
 
 
612
 
    die "Child $child with no pid?" unless $child->job;
613
 
 
614
 
    # at this point we've got a command of some sort
615
 
    if ($cmd =~ /^error (.+)$/i) {
616
 
        # pass it on to our error handler, prefaced with the child's job
617
 
        Mgd::error("[" . $child->job . "(" . $child->pid . ")] $1");
618
 
 
619
 
    } elsif ($cmd =~ /^debug (.+)$/i) {
620
 
        # pass it on to our error handler, prefaced with the child's job
621
 
        Mgd::debug("[" . $child->job . "(" . $child->pid . ")] $1");
622
 
 
623
 
    } elsif ($cmd =~ /^:state_change (\w+) (\d+) (\w+)/) {
624
 
        my ($what, $whatid, $state) = ($1, $2, $3);
625
 
        state_change($what, $whatid, $state, $child);
626
 
 
627
 
    } elsif ($cmd =~ /^queue_depth (\w+)/) {
628
 
        my $job   = $1;
629
 
        if ($job eq 'all') {
630
 
            for my $qname (keys %pending_work) {
631
 
                my $depth = @{$pending_work{$qname}};
632
 
                $child->write(":queue_depth $qname $depth\r\n");
633
 
            }
634
 
        } else {
635
 
            my $depth = 0;
636
 
            if ($pending_work{$job}) {
637
 
                $depth = @{$pending_work{$job}};
638
 
            }
639
 
            $child->write(":queue_depth $job $depth\r\n");
640
 
        }
641
 
        MogileFS::ProcManager->process_worker_queues;
642
 
    } elsif ($cmd =~ /^queue_todo (\w+) (.+)/) {
643
 
        my $job = $1;
644
 
        $pending_work{$job} ||= [];
645
 
        push(@{$pending_work{$job}}, $2);
646
 
        # Don't process queues immediately, to allow batch processing.
647
 
    } elsif ($cmd =~ /^worker_bored (\d+) (.+)/) {
648
 
        my $batch = $1;
649
 
        my $types = $2;
650
 
        if (job_needs_reduction($child->job)) {
651
 
            MogileFS::ProcManager->AskWorkerToDie($child);
652
 
        } else {
653
 
            unless (exists $idle_workers{$child->job}) {
654
 
                $idle_workers{$child->job} = {};
655
 
            }
656
 
            $idle_workers{_bored} ||= {};
657
 
            $idle_workers{_bored}->{$child} = $child;
658
 
            for my $type (split(/\s+/, $types)) {
659
 
                $idle_workers{$type} ||= {};
660
 
                $idle_workers{$type}->{$child}++;
661
 
                $child->wants_todo($type, $batch);
662
 
            }
663
 
            MogileFS::ProcManager->process_worker_queues;
664
 
        }
665
 
    } elsif ($cmd eq ":ping") {
666
 
 
667
 
        # warn sprintf("Job '%s' with pid %d is still alive at %d\n", $child->job, $child->pid, time());
668
 
 
669
 
        # this command expects a reply, either to die or stay alive.  beginning of worker's loops
670
 
        if (job_needs_reduction($child->job)) {
671
 
            MogileFS::ProcManager->AskWorkerToDie($child);
672
 
        } else {
673
 
            $child->write(":stay_alive\r\n");
674
 
        }
675
 
 
676
 
    } elsif ($cmd eq ":still_alive") {
677
 
        # a no-op
678
 
 
679
 
    } elsif ($cmd eq ":monitor_just_ran") {
680
 
        send_monitor_has_run($child);
681
 
 
682
 
    } elsif ($cmd =~ /^:wake_a (\w+)$/) {
683
 
 
684
 
        MogileFS::ProcManager->wake_a($1, $child);
685
 
 
686
 
    } elsif ($cmd =~ /^:invalidate_meta (\w+)/) {
687
 
 
688
 
        my $what = $1;
689
 
        MogileFS::ProcManager->send_to_all_children(":invalidate_meta_once $what", $child);
690
 
 
691
 
    } elsif ($cmd =~ /^:set_config_from_child (\S+) (.+)/) {
692
 
        # and this will rebroadcast it to all other children
693
 
        # (including the one that just set it to us, but eh)
694
 
        MogileFS::Config->set_config($1, $2);
695
 
    } elsif (my ($devid, $util) = $cmd =~ /^:set_dev_utilization (\d+) (.+)/) {
696
 
        $dev_util{$devid} = $util;
697
 
 
698
 
        # time to rebroadcast dev utilization messages to all children?
699
 
        if ($nowish > $last_util_spray + 3) {
700
 
            $last_util_spray = $nowish;
701
 
            MogileFS::ProcManager->send_to_all_children(":set_dev_utilization " . join(" ", %dev_util));
702
 
        }
703
 
    } else {
704
 
        # unknown command
705
 
        my $show = $cmd;
706
 
        $show = substr($show, 0, 80) . "..." if length $cmd > 80;
707
 
        Mgd::error("Unknown command [$show] from child; job=" . $child->job);
708
 
    }
709
 
}
710
 
 
711
 
# Class method.
712
 
#   ProcManager->ImmediateSendToChildrenByJob($class, $message, [ $child ])
713
 
# given a job class, and a message, send it to all children of that job.  returns
714
 
# the number of children the message was sent to.
715
 
#
716
 
# if child is specified, the message will be sent to members of the job class that
717
 
# aren't that child.  so you can exclude the one that originated the message.
718
 
#
719
 
# doesn't add to queue of things child gets on next interactive command: writes immediately
720
 
# (won't get in middle of partial write, though, as danga::socket queues things up)
721
 
#
722
 
# if $just_one is specified, only a single process is notified, then we stop.
723
 
sub ImmediateSendToChildrenByJob {
724
 
    my ($pkg, $class, $msg, $exclude_child, $just_one) = @_;
725
 
 
726
 
    my $childref = $ChildrenByJob{$class};
727
 
    return 0 unless defined $childref && %$childref;
728
 
 
729
 
    foreach my $child (values %$childref) {
730
 
        # ignore the child specified as the third arg if one is sent
731
 
        next if $exclude_child && $exclude_child == $child;
732
 
 
733
 
        # send the message to this child
734
 
        $child->write("$msg\r\n");
735
 
        return 1 if $just_one;
736
 
    }
737
 
    return scalar(keys %$childref);
738
 
}
739
 
 
740
 
# called when we notice that a worker has bit it.  we might have to restart a
741
 
# job that they had been working on.
742
 
sub NoteDeadWorkerConn {
743
 
    return if $IsChild;
744
 
 
745
 
    # get parms and error check
746
 
    my MogileFS::Connection::Worker $worker = $_[1];
747
 
    return unless $worker;
748
 
 
749
 
    my $fd = $worker->{fd};
750
 
    return unless defined($fd);
751
 
 
752
 
    # if there's a mapping for this worker's fd, they had a job that didn't get done
753
 
    if ($Mappings{$fd}) {
754
 
        # unshift, since this one already went through the queue once
755
 
        unshift @PendingQueries, $Mappings{$worker->{fd}};
756
 
        delete $Mappings{$worker->{fd}};
757
 
 
758
 
        # now try to get it processing again
759
 
        MogileFS::ProcManager->ProcessQueues;
760
 
    }
761
 
}
762
 
 
763
 
# given (job, pid), record that this worker is about to die
764
 
# $level is so we can tell if watchdog requested the death.
765
 
sub note_pending_death {
766
 
    my ($job, $pid, $level) = @_;
767
 
 
768
 
    die "$job not defined in call to note_pending_death.\n"
769
 
        unless defined $jobs{$job};
770
 
 
771
 
    $level ||= 1;
772
 
    # don't double decrement.
773
 
    $jobs{$job}->[1]-- unless $todie{$pid};
774
 
    $todie{$pid} = $level;
775
 
}
776
 
 
777
 
# see if we should reduce the number of active children
778
 
sub job_needs_reduction {
779
 
    my $job = shift;
780
 
    return $jobs{$job}->[0] < $jobs{$job}->[1];
781
 
}
782
 
 
783
 
sub is_child {
784
 
    return $IsChild;
785
 
}
786
 
 
787
 
sub state_change {
788
 
    my ($what, $whatid, $state, $exclude) = @_;
789
 
    my $key = "$what-$whatid";
790
 
    my $now = time();
791
 
    foreach my $child (values %child) {
792
 
        my $old = $child->{known_state}{$key} || "";
793
 
        if (!$old || $old->[1] ne $state || $old->[0] < $now - 300) {
794
 
            $child->{known_state}{$key} = [$now, $state];
795
 
 
796
 
            $child->write(":state_change $what $whatid $state\r\n")
797
 
                unless $exclude && $child == $exclude;
798
 
        }
799
 
    }
800
 
}
801
 
 
802
 
sub wake_a {
803
 
    my ($pkg, $class, $fromchild) = @_;  # from arg is optional (which child sent it)
804
 
    my $child = MogileFS::ProcManager->is_child;
805
 
    if ($child) {
806
 
        $child->wake_a($class);
807
 
    } else {
808
 
        MogileFS::ProcManager->ImmediateSendToChildrenByJob($class, ":wake_up", $fromchild, "just_one");
809
 
    }
810
 
}
811
 
 
812
 
sub send_to_all_children {
813
 
    my ($pkg, $msg, $exclude) = @_;
814
 
    foreach my $child (values %child) {
815
 
        next if $exclude && $child == $exclude;
816
 
        $child->write("$msg\r\n");
817
 
    }
818
 
}
819
 
 
820
 
sub send_monitor_has_run {
821
 
    my $child = shift;
822
 
    for my $type (qw(replicate fsck queryworker delete)) {
823
 
        MogileFS::ProcManager->ImmediateSendToChildrenByJob($type, ":monitor_has_run", $child);
824
 
    }
825
 
}
826
 
 
827
 
1;
828
 
 
829
 
# Local Variables:
830
 
# mode: perl
831
 
# c-basic-indent: 4
832
 
# indent-tabs-mode: nil
833
 
# End: