~free.ekanayaka/landscape-client/lucid-1.5.4-0ubuntu0.10.04.0

« back to all changes in this revision

Viewing changes to landscape/watchdog.py

  • Committer: Bazaar Package Importer
  • Author(s): Free Ekanayaka
  • Date: 2010-06-28 18:07:18 UTC
  • mfrom: (1.2.5 upstream)
  • Revision ID: james.westby@ubuntu.com-20100628180718-vytyqgbtkiirv5sb
Tags: 1.5.2.1-0ubuntu0.10.04.0
Filter duplicate network interfaces in get_active_interfaces (LP: #597000)

Show diffs side-by-side

added added

removed removed

Lines of Context:
14
14
 
15
15
from logging import warning, info, error
16
16
 
17
 
from dbus import DBusException
18
 
import dbus.glib # Side-effects rule!
19
 
 
20
17
from twisted.internet import reactor
21
18
from twisted.internet.defer import Deferred, succeed
22
19
from twisted.internet.protocol import ProcessProtocol
25
22
from twisted.application.app import startApplication
26
23
 
27
24
from landscape.deployment import Configuration, init_logging
28
 
from landscape.lib.dbus_util import get_bus
29
25
from landscape.lib.twisted_util import gather_results
30
26
from landscape.lib.log import log_failure
31
27
from landscape.lib.bootstrap import (BootstrapList, BootstrapFile,
32
28
                                     BootstrapDirectory)
33
29
from landscape.log import rotate_logs
 
30
from landscape.broker.amp import (
 
31
    RemoteBrokerConnector, RemoteMonitorConnector, RemoteManagerConnector)
 
32
from landscape.reactor import TwistedReactor
34
33
 
35
34
GRACEFUL_WAIT_PERIOD = 10
36
35
MAXIMUM_CONSECUTIVE_RESTARTS = 5
60
59
    @cvar username: The name of the user to switch to, by default.
61
60
    @cvar service: The DBus service name that the program will be expected to
62
61
        listen on.
63
 
    @cvar path: The DBus path that the program will be expected to listen on.
 
62
    @cvar max_retries: The maximum number of retries before giving up when
 
63
        trying to connect to the watched daemon.
 
64
    @cvar factor: The factor by which the delay between subsequent connection
 
65
        attempts will increase.
64
66
    """
65
67
 
66
68
    username = "landscape"
 
69
    max_retries = 3
 
70
    factor = 1.1
67
71
 
68
 
    def __init__(self, bus, reactor=reactor, verbose=False, config=None):
 
72
    def __init__(self, connector, reactor=reactor, verbose=False,
 
73
                 config=None):
69
74
        """
70
 
        @param bus: The bus which this program will listen and respond to pings
71
 
            on.
72
 
        @param reactor: The reactor with which to spawn the process and
73
 
            schedule timed calls.
 
75
        @param connector: The L{RemoteComponentConnector} of the daemon.
 
76
        @param reactor: The reactor used to spawn the process and schedule
 
77
            timed calls.
74
78
        @param verbose: Optionally, report more information when
75
79
            running this program.  Defaults to False.
76
80
        """
77
 
        self._bus = bus
 
81
        self._connector = connector
78
82
        self._reactor = reactor
79
83
        self._env = os.environ.copy()
80
84
        if os.getuid() == 0:
94
98
        self._process = None
95
99
        self._last_started = 0
96
100
        self._quick_starts = 0
 
101
        self._allow_restart = True
97
102
 
98
103
    def find_executable(self):
99
104
        """Find the fully-qualified path to the executable.
139
144
            return succeed(None)
140
145
        return self._process.kill()
141
146
 
 
147
    def _connect_and_call(self, name, *args, **kwargs):
 
148
        """Connect to the remote daemon over AMP and perform the given command.
 
149
 
 
150
        @param name: The name of the command to perform.
 
151
        @param args: Arguments list to be passed to the connect method
 
152
        @param kwargs: Keywords arguments to pass to the connect method.
 
153
        @return: A L{Deferred} resulting in C{True} if the command was
 
154
            successful or C{False} otherwise.
 
155
        @see: L{RemoteLandscapeComponentCreator.connect}.
 
156
        """
 
157
 
 
158
        def disconnect(ignored):
 
159
            self._connector.disconnect()
 
160
            return True
 
161
 
 
162
        connected = self._connector.connect(self.max_retries, self.factor,
 
163
                                            quiet=True)
 
164
        connected.addCallback(lambda remote: getattr(remote, name)())
 
165
        connected.addCallback(disconnect)
 
166
        connected.addErrback(lambda x: False)
 
167
        return connected
 
168
 
142
169
    def request_exit(self):
143
 
        try:
144
 
            object = self._bus.get_object(self.bus_name, self.object_path,
145
 
                                          introspect=False)
146
 
            object.exit(dbus_interface=self.bus_name)
147
 
        except DBusException, e:
148
 
            return False
149
 
        return True
 
170
        return self._connect_and_call("exit")
150
171
 
151
172
    def is_running(self):
152
173
        # FIXME Error cases may not be handled in the best possible way
153
174
        # here. We're basically return False if any error happens from the
154
175
        # dbus ping.
155
 
        result = Deferred()
156
 
        try:
157
 
            object = self._bus.get_object(self.bus_name, self.object_path,
158
 
                                          introspect=False)
159
 
            object.ping(reply_handler=result.callback,
160
 
                        error_handler=lambda f: result.callback(False),
161
 
                        dbus_interface=self.bus_name)
162
 
        except DBusException, e:
163
 
            result.callback(False)
164
 
        return result
 
176
        return self._connect_and_call("ping")
165
177
 
166
178
    def wait(self):
167
179
        """
181
193
            return succeed(None)
182
194
        return self._process.wait_or_die()
183
195
 
 
196
    def prepare_for_shutdown(self):
 
197
        """Called by the watchdog when starting to shut us down.
 
198
 
 
199
        It will prevent our L{WatchedProcessProtocol} to restart the process
 
200
        when it exits.
 
201
        """
 
202
        self._allow_restart = False
 
203
 
 
204
    def allow_restart(self):
 
205
        """Return a boolean indicating if the daemon should be restarted."""
 
206
        return self._allow_restart
 
207
 
184
208
    def rotate_logs(self):
185
209
        self._process.rotate_logs()
186
210
 
188
212
class Broker(Daemon):
189
213
    program = "landscape-broker"
190
214
 
191
 
    from landscape.broker.broker import BUS_NAME as bus_name
192
 
    from landscape.broker.broker import OBJECT_PATH as object_path
193
 
 
194
215
 
195
216
class Monitor(Daemon):
196
217
    program = "landscape-monitor"
197
218
 
198
 
    from landscape.monitor.monitor import BUS_NAME as bus_name
199
 
    from landscape.monitor.monitor import OBJECT_PATH as object_path
200
 
 
201
219
 
202
220
class Manager(Daemon):
203
221
    program = "landscape-manager"
204
222
    username = "root"
205
223
 
206
 
    from landscape.manager.manager import BUS_NAME as bus_name
207
 
    from landscape.manager.manager import OBJECT_PATH as object_path
208
 
 
209
224
 
210
225
class WatchedProcessProtocol(ProcessProtocol):
211
226
    """
256
271
                pass
257
272
 
258
273
    def wait(self):
 
274
        if self.transport.pid is None:
 
275
            return succeed(None)
259
276
        self._wait_result = Deferred()
260
277
        return self._wait_result
261
278
 
280
297
            self._delayed_terminate.cancel()
281
298
        if self._wait_result is not None:
282
299
            self._wait_result.callback(None)
283
 
        else:
 
300
        elif self.daemon.allow_restart():
284
301
            self.daemon.start()
285
302
 
286
303
 
290
307
    they are working.
291
308
    """
292
309
 
293
 
    def __init__(self, bus, reactor=reactor, verbose=False, config=None,
294
 
                 broker=None, monitor=None, manager=None, enabled_daemons=None):
295
 
        self.bus = bus
 
310
    def __init__(self, reactor=reactor, verbose=False, config=None,
 
311
                 broker=None, monitor=None, manager=None,
 
312
                 enabled_daemons=None):
 
313
        twisted_reactor = TwistedReactor()
296
314
        if enabled_daemons is None:
297
315
            enabled_daemons = [Broker, Monitor, Manager]
298
316
        if broker is None and Broker in enabled_daemons:
299
 
            broker = Broker(self.bus, verbose=verbose, config=config)
 
317
            broker = Broker(
 
318
                RemoteBrokerConnector(twisted_reactor, config),
 
319
                verbose=verbose, config=config.config)
300
320
        if monitor is None and Monitor in enabled_daemons:
301
 
            monitor = Monitor(self.bus, verbose=verbose, config=config)
 
321
            monitor = Monitor(
 
322
                RemoteMonitorConnector(twisted_reactor, config),
 
323
                verbose=verbose, config=config.config)
302
324
        if manager is None and Manager in enabled_daemons:
303
 
            manager = Manager(self.bus, verbose=verbose, config=config)
 
325
            manager = Manager(
 
326
                RemoteManagerConnector(twisted_reactor, config),
 
327
                verbose=verbose, config=config.config)
304
328
 
305
329
        self.broker = broker
306
330
        self.monitor = monitor
317
341
        """Return a list of any daemons that are already running."""
318
342
        results = []
319
343
        for daemon in self.daemons:
 
344
            # This method is called on startup, we basically try to connect
 
345
            # a few times in fast sequence (with exponential backoff), if we
 
346
            # don't get a response we assume the daemon is not running.
320
347
            result = daemon.is_running()
321
348
            result.addCallback(lambda is_running, d=daemon: (is_running, d))
322
349
            results.append(result)
 
350
 
323
351
        def got_all_results(r):
324
352
            return [x[1] for x in r if x[0]]
325
353
        return gather_results(results).addCallback(got_all_results)
354
382
            if self._ping_failures[daemon] == 5:
355
383
                warning("%s died! Restarting." % (daemon.program,))
356
384
                stopping = daemon.stop()
 
385
 
357
386
                def stopped(ignored):
358
387
                    daemon.start()
359
388
                    self._ping_failures[daemon] = 0
368
397
            is_running = daemon.is_running()
369
398
            is_running.addCallback(self._restart_if_not_running, daemon)
370
399
            all_running.append(is_running)
 
400
 
371
401
        def reschedule(ignored):
372
402
            self._checking = self.reactor.callLater(5, self._check)
373
403
        gather_results(all_running).addBoth(reschedule)
379
409
        # ping has already been sent but not yet responded to.
380
410
        self._stopping = True
381
411
 
382
 
        # If request_exit fails, we should just kill the daemons immediately.
383
 
        if self.broker.request_exit():
384
 
            results = [x.wait_or_die() for x in self.daemons]
385
 
        else:
386
 
            error("Couldn't request that broker gracefully shut down; "
387
 
                  "killing forcefully.")
388
 
            results = [x.stop() for x in self.daemons]
389
 
        return gather_results(results)
 
412
        # This tells the daemons to not automatically restart when they end
 
413
        for daemon in self.daemons:
 
414
            daemon.prepare_for_shutdown()
 
415
 
 
416
        def terminate_processes(broker_stopped):
 
417
            if broker_stopped:
 
418
                results = [daemon.wait_or_die() for daemon in self.daemons]
 
419
            else:
 
420
                # If request_exit fails, we should just kill the daemons
 
421
                # immediately.
 
422
                error("Couldn't request that broker gracefully shut down; "
 
423
                      "killing forcefully.")
 
424
                results = [x.stop() for x in self.daemons]
 
425
            return gather_results(results)
 
426
 
 
427
        result = self.broker.request_exit()
 
428
        return result.addCallback(terminate_processes)
390
429
 
391
430
    def _notify_rotate_logs(self, signal, frame):
392
431
        for daemon in self.daemons:
424
463
        os._exit(0) # kill off parent again.
425
464
    # some argue that this umask should be 0, but that's annoying.
426
465
    os.umask(077)
427
 
    null=os.open('/dev/null', os.O_RDWR)
 
466
    null = os.open('/dev/null', os.O_RDWR)
428
467
    for i in range(3):
429
468
        try:
430
469
            os.dup2(null, i)
438
477
 
439
478
    def __init__(self, config):
440
479
        self._config = config
441
 
        self.bus = get_bus(config.bus)
442
 
        self.watchdog = WatchDog(self.bus,
443
 
                                 verbose=not config.daemon,
444
 
                                 config=config.config,
 
480
        self.watchdog = WatchDog(verbose=not config.daemon,
 
481
                                 config=config,
445
482
                                 enabled_daemons=config.get_enabled_daemons())
446
483
        self.exit_code = 0
447
484
 
449
486
        Service.startService(self)
450
487
        bootstrap_list.bootstrap(data_path=self._config.data_path,
451
488
                                 log_dir=self._config.log_dir)
452
 
 
453
489
        result = self.watchdog.check_running()
454
490
 
455
491
        def start_if_not_running(running_daemons):
460
496
                reactor.crash() # so stopService isn't called.
461
497
                return
462
498
            self._daemonize()
463
 
            info("Watchdog watching for daemons on %r bus." % self._config.bus)
 
499
            info("Watchdog watching for daemons.")
464
500
            return self.watchdog.start()
 
501
 
465
502
        def die(failure):
466
503
            log_failure(failure, "Unknown error occurred!")
467
504
            self.exit_code = 2
503
540
bootstrap_list = BootstrapList([
504
541
    BootstrapDirectory("$data_path", "landscape", "root", 0755),
505
542
    BootstrapDirectory("$data_path/package", "landscape", "root", 0755),
506
 
    BootstrapDirectory("$data_path/package/hash-id", "landscape", "root", 0755),
507
 
    BootstrapDirectory("$data_path/package/binaries", "landscape", "root", 0755),
 
543
    BootstrapDirectory(
 
544
        "$data_path/package/hash-id", "landscape", "root", 0755),
 
545
    BootstrapDirectory(
 
546
        "$data_path/package/binaries", "landscape", "root", 0755),
508
547
    BootstrapDirectory(
509
548
        "$data_path/package/upgrade-tool", "landscape", "root", 0755),
510
549
    BootstrapDirectory("$data_path/messages", "landscape", "root", 0755),
 
550
    BootstrapDirectory("$data_path/sockets", "landscape", "root", 0750),
511
551
    BootstrapDirectory(
512
552
        "$data_path/custom-graph-scripts", "landscape", "root", 0755),
513
553
    BootstrapDirectory("$log_dir", "landscape", "root", 0755),
533
573
def run(args=sys.argv, reactor=None):
534
574
    """Start the watchdog.
535
575
 
536
 
    This is the topmost function that kicks off the Landscape client.  It cleans
537
 
    up the environment, loads the configuration, and starts the reactor.
 
576
    This is the topmost function that kicks off the Landscape client.  It
 
577
    cleans up the environment, loads the configuration, and starts the
 
578
    reactor.
538
579
 
539
580
    @param args: Command line arguments, including the program name as the
540
581
        first element.
541
582
    @param reactor: The reactor to use.  If none is specified, the global
542
583
        reactor is used.
543
584
    @raise SystemExit: if command line arguments are bad, or when landscape-
544
 
        client is running as non-root and the configuration indicates use of
545
 
        the system bus.
 
585
        client is not running as 'root' or 'landscape'.
546
586
    """
547
587
    clean_environment()
548
588
 
549
589
    config = WatchDogConfiguration()
550
590
    config.load(args)
551
591
 
552
 
    if (config.bus == "system"
553
 
        and not (os.getuid() == 0
554
 
                 or pwd.getpwnam("landscape").pw_uid == os.getuid())):
555
 
        sys.exit("When using the system bus, landscape-client must be run as "
556
 
                 "root.")
 
592
    try:
 
593
        landscape_uid = pwd.getpwnam("landscape").pw_uid
 
594
    except KeyError:
 
595
        sys.exit("The 'landscape' user doesn't exist!")
 
596
 
 
597
    if os.getuid() not in (0, landscape_uid):
 
598
        sys.exit("landscape-client can only be run as 'root' or 'landscape'.")
557
599
 
558
600
    init_logging(config, "watchdog")
559
601