34
32
The L{Monitor.exchange} method flushes the monitor after
35
33
C{exchange} on all plugins has been called.
37
class SamplePlugin(ExchangePlugin):
39
self.monitor.persist.set("a", 1)
40
super(SamplePlugin, myself).exchange()
42
self.monitor.add(SamplePlugin())
35
plugin = BrokerClientPlugin()
36
plugin.exchange = lambda: self.monitor.persist.set("a", 1)
37
self.monitor.add(plugin)
43
38
self.monitor.exchange()
45
40
persist = Persist()
46
41
persist.load(self.monitor.persist_filename)
47
42
self.assertEquals(persist.get("a"), 1)
44
def test_flush_every_flush_interval(self):
46
The L{Monitor.flush} method gets called every C{flush_interval}
47
seconds, and perists data to the disk.
49
self.monitor.persist.save = self.mocker.mock()
50
self.monitor.persist.save(self.monitor.persist_filename)
53
self.reactor.advance(self.config.flush_interval * 3)
49
55
def test_creating_loads_persist(self):
57
If C{persist_filename} exists, it is loaded by the constructor.
50
59
filename = self.makeFile()
52
61
persist = Persist()
53
62
persist.set("a", "Hi there!")
54
63
persist.save(filename)
56
manager = MonitorPluginRegistry(self.remote, self.reactor,
57
self.broker_service.config,
60
persist_filename=filename)
61
self.assertEquals(manager.persist.get("a"), "Hi there!")
64
class MonitorDBusObjectTest(LandscapeIsolatedTest):
65
"""Tests that use a monitor with a real DBUS service."""
67
helpers = [RemoteBrokerHelper]
70
super(MonitorDBusObjectTest, self).setUp()
72
self.monitor = MonitorPluginRegistry(self.remote,
73
self.broker_service.reactor,
74
self.broker_service.config,
75
self.broker_service.bus,
77
self.dbus_service = MonitorDBusObject(self.broker_service.bus,
79
self.service = get_object(self.broker_service.bus,
80
MonitorDBusObject.bus_name,
81
MonitorDBusObject.object_path)
84
result = self.service.ping()
85
def got_result(result):
86
self.assertEquals(result, True)
87
return result.addCallback(got_result)
92
reactor = self.mocker.replace("twisted.internet.reactor")
94
self.expect(reactor.stop()).call(lambda: result.callback(None))
102
class StubPluginUsingPlugin(MonitorPlugin):
111
class StubPluginRunIntervalNone(StubPluginUsingPlugin):
115
def register(self, manager):
116
super(StubPluginRunIntervalNone, self).register(manager)
117
manager.reactor.call_on("foo", self.callee)
123
class StubPluginRespondingToChangedAcceptedTypes(StubPluginUsingPlugin):
128
def register(self, manager):
129
super(StubPluginRespondingToChangedAcceptedTypes,
130
self).register(manager)
131
self.call_on_accepted("some-type", self.exchange, True, param=10)
133
def exchange(self, *args, **kwargs):
134
self.called.append((args, kwargs))
137
class PluginTest(LandscapeTest):
139
helpers = [MonitorHelper]
141
def test_without_persist_name(self):
142
plugin = StubPluginUsingPlugin()
143
patched_reactor = self.mocker.patch(self.reactor)
144
patched_reactor.call_every(5, plugin.run)
146
plugin.register(self.monitor)
147
self.assertFalse(hasattr(plugin, "_persist"))
149
def test_with_persist_name(self):
151
When plugins providea C{persist_name} attribute, they get a persist
152
object set at C{_persist} which is rooted at the name specified.
154
plugin = StubPluginUsingPlugin()
155
plugin.persist_name = "wubble"
156
plugin.register(self.monitor)
157
self.assertTrue(hasattr(plugin, "_persist"))
158
plugin._persist.set("hi", "there")
159
self.assertEquals(self.monitor.persist.get("wubble"), {"hi": "there"})
161
def test_with_no_run_interval(self):
162
plugin = StubPluginRunIntervalNone()
163
patched_reactor = self.mocker.patch(self.reactor)
165
# It *shouldn't* schedule run.
166
patched_reactor.call_every(5, plugin.run)
169
patched_reactor.call_on("foo", plugin.callee)
171
plugin.register(self.monitor)
173
def test_call_on_accepted(self):
175
L{MonitorPlugin}-based plugins can provide a callable to call
176
when a message type becomes accepted.
178
plugin = StubPluginRespondingToChangedAcceptedTypes()
179
plugin.register(self.monitor)
180
self.broker_service.reactor.fire(("message-type-acceptance-changed",
182
self.assertEquals(plugin.called, [((True,), {"param": 10})])
184
def test_call_on_accepted_when_unaccepted(self):
186
Notifications are only dispatched to plugins when types become
187
accepted, not when they become unaccepted.
189
plugin = StubPluginRespondingToChangedAcceptedTypes()
190
plugin.register(self.monitor)
191
self.broker_service.reactor.fire(("message-type-acceptance-changed",
193
self.assertEquals(plugin.called, [])
196
class StubDataWatchingPlugin(DataWatcher):
198
persist_name = "ooga"
199
message_type = "wubble"
200
message_key = "wubblestuff"
202
def __init__(self, data=None):
209
class DataWatcherTest(LandscapeTest):
211
helpers = [MonitorHelper, LogKeeperHelper]
214
LandscapeTest.setUp(self)
215
self.plugin = StubDataWatchingPlugin(1)
216
self.plugin.register(self.monitor)
217
self.mstore.add_schema(Message("wubble", {"wubblestuff": Int()}))
219
def test_get_message(self):
220
self.assertEquals(self.plugin.get_message(),
221
{"type": "wubble", "wubblestuff": 1})
223
def test_get_message_unchanging(self):
224
self.assertEquals(self.plugin.get_message(),
225
{"type": "wubble", "wubblestuff": 1})
226
self.assertEquals(self.plugin.get_message(), None)
228
def test_basic_exchange(self):
229
# Is this really want we want to do?
230
self.mstore.set_accepted_types(["wubble"])
231
self.plugin.exchange()
232
messages = self.mstore.get_pending_messages()
233
self.assertEquals(messages[0]["type"], "wubble")
234
self.assertEquals(messages[0]["wubblestuff"], 1)
235
self.assertIn("Queueing a message with updated data watcher info for "
236
"landscape.monitor.tests.test_monitor.StubDataWatching"
237
"Plugin.", self.logfile.getvalue())
239
def test_unchanging_value(self):
240
# Is this really want we want to do?
241
self.mstore.set_accepted_types(["wubble"])
242
self.plugin.exchange()
243
self.plugin.exchange()
244
messages = self.mstore.get_pending_messages()
245
self.assertEquals(len(messages), 1)
247
def test_urgent_exchange(self):
249
When exchange is called with an urgent argument set to True
250
make sure it sends the message urgently.
252
remote_broker_mock = self.mocker.replace(self.remote)
253
remote_broker_mock.send_message(ANY, urgent=True)
254
self.mocker.result(succeed(None))
257
self.mstore.set_accepted_types(["wubble"])
258
self.plugin.exchange(True)
260
def test_no_message_if_not_accepted(self):
262
Don't add any messages at all if the broker isn't currently
263
accepting their type.
265
self.mstore.set_accepted_types([])
266
self.reactor.advance(self.monitor.step_size * 2)
267
self.monitor.exchange()
269
self.mstore.set_accepted_types(["wubble"])
270
self.assertMessages(list(self.mstore.get_pending_messages()), [])
65
monitor = Monitor(self.reactor, self.config, persist=Persist(),
66
persist_filename=filename)
67
self.assertEquals(monitor.persist.get("a"), "Hi there!")