~ubuntu-branches/ubuntu/utopic/openstack-trove/utopic-proposed

« back to all changes in this revision

Viewing changes to trove/openstack/common/service.py

  • Committer: Package Import Robot
  • Author(s): James Page, Corey Bryant, James Page
  • Date: 2014-09-25 15:57:16 UTC
  • mfrom: (1.1.8)
  • Revision ID: package-import@ubuntu.com-20140925155716-5elwewljjkfa0abg
Tags: 2014.2~b3-0ubuntu1
[ Corey Bryant ]
* Enable unit test execution (LP: #1347567):
  - d/rules: Execute tests during package build.
  - d/p/skip-tests.patch: Add patch to skip broken tests.

[ James Page ]
* New upstream milestone release.
* d/rules: Tidy surplus overrides.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# vim: tabstop=4 shiftwidth=4 softtabstop=4
2
 
 
3
1
# Copyright 2010 United States Government as represented by the
4
2
# Administrator of the National Aeronautics and Space Administration.
5
3
# Copyright 2011 Justin Santa Barbara
20
18
"""Generic Node base class for all workers that run on hosts."""
21
19
 
22
20
import errno
 
21
import logging as std_logging
23
22
import os
24
23
import random
25
24
import signal
26
25
import sys
27
26
import time
28
27
 
 
28
try:
 
29
    # Importing just the symbol here because the io module does not
 
30
    # exist in Python 2.6.
 
31
    from io import UnsupportedOperation  # noqa
 
32
except ImportError:
 
33
    # Python 2.6
 
34
    UnsupportedOperation = None
 
35
 
29
36
import eventlet
30
37
from eventlet import event
31
 
import logging as std_logging
32
38
from oslo.config import cfg
33
39
 
34
40
from trove.openstack.common import eventlet_backdoor
35
 
from trove.openstack.common.gettextutils import _  # noqa
 
41
from trove.openstack.common.gettextutils import _LE, _LI, _LW
36
42
from trove.openstack.common import importutils
37
43
from trove.openstack.common import log as logging
 
44
from trove.openstack.common import systemd
38
45
from trove.openstack.common import threadgroup
39
46
 
40
47
 
43
50
LOG = logging.getLogger(__name__)
44
51
 
45
52
 
 
53
def _sighup_supported():
 
54
    return hasattr(signal, 'SIGHUP')
 
55
 
 
56
 
 
57
def _is_daemon():
 
58
    # The process group for a foreground process will match the
 
59
    # process group of the controlling terminal. If those values do
 
60
    # not match, or ioctl() fails on the stdout file handle, we assume
 
61
    # the process is running in the background as a daemon.
 
62
    # http://www.gnu.org/software/bash/manual/bashref.html#Job-Control-Basics
 
63
    try:
 
64
        is_daemon = os.getpgrp() != os.tcgetpgrp(sys.stdout.fileno())
 
65
    except OSError as err:
 
66
        if err.errno == errno.ENOTTY:
 
67
            # Assume we are a daemon because there is no terminal.
 
68
            is_daemon = True
 
69
        else:
 
70
            raise
 
71
    except UnsupportedOperation:
 
72
        # Could not get the fileno for stdout, so we must be a daemon.
 
73
        is_daemon = True
 
74
    return is_daemon
 
75
 
 
76
 
 
77
def _is_sighup_and_daemon(signo):
 
78
    if not (_sighup_supported() and signo == signal.SIGHUP):
 
79
        # Avoid checking if we are a daemon, because the signal isn't
 
80
        # SIGHUP.
 
81
        return False
 
82
    return _is_daemon()
 
83
 
 
84
 
 
85
def _signo_to_signame(signo):
 
86
    signals = {signal.SIGTERM: 'SIGTERM',
 
87
               signal.SIGINT: 'SIGINT'}
 
88
    if _sighup_supported():
 
89
        signals[signal.SIGHUP] = 'SIGHUP'
 
90
    return signals[signo]
 
91
 
 
92
 
 
93
def _set_signals_handler(handler):
 
94
    signal.signal(signal.SIGTERM, handler)
 
95
    signal.signal(signal.SIGINT, handler)
 
96
    if _sighup_supported():
 
97
        signal.signal(signal.SIGHUP, handler)
 
98
 
 
99
 
46
100
class Launcher(object):
47
101
    """Launch one or more services and wait for them to complete."""
48
102
 
100
154
class ServiceLauncher(Launcher):
101
155
    def _handle_signal(self, signo, frame):
102
156
        # Allow the process to be killed again and die from natural causes
103
 
        signal.signal(signal.SIGTERM, signal.SIG_DFL)
104
 
        signal.signal(signal.SIGINT, signal.SIG_DFL)
105
 
        signal.signal(signal.SIGHUP, signal.SIG_DFL)
106
 
 
 
157
        _set_signals_handler(signal.SIG_DFL)
107
158
        raise SignalExit(signo)
108
159
 
109
160
    def handle_signal(self):
110
 
        signal.signal(signal.SIGTERM, self._handle_signal)
111
 
        signal.signal(signal.SIGINT, self._handle_signal)
112
 
        signal.signal(signal.SIGHUP, self._handle_signal)
 
161
        _set_signals_handler(self._handle_signal)
113
162
 
114
 
    def _wait_for_exit_or_signal(self):
 
163
    def _wait_for_exit_or_signal(self, ready_callback=None):
115
164
        status = None
116
165
        signo = 0
117
166
 
118
 
        LOG.debug(_('Full set of CONF:'))
 
167
        LOG.debug('Full set of CONF:')
119
168
        CONF.log_opt_values(LOG, std_logging.DEBUG)
120
169
 
121
170
        try:
 
171
            if ready_callback:
 
172
                ready_callback()
122
173
            super(ServiceLauncher, self).wait()
123
174
        except SignalExit as exc:
124
 
            signame = {signal.SIGTERM: 'SIGTERM',
125
 
                       signal.SIGINT: 'SIGINT',
126
 
                       signal.SIGHUP: 'SIGHUP'}[exc.signo]
127
 
            LOG.info(_('Caught %s, exiting'), signame)
 
175
            signame = _signo_to_signame(exc.signo)
 
176
            LOG.info(_LI('Caught %s, exiting'), signame)
128
177
            status = exc.code
129
178
            signo = exc.signo
130
179
        except SystemExit as exc:
136
185
                    rpc.cleanup()
137
186
                except Exception:
138
187
                    # We're shutting down, so it doesn't matter at this point.
139
 
                    LOG.exception(_('Exception during rpc cleanup.'))
 
188
                    LOG.exception(_LE('Exception during rpc cleanup.'))
140
189
 
141
190
        return status, signo
142
191
 
143
 
    def wait(self):
 
192
    def wait(self, ready_callback=None):
 
193
        systemd.notify_once()
144
194
        while True:
145
195
            self.handle_signal()
146
 
            status, signo = self._wait_for_exit_or_signal()
147
 
            if signo != signal.SIGHUP:
 
196
            status, signo = self._wait_for_exit_or_signal(ready_callback)
 
197
            if not _is_sighup_and_daemon(signo):
148
198
                return status
149
199
            self.restart()
150
200
 
158
208
 
159
209
 
160
210
class ProcessLauncher(object):
161
 
    def __init__(self):
 
211
    def __init__(self, wait_interval=0.01):
 
212
        """Constructor.
 
213
 
 
214
        :param wait_interval: The interval to sleep for between checks
 
215
                              of child process exit.
 
216
        """
162
217
        self.children = {}
163
218
        self.sigcaught = None
164
219
        self.running = True
 
220
        self.wait_interval = wait_interval
165
221
        rfd, self.writepipe = os.pipe()
166
222
        self.readpipe = eventlet.greenio.GreenPipe(rfd, 'r')
167
223
        self.handle_signal()
168
224
 
169
225
    def handle_signal(self):
170
 
        signal.signal(signal.SIGTERM, self._handle_signal)
171
 
        signal.signal(signal.SIGINT, self._handle_signal)
172
 
        signal.signal(signal.SIGHUP, self._handle_signal)
 
226
        _set_signals_handler(self._handle_signal)
173
227
 
174
228
    def _handle_signal(self, signo, frame):
175
229
        self.sigcaught = signo
176
230
        self.running = False
177
231
 
178
232
        # Allow the process to be killed again and die from natural causes
179
 
        signal.signal(signal.SIGTERM, signal.SIG_DFL)
180
 
        signal.signal(signal.SIGINT, signal.SIG_DFL)
181
 
        signal.signal(signal.SIGHUP, signal.SIG_DFL)
 
233
        _set_signals_handler(signal.SIG_DFL)
182
234
 
183
235
    def _pipe_watcher(self):
184
236
        # This will block until the write end is closed when the parent
185
237
        # dies unexpectedly
186
238
        self.readpipe.read()
187
239
 
188
 
        LOG.info(_('Parent process has died unexpectedly, exiting'))
 
240
        LOG.info(_LI('Parent process has died unexpectedly, exiting'))
189
241
 
190
242
        sys.exit(1)
191
243
 
200
252
            raise SignalExit(signal.SIGHUP)
201
253
 
202
254
        signal.signal(signal.SIGTERM, _sigterm)
203
 
        signal.signal(signal.SIGHUP, _sighup)
 
255
        if _sighup_supported():
 
256
            signal.signal(signal.SIGHUP, _sighup)
204
257
        # Block SIGINT and let the parent send us a SIGTERM
205
258
        signal.signal(signal.SIGINT, signal.SIG_IGN)
206
259
 
207
260
    def _child_wait_for_exit_or_signal(self, launcher):
208
 
        status = None
 
261
        status = 0
209
262
        signo = 0
210
263
 
 
264
        # NOTE(johannes): All exceptions are caught to ensure this
 
265
        # doesn't fallback into the loop spawning children. It would
 
266
        # be bad for a child to spawn more children.
211
267
        try:
212
268
            launcher.wait()
213
269
        except SignalExit as exc:
214
 
            signame = {signal.SIGTERM: 'SIGTERM',
215
 
                       signal.SIGINT: 'SIGINT',
216
 
                       signal.SIGHUP: 'SIGHUP'}[exc.signo]
217
 
            LOG.info(_('Caught %s, exiting'), signame)
 
270
            signame = _signo_to_signame(exc.signo)
 
271
            LOG.info(_LI('Child caught %s, exiting'), signame)
218
272
            status = exc.code
219
273
            signo = exc.signo
220
274
        except SystemExit as exc:
221
275
            status = exc.code
222
276
        except BaseException:
223
 
            LOG.exception(_('Unhandled exception'))
 
277
            LOG.exception(_LE('Unhandled exception'))
224
278
            status = 2
225
279
        finally:
226
280
            launcher.stop()
253
307
            # start up quickly but ensure we don't fork off children that
254
308
            # die instantly too quickly.
255
309
            if time.time() - wrap.forktimes[0] < wrap.workers:
256
 
                LOG.info(_('Forking too fast, sleeping'))
 
310
                LOG.info(_LI('Forking too fast, sleeping'))
257
311
                time.sleep(1)
258
312
 
259
313
            wrap.forktimes.pop(0)
262
316
 
263
317
        pid = os.fork()
264
318
        if pid == 0:
265
 
            # NOTE(johannes): All exceptions are caught to ensure this
266
 
            # doesn't fallback into the loop spawning children. It would
267
 
            # be bad for a child to spawn more children.
268
319
            launcher = self._child_process(wrap.service)
269
320
            while True:
270
321
                self._child_process_handle_signal()
271
322
                status, signo = self._child_wait_for_exit_or_signal(launcher)
272
 
                if signo != signal.SIGHUP:
 
323
                if not _is_sighup_and_daemon(signo):
273
324
                    break
274
325
                launcher.restart()
275
326
 
276
327
            os._exit(status)
277
328
 
278
 
        LOG.info(_('Started child %d'), pid)
 
329
        LOG.info(_LI('Started child %d'), pid)
279
330
 
280
331
        wrap.children.add(pid)
281
332
        self.children[pid] = wrap
285
336
    def launch_service(self, service, workers=1):
286
337
        wrap = ServiceWrapper(service, workers)
287
338
 
288
 
        LOG.info(_('Starting %d workers'), wrap.workers)
 
339
        LOG.info(_LI('Starting %d workers'), wrap.workers)
289
340
        while self.running and len(wrap.children) < wrap.workers:
290
341
            self._start_child(wrap)
291
342
 
302
353
 
303
354
        if os.WIFSIGNALED(status):
304
355
            sig = os.WTERMSIG(status)
305
 
            LOG.info(_('Child %(pid)d killed by signal %(sig)d'),
 
356
            LOG.info(_LI('Child %(pid)d killed by signal %(sig)d'),
306
357
                     dict(pid=pid, sig=sig))
307
358
        else:
308
359
            code = os.WEXITSTATUS(status)
309
 
            LOG.info(_('Child %(pid)s exited with status %(code)d'),
 
360
            LOG.info(_LI('Child %(pid)s exited with status %(code)d'),
310
361
                     dict(pid=pid, code=code))
311
362
 
312
363
        if pid not in self.children:
313
 
            LOG.warning(_('pid %d not in child list'), pid)
 
364
            LOG.warning(_LW('pid %d not in child list'), pid)
314
365
            return None
315
366
 
316
367
        wrap = self.children.pop(pid)
324
375
                # Yield to other threads if no children have exited
325
376
                # Sleep for a short time to avoid excessive CPU usage
326
377
                # (see bug #1095346)
327
 
                eventlet.greenthread.sleep(.01)
 
378
                eventlet.greenthread.sleep(self.wait_interval)
328
379
                continue
329
380
            while self.running and len(wrap.children) < wrap.workers:
330
381
                self._start_child(wrap)
332
383
    def wait(self):
333
384
        """Loop waiting on children to die and respawning as necessary."""
334
385
 
335
 
        LOG.debug(_('Full set of CONF:'))
 
386
        systemd.notify_once()
 
387
        LOG.debug('Full set of CONF:')
336
388
        CONF.log_opt_values(LOG, std_logging.DEBUG)
337
389
 
338
 
        while True:
339
 
            self.handle_signal()
340
 
            self._respawn_children()
341
 
            if self.sigcaught:
342
 
                signame = {signal.SIGTERM: 'SIGTERM',
343
 
                           signal.SIGINT: 'SIGINT',
344
 
                           signal.SIGHUP: 'SIGHUP'}[self.sigcaught]
345
 
                LOG.info(_('Caught %s, stopping children'), signame)
346
 
            if self.sigcaught != signal.SIGHUP:
347
 
                break
348
 
 
349
 
            for pid in self.children:
350
 
                os.kill(pid, signal.SIGHUP)
351
 
            self.running = True
352
 
            self.sigcaught = None
353
 
 
 
390
        try:
 
391
            while True:
 
392
                self.handle_signal()
 
393
                self._respawn_children()
 
394
                # No signal means that stop was called.  Don't clean up here.
 
395
                if not self.sigcaught:
 
396
                    return
 
397
 
 
398
                signame = _signo_to_signame(self.sigcaught)
 
399
                LOG.info(_LI('Caught %s, stopping children'), signame)
 
400
                if not _is_sighup_and_daemon(self.sigcaught):
 
401
                    break
 
402
 
 
403
                for pid in self.children:
 
404
                    os.kill(pid, signal.SIGHUP)
 
405
                self.running = True
 
406
                self.sigcaught = None
 
407
        except eventlet.greenlet.GreenletExit:
 
408
            LOG.info(_LI("Wait called after thread killed.  Cleaning up."))
 
409
 
 
410
        self.stop()
 
411
 
 
412
    def stop(self):
 
413
        """Terminate child processes and wait on each."""
 
414
        self.running = False
354
415
        for pid in self.children:
355
416
            try:
356
417
                os.kill(pid, signal.SIGTERM)
360
421
 
361
422
        # Wait for children to die
362
423
        if self.children:
363
 
            LOG.info(_('Waiting on %d children to exit'), len(self.children))
 
424
            LOG.info(_LI('Waiting on %d children to exit'), len(self.children))
364
425
            while self.children:
365
426
                self._wait_child()
366
427
 
440
501
        done.wait()
441
502
 
442
503
 
443
 
def launch(service, workers=None):
444
 
    if workers:
 
504
def launch(service, workers=1):
 
505
    if workers is None or workers == 1:
 
506
        launcher = ServiceLauncher()
 
507
        launcher.launch_service(service)
 
508
    else:
445
509
        launcher = ProcessLauncher()
446
510
        launcher.launch_service(service, workers=workers)
447
 
    else:
448
 
        launcher = ServiceLauncher()
449
 
        launcher.launch_service(service)
 
511
 
450
512
    return launcher