~sebastian-scalr/openstack-platform-php/trunk

« back to all changes in this revision

Viewing changes to src/Scalr/System/Ipc/ProcessPool.php

  • Committer: DicsyDel
  • Date: 2010-01-14 09:49:24 UTC
  • Revision ID: svn-v4:ea022c65-b648-0410-8974-0d324702c27d:trunk:247
1.2.0 Stable Release
Events system improvements:
* Added new events: OnDNSZoneUpdated, OnEBSVolumeAttached

Scripting engine improvements:
* Added the %zone_name% variable for scripts executed on DNSZoneUpdate event
* Added the %new_ip_address% variable for scripts executed on IPAddressChanged event
* Added the %volume_id% and %mountpoint% variables for scripts executed on EBSVolumeMounted event
* The ability to execute scripts from the Scripts view page.

Snapshots manager improvements:
* Remove multiple snapshots in one time
* Share snapshots!

API improvements:
* Added methods: LaunchInstance, TerminateInstance, GetFarmDetails, GetScriptDetails, RebootInstance, GetEvents, GetLogs
* Added methods for working with DNS zones: ListDNSZones, ListDNSZoneRecords, AddDNSZoneRecord and RemoveDNSZoneRecord
* Improved the ExecuteScript method.
* Added LA for each instance to the GetFarmDetails method
* Added the ability to execute scripts with specified revision and parameters

Core improvements:
* Amazon RDS support
* Amazon Spot instances support.
* Amazon CloudWatch support (fine grained monitoring)
* Amazon VPC support (enterprise feature, deploy on non-shared servers)
* Rewrote the “Synchronize to all” feature. Cleaner and more reliable now.
* Same goes for AutoEBS and AutoEIP. Cleaner and more reliable.
* When you create a new AMI for a custom role, you can now switch over to it immediately.
* And the long awaited “Keep me logged in” checkbox on the login page!
* Filters for instances on the Servers view page.
* CloudFront distributions for domains not managed by Scalr.
* The ability to remove Elastic Load Balancers.
* Support for new region: us-west-1 (deploy on the west coast!)
* A new page with more details on the instance.
* Increased page load speed. Optimized js code. (faster, better interface!)
* An improved MySQL status page for your Farms
* The ability to set whether Scalr should terminate or reboot instances that fail to respond to SNMP calls.
* The ability to slowdown the scaling process
* Support for new instance types, the high memory instances (32 and 68GB of memory)
* The ability to add Google Apps MX records in the Zone Edit page in a single click.
* The ability to edit system DNS records. For advanced clients.
* The ability to set both size and snapshot for Role auto EBS (previously just one)
* Added ability to edit farm role specified security group
* Added Hide terminated instances checkbox on instances list.
* Added ability to view all instances (include non-scalr ones)
* Added ability to set system timezone for clients (Logs, Events, API logs)
* Fixed bug in garbage.php with “select all” checkbox
* Fixed bug with default SSH port (see thread)
* More than 200 bugs was fixed and tons of other internal improvements.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
<?php
 
2
 
 
3
/**
 
4
 * Process pool for executing work queue
 
5
 * 
 
6
 * @author Marat Komarov
 
7
 */
 
8
 
 
9
class Scalr_System_Ipc_ProcessPool extends Scalr_Util_Observable {
 
10
        
 
11
        public $name;
 
12
        
 
13
        public $size;
 
14
        
 
15
        public $workQueue;
 
16
        
 
17
        public $worker;
 
18
        
 
19
        public $daemonize;
 
20
        
 
21
        public $workTimeout;
 
22
        
 
23
        public $workerTimeout;
 
24
        
 
25
        public $workerMemoryLimit;
 
26
        
 
27
        public $workerMemoryLimitTick = 10000; // 10 seconds
 
28
        
 
29
        public $startupTimeout = 5000; // 5 seconds
 
30
        
 
31
        public $termTimeout = 5000; // 5 seconds
 
32
        
 
33
        public $preventParalleling;
 
34
        
 
35
        private $nowWorkingSet;
 
36
        
 
37
        protected $poolPid;
 
38
        
 
39
        protected $childs = array();
 
40
        
 
41
        protected $isChild = false;
 
42
        
 
43
        protected $childEventQueue;     
 
44
 
 
45
        protected $ready = false;       
 
46
        
 
47
        private static $termExitCode = 9;
 
48
        
 
49
        private $logger;
 
50
        
 
51
        private $stopForking = false;
 
52
        
 
53
        private $timeoutFly;
 
54
        
 
55
        private $inWaitLoop;
 
56
        
 
57
        protected $slippageLimit = 10;
 
58
        
 
59
        private $slippage = 0;
 
60
        
 
61
        private $cleanupComplete = false;
 
62
        
 
63
        /**
 
64
         * @var Scalr_System_Ipc_Shm
 
65
         */
 
66
        private $shm;
 
67
        
 
68
        const SHM_STARTUP_BARRIER = 0;
 
69
        
 
70
        /**
 
71
         * 
 
72
         * @param array $config
 
73
         * @key int [size]*
 
74
         * @key Scalr_DataQueue [workQueue]* Work queue. Must be multi-process safe (ex impl: Scalr_System_Ipc_ShmQueue)
 
75
         * @key Scalr_System_Ipc_DefaultWorker [worker]*
 
76
         * @key string [name] Pool name. Will be used instead of posix_getpid() as a ipc resources suffix
 
77
         * @key int [startupTimeout] Time to wait when 'start' event will be received from all childs 
 
78
         * @key int [workTimeout] Max execution time for $worker->handleWork() (default infinity)
 
79
         * @key int [workerTimeout] Max execution time for worker process (default infinity)
 
80
         * @key int [termTimeout] Time to wait after sending SIGTERM to worker process (default 5 seconds)
 
81
         * @key bool [daemonize] daemonize process pool (default false)
 
82
         * @key bool [preventParalleling] Prevents same work parallel processing
 
83
         * @key Scalr_Util_Set [nowWorkingSet] Set of currently blocked item values. Use with [preventParalleling] option
 
84
         * @key int [slippageLimit] Maximum number of childs crash without processing messages from workQueue
 
85
         * @key int [workerMemoryLimit] Memory limit for worker process
 
86
         * @key int [workerMemoryLimitTick] Tick time for worker memory limit check   
 
87
         * 
 
88
         * @event ready
 
89
         * @event shutdown
 
90
         * @event signal
 
91
         * 
 
92
         * @return Scalr_System_Ipc_ProcessPool
 
93
         */
 
94
        function __construct ($config) {
 
95
                // Check system requirements
 
96
        if (substr(PHP_OS, 0, 3) === 'WIN') {
 
97
            throw new Scalr_System_Ipc_Exception('Cannot run on windows');
 
98
        } else if (!in_array(substr(PHP_SAPI, 0, 3), array('cli', 'cgi'))) {
 
99
                throw new Scalr_System_Ipc_Exception('Can only run on CLI or CGI enviroment');
 
100
        } else if (!function_exists('pcntl_fork')) {
 
101
                throw new Scalr_System_Ipc_Exception('pcntl_* functions are required');
 
102
        } else if (!function_exists('posix_kill')) {
 
103
                throw new Scalr_System_Ipc_Exception('posix_* functions are required');
 
104
        }
 
105
 
 
106
        // Apply configuration
 
107
        foreach ($config as $k => $v) {
 
108
                if (property_exists($this, $k)) {
 
109
                        $this->{$k} = $v;
 
110
                }
 
111
                }
 
112
                if ($this->size < 1) {
 
113
                        throw new Scalr_System_Ipc_Exception(sprintf(
 
114
                                        "'size' must be more then 1. '%s' is given", $this->size));
 
115
                }
 
116
                if ($this->workQueue && !is_object($this->workQueue)) {
 
117
                        $this->workQueue = new Scalr_System_Ipc_ShmQueue($this->workQueue);
 
118
                }
 
119
                if ($this->preventParalleling) {
 
120
                        if (!$this->nowWorkingSet) {
 
121
                                $this->nowWorkingSet = new Scalr_System_Ipc_ShmSet(array(
 
122
                                        "name" => "scalr.ipc.processPool.nowWorkingSet-" . ($this->name ? $this->name : posix_getpid())
 
123
                                ));
 
124
                        }
 
125
                }
 
126
                $this->shm = new Scalr_System_Ipc_Shm(array(
 
127
                        "name" => "scalr.ipc.processPool.shm-" . posix_getpid()
 
128
                ));
 
129
                
 
130
                $this->defineEvents(array(
 
131
                        /**
 
132
                         * Fires when pool process received Unix signal
 
133
                         * @event signal
 
134
                         * @param Scalr_System_Ipc_ProcessPool $ppool
 
135
                         * @param int $signal
 
136
                         */
 
137
                        "signal",
 
138
                
 
139
                        /**
 
140
                         * Fires when pool ready for processing tasks
 
141
                         * @event ready
 
142
                         * @param Scalr_System_Ipc_ProcessPool $ppool
 
143
                         */
 
144
                        "ready",
 
145
                
 
146
                        /**
 
147
                         * Fires when pool is going terminate 
 
148
                         * @event shutdown
 
149
                         * @param Scalr_System_Ipc_ProcessPool $ppool
 
150
                         */
 
151
                        "shutdown"
 
152
                ));
 
153
                
 
154
                $this->logger = LoggerManager::getLogger(__CLASS__);
 
155
                register_shutdown_function(array($this, "_cleanup"));
 
156
        }
 
157
        
 
158
        function start () {
 
159
                $msg = "Starting process pool (size: {$this->size}";
 
160
                if ($this->daemonize) $msg .= ", daemonize: true";
 
161
                if ($this->preventParalleling) $msg .= ", preventParalleling: true";
 
162
                $msg .= ")";
 
163
        $this->logger->info($msg);
 
164
                
 
165
                // @see http://www.php.net/manual/en/function.pcntl-fork.php#41150
 
166
        @ob_end_flush();                
 
167
                
 
168
                if ($this->daemonize) {
 
169
                        $this->logger->info("Going to daemonize process pool. Fork child process");
 
170
                        $pid = pcntl_fork();
 
171
                        if ($pid == -1) {
 
172
                                throw new Scalr_System_Ipc_Exception("Cannot daemonize process pool: cannot fork process");
 
173
                        } else if ($pid == 0) {
 
174
                                // Child
 
175
                                $this->logger->info("Detaching process from terminal");
 
176
                                if (posix_setsid() == -1) {
 
177
                                        throw new Scalr_System_Ipc_Exception("Cannot detach process from terminal");
 
178
                                }
 
179
                                $this->sleepMillis(200);
 
180
                        } else {
 
181
                                // Parent process
 
182
                                die();
 
183
                        }
 
184
                }
 
185
                
 
186
                
 
187
                $this->poolPid = posix_getpid();
 
188
                $this->initSignalHandler();
 
189
                $this->shm->put(self::SHM_STARTUP_BARRIER, 0);
 
190
                
 
191
                $this->childEventQueue = new Scalr_System_Ipc_ShmQueue(array(
 
192
                        // Important! suffix must be pool pid
 
193
                        "name" => "scalr.ipc.processPool.ev-" . $this->poolPid,
 
194
                        "autoInit" => true
 
195
                ));
 
196
                $this->timeoutFly = new Scalr_Util_Timeout(0);
 
197
                
 
198
                // Start forking
 
199
                try {
 
200
                        $userWorkQueue = $this->worker->startForking($this->workQueue);
 
201
                        if ($userWorkQueue) {
 
202
                                $this->workQueue = $userWorkQueue;
 
203
                        }
 
204
                } catch (Exception $e) {
 
205
                        $this->logger->error("Exception in worker->startForking(). "
 
206
                                        . "Caught: <".get_class($e)."> {$e->getMessage()}");
 
207
                        $this->shutdown();
 
208
                        throw $e;
 
209
                } 
 
210
 
 
211
                // Fork childs          
 
212
                for ($i=0; $i<$this->size; $i++) {
 
213
                        try {
 
214
                                $this->forkChild();
 
215
                        } catch (Exception $e) {
 
216
                                $this->logger->error("Exception during fork childs. "
 
217
                                                . "Caught: <".get_class($e)."> {$e->getMessage()}");
 
218
                                $this->shutdown();
 
219
                                throw $e;
 
220
                        }
 
221
                }
 
222
 
 
223
                
 
224
                // Wait when all childs enter startup barrier
 
225
        
 
226
                try {
 
227
                        $timeout = new Scalr_Util_Timeout($this->startupTimeout);
 
228
                        while (!$timeout->reached()) {
 
229
                                $this->logger->info("Barrier capacity: " . $this->shm->get(self::SHM_STARTUP_BARRIER));
 
230
                                if ($this->shm->get(self::SHM_STARTUP_BARRIER) == $this->size) {
 
231
                                        break;
 
232
                                }
 
233
                                $timeout->sleep(10);
 
234
                        }
 
235
                } catch (Scalr_Util_TimeoutException $e) {
 
236
                        $this->logger->error("Caught timeout exception");
 
237
                        
 
238
                        $this->shutdown();                      
 
239
                        
 
240
                        throw new Scalr_System_Ipc_Exception(sprintf("Timeout exceed (%d millis) "
 
241
                                        . "while waiting when all childs enter startup barrier", 
 
242
                                        $this->startupTimeout));
 
243
                }
 
244
                $this->logger->debug("All children (".count($this->childs).") have entered startup barrier");
 
245
                
 
246
                
 
247
                // Send to all childs SIGUSR2
 
248
                $this->logger->debug("Send SIGUSR2 to all workers");
 
249
                foreach ($this->childs as $i => $childInfo) {
 
250
                        $this->kill($childInfo["pid"], SIGUSR2); // Wakeup
 
251
                        $this->childs[$i]["startTime"] = microtime(true);
 
252
                }
 
253
                
 
254
                $this->logger->debug("Process pool is ready");
 
255
                $this->ready = true;
 
256
                $this->fireEvent("ready", $this);
 
257
 
 
258
        
 
259
                // Setup SIGALRM 
 
260
                pcntl_alarm(1);
 
261
                
 
262
                $this->wait();          
 
263
        }
 
264
        
 
265
        function shutdown () {
 
266
                $this->logger->info("Shutdown...");
 
267
                $this->fireEvent("shutdown", $this);
 
268
                
 
269
                $this->stopForking = true;
 
270
                foreach ($this->childs as $childInfo) {
 
271
                        $this->kill($childInfo["pid"], SIGTERM);
 
272
                }
 
273
                $this->wait();
 
274
        }
 
275
        
 
276
        function _cleanup () {
 
277
                if (!$this->cleanupComplete) {
 
278
                        try {
 
279
                                $this->shm->delete();
 
280
                        } catch (Exception $ignore) {
 
281
                        }
 
282
                        
 
283
                        try {
 
284
                                if ($this->childEventQueue) {
 
285
                                        $this->childEventQueue->delete();
 
286
                                }
 
287
                        } catch (Exception $ignore) {
 
288
                        }
 
289
                        
 
290
                        try {
 
291
                                if ($this->preventParalleling) {
 
292
                                        $this->nowWorkingSet->delete();
 
293
                                }
 
294
                        } catch (Exception $ignore) {
 
295
                        }
 
296
                        
 
297
                        $this->cleanupComplete = true;
 
298
 
 
299
                }
 
300
        }
 
301
        
 
302
        protected function postShutdown () {
 
303
                $this->cleanup();
 
304
                $this->worker->endForking();            
 
305
        }
 
306
        
 
307
        protected function wait () {
 
308
                if ($this->inWaitLoop) {
 
309
                        return;
 
310
                }
 
311
                $this->inWaitLoop = true;
 
312
                
 
313
                while ($this->childs) {
 
314
                        // When children die, this gets rid of the zombies
 
315
                        $pid = pcntl_wait($status, WNOHANG);
 
316
                        if ($pid > 0) {
 
317
                                $this->logger->info(sprintf("wait() from child %s. Status: %d", $pid, $status));
 
318
                                $this->onSIGCHLD($pid, $status);
 
319
                        }
 
320
                        
 
321
                        // Check for timeouts
 
322
                        foreach ($this->childs as $childInfo) {
 
323
                                if ($childInfo["termStartTime"]) {
 
324
                                        // Kill maybe
 
325
                                        
 
326
                                        if ($this->timeoutReached($this->termTimeout, $childInfo["termStartTime"])) {
 
327
                                                $this->logger->info(sprintf("Child %d reached termination timeout and will be KILLED", 
 
328
                                                                $childInfo["pid"]));
 
329
                                                $this->kill($childInfo["pid"], SIGKILL);
 
330
                                        }
 
331
                                        
 
332
                                } else {
 
333
                                        // Terminate maybe      
 
334
                
 
335
                                        $term = $this->workTimeout && $childInfo["workStartTime"] &&
 
336
                                                        $this->timeoutReached($this->workTimeout, $childInfo["workStartTime"]);
 
337
                                        if ($term) {
 
338
                                                $this->logger->info(sprintf("Child %d reached WORK max execution time", 
 
339
                                                                $childInfo["pid"]));
 
340
                                        } else {
 
341
                                                $term = $this->workerTimeout && $childInfo["startTime"] &&
 
342
                                                        $this->timeoutReached($this->workerTimeout, $childInfo["startTime"]);
 
343
                                                if ($term) {
 
344
                                                        $this->logger->info(sprintf("Child %d reached WORKER max execution time", 
 
345
                                                                        $childInfo["pid"]));
 
346
                                                }
 
347
                                        }
 
348
                                        
 
349
                                        if ($term) {
 
350
                                                $this->terminateChild($childInfo["pid"]);
 
351
                                        }
 
352
                                }
 
353
                        }
 
354
                        
 
355
                        $this->sleepMillis(10);
 
356
                }
 
357
                
 
358
                $this->inWaitLoop = false;
 
359
                
 
360
                $this->postShutdown();
 
361
        }
 
362
 
 
363
        protected function forkChild ($useBarrier=true) {
 
364
                $this->logger->info("Fork child process");
 
365
                $pid = pcntl_fork();
 
366
                
 
367
                if ($pid == -1) {
 
368
                        // Cannot fork child                            
 
369
                        throw new Scalr_System_Ipc_Exception("Cannot fork child process");
 
370
                        
 
371
                } else if ($pid) {
 
372
                        // Current process
 
373
                        $this->logger->debug(sprintf("Child %s was forked", $pid));
 
374
                        $this->childs[$pid] = array("pid" => $pid);
 
375
                        $this->worker->childForked($pid);
 
376
                        
 
377
                } else {
 
378
                        // Child process
 
379
                        try {
 
380
                                $this->isChild = true;
 
381
                                $mypid = posix_getpid();
 
382
                                $this->logger->info("Starting...");
 
383
                                $this->fireChildEvent("start");
 
384
                                
 
385
                                if (!$useBarrier) {
 
386
                                        $this->ready = true;
 
387
                                } else {
 
388
                                        // Wait for SIGUSR2                             
 
389
                                        while (!$this->ready) {
 
390
                                                $this->sleepMillis(10);
 
391
                                        }
 
392
                                }
 
393
                                
 
394
                                $this->worker->startChild();
 
395
                                
 
396
                                if ($this->workQueue) {
 
397
                                        $memoryTick = new Scalr_Util_Timeout($this->workerMemoryLimitTick);
 
398
                                        $os = Scalr_System_OS::getInstance();
 
399
                                        while ($message = $this->workQueue->peek()) {
 
400
                                                $this->logger->info(sprintf("Peek message from work queue (remaining capacity: %d)",
 
401
                                                                $this->workQueue->capacity()));
 
402
                                                                
 
403
                                                // Notify parent before message handler
 
404
                                                $this->fireChildEvent("beforeHandleWork", array(
 
405
                                                        "microtime" => microtime(true),
 
406
                                                        "message" => $message
 
407
                                                ));
 
408
                                                
 
409
                                                if ($this->preventParalleling) {
 
410
                                                        if ($this->nowWorkingSet->contains($message)) {
 
411
                                                                $this->logger->warn(sprintf("Skip message processing because same message "
 
412
                                                                                . "is currently processing by another process (message: '%s')", 
 
413
                                                                                serialize($message)));
 
414
                                                        }
 
415
                                                        
 
416
                                                        $this->nowWorkingSet->add($message);
 
417
                                                }
 
418
                                                
 
419
                                                $this->worker->handleWork($message);
 
420
                                                
 
421
                                                if ($this->preventParalleling) {
 
422
                                                        $this->nowWorkingSet->remove($message);
 
423
                                                }
 
424
                                                
 
425
                                                // Notify parent after message handler
 
426
                                                $this->fireChildEvent("afterHandleWork", array(
 
427
                                                        "message" => $message
 
428
                                                ));
 
429
                                                
 
430
                                                if ($this->workerMemoryLimit && $memoryTick->reached(false)) {
 
431
                                                        $this->fireChildEvent("memoryUsage", array(
 
432
                                                                "memory" => $os->getMemoryUsage(posix_getpid(), Scalr_System_OS::MEM_RES)
 
433
                                                        ));
 
434
                                                        $memoryTick->reset();
 
435
                                                }
 
436
                                        }
 
437
                                }
 
438
                                
 
439
                                $this->worker->endChild();
 
440
                                $this->logger->info("Done");
 
441
                                
 
442
                        } catch (Exception $e) {
 
443
                                // Raise fatal error
 
444
                                $this->logger->error(sprintf("Unhandled exception in worker process: <%s> '%s'", 
 
445
                                                get_class($e), $e->getMessage()));
 
446
                                $this->logger->info(sprintf("Worker process %d terminated (exit code: %d)", 
 
447
                                                $mypid, self::$termExitCode));
 
448
                                                
 
449
                                // Sometimes (in our tests when daemonize=true) parent process does'nt receive SIGCHLD
 
450
                                // Sending kill signal will force SIGCHLD
 
451
                                // TODO: Consider it deeper                                
 
452
                                posix_kill($mypid, SIGKILL);
 
453
                                
 
454
                                exit(self::$termExitCode);
 
455
                        }
 
456
                        
 
457
                        exit();
 
458
                }
 
459
        }
 
460
        
 
461
        private function timeoutReached ($timeout, $startTime) {
 
462
                $this->timeoutFly->start = $startTime;
 
463
                $this->timeoutFly->setTimeout($timeout);
 
464
                return $this->timeoutFly->reached(false);
 
465
        }
 
466
        
 
467
        private function sleepMillis ($millis) {
 
468
                Scalr_Util_Timeout::sleep($millis);
 
469
        }
 
470
        
 
471
        protected function kill ($pid, $signal, $logPrefix="") {
 
472
                $this->logger->info(sprintf("%sSend %s -> %s", $logPrefix, self::$signames[$signal], $pid));
 
473
                return posix_kill($pid, $signal);
 
474
        }
 
475
        
 
476
        protected function terminateChild ($pid) {
 
477
                if (key_exists($pid, $this->childs)) {
 
478
                        $this->kill($pid, SIGTERM);
 
479
                        $this->childs[$pid]["termStartTime"] = microtime(true);
 
480
                }
 
481
        }
 
482
        
 
483
        protected function fireChildEvent ($evName, $evData=array()) {
 
484
                $evData["type"] = $evName;
 
485
                $evData["pid"] = posix_getpid();
 
486
                $this->childEventQueue->put(serialize($evData));
 
487
                if (!$this->kill($this->poolPid, SIGUSR1, "[".posix_getpid()."] ")) {
 
488
                        $this->logger->fatal("Cannot send signal to parent process");
 
489
                        posix_kill(posix_getpid(), SIGKILL);
 
490
                }               
 
491
        }
 
492
        
 
493
 
 
494
        protected static $signames = array(
 
495
                SIGHUP => "SIGHUP",
 
496
                SIGINT => "SIGINT",
 
497
                SIGQUIT => "SIGQUIT",
 
498
                SIGILL => "SIGILL",
 
499
                SIGTRAP => "SIGTRAP",
 
500
                SIGABRT => "SIGABRT",
 
501
                SIGBUS => "SIGBUS",
 
502
                SIGFPE => "SIGFPE",
 
503
                SIGKILL => "SIGKILL",
 
504
                SIGUSR1 => "SIGUSR1",
 
505
                SIGSEGV => "SIGSEGV",
 
506
                SIGUSR2 => "SIGUSR2",
 
507
                SIGPIPE => "SIGPIPE",
 
508
                SIGALRM => "SIGALRM",
 
509
                SIGTERM => "SIGTERM",
 
510
                SIGSTKFLT => "SIGSTKFLT",
 
511
                SIGCHLD => "SIGCHLD",
 
512
                SIGCONT => "SIGCONT",
 
513
                SIGSTOP => "SIGSTOP",
 
514
                SIGTSTP => "SIGTSTP",
 
515
                SIGTTIN => "SIGTTIN",
 
516
                SIGTTOU => "SIGTTOU",
 
517
                SIGURG => "SIGURG",
 
518
                SIGXCPU => "SIGXCPU",
 
519
                SIGXFSZ => "SIGXFSZ",
 
520
                SIGVTALRM => "SIGVTALRM",
 
521
                SIGPROF => "SIGPROF",
 
522
                SIGWINCH => "SIGWINCH",
 
523
                SIGPOLL => "SIGPOLL",
 
524
                SIGIO => "SIGIO",
 
525
                SIGPWR => "SIGPWR",
 
526
                SIGSYS => "SIGSYS",
 
527
                SIGBABY => "SIGBABY"
 
528
        );
 
529
        
 
530
        protected function initSignalHandler () {
 
531
                $fn = array($this, "signalHandler");
 
532
                $signals = array(SIGCHLD, SIGTERM, SIGABRT, SIGALRM, SIGUSR1, SIGUSR2);
 
533
                if ($this->daemonize) {
 
534
                        $signals[] = SIGHUP;
 
535
                }
 
536
                foreach ($signals as $sig) {
 
537
                        $this->logger->debug("Install ".self::$signames[$sig]." handler");
 
538
                        if (!pcntl_signal($sig, $fn)) {
 
539
                                $this->logger->warn(sprintf("Cannot install signal handler on signal %s in process %d", 
 
540
                                                self::$signames[$sig], posix_getpid()));
 
541
                        }
 
542
                }
 
543
        }
 
544
        
 
545
        function signalHandler ($sig) {
 
546
                $mypid = posix_getpid();
 
547
                
 
548
                switch ($sig) {
 
549
                        // Child terminated
 
550
                        case SIGCHLD: 
 
551
                                // In parent
 
552
                                $pid = pcntl_waitpid(0, $status, WNOHANG);                              
 
553
                                $this->logger->info(sprintf("Received %s from %d. Status: %d", 
 
554
                                                self::$signames[$sig], $pid, $status));                         
 
555
                                $this->onSIGCHLD($pid, $status);
 
556
                                break;
 
557
                                
 
558
                        // Startup barrier ready
 
559
                        case SIGUSR2: 
 
560
                                // In child
 
561
                                $this->logger->debug(sprintf("Received %s", self::$signames[$sig]));
 
562
                                $this->ready = true;
 
563
                                break;
 
564
                                
 
565
                        // Event notification from child
 
566
                        case SIGUSR1:
 
567
                                // In parent
 
568
                                $this->logger->info(sprintf("Received %s", self::$signames[$sig]));
 
569
                                $this->onSIGUSR1();
 
570
                                break;
 
571
                                
 
572
                        // Timer alarm 
 
573
                        case SIGALRM:
 
574
                                //$this->logger->debug(sprintf("Received %s", self::$signames[$sig]));
 
575
                                // Check zomby child processes
 
576
                                foreach (array_keys($this->childs) as $pid) {
 
577
                                        if (!posix_kill($pid, 0)) {
 
578
                                                unset($this->childs[$pid]);
 
579
                                        }
 
580
                                }
 
581
                                
 
582
                                pcntl_alarm(1);
 
583
                                break;
 
584
                                
 
585
                        case SIGHUP:
 
586
                                // Works when $this->daemonize = true
 
587
                                $this->logger->info(sprintf("Received %s", self::$signames[$sig]));
 
588
                                // Restart workers 
 
589
                                foreach ($this->childs as $childInfo) {
 
590
                                        $this->terminateChild($childInfo["pid"]);
 
591
                                }
 
592
                                break;
 
593
                                
 
594
                        // Terminate process
 
595
                        case SIGTERM:
 
596
                        case SIGABRT:
 
597
                                
 
598
                                if ($mypid == $this->poolPid) {
 
599
                                        // In parent
 
600
                                        $this->logger->info(sprintf("Received %s in parent", self::$signames[$sig]));           
 
601
                                        $this->fireEvent("signal", $this, $sig);
 
602
                                        $this->shutdown();
 
603
                                        return;
 
604
                                } else {
 
605
                                        // In child
 
606
                                        $this->logger->info(sprintf("Received %s in child", self::$signames[$sig]));                                    
 
607
                                        if ($this->isChild) {
 
608
                                                $this->logger->debug("Worker terminating...");
 
609
                                                try {
 
610
                                                        $this->worker->terminate();
 
611
                                                } catch (Exception $e) {
 
612
                                                        $this->logger->error("Exception in worker->terminate(). Caught: {$e->getMessage()}");
 
613
                                                }
 
614
                                                $this->logger->info(sprintf("Worker process %d terminated (exit code: %d)", 
 
615
                                                                $mypid, self::$termExitCode));
 
616
 
 
617
                                                // Sometimes (in our tests when daemonize=true) parent process does'nt receive SIGCHLD
 
618
                                                // Sending kill signal will force SIGCHLD
 
619
                                                // TODO: Consider it deeper  
 
620
                                                posix_kill($mypid, SIGKILL);
 
621
                                                exit(self::$termExitCode);                                      
 
622
                                        }
 
623
                                }
 
624
                                break;
 
625
                        
 
626
                        default:
 
627
                                $this->logger->info(sprintf("Received %s", self::$signames[$sig]));
 
628
                                break;
 
629
                }
 
630
                
 
631
                $this->fireEvent("signal", $this, $sig);
 
632
        }
 
633
        
 
634
        protected function onSIGCHLD ($pid, $status) {
 
635
                if ($pid <= 0) { 
 
636
                        return;
 
637
                }
 
638
 
 
639
                if (key_exists($pid, $this->childs)) {
 
640
                        // Remove work from nowWorking set 
 
641
                        if ($this->childs[$pid]["message"] && $this->preventParalleling) {
 
642
                                $this->nowWorkingSet->remove($this->childs[$pid]["message"]);
 
643
                        }
 
644
                        unset($this->childs[$pid]);
 
645
                        
 
646
                        $this->logger->debug(sprintf("Child termination options. "
 
647
                                        . "wifexited: %d, wifsignaled: %d, wifstopped: %d, stopForking: %d", 
 
648
                                        pcntl_wifexited($status), pcntl_wifsignaled($status), 
 
649
                                        pcntl_wifstopped($status), $this->stopForking));
 
650
                        
 
651
                        // In case of unnormal exit fork new child process
 
652
                        // 1. status=65280 when child process died with fatal error (set by PHP)
 
653
                        // 2. exit=9 when child was terminated by parent or by unhandled exception (set by ProcessPool)
 
654
                        // 3. stopeed by signal
 
655
                        if ((pcntl_wifexited($status) && $status == 65280) ||
 
656
                                (pcntl_wifexited($status) && pcntl_wexitstatus($status) == self::$termExitCode) ||
 
657
                                (pcntl_wifsignaled($status))) {
 
658
                                try {
 
659
                                        if (!$this->stopForking) {
 
660
                                                // Increase slippage
 
661
                                                $this->slippage++;
 
662
                                                
 
663
                                                if ($this->slippage < $this->slippageLimit) {
 
664
                                                        $this->forkChild(false);
 
665
                                                } else {
 
666
                                                        $this->logger->info(sprintf("Slippage limit: %d exceed. No new childs will be forked", 
 
667
                                                                        $this->slippage));
 
668
                                                        $this->stopForking = true;
 
669
                                                }
 
670
                                        } else {
 
671
                                                $this->logger->debug("'stopForking' flag prevents new process forking");
 
672
                                        }
 
673
                                } catch (Scalr_System_Ipc_Exception $e) {
 
674
                                        $this->logger->error(sprintf("Cannot fork child. Caught: <%s> %s", 
 
675
                                                        get_class($e), $e->getMessage()));
 
676
                                }
 
677
                        }
 
678
                }
 
679
        }
 
680
 
 
681
        protected function onSIGUSR1 () {
 
682
                while ($message0 = $this->childEventQueue->peek()) {
 
683
                        $message = unserialize($message0);
 
684
                        $this->logger->info(sprintf("Peeked '%s' from event queue", $message["type"]));
 
685
                        
 
686
                        switch ($message["type"]) {
 
687
                                case "beforeHandleWork":
 
688
                                        $this->childs[$message["pid"]]["workStartTime"] = $message["microtime"];
 
689
                                        $this->childs[$message["pid"]]["message"] = $message["message"];
 
690
                                        break;
 
691
                                        
 
692
                                case "afterHandleWork":
 
693
                                        unset($this->childs[$message["pid"]]["workStartTime"]);
 
694
                                        unset($this->childs[$message["pid"]]["message"]);
 
695
 
 
696
                                        // Reset slippage counter
 
697
                                        $this->slippage = 0;
 
698
                                        break;
 
699
                                        
 
700
                                case "start":
 
701
                                        if (!$this->ready) {
 
702
                                                $this->shm->put(self::SHM_STARTUP_BARRIER, $this->shm->get(self::SHM_STARTUP_BARRIER) + 1);
 
703
                                        } else {
 
704
                                                $this->childs[$message["pid"]]["startTime"] = microtime(true);
 
705
                                        }
 
706
                                        break;
 
707
                                
 
708
                                case "memoryUsage":
 
709
                                        if ($this->workerMemoryLimit && $message["memory"] > $this->workerMemoryLimit) {
 
710
                                                $this->logger->warn(sprintf(
 
711
                                                                "Worker %d allocates %d Kb. Maximum %d Kb is allowed by configuration", 
 
712
                                                                $message["pid"], $message["memory"], $this->workerMemoryLimit));
 
713
                                                $this->terminateChild($message["pid"]);
 
714
                                        }
 
715
                                        break;
 
716
                                        
 
717
                                default:
 
718
                                        $this->logger->warn("Peeked unknown message from child event queue. "
 
719
                                                        . "Serialized message: {$message0}");
 
720
                        }
 
721
                }
 
722
        }
 
723
        
 
724
        function getPid () {
 
725
                return $this->poolPid;
 
726
        }
 
727
}