~ubuntu-branches/ubuntu/saucy/heat/saucy

« back to all changes in this revision

Viewing changes to heat/openstack/common/service.py

  • Committer: Package Import Robot
  • Author(s): Chuck Short, Chuck Short, Adam Gandelman
  • Date: 2013-09-08 21:51:19 UTC
  • mfrom: (1.1.4)
  • Revision ID: package-import@ubuntu.com-20130908215119-r939tu4aumqgdrkx
Tags: 2013.2~b3-0ubuntu1
[ Chuck Short ]
* New upstream release.
* debian/control: Add python-netaddr as build-dep.
* debian/heat-common.install: Remove heat-boto and associated man-page
* debian/heat-common.install: Remove heat-cfn and associated man-page
* debian/heat-common.install: Remove heat-watch and associated man-page
* debian/patches/fix-sqlalchemy-0.8.patch: Dropped

[ Adam Gandelman ]
* debian/patches/default-kombu.patch: Dropped.
* debian/patches/default-sqlite.patch: Refreshed.
* debian/*.install, rules: Install heat.conf.sample as common
  config file in heat-common. Drop other per-package configs, they
  are no longer used.
* debian/rules: Clean pbr .egg from build dir if it exists.

Show diffs side-by-side

added added

removed removed

Lines of Context:
27
27
import time
28
28
 
29
29
import eventlet
 
30
from eventlet import event
30
31
import logging as std_logging
31
32
from oslo.config import cfg
32
33
 
33
34
from heat.openstack.common import eventlet_backdoor
34
 
from heat.openstack.common.gettextutils import _
 
35
from heat.openstack.common.gettextutils import _  # noqa
35
36
from heat.openstack.common import importutils
36
37
from heat.openstack.common import log as logging
37
38
from heat.openstack.common import threadgroup
51
52
        :returns: None
52
53
 
53
54
        """
54
 
        self._services = threadgroup.ThreadGroup()
 
55
        self.services = Services()
55
56
        self.backdoor_port = eventlet_backdoor.initialize_if_enabled()
56
57
 
57
 
    @staticmethod
58
 
    def run_service(service):
59
 
        """Start and wait for a service to finish.
60
 
 
61
 
        :param service: service to run and wait for.
62
 
        :returns: None
63
 
 
64
 
        """
65
 
        service.start()
66
 
        service.wait()
67
 
 
68
58
    def launch_service(self, service):
69
59
        """Load and start the given service.
70
60
 
73
63
 
74
64
        """
75
65
        service.backdoor_port = self.backdoor_port
76
 
        self._services.add_thread(self.run_service, service)
 
66
        self.services.add(service)
77
67
 
78
68
    def stop(self):
79
69
        """Stop all services which are currently running.
81
71
        :returns: None
82
72
 
83
73
        """
84
 
        self._services.stop()
 
74
        self.services.stop()
85
75
 
86
76
    def wait(self):
87
77
        """Waits until all services have been stopped, and then returns.
89
79
        :returns: None
90
80
 
91
81
        """
92
 
        self._services.wait()
 
82
        self.services.wait()
 
83
 
 
84
    def restart(self):
 
85
        """Reload config files and restart service.
 
86
 
 
87
        :returns: None
 
88
 
 
89
        """
 
90
        cfg.CONF.reload_config_files()
 
91
        self.services.restart()
93
92
 
94
93
 
95
94
class SignalExit(SystemExit):
103
102
        # Allow the process to be killed again and die from natural causes
104
103
        signal.signal(signal.SIGTERM, signal.SIG_DFL)
105
104
        signal.signal(signal.SIGINT, signal.SIG_DFL)
 
105
        signal.signal(signal.SIGHUP, signal.SIG_DFL)
106
106
 
107
107
        raise SignalExit(signo)
108
108
 
109
 
    def wait(self):
 
109
    def handle_signal(self):
110
110
        signal.signal(signal.SIGTERM, self._handle_signal)
111
111
        signal.signal(signal.SIGINT, self._handle_signal)
 
112
        signal.signal(signal.SIGHUP, self._handle_signal)
 
113
 
 
114
    def _wait_for_exit_or_signal(self):
 
115
        status = None
 
116
        signo = 0
112
117
 
113
118
        LOG.debug(_('Full set of CONF:'))
114
119
        CONF.log_opt_values(LOG, std_logging.DEBUG)
115
120
 
116
 
        status = None
117
121
        try:
118
122
            super(ServiceLauncher, self).wait()
119
123
        except SignalExit as exc:
120
124
            signame = {signal.SIGTERM: 'SIGTERM',
121
 
                       signal.SIGINT: 'SIGINT'}[exc.signo]
 
125
                       signal.SIGINT: 'SIGINT',
 
126
                       signal.SIGHUP: 'SIGHUP'}[exc.signo]
122
127
            LOG.info(_('Caught %s, exiting'), signame)
123
128
            status = exc.code
 
129
            signo = exc.signo
124
130
        except SystemExit as exc:
125
131
            status = exc.code
126
132
        finally:
 
133
            self.stop()
127
134
            if rpc:
128
 
                rpc.cleanup()
129
 
            self.stop()
130
 
        return status
 
135
                try:
 
136
                    rpc.cleanup()
 
137
                except Exception:
 
138
                    # We're shutting down, so it doesn't matter at this point.
 
139
                    LOG.exception(_('Exception during rpc cleanup.'))
 
140
 
 
141
        return status, signo
 
142
 
 
143
    def wait(self):
 
144
        while True:
 
145
            self.handle_signal()
 
146
            status, signo = self._wait_for_exit_or_signal()
 
147
            if signo != signal.SIGHUP:
 
148
                return status
 
149
            self.restart()
131
150
 
132
151
 
133
152
class ServiceWrapper(object):
145
164
        self.running = True
146
165
        rfd, self.writepipe = os.pipe()
147
166
        self.readpipe = eventlet.greenio.GreenPipe(rfd, 'r')
 
167
        self.handle_signal()
148
168
 
 
169
    def handle_signal(self):
149
170
        signal.signal(signal.SIGTERM, self._handle_signal)
150
171
        signal.signal(signal.SIGINT, self._handle_signal)
 
172
        signal.signal(signal.SIGHUP, self._handle_signal)
151
173
 
152
174
    def _handle_signal(self, signo, frame):
153
175
        self.sigcaught = signo
156
178
        # Allow the process to be killed again and die from natural causes
157
179
        signal.signal(signal.SIGTERM, signal.SIG_DFL)
158
180
        signal.signal(signal.SIGINT, signal.SIG_DFL)
 
181
        signal.signal(signal.SIGHUP, signal.SIG_DFL)
159
182
 
160
183
    def _pipe_watcher(self):
161
184
        # This will block until the write end is closed when the parent
166
189
 
167
190
        sys.exit(1)
168
191
 
169
 
    def _child_process(self, service):
 
192
    def _child_process_handle_signal(self):
170
193
        # Setup child signal handlers differently
171
194
        def _sigterm(*args):
172
195
            signal.signal(signal.SIGTERM, signal.SIG_DFL)
173
196
            raise SignalExit(signal.SIGTERM)
174
197
 
 
198
        def _sighup(*args):
 
199
            signal.signal(signal.SIGHUP, signal.SIG_DFL)
 
200
            raise SignalExit(signal.SIGHUP)
 
201
 
175
202
        signal.signal(signal.SIGTERM, _sigterm)
 
203
        signal.signal(signal.SIGHUP, _sighup)
176
204
        # Block SIGINT and let the parent send us a SIGTERM
177
205
        signal.signal(signal.SIGINT, signal.SIG_IGN)
178
206
 
 
207
    def _child_wait_for_exit_or_signal(self, launcher):
 
208
        status = None
 
209
        signo = 0
 
210
 
 
211
        try:
 
212
            launcher.wait()
 
213
        except SignalExit as exc:
 
214
            signame = {signal.SIGTERM: 'SIGTERM',
 
215
                       signal.SIGINT: 'SIGINT',
 
216
                       signal.SIGHUP: 'SIGHUP'}[exc.signo]
 
217
            LOG.info(_('Caught %s, exiting'), signame)
 
218
            status = exc.code
 
219
            signo = exc.signo
 
220
        except SystemExit as exc:
 
221
            status = exc.code
 
222
        except BaseException:
 
223
            LOG.exception(_('Unhandled exception'))
 
224
            status = 2
 
225
        finally:
 
226
            launcher.stop()
 
227
 
 
228
        return status, signo
 
229
 
 
230
    def _child_process(self, service):
 
231
        self._child_process_handle_signal()
 
232
 
179
233
        # Reopen the eventlet hub to make sure we don't share an epoll
180
234
        # fd with parent and/or siblings, which would be bad
181
235
        eventlet.hubs.use_hub()
189
243
        random.seed()
190
244
 
191
245
        launcher = Launcher()
192
 
        launcher.run_service(service)
 
246
        launcher.launch_service(service)
 
247
        return launcher
193
248
 
194
249
    def _start_child(self, wrap):
195
250
        if len(wrap.forktimes) > wrap.workers:
210
265
            # NOTE(johannes): All exceptions are caught to ensure this
211
266
            # doesn't fallback into the loop spawning children. It would
212
267
            # be bad for a child to spawn more children.
213
 
            status = 0
214
 
            try:
215
 
                self._child_process(wrap.service)
216
 
            except SignalExit as exc:
217
 
                signame = {signal.SIGTERM: 'SIGTERM',
218
 
                           signal.SIGINT: 'SIGINT'}[exc.signo]
219
 
                LOG.info(_('Caught %s, exiting'), signame)
220
 
                status = exc.code
221
 
            except SystemExit as exc:
222
 
                status = exc.code
223
 
            except BaseException:
224
 
                LOG.exception(_('Unhandled exception'))
225
 
                status = 2
226
 
            finally:
227
 
                wrap.service.stop()
 
268
            launcher = self._child_process(wrap.service)
 
269
            while True:
 
270
                self._child_process_handle_signal()
 
271
                status, signo = self._child_wait_for_exit_or_signal(launcher)
 
272
                if signo != signal.SIGHUP:
 
273
                    break
 
274
                launcher.restart()
228
275
 
229
276
            os._exit(status)
230
277
 
270
317
        wrap.children.remove(pid)
271
318
        return wrap
272
319
 
273
 
    def wait(self):
274
 
        """Loop waiting on children to die and respawning as necessary."""
275
 
 
276
 
        LOG.debug(_('Full set of CONF:'))
277
 
        CONF.log_opt_values(LOG, std_logging.DEBUG)
278
 
 
 
320
    def _respawn_children(self):
279
321
        while self.running:
280
322
            wrap = self._wait_child()
281
323
            if not wrap:
284
326
                # (see bug #1095346)
285
327
                eventlet.greenthread.sleep(.01)
286
328
                continue
287
 
 
288
329
            while self.running and len(wrap.children) < wrap.workers:
289
330
                self._start_child(wrap)
290
331
 
291
 
        if self.sigcaught:
292
 
            signame = {signal.SIGTERM: 'SIGTERM',
293
 
                       signal.SIGINT: 'SIGINT'}[self.sigcaught]
294
 
            LOG.info(_('Caught %s, stopping children'), signame)
 
332
    def wait(self):
 
333
        """Loop waiting on children to die and respawning as necessary."""
 
334
 
 
335
        LOG.debug(_('Full set of CONF:'))
 
336
        CONF.log_opt_values(LOG, std_logging.DEBUG)
 
337
 
 
338
        while True:
 
339
            self.handle_signal()
 
340
            self._respawn_children()
 
341
            if self.sigcaught:
 
342
                signame = {signal.SIGTERM: 'SIGTERM',
 
343
                           signal.SIGINT: 'SIGINT',
 
344
                           signal.SIGHUP: 'SIGHUP'}[self.sigcaught]
 
345
                LOG.info(_('Caught %s, stopping children'), signame)
 
346
            if self.sigcaught != signal.SIGHUP:
 
347
                break
 
348
 
 
349
            for pid in self.children:
 
350
                os.kill(pid, signal.SIGHUP)
 
351
            self.running = True
 
352
            self.sigcaught = None
295
353
 
296
354
        for pid in self.children:
297
355
            try:
313
371
    def __init__(self, threads=1000):
314
372
        self.tg = threadgroup.ThreadGroup(threads)
315
373
 
 
374
        # signal that the service is done shutting itself down:
 
375
        self._done = event.Event()
 
376
 
 
377
    def reset(self):
 
378
        # NOTE(Fengqian): docs for Event.reset() recommend against using it
 
379
        self._done = event.Event()
 
380
 
316
381
    def start(self):
317
382
        pass
318
383
 
319
384
    def stop(self):
320
385
        self.tg.stop()
321
 
 
322
 
    def wait(self):
323
 
        self.tg.wait()
 
386
        self.tg.wait()
 
387
        # Signal that service cleanup is done:
 
388
        if not self._done.ready():
 
389
            self._done.send()
 
390
 
 
391
    def wait(self):
 
392
        self._done.wait()
 
393
 
 
394
 
 
395
class Services(object):
 
396
 
 
397
    def __init__(self):
 
398
        self.services = []
 
399
        self.tg = threadgroup.ThreadGroup()
 
400
        self.done = event.Event()
 
401
 
 
402
    def add(self, service):
 
403
        self.services.append(service)
 
404
        self.tg.add_thread(self.run_service, service, self.done)
 
405
 
 
406
    def stop(self):
 
407
        # wait for graceful shutdown of services:
 
408
        for service in self.services:
 
409
            service.stop()
 
410
            service.wait()
 
411
 
 
412
        # Each service has performed cleanup, now signal that the run_service
 
413
        # wrapper threads can now die:
 
414
        if not self.done.ready():
 
415
            self.done.send()
 
416
 
 
417
        # reap threads:
 
418
        self.tg.stop()
 
419
 
 
420
    def wait(self):
 
421
        self.tg.wait()
 
422
 
 
423
    def restart(self):
 
424
        self.stop()
 
425
        self.done = event.Event()
 
426
        for restart_service in self.services:
 
427
            restart_service.reset()
 
428
            self.tg.add_thread(self.run_service, restart_service, self.done)
 
429
 
 
430
    @staticmethod
 
431
    def run_service(service, done):
 
432
        """Service start wrapper.
 
433
 
 
434
        :param service: service to run
 
435
        :param done: event to wait on until a shutdown is triggered
 
436
        :returns: None
 
437
 
 
438
        """
 
439
        service.start()
 
440
        done.wait()
324
441
 
325
442
 
326
443
def launch(service, workers=None):