~ack/landscape-client/sources.list-preserve-old-permissions

« back to all changes in this revision

Viewing changes to landscape/monitor/tests/test_deployment.py

  • Committer: Christopher Armstrong
  • Date: 2008-06-10 10:56:01 UTC
  • Revision ID: radix@twistedmatrix.com-20080610105601-l9qfvqjf88e7j8b6
Import landscape-client into public branch

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
from landscape.lib.persist import Persist
 
2
from landscape.reactor import FakeReactor
 
3
from landscape.monitor.computerinfo import ComputerInfo
 
4
from landscape.monitor.loadaverage import LoadAverage
 
5
from landscape.monitor.deployment import MonitorService, MonitorConfiguration
 
6
from landscape.tests.helpers import (
 
7
    LandscapeTest, LandscapeIsolatedTest, RemoteBrokerHelper)
 
8
from landscape.broker.tests.test_remote import assertTransmitterActive
 
9
from landscape.tests.test_plugin import assertReceivesMessages
 
10
 
 
11
 
 
12
class DeploymentTest(LandscapeTest):
 
13
 
 
14
    def test_get_plugins(self):
 
15
        configuration = MonitorConfiguration()
 
16
        configuration.load(["--monitor-plugins", "ComputerInfo, LoadAverage",
 
17
                            "-d", self.make_path()])
 
18
        monitor = MonitorService(configuration)
 
19
        plugins = monitor.plugins
 
20
        self.assertEquals(len(plugins), 2)
 
21
        self.assertTrue(isinstance(plugins[0], ComputerInfo))
 
22
        self.assertTrue(isinstance(plugins[1], LoadAverage))
 
23
 
 
24
    def test_get_all_plugins(self):
 
25
        configuration = MonitorConfiguration()
 
26
        configuration.load(["--monitor-plugins", "ALL",
 
27
                            "-d", self.make_path()])
 
28
        monitor = MonitorService(configuration)
 
29
        plugins = monitor.plugins
 
30
        self.assertEquals(len(plugins), 10)
 
31
 
 
32
 
 
33
class MonitorServiceTest(LandscapeIsolatedTest):
 
34
 
 
35
    helpers = [RemoteBrokerHelper]
 
36
 
 
37
    def setUp(self):
 
38
        super(MonitorServiceTest, self).setUp()
 
39
        configuration = MonitorConfiguration()
 
40
        configuration.load(["-d", self.broker_service.config.data_path,
 
41
                            "--bus", "session"])
 
42
        self.monitor = MonitorService(configuration)
 
43
        self.monitor.reactor = FakeReactor()
 
44
        self.monitor.startService()
 
45
 
 
46
    def tearDown(self):
 
47
        super(MonitorServiceTest, self).tearDown()
 
48
        self.monitor.stopService()
 
49
 
 
50
 
 
51
class DeploymentBusTest(MonitorServiceTest):
 
52
 
 
53
    def test_dbus_reactor_transmitter_installed(self):
 
54
        return assertTransmitterActive(self, self.broker_service,
 
55
                                       self.monitor.reactor)
 
56
 
 
57
    def test_receives_messages(self):
 
58
        return assertReceivesMessages(self, self.monitor.dbus_service,
 
59
                                      self.broker_service, self.remote)
 
60
 
 
61
 
 
62
class MonitorTest(MonitorServiceTest):
 
63
 
 
64
    def test_monitor_flushes_on_flush_event(self):
 
65
        """L{MonitorService.flush} saves the persist."""
 
66
        self.monitor.registry.persist.set("a", 1)
 
67
        self.monitor.registry.flush()
 
68
 
 
69
        persist = Persist()
 
70
        persist.load(self.monitor.registry.persist_filename)
 
71
        self.assertEquals(persist.get("a"), 1)
 
72
 
 
73
    def test_monitor_flushes_on_flush_interval(self):
 
74
        """
 
75
        The monitor is flushed every C{flush_interval} seconds, after
 
76
        the monitor service is started.
 
77
        """
 
78
        self.monitor.registry.persist.set("a", 1)
 
79
        self.monitor.reactor.advance(self.monitor.config.flush_interval)
 
80
 
 
81
        persist = Persist()
 
82
        persist.load(self.monitor.registry.persist_filename)
 
83
        self.assertEquals(persist.get("a"), 1)
 
84
 
 
85
    def test_monitor_flushes_on_service_stop(self):
 
86
        """The monitor is flushed when the service stops."""
 
87
        self.monitor.registry.persist.set("a", 1)
 
88
        self.monitor.stopService()
 
89
 
 
90
        persist = Persist()
 
91
        persist.load(self.monitor.registry.persist_filename)
 
92
        self.assertEquals(persist.get("a"), 1)
 
93
 
 
94
    def test_monitor_stops_flushing_after_service_stops(self):
 
95
        """The recurring flush event is cancelled when the service stops."""
 
96
        self.monitor.stopService()
 
97
        self.monitor.registry.persist.set("a", 1)
 
98
        self.monitor.reactor.advance(self.monitor.config.flush_interval)
 
99
 
 
100
        persist = Persist()
 
101
        persist.load(self.monitor.registry.persist_filename)
 
102
        self.assertEquals(persist.get("a"), None)