1
from twisted.internet import reactor
2
from twisted.internet.defer import Deferred
4
from landscape.lib.twisted_util import gather_results
5
from landscape.tests.helpers import LandscapeTest, DEFAULT_ACCEPTED_TYPES
6
from landscape.broker.tests.helpers import BrokerClientHelper
7
from landscape.broker.client import BrokerClientPlugin, HandlerNotFoundError
10
class BrokerClientTest(LandscapeTest):
12
helpers = [BrokerClientHelper]
16
The L{BrokerClient.ping} method always returns C{True}.
18
self.assertTrue(self.client.ping())
22
The L{BrokerClient.add} method registers a new plugin
23
plugin, and calls the plugin's C{register} method.
25
plugin = BrokerClientPlugin()
26
self.client.add(plugin)
27
self.assertIs(plugin.client, self.client)
29
def test_get_plugins(self):
31
The L{BrokerClient.get_plugins} method returns a list
32
of registered plugins.
34
plugins = [BrokerClientPlugin(), BrokerClientPlugin()]
35
self.client.add(plugins[0])
36
self.client.add(plugins[1])
37
self.assertEquals(self.client.get_plugins(), plugins)
39
def test_get_plugins_returns_a_copy(self):
41
The L{BrokerClient.get_plugins} method returns a copy of the list
42
of registered plugins, so user can't can't modify our internals.
44
plugins = self.client.get_plugins()
45
plugins.append(BrokerClientPlugin())
46
self.assertEquals(self.client.get_plugins(), [])
48
def test_get_named_plugin(self):
50
If a plugin has a C{plugin_name} attribute, it is possible to look it
51
up by name after adding it to the L{BrokerClient}.
53
plugin = BrokerClientPlugin()
54
plugin.plugin_name = "foo"
55
self.client.add(plugin)
56
self.assertEquals(self.client.get_plugin("foo"), plugin)
58
def test_run_interval(self):
60
If a plugin has a C{run} method, the reactor will call it every
61
C{run_interval} seconds.
63
plugin = BrokerClientPlugin()
64
plugin.run = self.mocker.mock()
65
self.expect(plugin.run()).count(2)
67
self.client.add(plugin)
68
self.client_reactor.advance(plugin.run_interval)
69
self.client_reactor.advance(plugin.run_interval)
71
def test_run_immediately(self):
73
If a plugin has a C{run} method and C{run_immediately} is C{True},
74
the plugin will be run immediately at registration.
76
plugin = BrokerClientPlugin()
77
plugin.run = self.mocker.mock()
78
plugin.run_immediately = True
79
self.expect(plugin.run()).count(1)
81
self.client.add(plugin)
83
def test_register_message(self):
85
When L{BrokerClient.register_message} is called, the broker is notified
86
that the message type is now accepted.
88
result1 = self.client.register_message("foo", lambda m: None)
89
result2 = self.client.register_message("bar", lambda m: None)
91
def got_result(result):
93
self.exchanger.get_client_accepted_message_types(),
94
sorted(["bar", "foo"] + DEFAULT_ACCEPTED_TYPES))
96
return gather_results([result1, result2]).addCallback(got_result)
98
def test_dispatch_message(self):
100
L{BrokerClient.dispatch_message} calls a previously-registered message
101
handler and return its value.
103
message = {"type": "foo"}
104
handle_message = self.mocker.mock()
105
self.expect(handle_message(message)).result(123)
108
def dispatch_message(result):
109
self.assertEquals(self.client.dispatch_message(message), 123)
111
result = self.client.register_message("foo", handle_message)
112
return result.addCallback(dispatch_message)
114
def test_dispatch_message_with_exception(self):
116
L{BrokerClient.dispatch_message} gracefully logs exceptions raised
119
message = {"type": "foo"}
120
handle_message = self.mocker.mock()
121
self.expect(handle_message(message)).throw(ZeroDivisionError)
124
self.log_helper.ignore_errors("Error running message handler.*")
126
def dispatch_message(result):
127
self.assertIs(self.client.dispatch_message(message), None)
128
self.assertTrue("Error running message handler for type 'foo'" in
129
self.logfile.getvalue())
131
result = self.client.register_message("foo", handle_message)
132
return result.addCallback(dispatch_message)
134
def test_dispatch_message_with_no_handler(self):
136
L{BrokerClient.dispatch_message} raises an error if no handler was
137
found for the given message.
139
error = self.assertRaises(HandlerNotFoundError,
140
self.client.dispatch_message, {"type": "x"})
141
self.assertEquals(str(error), "x")
143
def test_message(self):
145
The L{BrokerClient.message} method dispatches a message and
146
returns C{True} if an handler for it was found.
148
message = {"type": "foo"}
150
handle_message = self.mocker.mock()
151
handle_message(message)
154
def dispatch_message(result):
155
self.assertEquals(self.client.message(message), True)
157
result = self.client.register_message("foo", handle_message)
158
return result.addCallback(dispatch_message)
160
def test_message_with_no_handler(self):
162
The L{BrokerClient.message} method returns C{False} if no
165
message = {"type": "foo"}
166
self.assertEquals(self.client.message(message), False)
168
def test_exchange(self):
170
The L{BrokerClient.exchange} method calls C{exchange} on all
171
plugins, if available.
173
plugin = BrokerClientPlugin()
174
plugin.exchange = self.mocker.mock()
177
self.client.add(plugin)
178
self.client.exchange()
180
def test_exchange_on_plugin_without_exchange_method(self):
182
The L{BrokerClient.exchange} method ignores plugins without
183
an C{exchange} method.
185
plugin = BrokerClientPlugin()
186
self.assertFalse(hasattr(plugin, "exchange"))
187
self.client.exchange()
189
def test_exchange_logs_errors_and_continues(self):
191
If the L{exchange} method of a registered plugin fails, the error is
192
logged and other plugins are processed.
194
self.log_helper.ignore_errors(ZeroDivisionError)
195
plugin1 = BrokerClientPlugin()
196
plugin2 = BrokerClientPlugin()
197
plugin1.exchange = self.mocker.mock()
198
plugin2.exchange = self.mocker.mock()
199
self.expect(plugin1.exchange()).throw(ZeroDivisionError)
202
self.client.add(plugin1)
203
self.client.add(plugin2)
204
self.client.exchange()
205
self.assertTrue("Error during plugin exchange" in
206
self.logfile.getvalue())
207
self.assertTrue("ZeroDivisionError" in self.logfile.getvalue())
209
def test_notify_exchange(self):
211
The L{BrokerClient.notify_exchange} method is triggered by an
212
impending-exchange event and calls C{exchange} on all plugins,
215
plugin = BrokerClientPlugin()
216
plugin.exchange = self.mocker.mock()
219
self.client.add(plugin)
220
self.client_reactor.fire("impending-exchange")
221
self.assertTrue("Got notification of impending exchange. "
222
"Notifying all plugins." in self.logfile.getvalue())
224
def test_fire_event(self):
226
The L{BrokerClient.fire_event} method makes the reactor fire the
229
callback = self.mocker.mock()
232
self.client_reactor.call_on("event", callback)
233
self.client.fire_event("event")
235
def test_fire_event_with_arguments(self):
237
The L{BrokerClient.fire_event} accepts optional arguments and keyword
238
arguments to pass to the registered callback.
240
callback = self.mocker.mock()
241
callback(True, kwarg=2)
243
self.client_reactor.call_on("event", callback)
244
self.client.fire_event("event", True, kwarg=2)
246
def test_fire_event_with_mixed_results(self):
248
The return values of the fired handlers can be part L{Deferred}s
251
deferred = Deferred()
252
callback1 = self.mocker.mock()
253
callback2 = self.mocker.mock()
254
self.expect(callback1()).result(123)
255
self.expect(callback2()).result(deferred)
257
self.client_reactor.call_on("event", callback1)
258
self.client_reactor.call_on("event", callback2)
259
result = self.client.fire_event("event")
260
reactor.callLater(0, lambda: deferred.callback("abc"))
261
return self.assertSuccess(result, [123, "abc"])
263
def test_fire_event_with_acceptance_changed(self):
265
When the given event type is C{message-type-acceptance-changed}, the
266
fired event will be a 2-tuple of the eventy type and the message type.
268
event_type = "message-type-acceptance-changed"
269
callback = self.mocker.mock()
272
self.client_reactor.call_on((event_type, "test"), callback)
273
self.client.fire_event(event_type, "test", False)
275
def test_handle_reconnect(self):
277
The L{BrokerClient.handle_reconnect} method is triggered by a
278
broker-reconnect event, and it causes any message types previously
279
registered with the broker to be registered again.
281
result1 = self.client.register_message("foo", lambda m: None)
282
result2 = self.client.register_message("bar", lambda m: None)
284
def got_result(result):
285
self.client.broker = self.mocker.mock()
286
self.client.broker.register_client_accepted_message_type("foo")
287
self.client.broker.register_client_accepted_message_type("bar")
288
self.client.broker.register_client("client")
290
self.client_reactor.fire("broker-reconnect")
292
return gather_results([result1, result2]).addCallback(got_result)
296
The L{BrokerClient.exit} method causes the reactor to be stopped.
298
self.client.reactor.stop = self.mocker.mock()
299
self.client.reactor.stop()
302
self.client.reactor.advance(0.1)