1
# vim: tabstop=4 shiftwidth=4 softtabstop=4
3
1
# Copyright 2010 United States Government as represented by the
4
2
# Administrator of the National Aeronautics and Space Administration.
5
3
# Copyright 2011 Justin Santa Barbara
20
18
"""Generic Node base class for all workers that run on hosts."""
21
import logging as std_logging
29
# Importing just the symbol here because the io module does not
30
# exist in Python 2.6.
31
from io import UnsupportedOperation # noqa
34
UnsupportedOperation = None
30
37
from eventlet import event
31
import logging as std_logging
32
38
from oslo.config import cfg
34
40
from trove.openstack.common import eventlet_backdoor
35
from trove.openstack.common.gettextutils import _ # noqa
41
from trove.openstack.common.gettextutils import _LE, _LI, _LW
36
42
from trove.openstack.common import importutils
37
43
from trove.openstack.common import log as logging
44
from trove.openstack.common import systemd
38
45
from trove.openstack.common import threadgroup
43
50
LOG = logging.getLogger(__name__)
53
def _sighup_supported():
54
return hasattr(signal, 'SIGHUP')
58
# The process group for a foreground process will match the
59
# process group of the controlling terminal. If those values do
60
# not match, or ioctl() fails on the stdout file handle, we assume
61
# the process is running in the background as a daemon.
62
# http://www.gnu.org/software/bash/manual/bashref.html#Job-Control-Basics
64
is_daemon = os.getpgrp() != os.tcgetpgrp(sys.stdout.fileno())
65
except OSError as err:
66
if err.errno == errno.ENOTTY:
67
# Assume we are a daemon because there is no terminal.
71
except UnsupportedOperation:
72
# Could not get the fileno for stdout, so we must be a daemon.
77
def _is_sighup_and_daemon(signo):
78
if not (_sighup_supported() and signo == signal.SIGHUP):
79
# Avoid checking if we are a daemon, because the signal isn't
85
def _signo_to_signame(signo):
86
signals = {signal.SIGTERM: 'SIGTERM',
87
signal.SIGINT: 'SIGINT'}
88
if _sighup_supported():
89
signals[signal.SIGHUP] = 'SIGHUP'
93
def _set_signals_handler(handler):
94
signal.signal(signal.SIGTERM, handler)
95
signal.signal(signal.SIGINT, handler)
96
if _sighup_supported():
97
signal.signal(signal.SIGHUP, handler)
46
100
class Launcher(object):
47
101
"""Launch one or more services and wait for them to complete."""
100
154
class ServiceLauncher(Launcher):
101
155
def _handle_signal(self, signo, frame):
102
156
# Allow the process to be killed again and die from natural causes
103
signal.signal(signal.SIGTERM, signal.SIG_DFL)
104
signal.signal(signal.SIGINT, signal.SIG_DFL)
105
signal.signal(signal.SIGHUP, signal.SIG_DFL)
157
_set_signals_handler(signal.SIG_DFL)
107
158
raise SignalExit(signo)
109
160
def handle_signal(self):
110
signal.signal(signal.SIGTERM, self._handle_signal)
111
signal.signal(signal.SIGINT, self._handle_signal)
112
signal.signal(signal.SIGHUP, self._handle_signal)
161
_set_signals_handler(self._handle_signal)
114
def _wait_for_exit_or_signal(self):
163
def _wait_for_exit_or_signal(self, ready_callback=None):
118
LOG.debug(_('Full set of CONF:'))
167
LOG.debug('Full set of CONF:')
119
168
CONF.log_opt_values(LOG, std_logging.DEBUG)
122
173
super(ServiceLauncher, self).wait()
123
174
except SignalExit as exc:
124
signame = {signal.SIGTERM: 'SIGTERM',
125
signal.SIGINT: 'SIGINT',
126
signal.SIGHUP: 'SIGHUP'}[exc.signo]
127
LOG.info(_('Caught %s, exiting'), signame)
175
signame = _signo_to_signame(exc.signo)
176
LOG.info(_LI('Caught %s, exiting'), signame)
128
177
status = exc.code
129
178
signo = exc.signo
130
179
except SystemExit as exc:
137
186
except Exception:
138
187
# We're shutting down, so it doesn't matter at this point.
139
LOG.exception(_('Exception during rpc cleanup.'))
188
LOG.exception(_LE('Exception during rpc cleanup.'))
141
190
return status, signo
192
def wait(self, ready_callback=None):
193
systemd.notify_once()
145
195
self.handle_signal()
146
status, signo = self._wait_for_exit_or_signal()
147
if signo != signal.SIGHUP:
196
status, signo = self._wait_for_exit_or_signal(ready_callback)
197
if not _is_sighup_and_daemon(signo):
160
210
class ProcessLauncher(object):
211
def __init__(self, wait_interval=0.01):
214
:param wait_interval: The interval to sleep for between checks
215
of child process exit.
162
217
self.children = {}
163
218
self.sigcaught = None
164
219
self.running = True
220
self.wait_interval = wait_interval
165
221
rfd, self.writepipe = os.pipe()
166
222
self.readpipe = eventlet.greenio.GreenPipe(rfd, 'r')
167
223
self.handle_signal()
169
225
def handle_signal(self):
170
signal.signal(signal.SIGTERM, self._handle_signal)
171
signal.signal(signal.SIGINT, self._handle_signal)
172
signal.signal(signal.SIGHUP, self._handle_signal)
226
_set_signals_handler(self._handle_signal)
174
228
def _handle_signal(self, signo, frame):
175
229
self.sigcaught = signo
176
230
self.running = False
178
232
# Allow the process to be killed again and die from natural causes
179
signal.signal(signal.SIGTERM, signal.SIG_DFL)
180
signal.signal(signal.SIGINT, signal.SIG_DFL)
181
signal.signal(signal.SIGHUP, signal.SIG_DFL)
233
_set_signals_handler(signal.SIG_DFL)
183
235
def _pipe_watcher(self):
184
236
# This will block until the write end is closed when the parent
185
237
# dies unexpectedly
186
238
self.readpipe.read()
188
LOG.info(_('Parent process has died unexpectedly, exiting'))
240
LOG.info(_LI('Parent process has died unexpectedly, exiting'))
200
252
raise SignalExit(signal.SIGHUP)
202
254
signal.signal(signal.SIGTERM, _sigterm)
203
signal.signal(signal.SIGHUP, _sighup)
255
if _sighup_supported():
256
signal.signal(signal.SIGHUP, _sighup)
204
257
# Block SIGINT and let the parent send us a SIGTERM
205
258
signal.signal(signal.SIGINT, signal.SIG_IGN)
207
260
def _child_wait_for_exit_or_signal(self, launcher):
264
# NOTE(johannes): All exceptions are caught to ensure this
265
# doesn't fallback into the loop spawning children. It would
266
# be bad for a child to spawn more children.
213
269
except SignalExit as exc:
214
signame = {signal.SIGTERM: 'SIGTERM',
215
signal.SIGINT: 'SIGINT',
216
signal.SIGHUP: 'SIGHUP'}[exc.signo]
217
LOG.info(_('Caught %s, exiting'), signame)
270
signame = _signo_to_signame(exc.signo)
271
LOG.info(_LI('Child caught %s, exiting'), signame)
218
272
status = exc.code
219
273
signo = exc.signo
220
274
except SystemExit as exc:
221
275
status = exc.code
222
276
except BaseException:
223
LOG.exception(_('Unhandled exception'))
277
LOG.exception(_LE('Unhandled exception'))
253
307
# start up quickly but ensure we don't fork off children that
254
308
# die instantly too quickly.
255
309
if time.time() - wrap.forktimes[0] < wrap.workers:
256
LOG.info(_('Forking too fast, sleeping'))
310
LOG.info(_LI('Forking too fast, sleeping'))
259
313
wrap.forktimes.pop(0)
265
# NOTE(johannes): All exceptions are caught to ensure this
266
# doesn't fallback into the loop spawning children. It would
267
# be bad for a child to spawn more children.
268
319
launcher = self._child_process(wrap.service)
270
321
self._child_process_handle_signal()
271
322
status, signo = self._child_wait_for_exit_or_signal(launcher)
272
if signo != signal.SIGHUP:
323
if not _is_sighup_and_daemon(signo):
274
325
launcher.restart()
278
LOG.info(_('Started child %d'), pid)
329
LOG.info(_LI('Started child %d'), pid)
280
331
wrap.children.add(pid)
281
332
self.children[pid] = wrap
285
336
def launch_service(self, service, workers=1):
286
337
wrap = ServiceWrapper(service, workers)
288
LOG.info(_('Starting %d workers'), wrap.workers)
339
LOG.info(_LI('Starting %d workers'), wrap.workers)
289
340
while self.running and len(wrap.children) < wrap.workers:
290
341
self._start_child(wrap)
303
354
if os.WIFSIGNALED(status):
304
355
sig = os.WTERMSIG(status)
305
LOG.info(_('Child %(pid)d killed by signal %(sig)d'),
356
LOG.info(_LI('Child %(pid)d killed by signal %(sig)d'),
306
357
dict(pid=pid, sig=sig))
308
359
code = os.WEXITSTATUS(status)
309
LOG.info(_('Child %(pid)s exited with status %(code)d'),
360
LOG.info(_LI('Child %(pid)s exited with status %(code)d'),
310
361
dict(pid=pid, code=code))
312
363
if pid not in self.children:
313
LOG.warning(_('pid %d not in child list'), pid)
364
LOG.warning(_LW('pid %d not in child list'), pid)
316
367
wrap = self.children.pop(pid)
324
375
# Yield to other threads if no children have exited
325
376
# Sleep for a short time to avoid excessive CPU usage
326
377
# (see bug #1095346)
327
eventlet.greenthread.sleep(.01)
378
eventlet.greenthread.sleep(self.wait_interval)
329
380
while self.running and len(wrap.children) < wrap.workers:
330
381
self._start_child(wrap)
333
384
"""Loop waiting on children to die and respawning as necessary."""
335
LOG.debug(_('Full set of CONF:'))
386
systemd.notify_once()
387
LOG.debug('Full set of CONF:')
336
388
CONF.log_opt_values(LOG, std_logging.DEBUG)
340
self._respawn_children()
342
signame = {signal.SIGTERM: 'SIGTERM',
343
signal.SIGINT: 'SIGINT',
344
signal.SIGHUP: 'SIGHUP'}[self.sigcaught]
345
LOG.info(_('Caught %s, stopping children'), signame)
346
if self.sigcaught != signal.SIGHUP:
349
for pid in self.children:
350
os.kill(pid, signal.SIGHUP)
352
self.sigcaught = None
393
self._respawn_children()
394
# No signal means that stop was called. Don't clean up here.
395
if not self.sigcaught:
398
signame = _signo_to_signame(self.sigcaught)
399
LOG.info(_LI('Caught %s, stopping children'), signame)
400
if not _is_sighup_and_daemon(self.sigcaught):
403
for pid in self.children:
404
os.kill(pid, signal.SIGHUP)
406
self.sigcaught = None
407
except eventlet.greenlet.GreenletExit:
408
LOG.info(_LI("Wait called after thread killed. Cleaning up."))
413
"""Terminate child processes and wait on each."""
354
415
for pid in self.children:
356
417
os.kill(pid, signal.SIGTERM)
443
def launch(service, workers=None):
504
def launch(service, workers=1):
505
if workers is None or workers == 1:
506
launcher = ServiceLauncher()
507
launcher.launch_service(service)
445
509
launcher = ProcessLauncher()
446
510
launcher.launch_service(service, workers=workers)
448
launcher = ServiceLauncher()
449
launcher.launch_service(service)