4
* Process pool for executing work queue
6
* @author Marat Komarov
9
class Scalr_System_Ipc_ProcessPool extends Scalr_Util_Observable {
23
public $workerTimeout;
25
public $workerMemoryLimit;
27
public $workerMemoryLimitTick = 10000; // 10 seconds
29
public $startupTimeout = 5000; // 5 seconds
31
public $termTimeout = 5000; // 5 seconds
33
public $preventParalleling;
35
private $nowWorkingSet;
39
protected $childs = array();
41
protected $isChild = false;
43
protected $childEventQueue;
45
protected $ready = false;
47
private static $termExitCode = 9;
51
private $stopForking = false;
57
protected $slippageLimit = 10;
59
private $slippage = 0;
61
private $cleanupComplete = false;
64
* @var Scalr_System_Ipc_Shm
68
const SHM_STARTUP_BARRIER = 0;
72
* @param array $config
74
* @key Scalr_DataQueue [workQueue]* Work queue. Must be multi-process safe (ex impl: Scalr_System_Ipc_ShmQueue)
75
* @key Scalr_System_Ipc_DefaultWorker [worker]*
76
* @key string [name] Pool name. Will be used instead of posix_getpid() as a ipc resources suffix
77
* @key int [startupTimeout] Time to wait when 'start' event will be received from all childs
78
* @key int [workTimeout] Max execution time for $worker->handleWork() (default infinity)
79
* @key int [workerTimeout] Max execution time for worker process (default infinity)
80
* @key int [termTimeout] Time to wait after sending SIGTERM to worker process (default 5 seconds)
81
* @key bool [daemonize] daemonize process pool (default false)
82
* @key bool [preventParalleling] Prevents same work parallel processing
83
* @key Scalr_Util_Set [nowWorkingSet] Set of currently blocked item values. Use with [preventParalleling] option
84
* @key int [slippageLimit] Maximum number of childs crash without processing messages from workQueue
85
* @key int [workerMemoryLimit] Memory limit for worker process
86
* @key int [workerMemoryLimitTick] Tick time for worker memory limit check
92
* @return Scalr_System_Ipc_ProcessPool
94
function __construct ($config) {
95
// Check system requirements
96
if (substr(PHP_OS, 0, 3) === 'WIN') {
97
throw new Scalr_System_Ipc_Exception('Cannot run on windows');
98
} else if (!in_array(substr(PHP_SAPI, 0, 3), array('cli', 'cgi'))) {
99
throw new Scalr_System_Ipc_Exception('Can only run on CLI or CGI enviroment');
100
} else if (!function_exists('pcntl_fork')) {
101
throw new Scalr_System_Ipc_Exception('pcntl_* functions are required');
102
} else if (!function_exists('posix_kill')) {
103
throw new Scalr_System_Ipc_Exception('posix_* functions are required');
106
// Apply configuration
107
foreach ($config as $k => $v) {
108
if (property_exists($this, $k)) {
112
if ($this->size < 1) {
113
throw new Scalr_System_Ipc_Exception(sprintf(
114
"'size' must be more then 1. '%s' is given", $this->size));
116
if ($this->workQueue && !is_object($this->workQueue)) {
117
$this->workQueue = new Scalr_System_Ipc_ShmQueue($this->workQueue);
119
if ($this->preventParalleling) {
120
if (!$this->nowWorkingSet) {
121
$this->nowWorkingSet = new Scalr_System_Ipc_ShmSet(array(
122
"name" => "scalr.ipc.processPool.nowWorkingSet-" . ($this->name ? $this->name : posix_getpid())
126
$this->shm = new Scalr_System_Ipc_Shm(array(
127
"name" => "scalr.ipc.processPool.shm-" . posix_getpid()
130
$this->defineEvents(array(
132
* Fires when pool process received Unix signal
134
* @param Scalr_System_Ipc_ProcessPool $ppool
140
* Fires when pool ready for processing tasks
142
* @param Scalr_System_Ipc_ProcessPool $ppool
147
* Fires when pool is going terminate
149
* @param Scalr_System_Ipc_ProcessPool $ppool
154
$this->logger = LoggerManager::getLogger(__CLASS__);
155
register_shutdown_function(array($this, "_cleanup"));
159
$msg = "Starting process pool (size: {$this->size}";
160
if ($this->daemonize) $msg .= ", daemonize: true";
161
if ($this->preventParalleling) $msg .= ", preventParalleling: true";
163
$this->logger->info($msg);
165
// @see http://www.php.net/manual/en/function.pcntl-fork.php#41150
168
if ($this->daemonize) {
169
$this->logger->info("Going to daemonize process pool. Fork child process");
172
throw new Scalr_System_Ipc_Exception("Cannot daemonize process pool: cannot fork process");
173
} else if ($pid == 0) {
175
$this->logger->info("Detaching process from terminal");
176
if (posix_setsid() == -1) {
177
throw new Scalr_System_Ipc_Exception("Cannot detach process from terminal");
179
$this->sleepMillis(200);
187
$this->poolPid = posix_getpid();
188
$this->initSignalHandler();
189
$this->shm->put(self::SHM_STARTUP_BARRIER, 0);
191
$this->childEventQueue = new Scalr_System_Ipc_ShmQueue(array(
192
// Important! suffix must be pool pid
193
"name" => "scalr.ipc.processPool.ev-" . $this->poolPid,
196
$this->timeoutFly = new Scalr_Util_Timeout(0);
200
$userWorkQueue = $this->worker->startForking($this->workQueue);
201
if ($userWorkQueue) {
202
$this->workQueue = $userWorkQueue;
204
} catch (Exception $e) {
205
$this->logger->error("Exception in worker->startForking(). "
206
. "Caught: <".get_class($e)."> {$e->getMessage()}");
212
for ($i=0; $i<$this->size; $i++) {
215
} catch (Exception $e) {
216
$this->logger->error("Exception during fork childs. "
217
. "Caught: <".get_class($e)."> {$e->getMessage()}");
224
// Wait when all childs enter startup barrier
227
$timeout = new Scalr_Util_Timeout($this->startupTimeout);
228
while (!$timeout->reached()) {
229
$this->logger->info("Barrier capacity: " . $this->shm->get(self::SHM_STARTUP_BARRIER));
230
if ($this->shm->get(self::SHM_STARTUP_BARRIER) == $this->size) {
235
} catch (Scalr_Util_TimeoutException $e) {
236
$this->logger->error("Caught timeout exception");
240
throw new Scalr_System_Ipc_Exception(sprintf("Timeout exceed (%d millis) "
241
. "while waiting when all childs enter startup barrier",
242
$this->startupTimeout));
244
$this->logger->debug("All children (".count($this->childs).") have entered startup barrier");
247
// Send to all childs SIGUSR2
248
$this->logger->debug("Send SIGUSR2 to all workers");
249
foreach ($this->childs as $i => $childInfo) {
250
$this->kill($childInfo["pid"], SIGUSR2); // Wakeup
251
$this->childs[$i]["startTime"] = microtime(true);
254
$this->logger->debug("Process pool is ready");
256
$this->fireEvent("ready", $this);
265
function shutdown () {
266
$this->logger->info("Shutdown...");
267
$this->fireEvent("shutdown", $this);
269
$this->stopForking = true;
270
foreach ($this->childs as $childInfo) {
271
$this->kill($childInfo["pid"], SIGTERM);
276
function _cleanup () {
277
if (!$this->cleanupComplete) {
279
$this->shm->delete();
280
} catch (Exception $ignore) {
284
if ($this->childEventQueue) {
285
$this->childEventQueue->delete();
287
} catch (Exception $ignore) {
291
if ($this->preventParalleling) {
292
$this->nowWorkingSet->delete();
294
} catch (Exception $ignore) {
297
$this->cleanupComplete = true;
302
protected function postShutdown () {
304
$this->worker->endForking();
307
protected function wait () {
308
if ($this->inWaitLoop) {
311
$this->inWaitLoop = true;
313
while ($this->childs) {
314
// When children die, this gets rid of the zombies
315
$pid = pcntl_wait($status, WNOHANG);
317
$this->logger->info(sprintf("wait() from child %s. Status: %d", $pid, $status));
318
$this->onSIGCHLD($pid, $status);
321
// Check for timeouts
322
foreach ($this->childs as $childInfo) {
323
if ($childInfo["termStartTime"]) {
326
if ($this->timeoutReached($this->termTimeout, $childInfo["termStartTime"])) {
327
$this->logger->info(sprintf("Child %d reached termination timeout and will be KILLED",
329
$this->kill($childInfo["pid"], SIGKILL);
335
$term = $this->workTimeout && $childInfo["workStartTime"] &&
336
$this->timeoutReached($this->workTimeout, $childInfo["workStartTime"]);
338
$this->logger->info(sprintf("Child %d reached WORK max execution time",
341
$term = $this->workerTimeout && $childInfo["startTime"] &&
342
$this->timeoutReached($this->workerTimeout, $childInfo["startTime"]);
344
$this->logger->info(sprintf("Child %d reached WORKER max execution time",
350
$this->terminateChild($childInfo["pid"]);
355
$this->sleepMillis(10);
358
$this->inWaitLoop = false;
360
$this->postShutdown();
363
protected function forkChild ($useBarrier=true) {
364
$this->logger->info("Fork child process");
369
throw new Scalr_System_Ipc_Exception("Cannot fork child process");
373
$this->logger->debug(sprintf("Child %s was forked", $pid));
374
$this->childs[$pid] = array("pid" => $pid);
375
$this->worker->childForked($pid);
380
$this->isChild = true;
381
$mypid = posix_getpid();
382
$this->logger->info("Starting...");
383
$this->fireChildEvent("start");
389
while (!$this->ready) {
390
$this->sleepMillis(10);
394
$this->worker->startChild();
396
if ($this->workQueue) {
397
$memoryTick = new Scalr_Util_Timeout($this->workerMemoryLimitTick);
398
$os = Scalr_System_OS::getInstance();
399
while ($message = $this->workQueue->peek()) {
400
$this->logger->info(sprintf("Peek message from work queue (remaining capacity: %d)",
401
$this->workQueue->capacity()));
403
// Notify parent before message handler
404
$this->fireChildEvent("beforeHandleWork", array(
405
"microtime" => microtime(true),
406
"message" => $message
409
if ($this->preventParalleling) {
410
if ($this->nowWorkingSet->contains($message)) {
411
$this->logger->warn(sprintf("Skip message processing because same message "
412
. "is currently processing by another process (message: '%s')",
413
serialize($message)));
416
$this->nowWorkingSet->add($message);
419
$this->worker->handleWork($message);
421
if ($this->preventParalleling) {
422
$this->nowWorkingSet->remove($message);
425
// Notify parent after message handler
426
$this->fireChildEvent("afterHandleWork", array(
427
"message" => $message
430
if ($this->workerMemoryLimit && $memoryTick->reached(false)) {
431
$this->fireChildEvent("memoryUsage", array(
432
"memory" => $os->getMemoryUsage(posix_getpid(), Scalr_System_OS::MEM_RES)
434
$memoryTick->reset();
439
$this->worker->endChild();
440
$this->logger->info("Done");
442
} catch (Exception $e) {
444
$this->logger->error(sprintf("Unhandled exception in worker process: <%s> '%s'",
445
get_class($e), $e->getMessage()));
446
$this->logger->info(sprintf("Worker process %d terminated (exit code: %d)",
447
$mypid, self::$termExitCode));
449
// Sometimes (in our tests when daemonize=true) parent process does'nt receive SIGCHLD
450
// Sending kill signal will force SIGCHLD
451
// TODO: Consider it deeper
452
posix_kill($mypid, SIGKILL);
454
exit(self::$termExitCode);
461
private function timeoutReached ($timeout, $startTime) {
462
$this->timeoutFly->start = $startTime;
463
$this->timeoutFly->setTimeout($timeout);
464
return $this->timeoutFly->reached(false);
467
private function sleepMillis ($millis) {
468
Scalr_Util_Timeout::sleep($millis);
471
protected function kill ($pid, $signal, $logPrefix="") {
472
$this->logger->info(sprintf("%sSend %s -> %s", $logPrefix, self::$signames[$signal], $pid));
473
return posix_kill($pid, $signal);
476
protected function terminateChild ($pid) {
477
if (key_exists($pid, $this->childs)) {
478
$this->kill($pid, SIGTERM);
479
$this->childs[$pid]["termStartTime"] = microtime(true);
483
protected function fireChildEvent ($evName, $evData=array()) {
484
$evData["type"] = $evName;
485
$evData["pid"] = posix_getpid();
486
$this->childEventQueue->put(serialize($evData));
487
if (!$this->kill($this->poolPid, SIGUSR1, "[".posix_getpid()."] ")) {
488
$this->logger->fatal("Cannot send signal to parent process");
489
posix_kill(posix_getpid(), SIGKILL);
494
protected static $signames = array(
497
SIGQUIT => "SIGQUIT",
499
SIGTRAP => "SIGTRAP",
500
SIGABRT => "SIGABRT",
503
SIGKILL => "SIGKILL",
504
SIGUSR1 => "SIGUSR1",
505
SIGSEGV => "SIGSEGV",
506
SIGUSR2 => "SIGUSR2",
507
SIGPIPE => "SIGPIPE",
508
SIGALRM => "SIGALRM",
509
SIGTERM => "SIGTERM",
510
SIGSTKFLT => "SIGSTKFLT",
511
SIGCHLD => "SIGCHLD",
512
SIGCONT => "SIGCONT",
513
SIGSTOP => "SIGSTOP",
514
SIGTSTP => "SIGTSTP",
515
SIGTTIN => "SIGTTIN",
516
SIGTTOU => "SIGTTOU",
518
SIGXCPU => "SIGXCPU",
519
SIGXFSZ => "SIGXFSZ",
520
SIGVTALRM => "SIGVTALRM",
521
SIGPROF => "SIGPROF",
522
SIGWINCH => "SIGWINCH",
523
SIGPOLL => "SIGPOLL",
530
protected function initSignalHandler () {
531
$fn = array($this, "signalHandler");
532
$signals = array(SIGCHLD, SIGTERM, SIGABRT, SIGALRM, SIGUSR1, SIGUSR2);
533
if ($this->daemonize) {
536
foreach ($signals as $sig) {
537
$this->logger->debug("Install ".self::$signames[$sig]." handler");
538
if (!pcntl_signal($sig, $fn)) {
539
$this->logger->warn(sprintf("Cannot install signal handler on signal %s in process %d",
540
self::$signames[$sig], posix_getpid()));
545
function signalHandler ($sig) {
546
$mypid = posix_getpid();
552
$pid = pcntl_waitpid(0, $status, WNOHANG);
553
$this->logger->info(sprintf("Received %s from %d. Status: %d",
554
self::$signames[$sig], $pid, $status));
555
$this->onSIGCHLD($pid, $status);
558
// Startup barrier ready
561
$this->logger->debug(sprintf("Received %s", self::$signames[$sig]));
565
// Event notification from child
568
$this->logger->info(sprintf("Received %s", self::$signames[$sig]));
574
//$this->logger->debug(sprintf("Received %s", self::$signames[$sig]));
575
// Check zomby child processes
576
foreach (array_keys($this->childs) as $pid) {
577
if (!posix_kill($pid, 0)) {
578
unset($this->childs[$pid]);
586
// Works when $this->daemonize = true
587
$this->logger->info(sprintf("Received %s", self::$signames[$sig]));
589
foreach ($this->childs as $childInfo) {
590
$this->terminateChild($childInfo["pid"]);
598
if ($mypid == $this->poolPid) {
600
$this->logger->info(sprintf("Received %s in parent", self::$signames[$sig]));
601
$this->fireEvent("signal", $this, $sig);
606
$this->logger->info(sprintf("Received %s in child", self::$signames[$sig]));
607
if ($this->isChild) {
608
$this->logger->debug("Worker terminating...");
610
$this->worker->terminate();
611
} catch (Exception $e) {
612
$this->logger->error("Exception in worker->terminate(). Caught: {$e->getMessage()}");
614
$this->logger->info(sprintf("Worker process %d terminated (exit code: %d)",
615
$mypid, self::$termExitCode));
617
// Sometimes (in our tests when daemonize=true) parent process does'nt receive SIGCHLD
618
// Sending kill signal will force SIGCHLD
619
// TODO: Consider it deeper
620
posix_kill($mypid, SIGKILL);
621
exit(self::$termExitCode);
627
$this->logger->info(sprintf("Received %s", self::$signames[$sig]));
631
$this->fireEvent("signal", $this, $sig);
634
protected function onSIGCHLD ($pid, $status) {
639
if (key_exists($pid, $this->childs)) {
640
// Remove work from nowWorking set
641
if ($this->childs[$pid]["message"] && $this->preventParalleling) {
642
$this->nowWorkingSet->remove($this->childs[$pid]["message"]);
644
unset($this->childs[$pid]);
646
$this->logger->debug(sprintf("Child termination options. "
647
. "wifexited: %d, wifsignaled: %d, wifstopped: %d, stopForking: %d",
648
pcntl_wifexited($status), pcntl_wifsignaled($status),
649
pcntl_wifstopped($status), $this->stopForking));
651
// In case of unnormal exit fork new child process
652
// 1. status=65280 when child process died with fatal error (set by PHP)
653
// 2. exit=9 when child was terminated by parent or by unhandled exception (set by ProcessPool)
654
// 3. stopeed by signal
655
if ((pcntl_wifexited($status) && $status == 65280) ||
656
(pcntl_wifexited($status) && pcntl_wexitstatus($status) == self::$termExitCode) ||
657
(pcntl_wifsignaled($status))) {
659
if (!$this->stopForking) {
663
if ($this->slippage < $this->slippageLimit) {
664
$this->forkChild(false);
666
$this->logger->info(sprintf("Slippage limit: %d exceed. No new childs will be forked",
668
$this->stopForking = true;
671
$this->logger->debug("'stopForking' flag prevents new process forking");
673
} catch (Scalr_System_Ipc_Exception $e) {
674
$this->logger->error(sprintf("Cannot fork child. Caught: <%s> %s",
675
get_class($e), $e->getMessage()));
681
protected function onSIGUSR1 () {
682
while ($message0 = $this->childEventQueue->peek()) {
683
$message = unserialize($message0);
684
$this->logger->info(sprintf("Peeked '%s' from event queue", $message["type"]));
686
switch ($message["type"]) {
687
case "beforeHandleWork":
688
$this->childs[$message["pid"]]["workStartTime"] = $message["microtime"];
689
$this->childs[$message["pid"]]["message"] = $message["message"];
692
case "afterHandleWork":
693
unset($this->childs[$message["pid"]]["workStartTime"]);
694
unset($this->childs[$message["pid"]]["message"]);
696
// Reset slippage counter
702
$this->shm->put(self::SHM_STARTUP_BARRIER, $this->shm->get(self::SHM_STARTUP_BARRIER) + 1);
704
$this->childs[$message["pid"]]["startTime"] = microtime(true);
709
if ($this->workerMemoryLimit && $message["memory"] > $this->workerMemoryLimit) {
710
$this->logger->warn(sprintf(
711
"Worker %d allocates %d Kb. Maximum %d Kb is allowed by configuration",
712
$message["pid"], $message["memory"], $this->workerMemoryLimit));
713
$this->terminateChild($message["pid"]);
718
$this->logger->warn("Peeked unknown message from child event queue. "
719
. "Serialized message: {$message0}");
725
return $this->poolPid;