~ahasenack/landscape-client/landscape-client-1.5.5-0ubuntu0.9.04.0

« back to all changes in this revision

Viewing changes to landscape/watchdog.py

  • Committer: Bazaar Package Importer
  • Author(s): Rick Clark
  • Date: 2008-09-08 16:35:57 UTC
  • mfrom: (1.1.1 upstream)
  • Revision ID: james.westby@ubuntu.com-20080908163557-l3ixzj5dxz37wnw2
Tags: 1.0.18-0ubuntu1
New upstream release 

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
"""See L{WatchDog}.
 
2
 
 
3
The WatchDog must run as root, because it spawns the Landscape Manager.
 
4
 
 
5
The main C{landscape-client} program uses this watchdog.
 
6
"""
 
7
 
 
8
import os
 
9
import errno
 
10
import sys
 
11
import pwd
 
12
import signal
 
13
import time
 
14
 
 
15
from logging import warning, info, error
 
16
 
 
17
from dbus import DBusException
 
18
import dbus.glib # Side-effects rule!
 
19
 
 
20
from twisted.internet import reactor
 
21
from twisted.internet.defer import Deferred, succeed
 
22
from twisted.internet.protocol import ProcessProtocol
 
23
from twisted.internet.error import ProcessExitedAlready
 
24
from twisted.application.service import Service, Application
 
25
from twisted.application.app import startApplication
 
26
 
 
27
from landscape.deployment import Configuration, init_logging
 
28
from landscape.lib.dbus_util import get_bus
 
29
from landscape.lib.twisted_util import gather_results
 
30
from landscape.lib.bootstrap import (BootstrapList, BootstrapFile,
 
31
                                     BootstrapDirectory)
 
32
from landscape.log import rotate_logs
 
33
 
 
34
GRACEFUL_WAIT_PERIOD = 10
 
35
MAXIMUM_CONSECUTIVE_RESTARTS = 5
 
36
RESTART_BURST_DELAY = 30 # seconds
 
37
SIGKILL_DELAY = 10
 
38
 
 
39
 
 
40
class DaemonError(Exception):
 
41
    """One of the daemons could not be started."""
 
42
 
 
43
 
 
44
class TimeoutError(Exception):
 
45
    """Something took too long."""
 
46
 
 
47
 
 
48
class ExecutableNotFoundError(Exception):
 
49
    """An executable was not found."""
 
50
 
 
51
 
 
52
class AlreadyRunningError(Exception):
 
53
    """
 
54
    A daemon was already running.
 
55
    """
 
56
 
 
57
 
 
58
class Daemon(object):
 
59
    """A Landscape daemon which can be started and tracked.
 
60
 
 
61
    This class should be subclassed to specify individual daemon.
 
62
 
 
63
    @cvar program: The name of the executable program that will start this
 
64
        daemon.
 
65
    @cvar username: The name of the user to switch to, by default.
 
66
    @cvar service: The DBus service name that the program will be expected to
 
67
        listen on.
 
68
    @cvar path: The DBus path that the program will be expected to listen on.
 
69
    """
 
70
 
 
71
    username = "landscape"
 
72
 
 
73
    def __init__(self, bus, reactor=reactor, verbose=False, config=None):
 
74
        """
 
75
        @param bus: The bus which this program will listen and respond to pings
 
76
            on.
 
77
        @param reactor: The reactor with which to spawn the process and
 
78
            schedule timed calls.
 
79
        @param verbose: Optionally, report more information when
 
80
            running this program.  Defaults to False.
 
81
        """
 
82
        self._bus = bus
 
83
        self._reactor = reactor
 
84
        if os.getuid() == 0:
 
85
            info = pwd.getpwnam(self.username)
 
86
            self._uid = info.pw_uid
 
87
            self._gid = info.pw_gid
 
88
        else:
 
89
            # We can only switch UIDs if we're root, so simply don't switch
 
90
            # UIDs if we're not.
 
91
            self._uid = None
 
92
            self._gid = None
 
93
        self._verbose = verbose
 
94
        self._config = config
 
95
        self._process = None
 
96
        self._last_started = 0
 
97
        self._quick_starts = 0
 
98
 
 
99
    def find_executable(self):
 
100
        """Find the fully-qualified path to the executable.
 
101
 
 
102
        If the executable can't be found, L{ExecutableNotFoundError} will be
 
103
        raised.
 
104
        """
 
105
        dirname = os.path.dirname(os.path.abspath(sys.argv[0]))
 
106
        executable = os.path.join(dirname, self.program)
 
107
        if not os.path.exists(executable):
 
108
            raise ExecutableNotFoundError("%s doesn't exist" % (executable,))
 
109
        return executable
 
110
 
 
111
    def start(self):
 
112
        """Start this daemon."""
 
113
        self._process = None
 
114
 
 
115
        now = time.time()
 
116
        if self._last_started + RESTART_BURST_DELAY > now:
 
117
            self._quick_starts += 1
 
118
            if self._quick_starts == MAXIMUM_CONSECUTIVE_RESTARTS:
 
119
                error("Can't keep %s running. Exiting." % self.program)
 
120
                self._reactor.stop()
 
121
                return
 
122
        else:
 
123
            self._quick_starts = 0
 
124
 
 
125
        self._last_started = now
 
126
 
 
127
        self._process = WatchedProcessProtocol(self)
 
128
        exe = self.find_executable()
 
129
        args = [exe, "--ignore-sigint"]
 
130
        if not self._verbose:
 
131
            args.append("--quiet")
 
132
        if self._config:
 
133
            args.extend(["-c", self._config])
 
134
        self._reactor.spawnProcess(self._process, exe, args=args,
 
135
                                   env=os.environ,uid=self._uid, gid=self._gid)
 
136
 
 
137
    def stop(self):
 
138
        """Stop this daemon."""
 
139
        if not self._process:
 
140
            return succeed(None)
 
141
        return self._process.kill()
 
142
 
 
143
    def request_exit(self):
 
144
        try:
 
145
            object = self._bus.get_object(self.bus_name, self.object_path,
 
146
                                          introspect=False)
 
147
            object.exit(dbus_interface=self.bus_name)
 
148
        except DBusException, e:
 
149
            return False
 
150
        return True
 
151
 
 
152
    def is_running(self):
 
153
        # FIXME Error cases may not be handled in the best possible way
 
154
        # here. We're basically return False if any error happens from the
 
155
        # dbus ping.
 
156
        result = Deferred()
 
157
        try:
 
158
            object = self._bus.get_object(self.bus_name, self.object_path,
 
159
                                          introspect=False)
 
160
            object.ping(reply_handler=result.callback,
 
161
                        error_handler=lambda f: result.callback(False),
 
162
                        dbus_interface=self.bus_name)
 
163
        except DBusException, e:
 
164
            result.callback(False)
 
165
        return result
 
166
 
 
167
    def wait(self):
 
168
        """
 
169
        Return a Deferred which will fire when the process has died.
 
170
        """
 
171
        if not self._process:
 
172
            return succeed(None)
 
173
        return self._process.wait()
 
174
 
 
175
    def wait_or_die(self):
 
176
        """
 
177
        Wait for the process to die for C{GRACEFUL_WAIT_PERIOD}. If it hasn't
 
178
        died by that point, send it a SIGTERM. If it doesn't die for
 
179
        C{SIGKILL_DELAY},
 
180
        """
 
181
        if not self._process:
 
182
            return succeed(None)
 
183
        return self._process.wait_or_die()
 
184
 
 
185
    def rotate_logs(self):
 
186
        self._process.rotate_logs()
 
187
 
 
188
 
 
189
class Broker(Daemon):
 
190
    program = "landscape-broker"
 
191
 
 
192
    from landscape.broker.broker import BUS_NAME as bus_name
 
193
    from landscape.broker.broker import OBJECT_PATH as object_path
 
194
 
 
195
 
 
196
class Monitor(Daemon):
 
197
    program = "landscape-monitor"
 
198
 
 
199
    from landscape.monitor.monitor import BUS_NAME as bus_name
 
200
    from landscape.monitor.monitor import OBJECT_PATH as object_path
 
201
 
 
202
 
 
203
class Manager(Daemon):
 
204
    program = "landscape-manager"
 
205
    username = "root"
 
206
 
 
207
    from landscape.manager.manager import BUS_NAME as bus_name
 
208
    from landscape.manager.manager import OBJECT_PATH as object_path
 
209
 
 
210
 
 
211
class WatchedProcessProtocol(ProcessProtocol):
 
212
    """
 
213
    A process-watching protocol which sends any of its output to the log file
 
214
    and restarts it when it dies.
 
215
    """
 
216
 
 
217
    _killed = False
 
218
 
 
219
    def __init__(self, daemon):
 
220
        self.daemon = daemon
 
221
        self._wait_result = None
 
222
        self._delayed_really_kill = None
 
223
        self._delayed_terminate = None
 
224
 
 
225
    def kill(self):
 
226
        self._terminate()
 
227
        return self.wait()
 
228
    
 
229
    def _terminate(self, warn=False):
 
230
        if self.transport is not None:
 
231
            if warn:
 
232
                warning("%s didn't exit. Sending SIGTERM"
 
233
                        % (self.daemon.program,))
 
234
            try:
 
235
                self.transport.signalProcess(signal.SIGTERM)
 
236
            except ProcessExitedAlready:
 
237
                pass
 
238
            else:
 
239
                # Give some time for the process, and then show who's the boss.
 
240
                delayed = reactor.callLater(SIGKILL_DELAY, self._really_kill)
 
241
                self._delayed_really_kill = delayed
 
242
 
 
243
    def _really_kill(self):
 
244
        try:
 
245
            self.transport.signalProcess(signal.SIGKILL)
 
246
        except ProcessExitedAlready:
 
247
            pass
 
248
        else:
 
249
            warning("%s didn't die.  Sending SIGKILL." % self.daemon.program)
 
250
        self._delayed_really_kill = None
 
251
 
 
252
    def rotate_logs(self):
 
253
        if self.transport is not None:
 
254
            try:
 
255
                self.transport.signalProcess(signal.SIGUSR1)
 
256
            except ProcessExitedAlready:
 
257
                pass
 
258
 
 
259
    def wait(self):
 
260
        self._wait_result = Deferred()
 
261
        return self._wait_result
 
262
    
 
263
    def wait_or_die(self):
 
264
        self._delayed_terminate = reactor.callLater(GRACEFUL_WAIT_PERIOD,
 
265
                                                    self._terminate, warn=True)
 
266
        return self.wait()
 
267
 
 
268
    def outReceived(self, data):
 
269
        # it's *probably* going to always be line buffered, by accident
 
270
        sys.stdout.write(data)
 
271
 
 
272
    def errReceived(self, data):
 
273
        sys.stderr.write(data)
 
274
 
 
275
    def processEnded(self, reason):
 
276
        """The process has ended; restart it."""
 
277
        if self._delayed_really_kill is not None:
 
278
            self._delayed_really_kill.cancel()
 
279
        if (self._delayed_terminate is not None
 
280
            and self._delayed_terminate.active()):
 
281
            self._delayed_terminate.cancel()
 
282
        if self._wait_result is not None:
 
283
            self._wait_result.callback(None)
 
284
        else:
 
285
            self.daemon.start()
 
286
 
 
287
 
 
288
class WatchDog(object):
 
289
    """
 
290
    The Landscape WatchDog starts all other landscape daemons and ensures that
 
291
    they are working.
 
292
    """
 
293
 
 
294
    def __init__(self, bus, reactor=reactor, verbose=False, config=None,
 
295
                 broker=None, monitor=None, manager=None):
 
296
        self.bus = bus
 
297
        if broker is None:
 
298
            broker = Broker(self.bus, verbose=verbose, config=config)
 
299
        if monitor is None:
 
300
            monitor = Monitor(self.bus, verbose=verbose, config=config)
 
301
        if manager is None:
 
302
            manager = Manager(self.bus, verbose=verbose, config=config)
 
303
 
 
304
        self.broker = broker
 
305
        self.monitor = monitor
 
306
        self.manager = manager
 
307
        self.daemons = [self.broker, self.monitor, self.manager]
 
308
        self.reactor = reactor
 
309
        self._checking = None
 
310
        self._stopping = False
 
311
        signal.signal(signal.SIGUSR1, self._notify_rotate_logs)
 
312
 
 
313
        self._ping_failures = {}
 
314
 
 
315
    def start(self):
 
316
        """
 
317
        Start all daemons. The broker will be started first, and no other
 
318
        daemons will be started before it is running and responding to DBUS
 
319
        messages.
 
320
 
 
321
        @return: A deferred which fires when all services have successfully
 
322
            started. If a daemon could not be started, the deferred will fail
 
323
            with L{DaemonError}.
 
324
        """
 
325
        results = []
 
326
        for daemon in self.daemons:
 
327
            result = daemon.is_running()
 
328
            result.addCallback(lambda is_running, d=daemon: (is_running, d))
 
329
            results.append(result)
 
330
        return gather_results(results).addCallback(self._start_if_not_running)
 
331
 
 
332
    def _start_if_not_running(self, results):
 
333
        for is_running, daemon in results:
 
334
            if is_running:
 
335
                raise AlreadyRunningError(daemon)
 
336
 
 
337
        self.broker.start()
 
338
        self.monitor.start()
 
339
        self.manager.start()
 
340
 
 
341
        self.start_monitoring()
 
342
 
 
343
    def start_monitoring(self):
 
344
        """Start monitoring processes which have already been started."""
 
345
        # Must wait before daemons actually start, otherwise check will
 
346
        # restart them *again*.
 
347
        self._checking = self.reactor.callLater(5, self._check)
 
348
 
 
349
    def _restart_if_not_running(self, is_running, daemon):
 
350
        if (not is_running) and (not self._stopping):
 
351
            warning("%s failed to respond to a ping."
 
352
                    % (daemon.program,))
 
353
            if daemon not in self._ping_failures:
 
354
                self._ping_failures[daemon] = 0
 
355
            self._ping_failures[daemon] += 1
 
356
            if self._ping_failures[daemon] == 5:
 
357
                warning("%s died! Restarting." % (daemon.program,))
 
358
                stopping = daemon.stop()
 
359
                def stopped(ignored):
 
360
                    daemon.start()
 
361
                    self._ping_failures[daemon] = 0
 
362
                stopping.addBoth(stopped)
 
363
                return stopping
 
364
        else:
 
365
            self._ping_failures[daemon] = 0
 
366
 
 
367
    def _check(self):
 
368
        all_running = []
 
369
        for daemon in self.daemons:
 
370
            is_running = daemon.is_running()
 
371
            is_running.addCallback(self._restart_if_not_running, daemon)
 
372
            all_running.append(is_running)
 
373
        def reschedule(ignored):
 
374
            self._checking = self.reactor.callLater(5, self._check)
 
375
        gather_results(all_running).addBoth(reschedule)
 
376
 
 
377
    def request_exit(self):
 
378
        if self._checking is not None and self._checking.active():
 
379
            self._checking.cancel()
 
380
        # Set a flag so that the pinger will avoid restarting the daemons if a
 
381
        # ping has already been sent but not yet responded to.
 
382
        self._stopping = True
 
383
 
 
384
        # If request_exit fails, we should just kill the daemons immediately.
 
385
        if self.broker.request_exit():
 
386
            results = [x.wait_or_die() for x in self.daemons]
 
387
        else:
 
388
            error("Couldn't request that broker gracefully shut down; "
 
389
                  "killing forcefully.")
 
390
            results = [x.stop() for x in self.daemons]
 
391
        return gather_results(results)
 
392
 
 
393
    def _notify_rotate_logs(self, signal, frame):
 
394
        for daemon in self.daemons:
 
395
            daemon.rotate_logs()
 
396
        rotate_logs()
 
397
 
 
398
 
 
399
class WatchDogConfiguration(Configuration):
 
400
 
 
401
    def make_parser(self):
 
402
        parser = super(WatchDogConfiguration, self).make_parser()
 
403
        parser.add_option("--daemon", action="store_true",
 
404
                          help="Fork and run in the background.")
 
405
        parser.add_option("--pid-file", type="str",
 
406
                          help="The file to write the PID to.")
 
407
        return parser
 
408
 
 
409
 
 
410
def daemonize():
 
411
    # See http://web.archive.org/web/20070410070022/www.erlenstar.demon.co.uk/unix/faq_2.html#SEC13
 
412
    if os.fork():   # launch child and...
 
413
        os._exit(0) # kill off parent
 
414
    os.setsid()
 
415
    if os.fork():   # launch child and...
 
416
        os._exit(0) # kill off parent again.
 
417
    # some argue that this umask should be 0, but that's annoying.
 
418
    os.umask(077)
 
419
    null=os.open('/dev/null', os.O_RDWR)
 
420
    for i in range(3):
 
421
        try:
 
422
            os.dup2(null, i)
 
423
        except OSError, e:
 
424
            if e.errno != errno.EBADF:
 
425
                raise
 
426
    os.close(null)
 
427
 
 
428
 
 
429
class WatchDogService(Service):
 
430
 
 
431
    def __init__(self, config):
 
432
        self._config = config
 
433
        self.bus = get_bus(config.bus)
 
434
        self.watchdog = WatchDog(self.bus,
 
435
                                 verbose=not config.daemon,
 
436
                                 config=config.config)
 
437
 
 
438
    def startService(self):
 
439
        info("Watchdog watching for daemons on %r bus." % self._config.bus)
 
440
        Service.startService(self)
 
441
 
 
442
        bootstrap_list.bootstrap(data_path=self._config.data_path,
 
443
                                 log_dir=self._config.log_dir)
 
444
 
 
445
        result = self.watchdog.start()
 
446
 
 
447
        def got_error(failure):
 
448
            if failure.check(AlreadyRunningError):
 
449
                daemon = failure.value.args[0]
 
450
                error("ERROR: %s is already running" % daemon.program)
 
451
            else:
 
452
                error("UNKNOWN ERROR: " + failure.getErrorMessage())
 
453
            os._exit(1)
 
454
 
 
455
        result.addErrback(got_error)
 
456
        return result
 
457
 
 
458
    def stopService(self):
 
459
        info("Stopping client...")
 
460
        Service.stopService(self)
 
461
 
 
462
        # If CTRL-C is pressed twice in a row, the second SIGINT actually
 
463
        # kills us before subprocesses die, and that makes them hang around.
 
464
        signal.signal(signal.SIGINT, signal.SIG_IGN)
 
465
 
 
466
        return self.watchdog.request_exit()
 
467
 
 
468
 
 
469
bootstrap_list = BootstrapList([
 
470
    BootstrapDirectory("$data_path", "landscape", "root", 0755),
 
471
    BootstrapDirectory("$data_path/package", "landscape", "root", 0755),
 
472
    BootstrapDirectory("$data_path/messages", "landscape", "root", 0755),
 
473
    BootstrapDirectory("$log_dir", "landscape", "root", 0755),
 
474
    BootstrapFile("$data_path/package/database", "landscape", "root", 0644),
 
475
    ])
 
476
 
 
477
 
 
478
def run(args=sys.argv):
 
479
    config = WatchDogConfiguration()
 
480
    config.load(args)
 
481
 
 
482
    init_logging(config, "watchdog")
 
483
 
 
484
    if os.getuid() != 0:
 
485
        warning("Daemons will be run as %s" % pwd.getpwuid(os.getuid()).pw_name)
 
486
 
 
487
    if config.daemon:
 
488
        daemonize()
 
489
        if config.pid_file:
 
490
            stream = open(config.pid_file, "w")
 
491
            stream.write(str(os.getpid()))
 
492
            stream.close()
 
493
 
 
494
    application = Application("landscape-client")
 
495
    WatchDogService(config).setServiceParent(application)
 
496
    startApplication(application, False)
 
497
    from twisted.internet import reactor
 
498
    reactor.run()