~free.ekanayaka/landscape-client/karmic-updates-1.4.4-0ubuntu0.9.10

« back to all changes in this revision

Viewing changes to landscape/tests/test_watchdog.py

  • Committer: Bazaar Package Importer
  • Author(s): Rick Clark
  • Date: 2008-09-08 16:35:57 UTC
  • mfrom: (1.1.1 upstream)
  • Revision ID: james.westby@ubuntu.com-20080908163557-l3ixzj5dxz37wnw2
Tags: 1.0.18-0ubuntu1
New upstream release 

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
import stat
 
2
import time
 
3
import sys
 
4
import os
 
5
import signal
 
6
import logging
 
7
 
 
8
import dbus
 
9
 
 
10
from twisted.internet.utils import getProcessOutput
 
11
from twisted.internet.threads import deferToThread
 
12
from twisted.internet.defer import Deferred, succeed, fail
 
13
from twisted.internet import reactor
 
14
 
 
15
from landscape.broker.broker import BUS_NAME, OBJECT_PATH, IFACE_NAME
 
16
from landscape.lib.dbus_util import method
 
17
from landscape.tests.mocker import ARGS, KWARGS, ANY
 
18
from landscape.tests.clock import Clock
 
19
from landscape.tests.helpers import (
 
20
    LandscapeIsolatedTest, LandscapeTest, DBusHelper, RemoteBrokerHelper)
 
21
from landscape.watchdog import (
 
22
    Daemon, WatchDog, WatchDogService, ExecutableNotFoundError, TimeoutError,
 
23
    AlreadyRunningError, run, WatchDogConfiguration, bootstrap_list,
 
24
    MAXIMUM_CONSECUTIVE_RESTARTS, RESTART_BURST_DELAY)
 
25
import landscape.watchdog
 
26
from landscape.log import rotate_logs
 
27
 
 
28
class WatchDogTest(LandscapeTest):
 
29
    """
 
30
    Tests for L{landscape.watchdog.WatchDog}.
 
31
    """
 
32
 
 
33
    def setUp(self):
 
34
        super(WatchDogTest, self).setUp()
 
35
        self.bus = object()
 
36
        self.broker_factory = self.mocker.replace("landscape.watchdog.Broker",
 
37
                                                  passthrough=False)
 
38
        self.monitor_factory = self.mocker.replace("landscape.watchdog.Monitor",
 
39
                                                   passthrough=False)
 
40
        self.manager_factory = self.mocker.replace("landscape.watchdog.Manager",
 
41
                                                   passthrough=False)
 
42
        self.broker = self.broker_factory(self.bus, verbose=False,
 
43
                                          config=None)
 
44
        self.monitor = self.monitor_factory(self.bus, verbose=False,
 
45
                                            config=None)
 
46
        self.manager = self.manager_factory(self.bus, verbose=False,
 
47
                                            config=None)
 
48
 
 
49
        self.expect(self.broker.program).result("landscape-broker")
 
50
        self.mocker.count(0, None)
 
51
        self.expect(self.manager.program).result("landscape-manager")
 
52
        self.mocker.count(0, None)
 
53
        self.expect(self.monitor.program).result("landscape-monitor")
 
54
        self.mocker.count(0, None)
 
55
 
 
56
    def test_daemon_construction(self):
 
57
        """The WatchDog sets up some daemons when constructed."""
 
58
        self.mocker.replay()
 
59
        WatchDog(self.bus)
 
60
 
 
61
    def test_start_checks_for_existing_broker(self):
 
62
        self.expect(self.broker.is_running()).result(succeed(True))
 
63
        self.expect(self.monitor.is_running()).result(succeed(False))
 
64
        self.expect(self.manager.is_running()).result(succeed(False))
 
65
        self.mocker.replay()
 
66
        result = WatchDog(self.bus).start()
 
67
        self.assertFailure(result, AlreadyRunningError)
 
68
        return result
 
69
 
 
70
    def test_start_checks_for_existing_monitor(self):
 
71
        self.expect(self.broker.is_running()).result(succeed(False))
 
72
        self.expect(self.monitor.is_running()).result(succeed(True))
 
73
        self.expect(self.manager.is_running()).result(succeed(False))
 
74
        self.mocker.replay()
 
75
        result = WatchDog(self.bus).start()
 
76
        self.assertFailure(result, AlreadyRunningError)
 
77
        return result
 
78
 
 
79
    def test_start_checks_for_existing_manager(self):
 
80
        self.expect(self.broker.is_running()).result(succeed(False))
 
81
        self.expect(self.monitor.is_running()).result(succeed(False))
 
82
        self.expect(self.manager.is_running()).result(succeed(True))
 
83
        self.mocker.replay()
 
84
        result = WatchDog(self.bus).start()
 
85
        self.assertFailure(result, AlreadyRunningError)
 
86
        return result
 
87
 
 
88
    def none_are_running(self):
 
89
        self.expect(self.broker.is_running()).result(succeed(False))
 
90
        self.expect(self.monitor.is_running()).result(succeed(False))
 
91
        self.expect(self.manager.is_running()).result(succeed(False))
 
92
 
 
93
 
 
94
    def expect_request_exit(self):
 
95
        self.expect(self.broker.request_exit()).result(succeed(False))
 
96
        self.expect(self.broker.wait_or_die()).result(succeed(None))
 
97
        self.expect(self.monitor.wait_or_die()).result(succeed(None))
 
98
        self.expect(self.manager.wait_or_die()).result(succeed(None))
 
99
 
 
100
 
 
101
    def test_start_and_stop_daemons(self):
 
102
        """The WatchDog will start all daemons, starting with the broker."""
 
103
        self.mocker.order()
 
104
 
 
105
        self.none_are_running()
 
106
 
 
107
        self.broker.start()
 
108
        self.monitor.start()
 
109
        self.manager.start()
 
110
 
 
111
        self.expect_request_exit()
 
112
 
 
113
        self.mocker.replay()
 
114
 
 
115
        dog = WatchDog(self.bus)
 
116
        dog.start()
 
117
        return dog.request_exit()
 
118
 
 
119
    def test_request_exit(self):
 
120
        """request_exit() asks the broker to exit.
 
121
 
 
122
        The broker itself is responsible for notifying other plugins to exit.
 
123
 
 
124
        When the deferred returned from request_exit fires, the process should
 
125
        definitely be gone.
 
126
        """
 
127
        self.expect_request_exit()
 
128
        self.mocker.replay()
 
129
        return WatchDog(self.bus).request_exit()
 
130
 
 
131
    def test_ping_reply_after_request_exit_should_not_restart_processes(self):
 
132
        """
 
133
        When request_exit occurs between a ping request and response, a failing
 
134
        ping response should not cause the process to be restarted.
 
135
        """
 
136
        self.mocker.order()
 
137
        self.none_are_running()
 
138
 
 
139
        self.broker.start()
 
140
        self.monitor.start()
 
141
        self.manager.start()
 
142
 
 
143
        monitor_ping_result = Deferred()
 
144
        self.expect(self.broker.is_running()).result(succeed(True))
 
145
        self.expect(self.monitor.is_running()).result(monitor_ping_result)
 
146
        self.expect(self.manager.is_running()).result(succeed(True))
 
147
 
 
148
        self.expect_request_exit()
 
149
 
 
150
        # And the monitor should never be explicitly stopped / restarted.
 
151
        self.expect(self.monitor.stop()).count(0)
 
152
        self.expect(self.monitor.start()).count(0)
 
153
 
 
154
        self.mocker.replay()
 
155
 
 
156
        clock = Clock()
 
157
 
 
158
        dog = WatchDog(self.bus, clock)
 
159
        dog.start()
 
160
        clock.advance(5)
 
161
        result = dog.request_exit()
 
162
        monitor_ping_result.callback(False)
 
163
        return result
 
164
 
 
165
 
 
166
START = "start"
 
167
STOP = "stop"
 
168
 
 
169
class BoringDaemon(object):
 
170
    def __init__(self, program):
 
171
        self.program = program
 
172
        self.boots = []
 
173
 
 
174
    def start(self):
 
175
        self.boots.append(START)
 
176
 
 
177
    def stop(self):
 
178
        self.boots.append(STOP)
 
179
        return succeed(None)
 
180
 
 
181
    def is_running(self):
 
182
        return succeed(True)
 
183
 
 
184
    def request_exit(self):
 
185
        return True
 
186
 
 
187
    def wait(self):
 
188
        return succeed(None)
 
189
    
 
190
    def wait_or_die(self):
 
191
        return self.wait()
 
192
 
 
193
 
 
194
class AsynchronousPingDaemon(BoringDaemon):
 
195
    pings = 0
 
196
    deferred = None
 
197
 
 
198
    def is_running(self):
 
199
        self.pings += 1
 
200
        if self.deferred is not None:
 
201
            raise AssertionError(
 
202
                "is_running called while it's already running!")
 
203
        self.deferred = Deferred()
 
204
        return self.deferred
 
205
 
 
206
    def fire_running(self, value):
 
207
        self.deferred.callback(value)
 
208
        self.deferred = None
 
209
 
 
210
 
 
211
class NonMockerWatchDogTests(LandscapeTest):
 
212
    # mocker is hard
 
213
 
 
214
    def test_ping_is_not_rescheduled_until_pings_complete(self):
 
215
        clock = Clock()
 
216
        dog = WatchDog(object(), clock,
 
217
                       broker=AsynchronousPingDaemon("test-broker"),
 
218
                       monitor=AsynchronousPingDaemon("test-monitor"),
 
219
                       manager=AsynchronousPingDaemon("test-manager"))
 
220
 
 
221
        dog.start_monitoring()
 
222
 
 
223
        clock.advance(5)
 
224
        for daemon in dog.daemons:
 
225
            self.assertEquals(daemon.pings, 1)
 
226
        clock.advance(5)
 
227
        for daemon in dog.daemons:
 
228
            self.assertEquals(daemon.pings, 1)
 
229
            daemon.fire_running(True)
 
230
        clock.advance(5)
 
231
        for daemon in dog.daemons:
 
232
            self.assertEquals(daemon.pings, 2)
 
233
 
 
234
    def test_check_daemons(self):
 
235
        """
 
236
        The daemons are checked to be running every so often. When N=5 of these
 
237
        checks fail, the daemon will be restarted.
 
238
        """
 
239
        clock = Clock()
 
240
        dog = WatchDog(object(), clock,
 
241
                       broker=AsynchronousPingDaemon("test-broker"),
 
242
                       monitor=AsynchronousPingDaemon("test-monitor"),
 
243
                       manager=AsynchronousPingDaemon("test-manager"))
 
244
        dog.start_monitoring()
 
245
 
 
246
 
 
247
        for i in range(4):
 
248
            clock.advance(5)
 
249
            dog.broker.fire_running(False)
 
250
            dog.monitor.fire_running(True)
 
251
            dog.manager.fire_running(True)
 
252
            self.assertEquals(dog.broker.boots, [])
 
253
 
 
254
        clock.advance(5)
 
255
        dog.broker.fire_running(False)
 
256
        dog.monitor.fire_running(True)
 
257
        dog.manager.fire_running(True)
 
258
        self.assertEquals(dog.broker.boots, [STOP, START])
 
259
 
 
260
    def test_counted_ping_failures_reset_on_success(self):
 
261
        """
 
262
        When a failing ping is followed by a successful ping, it will then
 
263
        require 5 more ping failures to restart the daemon.
 
264
        """
 
265
        clock = Clock()
 
266
        dog = WatchDog(object(), clock,
 
267
                       broker=AsynchronousPingDaemon("test-broker"),
 
268
                       monitor=AsynchronousPingDaemon("test-monitor"),
 
269
                       manager=AsynchronousPingDaemon("test-manager"))
 
270
        dog.start_monitoring()
 
271
 
 
272
        clock.advance(5)
 
273
        dog.broker.fire_running(False)
 
274
        dog.monitor.fire_running(True)
 
275
        dog.manager.fire_running(True)
 
276
 
 
277
        clock.advance(5)
 
278
        dog.broker.fire_running(True)
 
279
        dog.monitor.fire_running(True)
 
280
        dog.manager.fire_running(True)
 
281
 
 
282
        for i in range(4):
 
283
            clock.advance(5)
 
284
            dog.broker.fire_running(False)
 
285
            dog.monitor.fire_running(True)
 
286
            dog.manager.fire_running(True)
 
287
            self.assertEquals(dog.broker.boots, [])
 
288
 
 
289
        clock.advance(5)
 
290
        dog.broker.fire_running(False)
 
291
        dog.monitor.fire_running(True)
 
292
        dog.manager.fire_running(True)
 
293
        self.assertEquals(dog.broker.boots, [STOP, START])
 
294
 
 
295
    def test_exiting_during_outstanding_ping_works(self):
 
296
        """
 
297
        This is a regression test. Some code called .cancel() on a timed call
 
298
        without checking if it was active first. Asynchronous is_running will
 
299
        cause the scheduled call to exist but already fired.
 
300
        """
 
301
        clock = Clock()
 
302
        dog = WatchDog(object(), clock,
 
303
                       broker=BoringDaemon("test-broker"),
 
304
                       monitor=BoringDaemon("test-monitor"),
 
305
                       manager=AsynchronousPingDaemon("test-manager"))
 
306
        dog.start_monitoring()
 
307
        clock.advance(5)
 
308
        return dog.request_exit()
 
309
 
 
310
    def test_wait_for_stop_before_start(self):
 
311
        """
 
312
        When a daemon times out and the watchdog attempts to kill it, it should
 
313
        not be restarted until the process has fully died.
 
314
        """
 
315
        clock = Clock()
 
316
        dog = WatchDog(object(), clock,
 
317
                       broker=AsynchronousPingDaemon("test-broker"),
 
318
                       monitor=BoringDaemon("test-monitor"),
 
319
                       manager=BoringDaemon("test-manager"))
 
320
        stop_result = Deferred()
 
321
        dog.broker.stop = lambda: stop_result
 
322
        dog.start_monitoring()
 
323
 
 
324
        for i in range(5):
 
325
            clock.advance(5)
 
326
            dog.broker.fire_running(False)
 
327
 
 
328
        self.assertEquals(dog.broker.boots, [])
 
329
        stop_result.callback(None)
 
330
        self.assertEquals(dog.broker.boots, ["start"])
 
331
 
 
332
    def test_wait_for_stop_before_ping(self):
 
333
        """
 
334
        When a daemon times out and the watchdog restarts it, it should not be
 
335
        pinged until after the restart completes.
 
336
        """
 
337
        clock = Clock()
 
338
        dog = WatchDog(object(), clock,
 
339
                       broker=AsynchronousPingDaemon("test-broker"),
 
340
                       monitor=BoringDaemon("test-monitor"),
 
341
                       manager=BoringDaemon("test-manager"))
 
342
        stop_result = Deferred()
 
343
        dog.broker.stop = lambda: stop_result
 
344
        dog.start_monitoring()
 
345
 
 
346
        for i in range(5):
 
347
            clock.advance(5)
 
348
            dog.broker.fire_running(False)
 
349
 
 
350
        self.assertEquals(dog.broker.boots, [])
 
351
        self.assertEquals(dog.broker.pings, 5)
 
352
        clock.advance(5) # wait some more to see if a ping happens
 
353
        self.assertEquals(dog.broker.pings, 5)
 
354
        stop_result.callback(None)
 
355
        self.assertEquals(dog.broker.boots, ["start"])
 
356
        clock.advance(5)
 
357
        self.assertEquals(dog.broker.pings, 6)
 
358
 
 
359
    def test_ping_failure_counter_reset_after_restart(self):
 
360
        """
 
361
        When a daemon stops responding and gets restarted after 5 failed pings,
 
362
        it will wait for another 5 failed pings before it will be restarted
 
363
        again.
 
364
        """
 
365
        clock = Clock()
 
366
        dog = WatchDog(object(), clock,
 
367
                       broker=AsynchronousPingDaemon("test-broker"),
 
368
                       monitor=BoringDaemon("test-monitor"),
 
369
                       manager=BoringDaemon("test-manager"))
 
370
        dog.start_monitoring()
 
371
 
 
372
        for i in range(5):
 
373
            clock.advance(5)
 
374
            dog.broker.fire_running(False)
 
375
 
 
376
        self.assertEquals(dog.broker.boots, ["stop", "start"])
 
377
        for i in range(4):
 
378
            clock.advance(5)
 
379
            dog.broker.fire_running(False)
 
380
            self.assertEquals(dog.broker.boots, ["stop", "start"])
 
381
        clock.advance(5)
 
382
        dog.broker.fire_running(False)
 
383
        self.assertEquals(dog.broker.boots, ["stop", "start", "stop", "start"])
 
384
 
 
385
    def test_die_when_broker_unavailable(self):
 
386
        """
 
387
        If the broker is not running, the client should still be able to shut
 
388
        down.
 
389
        """
 
390
        self.log_helper.ignore_errors(
 
391
            "Couldn't request that broker gracefully shut down; "
 
392
            "killing forcefully.")
 
393
        clock = Clock()
 
394
        dog = WatchDog(object(), clock,
 
395
                       broker=BoringDaemon("test-broker"),
 
396
                       monitor=BoringDaemon("test-monitor"),
 
397
                       manager=BoringDaemon("test-manager"))
 
398
 
 
399
        # request_exit returns False when there's no broker, as tested by
 
400
        # DaemonTest.test_request_exit_without_broker
 
401
        dog.broker.request_exit = lambda: False
 
402
        # The manager's wait method never fires its deferred because nothing
 
403
        # told it to die because the broker is dead!
 
404
 
 
405
        manager_result = Deferred()
 
406
        dog.manager.wait = lambda: manager_result
 
407
        def stop():
 
408
            manager_result.callback(True)
 
409
            return succeed(True)
 
410
        dog.manager.stop = stop
 
411
 
 
412
        result = dog.request_exit()
 
413
        return result
 
414
 
 
415
 
 
416
class DaemonTestBase(LandscapeIsolatedTest):
 
417
    bus = None
 
418
 
 
419
    def setUp(self):
 
420
        super(DaemonTestBase, self).setUp()
 
421
 
 
422
        self.exec_dir = self.makeDir()
 
423
        self.exec_name = os.path.join(self.exec_dir, "landscape-broker")
 
424
 
 
425
        self.daemon = self.get_daemon()
 
426
 
 
427
        self.saved_argv = sys.argv
 
428
        sys.argv = [os.path.join(self.exec_dir, "arv0_execname")]
 
429
 
 
430
    def tearDown(self):
 
431
        sys.argv = self.saved_argv
 
432
        super(DaemonTestBase, self).tearDown()
 
433
 
 
434
    def get_daemon(self, **kwargs):
 
435
        daemon = Daemon(self.bus, **kwargs)
 
436
        daemon.program = os.path.basename(self.exec_name)
 
437
        daemon.bus_name = BUS_NAME
 
438
        daemon.object_path = OBJECT_PATH
 
439
        return daemon
 
440
 
 
441
 
 
442
class FileChangeWaiter(object):
 
443
    # XXX This should be reimplemented using a named pipe.
 
444
 
 
445
    def __init__(self, filename):
 
446
        os.utime(filename, (0, 0))
 
447
        self._mtime = os.path.getmtime(filename)
 
448
        self._filename = filename
 
449
 
 
450
    def wait(self):
 
451
        while self._mtime == os.path.getmtime(self._filename):
 
452
            time.sleep(0.1)
 
453
 
 
454
 
 
455
class DaemonTest(DaemonTestBase):
 
456
 
 
457
    helpers = [DBusHelper]
 
458
 
 
459
    def test_find_executable_works(self):
 
460
        self.makeFile("I'm the broker.", path=self.exec_name)
 
461
        self.assertEquals(self.daemon.find_executable(), self.exec_name)
 
462
 
 
463
    def test_find_executable_cant_find_file(self):
 
464
        self.assertRaises(ExecutableNotFoundError, self.daemon.find_executable)
 
465
 
 
466
    def test_start_process(self):
 
467
        output_filename = self.makeFile("NOT RUN")
 
468
        self.makeFile('#!/bin/sh\necho "RUN $@" > %s' % output_filename,
 
469
                      path=self.exec_name)
 
470
        os.chmod(self.exec_name, 0755)
 
471
 
 
472
        waiter = FileChangeWaiter(output_filename)
 
473
 
 
474
        self.daemon.start()
 
475
 
 
476
        waiter.wait()
 
477
 
 
478
        self.assertEquals(open(output_filename).read(),
 
479
                          "RUN --ignore-sigint --quiet\n")
 
480
 
 
481
        return self.daemon.stop()
 
482
 
 
483
    def test_start_process_with_verbose(self):
 
484
        output_filename = self.makeFile("NOT RUN")
 
485
        self.makeFile('#!/bin/sh\necho "RUN $@" > %s' % output_filename,
 
486
                      path=self.exec_name)
 
487
        os.chmod(self.exec_name, 0755)
 
488
 
 
489
        waiter = FileChangeWaiter(output_filename)
 
490
 
 
491
        daemon = self.get_daemon(verbose=True)
 
492
        daemon.start()
 
493
 
 
494
        waiter.wait()
 
495
 
 
496
        self.assertEquals(open(output_filename).read(), "RUN --ignore-sigint\n")
 
497
 
 
498
        return daemon.stop()
 
499
 
 
500
    def test_kill_process_with_sigterm(self):
 
501
        """The stop() method sends SIGTERM to the subprocess."""
 
502
        output_filename = self.makeFile("NOT RUN")
 
503
        self.makeFile("#!%s\n"
 
504
                      "import time\n"
 
505
                      "file = open(%r, 'w')\n"
 
506
                      "file.write('RUN')\n"
 
507
                      "file.close()\n"
 
508
                      "time.sleep(1000)\n"
 
509
                      % (sys.executable, output_filename),
 
510
                      path=self.exec_name)
 
511
        os.chmod(self.exec_name, 0755)
 
512
 
 
513
        waiter = FileChangeWaiter(output_filename)
 
514
        self.daemon.start()
 
515
        waiter.wait()
 
516
        self.assertEquals(open(output_filename).read(), "RUN")
 
517
        return self.daemon.stop()
 
518
 
 
519
    def test_kill_process_with_sigkill(self):
 
520
        """
 
521
        Verify that killing process really works, even if something is
 
522
        holding the process badly.  In these cases, a SIGKILL is performed
 
523
        some time after the SIGTERM was issued and didn't work.
 
524
        """
 
525
        output_filename = self.makeFile("NOT RUN")
 
526
        self.makeFile("#!%s\n"
 
527
                      "import signal, os\n"
 
528
                      "signal.signal(signal.SIGTERM, signal.SIG_IGN)\n"
 
529
                      "file = open(%r, 'w')\n"
 
530
                      "file.write('RUN')\n"
 
531
                      "file.close()\n"
 
532
                      "os.kill(os.getpid(), signal.SIGSTOP)\n"
 
533
                      % (sys.executable, output_filename),
 
534
                      path=self.exec_name)
 
535
        os.chmod(self.exec_name, 0755)
 
536
 
 
537
        self.addCleanup(setattr, landscape.watchdog, "SIGKILL_DELAY",
 
538
                        landscape.watchdog.SIGKILL_DELAY)
 
539
        landscape.watchdog.SIGKILL_DELAY = 1
 
540
 
 
541
        waiter = FileChangeWaiter(output_filename)
 
542
        self.daemon.start()
 
543
        waiter.wait()
 
544
        self.assertEquals(open(output_filename).read(), "RUN")
 
545
        return self.daemon.stop()
 
546
 
 
547
    def test_wait_for_process(self):
 
548
        """
 
549
        The C{wait} method returns a Deferred that fires when the process has
 
550
        died.
 
551
        """
 
552
        output_filename = self.makeFile("NOT RUN")
 
553
        self.makeFile('#!/bin/sh\necho "RUN" > %s' % output_filename,
 
554
                      path=self.exec_name)
 
555
        os.chmod(self.exec_name, 0755)
 
556
 
 
557
        self.daemon.start()
 
558
        def got_result(result):
 
559
            self.assertEquals(open(output_filename).read(), "RUN\n")
 
560
        return self.daemon.wait().addCallback(got_result)
 
561
 
 
562
    def test_wait_or_die_dies_happily(self):
 
563
        """
 
564
        The C{wait_or_die} method will wait for the process to die for a
 
565
        certain amount of time, just like C{wait}.
 
566
        """
 
567
        output_filename = self.makeFile("NOT RUN")
 
568
        self.makeFile('#!/bin/sh\necho "RUN" > %s' % output_filename,
 
569
                      path=self.exec_name)
 
570
        os.chmod(self.exec_name, 0755)
 
571
 
 
572
        self.daemon.start()
 
573
        def got_result(result):
 
574
            self.assertEquals(open(output_filename).read(), "RUN\n")
 
575
        return self.daemon.wait_or_die().addCallback(got_result)
 
576
 
 
577
    def test_wait_or_die_terminates(self):
 
578
        """wait_or_die eventually terminates the process."""
 
579
        output_filename = self.makeFile("NOT RUN")
 
580
        self.makeFile("""\
 
581
#!%(exe)s
 
582
import time
 
583
import signal
 
584
file = open(%(out)r, 'w')
 
585
file.write('unsignalled')
 
586
file.close()
 
587
def term(frame, sig):
 
588
    file = open(%(out)r, 'w')
 
589
    file.write('TERMINATED')
 
590
    file.close()
 
591
signal.signal(signal.SIGTERM, term)
 
592
time.sleep(999)
 
593
        """
 
594
                      % {"exe": sys.executable, "out": output_filename},
 
595
                      path=self.exec_name)
 
596
        os.chmod(self.exec_name, 0755)
 
597
 
 
598
        self.addCleanup(setattr, landscape.watchdog, "GRACEFUL_WAIT_PERIOD",
 
599
                        landscape.watchdog.GRACEFUL_WAIT_PERIOD)
 
600
        landscape.watchdog.GRACEFUL_WAIT_PERIOD = 0.2
 
601
        self.daemon.start()
 
602
        def got_result(result):
 
603
            self.assertEquals(open(output_filename).read(), "TERMINATED")
 
604
        return self.daemon.wait_or_die().addCallback(got_result)
 
605
 
 
606
    def test_wait_or_die_kills(self):
 
607
        """
 
608
        wait_or_die eventually falls back to KILLing a process, after waiting
 
609
        and terminating don't work.
 
610
        """
 
611
        output_filename = self.makeFile("NOT RUN")
 
612
        self.makeFile("#!%s\n"
 
613
                      "import signal, os\n"
 
614
                      "signal.signal(signal.SIGTERM, signal.SIG_IGN)\n"
 
615
                      "file = open(%r, 'w')\n"
 
616
                      "file.write('RUN')\n"
 
617
                      "file.close()\n"
 
618
                      "os.kill(os.getpid(), signal.SIGSTOP)\n"
 
619
                      % (sys.executable, output_filename),
 
620
                      path=self.exec_name)
 
621
        os.chmod(self.exec_name, 0755)
 
622
 
 
623
        self.addCleanup(setattr, landscape.watchdog, "SIGKILL_DELAY",
 
624
                        landscape.watchdog.SIGKILL_DELAY)
 
625
        self.addCleanup(setattr, landscape.watchdog, "GRACEFUL_WAIT_PERIOD",
 
626
                        landscape.watchdog.GRACEFUL_WAIT_PERIOD)
 
627
        landscape.watchdog.GRACEFUL_WAIT_PERIOD = 1
 
628
        landscape.watchdog.SIGKILL_DELAY = 1
 
629
 
 
630
        waiter = FileChangeWaiter(output_filename)
 
631
        self.daemon.start()
 
632
        waiter.wait()
 
633
        self.assertEquals(open(output_filename).read(), "RUN")
 
634
        return self.daemon.wait_or_die()
 
635
 
 
636
    def test_wait_for_unstarted_process(self):
 
637
        """
 
638
        If a process has never been started, waiting for it is
 
639
        immediately successful.
 
640
        """
 
641
        daemon = self.get_daemon()
 
642
 
 
643
        def assert_wait(is_running):
 
644
            self.assertFalse(is_running)
 
645
            return daemon.wait()
 
646
 
 
647
        result = daemon.is_running()
 
648
        result.addCallback(assert_wait)
 
649
        return result
 
650
 
 
651
    def test_wait_or_die_for_unstarted_process(self):
 
652
        """
 
653
        If a process has never been started, wait_or_die is
 
654
        immediately successful.
 
655
        """
 
656
        daemon = self.get_daemon()
 
657
        l = []
 
658
        daemon.wait_or_die().addCallback(l.append)
 
659
        self.assertEquals(l, [None])
 
660
 
 
661
 
 
662
    def test_simulate_broker_not_starting_up(self):
 
663
        """
 
664
        When a daemon repeatedly dies, the watchdog gives up entirely and shuts
 
665
        down.
 
666
        """
 
667
        self.log_helper.ignore_errors("Can't keep landscape-broker running. "
 
668
                                      "Exiting.")
 
669
 
 
670
        output_filename = self.makeFile("NOT RUN")
 
671
 
 
672
        self.makeFile("#!/bin/sh\necho RUN >> %s" % output_filename,
 
673
                      path=self.exec_name)
 
674
        os.chmod(self.exec_name, 0755)
 
675
 
 
676
        def got_result(result):
 
677
            self.assertEquals(len(list(open(output_filename))),
 
678
                              MAXIMUM_CONSECUTIVE_RESTARTS)
 
679
 
 
680
            self.assertTrue("Can't keep landscape-broker running." in
 
681
                            self.logfile.getvalue())
 
682
 
 
683
        reactor_mock = self.mocker.proxy(reactor, passthrough=True)
 
684
        reactor_mock.stop()
 
685
        self.mocker.replay()
 
686
 
 
687
        result = Deferred()
 
688
        result.addCallback(lambda x: self.daemon.stop())
 
689
        result.addCallback(got_result)
 
690
 
 
691
        reactor.callLater(1, result.callback, None)
 
692
 
 
693
        daemon = self.get_daemon(reactor=reactor_mock)
 
694
        daemon.start()
 
695
 
 
696
        return result
 
697
 
 
698
    def test_simulate_broker_not_starting_up_with_delay(self):
 
699
        """
 
700
        The watchdog won't shutdown entirely when a daemon dies repeatedly as
 
701
        long as it is not dying too quickly.
 
702
        """
 
703
        # This test hacks the first time() call to make it return a timestamp
 
704
        # that happend a while ago, and so give the impression that some time
 
705
        # has passed and it's fine to restart more times again.
 
706
        self.log_helper.ignore_errors("Can't keep landscape-broker running. "
 
707
                                      "Exiting.")
 
708
 
 
709
        output_filename = self.makeFile("NOT RUN")
 
710
 
 
711
        self.makeFile("#!/bin/sh\necho RUN >> %s" % output_filename,
 
712
                      path=self.exec_name)
 
713
        os.chmod(self.exec_name, 0755)
 
714
 
 
715
        def got_result(result):
 
716
            # Pay attention to the +1 bellow. It's the reason for this test.
 
717
            self.assertEquals(len(list(open(output_filename))),
 
718
                              MAXIMUM_CONSECUTIVE_RESTARTS + 1)
 
719
 
 
720
            self.assertTrue("Can't keep landscape-broker running." in
 
721
                            self.logfile.getvalue())
 
722
 
 
723
        result = Deferred()
 
724
        result.addCallback(lambda x: self.daemon.stop())
 
725
        result.addCallback(got_result)
 
726
 
 
727
        reactor_mock = self.mocker.proxy(reactor, passthrough=True)
 
728
        reactor_mock.stop()
 
729
 
 
730
        # Make the *first* call to time return 0, so that it will try one
 
731
        # more time, and exercise the burst protection system.
 
732
        time_mock = self.mocker.replace("time.time")
 
733
        self.expect(time_mock()).result(time.time() - RESTART_BURST_DELAY)
 
734
        self.expect(time_mock()).passthrough().count(0, None)
 
735
 
 
736
        self.mocker.replay()
 
737
 
 
738
        # It's important to call start() shortly after the mocking above,
 
739
        # as we don't want anyone else getting the fake time.
 
740
        daemon = self.get_daemon(reactor=reactor_mock)
 
741
        daemon.start()
 
742
 
 
743
        reactor.callLater(1, result.callback, None)
 
744
 
 
745
        return result
 
746
 
 
747
    def test_is_not_running(self):
 
748
        result = self.daemon.is_running()
 
749
        result.addCallback(self.assertFalse)
 
750
        return result
 
751
 
 
752
    def test_spawn_process_with_uid(self):
 
753
        """
 
754
        When the current UID as reported by os.getuid is not the uid of the
 
755
        username of the daemon, the watchdog explicitly switched to the uid of
 
756
        the username of the daemon. It also specified the gid as the primary
 
757
        group of that user.
 
758
        """
 
759
        self.makeFile("", path=self.exec_name)
 
760
 
 
761
        getuid = self.mocker.replace("os.getuid")
 
762
        getpwnam = self.mocker.replace("pwd.getpwnam")
 
763
        reactor = self.mocker.mock()
 
764
        self.expect(getuid()).result(0)
 
765
        info = getpwnam("landscape")
 
766
        self.expect(info.pw_uid).result(123)
 
767
        self.expect(info.pw_gid).result(456)
 
768
 
 
769
        reactor.spawnProcess(ARGS, KWARGS, uid=123, gid=456)
 
770
 
 
771
        self.mocker.replay()
 
772
 
 
773
        daemon = self.get_daemon(reactor=reactor)
 
774
        daemon.start()
 
775
 
 
776
    def test_spawn_process_without_uid(self):
 
777
        """
 
778
        If the daemon is specified to run as the current user, no uid or gid
 
779
        switching will occur.
 
780
        """
 
781
        self.makeFile("", path=self.exec_name)
 
782
        getuid = self.mocker.replace("os.getuid")
 
783
        reactor = self.mocker.mock()
 
784
        self.expect(getuid()).result(555)
 
785
 
 
786
        reactor.spawnProcess(ARGS, KWARGS, uid=None, gid=None)
 
787
 
 
788
        self.mocker.replay()
 
789
 
 
790
        daemon = self.get_daemon(reactor=reactor)
 
791
        daemon.start()
 
792
 
 
793
    def test_request_exit(self):
 
794
        """The request_exit() method calls exit() on the broker synchronously.
 
795
 
 
796
        The method must be synchronous because we don't want the watchdog to
 
797
        do anything else while we're requesting the broker to exit.  This makes
 
798
        testing it a bit wild unfortunately.  We have to spawn the a stub
 
799
        broker into a different process.
 
800
        """
 
801
 
 
802
        output_filename = self.makeFile("NOT CALLED")
 
803
        broker_filename = self.makeFile(STUB_BROKER %
 
804
                                        {"executable": sys.executable,
 
805
                                         "path": sys.path,
 
806
                                         "output_filename": output_filename,
 
807
                                         "bus_name": BUS_NAME,
 
808
                                         "object_path": OBJECT_PATH,
 
809
                                         "iface_name": IFACE_NAME})
 
810
 
 
811
        os.chmod(broker_filename, 0755)
 
812
 
 
813
        process_result = getProcessOutput(broker_filename, env=os.environ,
 
814
                                          errortoo=True)
 
815
 
 
816
        # Wait until the process starts up, trying the call a few times.
 
817
        for i in range(10):
 
818
            if self.daemon.request_exit():
 
819
                break
 
820
            time.sleep(0.1)
 
821
        else:
 
822
            self.fail("request_exit() never returned True.")
 
823
 
 
824
        def got_result(result):
 
825
            self.assertEquals(result, "")
 
826
            self.assertEquals(open(output_filename).read(), "CALLED")
 
827
 
 
828
        return process_result.addCallback(got_result)
 
829
 
 
830
    def test_request_exit_without_broker(self):
 
831
        """
 
832
        The request_exit method returns False when the broker can't be
 
833
        contacted.
 
834
        """
 
835
        self.assertFalse(self.daemon.request_exit())
 
836
 
 
837
 
 
838
class DaemonBrokerTest(DaemonTestBase):
 
839
 
 
840
    helpers = [RemoteBrokerHelper]
 
841
 
 
842
    @property
 
843
    def bus(self):
 
844
        return self.broker_service.bus
 
845
 
 
846
    def test_is_running(self):
 
847
        result = self.daemon.is_running()
 
848
        result.addCallback(self.assertTrue)
 
849
        return result
 
850
 
 
851
 
 
852
class WatchDogOptionsTest(LandscapeTest):
 
853
 
 
854
    def setUp(self):
 
855
        super(WatchDogOptionsTest, self).setUp()
 
856
        self.config = WatchDogConfiguration()
 
857
        self.config.default_config_filenames = []
 
858
 
 
859
    def test_daemon(self):
 
860
        self.config.load(["--daemon"])
 
861
        self.assertTrue(self.config.daemon)
 
862
 
 
863
    def test_daemon_default(self):
 
864
        self.config.load([])
 
865
        self.assertFalse(self.config.daemon)
 
866
 
 
867
    def test_pid_file(self):
 
868
        self.config.load(["--pid-file", "wubble.txt"])
 
869
        self.assertEquals(self.config.pid_file, "wubble.txt")
 
870
 
 
871
    def test_pid_file_default(self):
 
872
        self.config.load([])
 
873
        self.assertEquals(self.config.pid_file, None)
 
874
 
 
875
 
 
876
class WatchDogScriptTest(LandscapeTest):
 
877
 
 
878
    def setUp(self):
 
879
        super(WatchDogScriptTest, self).setUp()
 
880
        daemonize = self.mocker.replace("landscape.watchdog.daemonize",
 
881
                                        passthrough=False)
 
882
        daemonize()
 
883
        self.mocker.count(0, None)
 
884
 
 
885
        startApplication = self.mocker.replace(
 
886
            "twisted.application.app.startApplication", passthrough=False)
 
887
        startApplication(ANY, False)
 
888
        self.mocker.count(0, None)
 
889
 
 
890
        reactor = self.mocker.replace("twisted.internet.reactor",
 
891
                                      passthrough=False)
 
892
        reactor.run()
 
893
        self.mocker.count(0, None)
 
894
 
 
895
    def test_daemonize(self):
 
896
        self.mocker.replay()
 
897
        try:
 
898
            run(["--daemon", "--log-dir", self.make_dir()])
 
899
            self.mocker.verify()
 
900
        finally:
 
901
            self.mocker.reset()
 
902
 
 
903
    def test_pid_file(self):
 
904
        pid_file = self.make_path()
 
905
        self.mocker.replay()
 
906
        try:
 
907
            run(["--daemon", "--pid-file", pid_file,
 
908
                 "--log-dir", self.make_dir()])
 
909
            self.mocker.verify()
 
910
        finally:
 
911
            self.mocker.reset()
 
912
        self.assertEquals(int(open(pid_file, "r").read()), os.getpid())
 
913
 
 
914
 
 
915
class WatchDogServiceTest(LandscapeTest):
 
916
 
 
917
    def setUp(self):
 
918
        super(WatchDogServiceTest, self).setUp()
 
919
        self.configuration = WatchDogConfiguration()
 
920
        self.data_path = self.makeDir()
 
921
        self.log_dir = self.makeDir()
 
922
        self.configuration.load(["--bus", "system",
 
923
                                 "--data-path", self.data_path,
 
924
                                 "--log-dir", self.log_dir])
 
925
 
 
926
    def test_start_service_uses_right_bus(self):
 
927
        service = WatchDogService(self.configuration)
 
928
        self.assertEquals(type(service.bus), dbus.SystemBus)
 
929
 
 
930
    def test_start_service_exits_when_already_running(self):
 
931
        self.log_helper.ignore_errors("ERROR: program-name is already running")
 
932
        class StubDaemon(object):
 
933
            program = "program-name"
 
934
 
 
935
        bootstrap_list_mock = self.mocker.patch(bootstrap_list)
 
936
        bootstrap_list_mock.bootstrap(data_path=self.data_path,
 
937
                                      log_dir=self.log_dir)
 
938
 
 
939
        service = WatchDogService(self.configuration)
 
940
 
 
941
        self.mocker.order()
 
942
 
 
943
        watchdog_mock = self.mocker.replace(service.watchdog)
 
944
        watchdog_mock.start()
 
945
        deferred = fail(AlreadyRunningError(StubDaemon()))
 
946
        self.mocker.result(deferred)
 
947
 
 
948
        os_mock = self.mocker.replace("os")
 
949
        os_mock._exit(1)
 
950
 
 
951
        self.mocker.replay()
 
952
 
 
953
        return service.startService()
 
954
 
 
955
    def test_start_service_exits_when_unknown_errors_occur(self):
 
956
        self.log_helper.ignore_errors("UNKNOWN ERROR: I'm an unknown error!")
 
957
        service = WatchDogService(self.configuration)
 
958
 
 
959
        bootstrap_list_mock = self.mocker.patch(bootstrap_list)
 
960
        bootstrap_list_mock.bootstrap(data_path=self.data_path,
 
961
                                      log_dir=self.log_dir)
 
962
 
 
963
        self.mocker.order()
 
964
 
 
965
        watchdog_mock = self.mocker.replace(service.watchdog)
 
966
        watchdog_mock.start()
 
967
        deferred = fail(AttributeError("I'm an unknown error!"))
 
968
        self.mocker.result(deferred)
 
969
 
 
970
        os_mock = self.mocker.replace("os")
 
971
        os_mock._exit(1)
 
972
 
 
973
        self.mocker.replay()
 
974
 
 
975
        return service.startService()
 
976
 
 
977
    def test_bootstrap(self):
 
978
 
 
979
        data_path = self.makeDir()
 
980
        log_dir = self.makeDir()
 
981
 
 
982
        def path(*suffix):
 
983
            return os.path.join(data_path, *suffix)
 
984
 
 
985
        getuid = self.mocker.replace("os.getuid")
 
986
        getuid()
 
987
        self.mocker.result(0)
 
988
        self.mocker.count(1, None)
 
989
 
 
990
        getpwnam = self.mocker.replace("pwd.getpwnam")
 
991
        value = getpwnam("landscape")
 
992
        self.mocker.count(1, None)
 
993
        value.pw_uid
 
994
        self.mocker.result(1234)
 
995
        self.mocker.count(1, None)
 
996
 
 
997
        getgrnam = self.mocker.replace("grp.getgrnam")
 
998
        value = getgrnam("root")
 
999
        self.mocker.count(1, None)
 
1000
        value.gr_gid
 
1001
        self.mocker.result(5678)
 
1002
        self.mocker.count(1, None)
 
1003
 
 
1004
        chown = self.mocker.replace("os.chown")
 
1005
        chown(path(), 1234, 5678)
 
1006
        chown(path("messages"), 1234, 5678)
 
1007
        chown(path("package"), 1234, 5678)
 
1008
        chown(path("package/database"), 1234, 5678)
 
1009
        chown(log_dir, 1234, 5678)
 
1010
 
 
1011
        self.mocker.replay()
 
1012
 
 
1013
        bootstrap_list.bootstrap(data_path=data_path,
 
1014
                                 log_dir=log_dir)
 
1015
 
 
1016
        self.assertTrue(os.path.isdir(path()))
 
1017
        self.assertTrue(os.path.isdir(path("package")))
 
1018
        self.assertTrue(os.path.isdir(log_dir))
 
1019
        self.assertTrue(os.path.isfile(path("package/database")))
 
1020
 
 
1021
        def mode(*suffix):
 
1022
            return stat.S_IMODE(os.stat(path(*suffix)).st_mode)
 
1023
 
 
1024
        self.assertEquals(mode(), 0755)
 
1025
        self.assertEquals(mode("messages"), 0755)
 
1026
        self.assertEquals(mode("package"), 0755)
 
1027
        self.assertEquals(mode("package/database"), 0644)
 
1028
 
 
1029
    def test_log_notification(self):
 
1030
        """
 
1031
        SIGUSR1 should cause logs to be reopened.
 
1032
        """
 
1033
        logging.getLogger().addHandler(logging.FileHandler(self.make_path()))
 
1034
        service = WatchDogService(self.configuration)
 
1035
        # We expect the Watchdog to delegate to each of the sub-processes
 
1036
        daemon_mock = self.mocker.patch(Daemon)
 
1037
        daemon_mock.rotate_logs()
 
1038
        self.mocker.count(3)
 
1039
        self.mocker.replay()
 
1040
 
 
1041
        # Store the initial set of handlers
 
1042
        original_streams = [handler.stream for handler in
 
1043
                            logging.getLogger().handlers if
 
1044
                            isinstance(handler, logging.FileHandler)]
 
1045
 
 
1046
        # We fire the signal
 
1047
        os.kill(os.getpid(), signal.SIGUSR1)
 
1048
        new_streams = [handler.stream for handler in
 
1049
                       logging.getLogger().handlers if
 
1050
                       isinstance(handler, logging.FileHandler)]
 
1051
 
 
1052
        for stream in new_streams:
 
1053
            self.assertTrue(stream not in original_streams)
 
1054
 
 
1055
 
 
1056
STUB_BROKER = """\
 
1057
#!%(executable)s
 
1058
import sys
 
1059
 
 
1060
import warnings
 
1061
warnings.filterwarnings("ignore", "Python C API version mismatch",
 
1062
                        RuntimeWarning)
 
1063
 
 
1064
from dbus import SessionBus
 
1065
import dbus.glib
 
1066
 
 
1067
from twisted.internet.glib2reactor import install
 
1068
install()
 
1069
from twisted.internet import reactor
 
1070
 
 
1071
sys.path = %(path)r
 
1072
 
 
1073
from landscape.lib.dbus_util import Object, method
 
1074
 
 
1075
 
 
1076
class StubBroker(Object):
 
1077
    bus_name = %(bus_name)r
 
1078
    object_path = %(object_path)r
 
1079
 
 
1080
    @method(%(iface_name)r)
 
1081
    def exit(self):
 
1082
        file = open(%(output_filename)r, "w")
 
1083
        file.write("CALLED")
 
1084
        file.close()
 
1085
        reactor.callLater(1, reactor.stop)
 
1086
 
 
1087
stub_broker = StubBroker(SessionBus())
 
1088
 
 
1089
reactor.run()
 
1090
"""