3
The WatchDog must run as root, because it spawns the Landscape Manager.
5
The main C{landscape-client} program uses this watchdog.
15
from logging import warning, info, error
17
from twisted.internet import reactor
18
from twisted.internet.defer import Deferred, succeed
19
from twisted.internet.protocol import ProcessProtocol
20
from twisted.internet.error import ProcessExitedAlready
21
from twisted.application.service import Service, Application
22
from twisted.application.app import startApplication
24
from landscape.deployment import Configuration, init_logging
25
from landscape.lib.twisted_util import gather_results
26
from landscape.lib.log import log_failure
27
from landscape.lib.bootstrap import (BootstrapList, BootstrapFile,
29
from landscape.log import rotate_logs
30
from landscape.broker.amp import (
31
RemoteBrokerConnector, RemoteMonitorConnector, RemoteManagerConnector)
32
from landscape.reactor import TwistedReactor
34
GRACEFUL_WAIT_PERIOD = 10
35
MAXIMUM_CONSECUTIVE_RESTARTS = 5
36
RESTART_BURST_DELAY = 30 # seconds
40
class DaemonError(Exception):
41
"""One of the daemons could not be started."""
44
class TimeoutError(Exception):
45
"""Something took too long."""
48
class ExecutableNotFoundError(Exception):
49
"""An executable was not found."""
53
"""A Landscape daemon which can be started and tracked.
55
This class should be subclassed to specify individual daemon.
57
@cvar program: The name of the executable program that will start this
59
@cvar username: The name of the user to switch to, by default.
60
@cvar service: The DBus service name that the program will be expected to
62
@cvar max_retries: The maximum number of retries before giving up when
63
trying to connect to the watched daemon.
64
@cvar factor: The factor by which the delay between subsequent connection
65
attempts will increase.
68
username = "landscape"
72
def __init__(self, connector, reactor=reactor, verbose=False,
75
@param connector: The L{RemoteComponentConnector} of the daemon.
76
@param reactor: The reactor used to spawn the process and schedule
78
@param verbose: Optionally, report more information when
79
running this program. Defaults to False.
81
self._connector = connector
82
self._reactor = reactor
83
self._env = os.environ.copy()
85
pwd_info = pwd.getpwnam(self.username)
86
self._uid = pwd_info.pw_uid
87
self._gid = pwd_info.pw_gid
88
self._env["HOME"] = pwd_info.pw_dir
89
self._env["USER"] = self.username
90
self._env["LOGNAME"] = self.username
92
# We can only switch UIDs if we're root, so simply don't switch
96
self._verbose = verbose
99
self._last_started = 0
100
self._quick_starts = 0
101
self._allow_restart = True
103
def find_executable(self):
104
"""Find the fully-qualified path to the executable.
106
If the executable can't be found, L{ExecutableNotFoundError} will be
109
dirname = os.path.dirname(os.path.abspath(sys.argv[0]))
110
executable = os.path.join(dirname, self.program)
111
if not os.path.exists(executable):
112
raise ExecutableNotFoundError("%s doesn't exist" % (executable,))
116
"""Start this daemon."""
120
if self._last_started + RESTART_BURST_DELAY > now:
121
self._quick_starts += 1
122
if self._quick_starts == MAXIMUM_CONSECUTIVE_RESTARTS:
123
error("Can't keep %s running. Exiting." % self.program)
127
self._quick_starts = 0
129
self._last_started = now
131
self._process = WatchedProcessProtocol(self)
132
exe = self.find_executable()
133
args = [exe, "--ignore-sigint"]
134
if not self._verbose:
135
args.append("--quiet")
137
args.extend(["-c", self._config])
138
self._reactor.spawnProcess(self._process, exe, args=args,
139
env=self._env, uid=self._uid, gid=self._gid)
142
"""Stop this daemon."""
143
if not self._process:
145
return self._process.kill()
147
def _connect_and_call(self, name, *args, **kwargs):
148
"""Connect to the remote daemon over AMP and perform the given command.
150
@param name: The name of the command to perform.
151
@param args: Arguments list to be passed to the connect method
152
@param kwargs: Keywords arguments to pass to the connect method.
153
@return: A L{Deferred} resulting in C{True} if the command was
154
successful or C{False} otherwise.
155
@see: L{RemoteLandscapeComponentCreator.connect}.
158
def disconnect(ignored):
159
self._connector.disconnect()
162
connected = self._connector.connect(self.max_retries, self.factor,
164
connected.addCallback(lambda remote: getattr(remote, name)())
165
connected.addCallback(disconnect)
166
connected.addErrback(lambda x: False)
169
def request_exit(self):
170
return self._connect_and_call("exit")
172
def is_running(self):
173
# FIXME Error cases may not be handled in the best possible way
174
# here. We're basically return False if any error happens from the
176
return self._connect_and_call("ping")
180
Return a Deferred which will fire when the process has died.
182
if not self._process:
184
return self._process.wait()
186
def wait_or_die(self):
188
Wait for the process to die for C{GRACEFUL_WAIT_PERIOD}. If it hasn't
189
died by that point, send it a SIGTERM. If it doesn't die for
192
if not self._process:
194
return self._process.wait_or_die()
196
def prepare_for_shutdown(self):
197
"""Called by the watchdog when starting to shut us down.
199
It will prevent our L{WatchedProcessProtocol} to restart the process
202
self._allow_restart = False
204
def allow_restart(self):
205
"""Return a boolean indicating if the daemon should be restarted."""
206
return self._allow_restart
208
def rotate_logs(self):
209
self._process.rotate_logs()
212
class Broker(Daemon):
213
program = "landscape-broker"
216
class Monitor(Daemon):
217
program = "landscape-monitor"
220
class Manager(Daemon):
221
program = "landscape-manager"
225
class WatchedProcessProtocol(ProcessProtocol):
227
A process-watching protocol which sends any of its output to the log file
228
and restarts it when it dies.
233
def __init__(self, daemon):
235
self._wait_result = None
236
self._delayed_really_kill = None
237
self._delayed_terminate = None
243
def _terminate(self, warn=False):
244
if self.transport is not None:
246
warning("%s didn't exit. Sending SIGTERM"
247
% (self.daemon.program,))
249
self.transport.signalProcess(signal.SIGTERM)
250
except ProcessExitedAlready:
253
# Give some time for the process, and then show who's the boss.
254
delayed = reactor.callLater(SIGKILL_DELAY, self._really_kill)
255
self._delayed_really_kill = delayed
257
def _really_kill(self):
259
self.transport.signalProcess(signal.SIGKILL)
260
except ProcessExitedAlready:
263
warning("%s didn't die. Sending SIGKILL." % self.daemon.program)
264
self._delayed_really_kill = None
266
def rotate_logs(self):
267
if self.transport is not None:
269
self.transport.signalProcess(signal.SIGUSR1)
270
except ProcessExitedAlready:
274
if self.transport.pid is None:
276
self._wait_result = Deferred()
277
return self._wait_result
279
def wait_or_die(self):
280
self._delayed_terminate = reactor.callLater(GRACEFUL_WAIT_PERIOD,
281
self._terminate, warn=True)
284
def outReceived(self, data):
285
# it's *probably* going to always be line buffered, by accident
286
sys.stdout.write(data)
288
def errReceived(self, data):
289
sys.stderr.write(data)
291
def processEnded(self, reason):
292
"""The process has ended; restart it."""
293
if self._delayed_really_kill is not None:
294
self._delayed_really_kill.cancel()
295
if (self._delayed_terminate is not None
296
and self._delayed_terminate.active()):
297
self._delayed_terminate.cancel()
298
if self._wait_result is not None:
299
self._wait_result.callback(None)
300
elif self.daemon.allow_restart():
304
class WatchDog(object):
306
The Landscape WatchDog starts all other landscape daemons and ensures that
310
def __init__(self, reactor=reactor, verbose=False, config=None,
311
broker=None, monitor=None, manager=None,
312
enabled_daemons=None):
313
twisted_reactor = TwistedReactor()
314
if enabled_daemons is None:
315
enabled_daemons = [Broker, Monitor, Manager]
316
if broker is None and Broker in enabled_daemons:
318
RemoteBrokerConnector(twisted_reactor, config),
319
verbose=verbose, config=config.config)
320
if monitor is None and Monitor in enabled_daemons:
322
RemoteMonitorConnector(twisted_reactor, config),
323
verbose=verbose, config=config.config)
324
if manager is None and Manager in enabled_daemons:
326
RemoteManagerConnector(twisted_reactor, config),
327
verbose=verbose, config=config.config)
330
self.monitor = monitor
331
self.manager = manager
332
self.daemons = filter(None, [self.broker, self.monitor, self.manager])
333
self.reactor = reactor
334
self._checking = None
335
self._stopping = False
336
signal.signal(signal.SIGUSR1, self._notify_rotate_logs)
338
self._ping_failures = {}
340
def check_running(self):
341
"""Return a list of any daemons that are already running."""
343
for daemon in self.daemons:
344
# This method is called on startup, we basically try to connect
345
# a few times in fast sequence (with exponential backoff), if we
346
# don't get a response we assume the daemon is not running.
347
result = daemon.is_running()
348
result.addCallback(lambda is_running, d=daemon: (is_running, d))
349
results.append(result)
351
def got_all_results(r):
352
return [x[1] for x in r if x[0]]
353
return gather_results(results).addCallback(got_all_results)
357
Start all daemons. The broker will be started first, and no other
358
daemons will be started before it is running and responding to DBUS
361
@return: A deferred which fires when all services have successfully
362
started. If a daemon could not be started, the deferred will fail
365
for daemon in self.daemons:
367
self.start_monitoring()
369
def start_monitoring(self):
370
"""Start monitoring processes which have already been started."""
371
# Must wait before daemons actually start, otherwise check will
372
# restart them *again*.
373
self._checking = self.reactor.callLater(5, self._check)
375
def _restart_if_not_running(self, is_running, daemon):
376
if (not is_running) and (not self._stopping):
377
warning("%s failed to respond to a ping."
379
if daemon not in self._ping_failures:
380
self._ping_failures[daemon] = 0
381
self._ping_failures[daemon] += 1
382
if self._ping_failures[daemon] == 5:
383
warning("%s died! Restarting." % (daemon.program,))
384
stopping = daemon.stop()
386
def stopped(ignored):
388
self._ping_failures[daemon] = 0
389
stopping.addBoth(stopped)
392
self._ping_failures[daemon] = 0
396
for daemon in self.daemons:
397
is_running = daemon.is_running()
398
is_running.addCallback(self._restart_if_not_running, daemon)
399
all_running.append(is_running)
401
def reschedule(ignored):
402
self._checking = self.reactor.callLater(5, self._check)
403
gather_results(all_running).addBoth(reschedule)
405
def request_exit(self):
406
if self._checking is not None and self._checking.active():
407
self._checking.cancel()
408
# Set a flag so that the pinger will avoid restarting the daemons if a
409
# ping has already been sent but not yet responded to.
410
self._stopping = True
412
# This tells the daemons to not automatically restart when they end
413
for daemon in self.daemons:
414
daemon.prepare_for_shutdown()
416
def terminate_processes(broker_stopped):
418
results = [daemon.wait_or_die() for daemon in self.daemons]
420
# If request_exit fails, we should just kill the daemons
422
error("Couldn't request that broker gracefully shut down; "
423
"killing forcefully.")
424
results = [x.stop() for x in self.daemons]
425
return gather_results(results)
427
result = self.broker.request_exit()
428
return result.addCallback(terminate_processes)
430
def _notify_rotate_logs(self, signal, frame):
431
for daemon in self.daemons:
436
class WatchDogConfiguration(Configuration):
438
def make_parser(self):
439
parser = super(WatchDogConfiguration, self).make_parser()
440
parser.add_option("--daemon", action="store_true",
441
help="Fork and run in the background.")
442
parser.add_option("--pid-file", type="str",
443
help="The file to write the PID to.")
444
parser.add_option("--monitor-only", action="store_true",
445
help="Don't enable management features. This is "
446
"useful if you want to run the client as a non-root "
450
def get_enabled_daemons(self):
451
daemons = [Broker, Monitor]
452
if not self.monitor_only:
453
daemons.append(Manager)
458
# See http://www.steve.org.uk/Reference/Unix/faq_2.html#SEC16
459
if os.fork(): # launch child and...
460
os._exit(0) # kill off parent
462
if os.fork(): # launch child and...
463
os._exit(0) # kill off parent again.
464
# some argue that this umask should be 0, but that's annoying.
466
null = os.open('/dev/null', os.O_RDWR)
471
if e.errno != errno.EBADF:
476
class WatchDogService(Service):
478
def __init__(self, config):
479
self._config = config
480
self.watchdog = WatchDog(verbose=not config.daemon,
482
enabled_daemons=config.get_enabled_daemons())
485
def startService(self):
486
Service.startService(self)
487
bootstrap_list.bootstrap(data_path=self._config.data_path,
488
log_dir=self._config.log_dir)
489
result = self.watchdog.check_running()
491
def start_if_not_running(running_daemons):
493
error("ERROR: The following daemons are already running: %s"
494
% (", ".join(x.program for x in running_daemons)))
496
reactor.crash() # so stopService isn't called.
499
info("Watchdog watching for daemons.")
500
return self.watchdog.start()
503
log_failure(failure, "Unknown error occurred!")
506
result.addCallback(start_if_not_running)
507
result.addErrback(die)
510
def _daemonize(self):
511
if self._config.daemon:
513
if self._config.pid_file:
514
stream = open(self._config.pid_file, "w")
515
stream.write(str(os.getpid()))
518
def stopService(self):
519
info("Stopping client...")
520
Service.stopService(self)
522
# If CTRL-C is pressed twice in a row, the second SIGINT actually
523
# kills us before subprocesses die, and that makes them hang around.
524
signal.signal(signal.SIGINT, signal.SIG_IGN)
526
done = self.watchdog.request_exit()
527
done.addBoth(lambda r: self._remove_pid())
530
def _remove_pid(self):
531
pid_file = self._config.pid_file
532
if pid_file is not None and os.access(pid_file, os.W_OK):
533
stream = open(pid_file)
536
if pid == str(os.getpid()):
540
bootstrap_list = BootstrapList([
541
BootstrapDirectory("$data_path", "landscape", "root", 0755),
542
BootstrapDirectory("$data_path/package", "landscape", "root", 0755),
544
"$data_path/package/hash-id", "landscape", "root", 0755),
546
"$data_path/package/binaries", "landscape", "root", 0755),
548
"$data_path/package/upgrade-tool", "landscape", "root", 0755),
549
BootstrapDirectory("$data_path/messages", "landscape", "root", 0755),
550
BootstrapDirectory("$data_path/sockets", "landscape", "root", 0750),
552
"$data_path/custom-graph-scripts", "landscape", "root", 0755),
553
BootstrapDirectory("$log_dir", "landscape", "root", 0755),
554
BootstrapFile("$data_path/package/database", "landscape", "root", 0644),
558
def clean_environment():
559
"""Unset dangerous environment variables.
561
In particular unset all variables beginning with DEBIAN_ or DEBCONF_,
562
to avoid any problems when landscape-client is invoked from its
563
postinst script. Some environment variables may be set which would affect
564
*other* maintainer scripts which landscape-client invokes (via smart).
566
for key in os.environ.keys():
567
if (key.startswith("DEBIAN_")
568
or key.startswith("DEBCONF_")
569
or key in ["LANDSCAPE_ATTACHMENTS", "MAIL"]):
573
def run(args=sys.argv, reactor=None):
574
"""Start the watchdog.
576
This is the topmost function that kicks off the Landscape client. It
577
cleans up the environment, loads the configuration, and starts the
580
@param args: Command line arguments, including the program name as the
582
@param reactor: The reactor to use. If none is specified, the global
584
@raise SystemExit: if command line arguments are bad, or when landscape-
585
client is not running as 'root' or 'landscape'.
589
config = WatchDogConfiguration()
593
landscape_uid = pwd.getpwnam("landscape").pw_uid
595
sys.exit("The 'landscape' user doesn't exist!")
597
if os.getuid() not in (0, landscape_uid):
598
sys.exit("landscape-client can only be run as 'root' or 'landscape'.")
600
init_logging(config, "watchdog")
602
application = Application("landscape-client")
603
watchdog_service = WatchDogService(config)
604
watchdog_service.setServiceParent(application)
607
from twisted.internet import reactor
608
# We add a small delay to work around a Twisted bug: this method should
609
# only be called when the reactor is running, but we still get a
610
# PotentialZombieWarning.
611
reactor.callLater(0, startApplication, application, False)
613
return watchdog_service.exit_code