~free.ekanayaka/landscape-client/lucid-1.5.4-0ubuntu0.10.04.0

« back to all changes in this revision

Viewing changes to landscape/monitor/tests/test_deployment.py

  • Committer: Bazaar Package Importer
  • Author(s): Free Ekanayaka
  • Date: 2010-06-28 18:07:18 UTC
  • mfrom: (1.2.5 upstream)
  • Revision ID: james.westby@ubuntu.com-20100628180718-vytyqgbtkiirv5sb
Tags: 1.5.2.1-0ubuntu0.10.04.0
Filter duplicate network interfaces in get_active_interfaces (LP: #597000)

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
from landscape.lib.persist import Persist
2
 
from landscape.reactor import FakeReactor
3
 
from landscape.monitor.computerinfo import ComputerInfo
4
 
from landscape.monitor.loadaverage import LoadAverage
5
 
from landscape.monitor.deployment import MonitorService, MonitorConfiguration
6
 
from landscape.tests.helpers import (
7
 
    LandscapeTest, LandscapeIsolatedTest, RemoteBrokerHelper)
8
 
from landscape.broker.tests.test_remote import assertTransmitterActive
9
 
from landscape.tests.test_plugin import assertReceivesMessages
10
 
 
11
 
from twisted.internet.defer import Deferred
12
 
 
13
 
 
14
 
class DeploymentTest(LandscapeTest):
15
 
 
16
 
    def test_get_plugins(self):
17
 
        configuration = MonitorConfiguration()
18
 
        configuration.load(["--monitor-plugins", "ComputerInfo, LoadAverage",
19
 
                            "-d", self.makeFile()])
20
 
        monitor = MonitorService(configuration)
21
 
        plugins = monitor.plugins
22
 
        self.assertEquals(len(plugins), 2)
23
 
        self.assertTrue(isinstance(plugins[0], ComputerInfo))
24
 
        self.assertTrue(isinstance(plugins[1], LoadAverage))
25
 
 
26
 
    def test_get_all_plugins(self):
27
 
        configuration = MonitorConfiguration()
28
 
        configuration.load(["--monitor-plugins", "ALL",
29
 
                            "-d", self.makeFile()])
30
 
        monitor = MonitorService(configuration)
31
 
        plugins = monitor.plugins
32
 
        self.assertEquals(len(plugins), 12)
33
 
 
34
 
 
35
 
class MonitorServiceTest(LandscapeIsolatedTest):
36
 
 
37
 
    helpers = [RemoteBrokerHelper]
38
 
 
39
 
    def setUp(self):
40
 
        super(MonitorServiceTest, self).setUp()
41
 
        configuration = MonitorConfiguration()
42
 
        configuration.load(["-d", self.broker_service.config.data_path,
43
 
                            "--bus", "session"])
44
 
        self.monitor = MonitorService(configuration)
45
 
        self.monitor.reactor = FakeReactor()
46
 
        self.monitor.startService()
47
 
 
48
 
    def tearDown(self):
49
 
        super(MonitorServiceTest, self).tearDown()
50
 
        self.monitor.stopService()
51
 
 
52
 
 
53
 
class DeploymentBusTest(MonitorServiceTest):
54
 
 
55
 
    def test_dbus_reactor_transmitter_installed(self):
56
 
        # Set the config to monitor only, because assertTransmitterActive
57
 
        # fires a resynchronize event and the monitor's callback registered
58
 
        # for it would try to get a DBus object published by the manager.
59
 
        self.monitor.registry.config.monitor_only = True
60
 
        return assertTransmitterActive(self, self.broker_service,
61
 
                                       self.monitor.reactor)
62
 
 
63
 
    def test_receives_messages(self):
64
 
        return assertReceivesMessages(self, self.monitor.dbus_service,
65
 
                                      self.broker_service, self.remote)
66
 
 
67
 
    def test_register_plugin_on_broker_started(self):
68
 
        """
69
 
        When the broker is restarted, it fires a "broker-started" signal which
70
 
        makes the Monitor plugin register itself again.
71
 
        """
72
 
        d = Deferred()
73
 
 
74
 
        def register_plugin(bus_name, object_path):
75
 
            d.callback((bus_name, object_path))
76
 
 
77
 
        def patch(ignore):
78
 
            self.monitor.remote_broker.register_plugin = register_plugin
79
 
            self.broker_service.dbus_object.broker_started()
80
 
            return d
81
 
 
82
 
        return self.remote.get_registered_plugins(
83
 
            ).addCallback(patch
84
 
            ).addCallback(self.assertEquals,
85
 
                ("com.canonical.landscape.Monitor",
86
 
                 "/com/canonical/landscape/Monitor"))
87
 
 
88
 
    def test_register_message_on_broker_started(self):
89
 
        """
90
 
        When the broker is restarted, it fires a "broker-started" signal which
91
 
        makes the Monitor plugin register all registered messages again.
92
 
        """
93
 
        self.monitor.registry.register_message("foo", lambda x: None)
94
 
        d = Deferred()
95
 
 
96
 
        def register_client_accepted_message_type(type):
97
 
            if type == "foo":
98
 
                d.callback(type)
99
 
 
100
 
        def patch(ignore):
101
 
            self.monitor.remote_broker.register_client_accepted_message_type = \
102
 
                register_client_accepted_message_type
103
 
            self.broker_service.dbus_object.broker_started()
104
 
            return d
105
 
 
106
 
        return self.remote.get_registered_plugins(
107
 
            ).addCallback(patch
108
 
            ).addCallback(self.assertEquals, "foo")
109
 
 
110
 
 
111
 
class MonitorTest(MonitorServiceTest):
112
 
 
113
 
    def test_monitor_flushes_on_flush_event(self):
114
 
        """L{MonitorService.flush} saves the persist."""
115
 
        self.monitor.registry.persist.set("a", 1)
116
 
        self.monitor.registry.flush()
117
 
 
118
 
        persist = Persist()
119
 
        persist.load(self.monitor.registry.persist_filename)
120
 
        self.assertEquals(persist.get("a"), 1)
121
 
 
122
 
    def test_monitor_flushes_on_flush_interval(self):
123
 
        """
124
 
        The monitor is flushed every C{flush_interval} seconds, after
125
 
        the monitor service is started.
126
 
        """
127
 
        self.monitor.registry.persist.set("a", 1)
128
 
        self.monitor.reactor.advance(self.monitor.config.flush_interval)
129
 
 
130
 
        persist = Persist()
131
 
        persist.load(self.monitor.registry.persist_filename)
132
 
        self.assertEquals(persist.get("a"), 1)
133
 
 
134
 
    def test_monitor_flushes_on_service_stop(self):
135
 
        """The monitor is flushed when the service stops."""
136
 
        self.monitor.registry.persist.set("a", 1)
137
 
        self.monitor.stopService()
138
 
 
139
 
        persist = Persist()
140
 
        persist.load(self.monitor.registry.persist_filename)
141
 
        self.assertEquals(persist.get("a"), 1)
142
 
 
143
 
    def test_monitor_stops_flushing_after_service_stops(self):
144
 
        """The recurring flush event is cancelled when the service stops."""
145
 
        self.monitor.stopService()
146
 
        self.monitor.registry.persist.set("a", 1)
147
 
        self.monitor.reactor.advance(self.monitor.config.flush_interval)
148
 
 
149
 
        persist = Persist()
150
 
        persist.load(self.monitor.registry.persist_filename)
151
 
        self.assertEquals(persist.get("a"), None)