30
from eventlet import event
30
31
import logging as std_logging
31
32
from oslo.config import cfg
33
34
from heat.openstack.common import eventlet_backdoor
34
from heat.openstack.common.gettextutils import _
35
from heat.openstack.common.gettextutils import _ # noqa
35
36
from heat.openstack.common import importutils
36
37
from heat.openstack.common import log as logging
37
38
from heat.openstack.common import threadgroup
54
self._services = threadgroup.ThreadGroup()
55
self.services = Services()
55
56
self.backdoor_port = eventlet_backdoor.initialize_if_enabled()
58
def run_service(service):
59
"""Start and wait for a service to finish.
61
:param service: service to run and wait for.
68
58
def launch_service(self, service):
69
59
"""Load and start the given service.
75
65
service.backdoor_port = self.backdoor_port
76
self._services.add_thread(self.run_service, service)
66
self.services.add(service)
79
69
"""Stop all services which are currently running.
103
102
# Allow the process to be killed again and die from natural causes
104
103
signal.signal(signal.SIGTERM, signal.SIG_DFL)
105
104
signal.signal(signal.SIGINT, signal.SIG_DFL)
105
signal.signal(signal.SIGHUP, signal.SIG_DFL)
107
107
raise SignalExit(signo)
109
def handle_signal(self):
110
110
signal.signal(signal.SIGTERM, self._handle_signal)
111
111
signal.signal(signal.SIGINT, self._handle_signal)
112
signal.signal(signal.SIGHUP, self._handle_signal)
114
def _wait_for_exit_or_signal(self):
113
118
LOG.debug(_('Full set of CONF:'))
114
119
CONF.log_opt_values(LOG, std_logging.DEBUG)
118
122
super(ServiceLauncher, self).wait()
119
123
except SignalExit as exc:
120
124
signame = {signal.SIGTERM: 'SIGTERM',
121
signal.SIGINT: 'SIGINT'}[exc.signo]
125
signal.SIGINT: 'SIGINT',
126
signal.SIGHUP: 'SIGHUP'}[exc.signo]
122
127
LOG.info(_('Caught %s, exiting'), signame)
123
128
status = exc.code
124
130
except SystemExit as exc:
125
131
status = exc.code
138
# We're shutting down, so it doesn't matter at this point.
139
LOG.exception(_('Exception during rpc cleanup.'))
146
status, signo = self._wait_for_exit_or_signal()
147
if signo != signal.SIGHUP:
133
152
class ServiceWrapper(object):
145
164
self.running = True
146
165
rfd, self.writepipe = os.pipe()
147
166
self.readpipe = eventlet.greenio.GreenPipe(rfd, 'r')
169
def handle_signal(self):
149
170
signal.signal(signal.SIGTERM, self._handle_signal)
150
171
signal.signal(signal.SIGINT, self._handle_signal)
172
signal.signal(signal.SIGHUP, self._handle_signal)
152
174
def _handle_signal(self, signo, frame):
153
175
self.sigcaught = signo
156
178
# Allow the process to be killed again and die from natural causes
157
179
signal.signal(signal.SIGTERM, signal.SIG_DFL)
158
180
signal.signal(signal.SIGINT, signal.SIG_DFL)
181
signal.signal(signal.SIGHUP, signal.SIG_DFL)
160
183
def _pipe_watcher(self):
161
184
# This will block until the write end is closed when the parent
169
def _child_process(self, service):
192
def _child_process_handle_signal(self):
170
193
# Setup child signal handlers differently
171
194
def _sigterm(*args):
172
195
signal.signal(signal.SIGTERM, signal.SIG_DFL)
173
196
raise SignalExit(signal.SIGTERM)
199
signal.signal(signal.SIGHUP, signal.SIG_DFL)
200
raise SignalExit(signal.SIGHUP)
175
202
signal.signal(signal.SIGTERM, _sigterm)
203
signal.signal(signal.SIGHUP, _sighup)
176
204
# Block SIGINT and let the parent send us a SIGTERM
177
205
signal.signal(signal.SIGINT, signal.SIG_IGN)
207
def _child_wait_for_exit_or_signal(self, launcher):
213
except SignalExit as exc:
214
signame = {signal.SIGTERM: 'SIGTERM',
215
signal.SIGINT: 'SIGINT',
216
signal.SIGHUP: 'SIGHUP'}[exc.signo]
217
LOG.info(_('Caught %s, exiting'), signame)
220
except SystemExit as exc:
222
except BaseException:
223
LOG.exception(_('Unhandled exception'))
230
def _child_process(self, service):
231
self._child_process_handle_signal()
179
233
# Reopen the eventlet hub to make sure we don't share an epoll
180
234
# fd with parent and/or siblings, which would be bad
181
235
eventlet.hubs.use_hub()
191
245
launcher = Launcher()
192
launcher.run_service(service)
246
launcher.launch_service(service)
194
249
def _start_child(self, wrap):
195
250
if len(wrap.forktimes) > wrap.workers:
210
265
# NOTE(johannes): All exceptions are caught to ensure this
211
266
# doesn't fallback into the loop spawning children. It would
212
267
# be bad for a child to spawn more children.
215
self._child_process(wrap.service)
216
except SignalExit as exc:
217
signame = {signal.SIGTERM: 'SIGTERM',
218
signal.SIGINT: 'SIGINT'}[exc.signo]
219
LOG.info(_('Caught %s, exiting'), signame)
221
except SystemExit as exc:
223
except BaseException:
224
LOG.exception(_('Unhandled exception'))
268
launcher = self._child_process(wrap.service)
270
self._child_process_handle_signal()
271
status, signo = self._child_wait_for_exit_or_signal(launcher)
272
if signo != signal.SIGHUP:
270
317
wrap.children.remove(pid)
274
"""Loop waiting on children to die and respawning as necessary."""
276
LOG.debug(_('Full set of CONF:'))
277
CONF.log_opt_values(LOG, std_logging.DEBUG)
320
def _respawn_children(self):
279
321
while self.running:
280
322
wrap = self._wait_child()
284
326
# (see bug #1095346)
285
327
eventlet.greenthread.sleep(.01)
288
329
while self.running and len(wrap.children) < wrap.workers:
289
330
self._start_child(wrap)
292
signame = {signal.SIGTERM: 'SIGTERM',
293
signal.SIGINT: 'SIGINT'}[self.sigcaught]
294
LOG.info(_('Caught %s, stopping children'), signame)
333
"""Loop waiting on children to die and respawning as necessary."""
335
LOG.debug(_('Full set of CONF:'))
336
CONF.log_opt_values(LOG, std_logging.DEBUG)
340
self._respawn_children()
342
signame = {signal.SIGTERM: 'SIGTERM',
343
signal.SIGINT: 'SIGINT',
344
signal.SIGHUP: 'SIGHUP'}[self.sigcaught]
345
LOG.info(_('Caught %s, stopping children'), signame)
346
if self.sigcaught != signal.SIGHUP:
349
for pid in self.children:
350
os.kill(pid, signal.SIGHUP)
352
self.sigcaught = None
296
354
for pid in self.children:
313
371
def __init__(self, threads=1000):
314
372
self.tg = threadgroup.ThreadGroup(threads)
374
# signal that the service is done shutting itself down:
375
self._done = event.Event()
378
# NOTE(Fengqian): docs for Event.reset() recommend against using it
379
self._done = event.Event()
387
# Signal that service cleanup is done:
388
if not self._done.ready():
395
class Services(object):
399
self.tg = threadgroup.ThreadGroup()
400
self.done = event.Event()
402
def add(self, service):
403
self.services.append(service)
404
self.tg.add_thread(self.run_service, service, self.done)
407
# wait for graceful shutdown of services:
408
for service in self.services:
412
# Each service has performed cleanup, now signal that the run_service
413
# wrapper threads can now die:
414
if not self.done.ready():
425
self.done = event.Event()
426
for restart_service in self.services:
427
restart_service.reset()
428
self.tg.add_thread(self.run_service, restart_service, self.done)
431
def run_service(service, done):
432
"""Service start wrapper.
434
:param service: service to run
435
:param done: event to wait on until a shutdown is triggered
326
443
def launch(service, workers=None):