3
The WatchDog must run as root, because it spawns the Landscape Manager.
5
The main C{landscape-client} program uses this watchdog.
15
from logging import warning, info, error
17
from dbus import DBusException
18
import dbus.glib # Side-effects rule!
20
from twisted.internet import reactor
21
from twisted.internet.defer import Deferred, succeed
22
from twisted.internet.protocol import ProcessProtocol
23
from twisted.internet.error import ProcessExitedAlready
24
from twisted.application.service import Service, Application
25
from twisted.application.app import startApplication
27
from landscape.deployment import Configuration, init_logging
28
from landscape.lib.dbus_util import get_bus
29
from landscape.lib.twisted_util import gather_results
30
from landscape.lib.bootstrap import (BootstrapList, BootstrapFile,
32
from landscape.log import rotate_logs
34
GRACEFUL_WAIT_PERIOD = 10
35
MAXIMUM_CONSECUTIVE_RESTARTS = 5
36
RESTART_BURST_DELAY = 30 # seconds
40
class DaemonError(Exception):
41
"""One of the daemons could not be started."""
44
class TimeoutError(Exception):
45
"""Something took too long."""
48
class ExecutableNotFoundError(Exception):
49
"""An executable was not found."""
52
class AlreadyRunningError(Exception):
54
A daemon was already running.
59
"""A Landscape daemon which can be started and tracked.
61
This class should be subclassed to specify individual daemon.
63
@cvar program: The name of the executable program that will start this
65
@cvar username: The name of the user to switch to, by default.
66
@cvar service: The DBus service name that the program will be expected to
68
@cvar path: The DBus path that the program will be expected to listen on.
71
username = "landscape"
73
def __init__(self, bus, reactor=reactor, verbose=False, config=None):
75
@param bus: The bus which this program will listen and respond to pings
77
@param reactor: The reactor with which to spawn the process and
79
@param verbose: Optionally, report more information when
80
running this program. Defaults to False.
83
self._reactor = reactor
85
info = pwd.getpwnam(self.username)
86
self._uid = info.pw_uid
87
self._gid = info.pw_gid
89
# We can only switch UIDs if we're root, so simply don't switch
93
self._verbose = verbose
96
self._last_started = 0
97
self._quick_starts = 0
99
def find_executable(self):
100
"""Find the fully-qualified path to the executable.
102
If the executable can't be found, L{ExecutableNotFoundError} will be
105
dirname = os.path.dirname(os.path.abspath(sys.argv[0]))
106
executable = os.path.join(dirname, self.program)
107
if not os.path.exists(executable):
108
raise ExecutableNotFoundError("%s doesn't exist" % (executable,))
112
"""Start this daemon."""
116
if self._last_started + RESTART_BURST_DELAY > now:
117
self._quick_starts += 1
118
if self._quick_starts == MAXIMUM_CONSECUTIVE_RESTARTS:
119
error("Can't keep %s running. Exiting." % self.program)
123
self._quick_starts = 0
125
self._last_started = now
127
self._process = WatchedProcessProtocol(self)
128
exe = self.find_executable()
129
args = [exe, "--ignore-sigint"]
130
if not self._verbose:
131
args.append("--quiet")
133
args.extend(["-c", self._config])
134
self._reactor.spawnProcess(self._process, exe, args=args,
135
env=os.environ,uid=self._uid, gid=self._gid)
138
"""Stop this daemon."""
139
if not self._process:
141
return self._process.kill()
143
def request_exit(self):
145
object = self._bus.get_object(self.bus_name, self.object_path,
147
object.exit(dbus_interface=self.bus_name)
148
except DBusException, e:
152
def is_running(self):
153
# FIXME Error cases may not be handled in the best possible way
154
# here. We're basically return False if any error happens from the
158
object = self._bus.get_object(self.bus_name, self.object_path,
160
object.ping(reply_handler=result.callback,
161
error_handler=lambda f: result.callback(False),
162
dbus_interface=self.bus_name)
163
except DBusException, e:
164
result.callback(False)
169
Return a Deferred which will fire when the process has died.
171
if not self._process:
173
return self._process.wait()
175
def wait_or_die(self):
177
Wait for the process to die for C{GRACEFUL_WAIT_PERIOD}. If it hasn't
178
died by that point, send it a SIGTERM. If it doesn't die for
181
if not self._process:
183
return self._process.wait_or_die()
185
def rotate_logs(self):
186
self._process.rotate_logs()
189
class Broker(Daemon):
190
program = "landscape-broker"
192
from landscape.broker.broker import BUS_NAME as bus_name
193
from landscape.broker.broker import OBJECT_PATH as object_path
196
class Monitor(Daemon):
197
program = "landscape-monitor"
199
from landscape.monitor.monitor import BUS_NAME as bus_name
200
from landscape.monitor.monitor import OBJECT_PATH as object_path
203
class Manager(Daemon):
204
program = "landscape-manager"
207
from landscape.manager.manager import BUS_NAME as bus_name
208
from landscape.manager.manager import OBJECT_PATH as object_path
211
class WatchedProcessProtocol(ProcessProtocol):
213
A process-watching protocol which sends any of its output to the log file
214
and restarts it when it dies.
219
def __init__(self, daemon):
221
self._wait_result = None
222
self._delayed_really_kill = None
223
self._delayed_terminate = None
229
def _terminate(self, warn=False):
230
if self.transport is not None:
232
warning("%s didn't exit. Sending SIGTERM"
233
% (self.daemon.program,))
235
self.transport.signalProcess(signal.SIGTERM)
236
except ProcessExitedAlready:
239
# Give some time for the process, and then show who's the boss.
240
delayed = reactor.callLater(SIGKILL_DELAY, self._really_kill)
241
self._delayed_really_kill = delayed
243
def _really_kill(self):
245
self.transport.signalProcess(signal.SIGKILL)
246
except ProcessExitedAlready:
249
warning("%s didn't die. Sending SIGKILL." % self.daemon.program)
250
self._delayed_really_kill = None
252
def rotate_logs(self):
253
if self.transport is not None:
255
self.transport.signalProcess(signal.SIGUSR1)
256
except ProcessExitedAlready:
260
self._wait_result = Deferred()
261
return self._wait_result
263
def wait_or_die(self):
264
self._delayed_terminate = reactor.callLater(GRACEFUL_WAIT_PERIOD,
265
self._terminate, warn=True)
268
def outReceived(self, data):
269
# it's *probably* going to always be line buffered, by accident
270
sys.stdout.write(data)
272
def errReceived(self, data):
273
sys.stderr.write(data)
275
def processEnded(self, reason):
276
"""The process has ended; restart it."""
277
if self._delayed_really_kill is not None:
278
self._delayed_really_kill.cancel()
279
if (self._delayed_terminate is not None
280
and self._delayed_terminate.active()):
281
self._delayed_terminate.cancel()
282
if self._wait_result is not None:
283
self._wait_result.callback(None)
288
class WatchDog(object):
290
The Landscape WatchDog starts all other landscape daemons and ensures that
294
def __init__(self, bus, reactor=reactor, verbose=False, config=None,
295
broker=None, monitor=None, manager=None):
298
broker = Broker(self.bus, verbose=verbose, config=config)
300
monitor = Monitor(self.bus, verbose=verbose, config=config)
302
manager = Manager(self.bus, verbose=verbose, config=config)
305
self.monitor = monitor
306
self.manager = manager
307
self.daemons = [self.broker, self.monitor, self.manager]
308
self.reactor = reactor
309
self._checking = None
310
self._stopping = False
311
signal.signal(signal.SIGUSR1, self._notify_rotate_logs)
313
self._ping_failures = {}
317
Start all daemons. The broker will be started first, and no other
318
daemons will be started before it is running and responding to DBUS
321
@return: A deferred which fires when all services have successfully
322
started. If a daemon could not be started, the deferred will fail
326
for daemon in self.daemons:
327
result = daemon.is_running()
328
result.addCallback(lambda is_running, d=daemon: (is_running, d))
329
results.append(result)
330
return gather_results(results).addCallback(self._start_if_not_running)
332
def _start_if_not_running(self, results):
333
for is_running, daemon in results:
335
raise AlreadyRunningError(daemon)
341
self.start_monitoring()
343
def start_monitoring(self):
344
"""Start monitoring processes which have already been started."""
345
# Must wait before daemons actually start, otherwise check will
346
# restart them *again*.
347
self._checking = self.reactor.callLater(5, self._check)
349
def _restart_if_not_running(self, is_running, daemon):
350
if (not is_running) and (not self._stopping):
351
warning("%s failed to respond to a ping."
353
if daemon not in self._ping_failures:
354
self._ping_failures[daemon] = 0
355
self._ping_failures[daemon] += 1
356
if self._ping_failures[daemon] == 5:
357
warning("%s died! Restarting." % (daemon.program,))
358
stopping = daemon.stop()
359
def stopped(ignored):
361
self._ping_failures[daemon] = 0
362
stopping.addBoth(stopped)
365
self._ping_failures[daemon] = 0
369
for daemon in self.daemons:
370
is_running = daemon.is_running()
371
is_running.addCallback(self._restart_if_not_running, daemon)
372
all_running.append(is_running)
373
def reschedule(ignored):
374
self._checking = self.reactor.callLater(5, self._check)
375
gather_results(all_running).addBoth(reschedule)
377
def request_exit(self):
378
if self._checking is not None and self._checking.active():
379
self._checking.cancel()
380
# Set a flag so that the pinger will avoid restarting the daemons if a
381
# ping has already been sent but not yet responded to.
382
self._stopping = True
384
# If request_exit fails, we should just kill the daemons immediately.
385
if self.broker.request_exit():
386
results = [x.wait_or_die() for x in self.daemons]
388
error("Couldn't request that broker gracefully shut down; "
389
"killing forcefully.")
390
results = [x.stop() for x in self.daemons]
391
return gather_results(results)
393
def _notify_rotate_logs(self, signal, frame):
394
for daemon in self.daemons:
399
class WatchDogConfiguration(Configuration):
401
def make_parser(self):
402
parser = super(WatchDogConfiguration, self).make_parser()
403
parser.add_option("--daemon", action="store_true",
404
help="Fork and run in the background.")
405
parser.add_option("--pid-file", type="str",
406
help="The file to write the PID to.")
411
# See http://web.archive.org/web/20070410070022/www.erlenstar.demon.co.uk/unix/faq_2.html#SEC13
412
if os.fork(): # launch child and...
413
os._exit(0) # kill off parent
415
if os.fork(): # launch child and...
416
os._exit(0) # kill off parent again.
417
# some argue that this umask should be 0, but that's annoying.
419
null=os.open('/dev/null', os.O_RDWR)
424
if e.errno != errno.EBADF:
429
class WatchDogService(Service):
431
def __init__(self, config):
432
self._config = config
433
self.bus = get_bus(config.bus)
434
self.watchdog = WatchDog(self.bus,
435
verbose=not config.daemon,
436
config=config.config)
438
def startService(self):
439
info("Watchdog watching for daemons on %r bus." % self._config.bus)
440
Service.startService(self)
442
bootstrap_list.bootstrap(data_path=self._config.data_path,
443
log_dir=self._config.log_dir)
445
result = self.watchdog.start()
447
def got_error(failure):
448
if failure.check(AlreadyRunningError):
449
daemon = failure.value.args[0]
450
error("ERROR: %s is already running" % daemon.program)
452
error("UNKNOWN ERROR: " + failure.getErrorMessage())
455
result.addErrback(got_error)
458
def stopService(self):
459
info("Stopping client...")
460
Service.stopService(self)
462
# If CTRL-C is pressed twice in a row, the second SIGINT actually
463
# kills us before subprocesses die, and that makes them hang around.
464
signal.signal(signal.SIGINT, signal.SIG_IGN)
466
return self.watchdog.request_exit()
469
bootstrap_list = BootstrapList([
470
BootstrapDirectory("$data_path", "landscape", "root", 0755),
471
BootstrapDirectory("$data_path/package", "landscape", "root", 0755),
472
BootstrapDirectory("$data_path/messages", "landscape", "root", 0755),
473
BootstrapDirectory("$log_dir", "landscape", "root", 0755),
474
BootstrapFile("$data_path/package/database", "landscape", "root", 0644),
478
def run(args=sys.argv):
479
config = WatchDogConfiguration()
482
init_logging(config, "watchdog")
485
warning("Daemons will be run as %s" % pwd.getpwuid(os.getuid()).pw_name)
490
stream = open(config.pid_file, "w")
491
stream.write(str(os.getpid()))
494
application = Application("landscape-client")
495
WatchDogService(config).setServiceParent(application)
496
startApplication(application, False)
497
from twisted.internet import reactor