1
from dbus.service import signal
3
from twisted.internet.defer import Deferred
5
from landscape.schema import Message
6
from landscape.broker.broker import IFACE_NAME
7
from landscape.tests.helpers import (
8
LandscapeIsolatedTest, RemoteBrokerHelper, LandscapeTest)
9
from landscape.lib.bpickle import dumps, loads
10
from landscape.lib.dbus_util import (Object, method,
11
byte_array, array_to_string)
12
from landscape.lib.twisted_util import gather_results
13
from landscape.manager.manager import FAILED
16
class SampleSignalReceiver(object):
18
def __init__(self, remote_exchange, bus):
20
self.signal_waiters = {}
22
def got_signal(self, name, *data):
23
if name in self.signal_waiters:
24
self.signal_waiters[name].callback(data)
26
def wait_for_signal(self, name):
27
handler = lambda *args: self.got_signal(name, *args)
28
self.bus.add_signal_receiver(handler, name)
29
self.signal_waiters[name] = Deferred()
30
return self.signal_waiters[name]
34
class BrokerDBusObjectTest(LandscapeIsolatedTest):
36
helpers = [RemoteBrokerHelper]
39
super(BrokerDBusObjectTest, self).setUp()
40
self.broker_service.message_store.set_accepted_types(["test"])
41
self.broker_service.message_store.add_schema(Message("test", {}))
45
The broker can be pinged over DBUS to see if it is alive.
47
return self.remote_service.ping().addCallback(self.assertEquals, True)
49
def test_send_message(self):
51
The L{BrokerDBusObject} should expose a remote
52
C{send_message} method which adds a given message to the
55
result = self.remote_service.send_message(
56
byte_array(dumps({"type": "test"})), dbus_interface=IFACE_NAME)
57
def got_result(message_id):
58
service = self.broker_service
59
self.assertTrue(service.message_store.is_pending(message_id))
60
messages = service.message_store.get_pending_messages()
61
self.assertEquals(len(messages), 1)
62
self.assertMessage(messages[0], {"type": "test"})
63
result.addCallback(got_result)
66
def test_send_urgent_message(self):
68
The C{send_message} method should take a flag indicating that
69
the client should be put into urgent exchange mode.
71
result = self.remote_service.send_message(
72
byte_array(dumps({"type": "test"})), True,
73
dbus_interface=IFACE_NAME)
74
def got_result(result):
75
messages = self.broker_service.message_store.get_pending_messages()
76
self.assertEquals(len(messages), 1)
77
self.assertMessage(messages[0], {"type": "test"})
78
self.assertTrue(self.broker_service.exchanger.is_urgent())
79
result.addCallback(got_result)
82
def test_is_message_pending_true(self):
84
The L{BrokerDBusObject} should expose a remote
85
C{send_message} method which adds a given message to the
88
message_id = self.broker_service.message_store.add({"type": "test"})
89
result = self.remote_service.is_message_pending(message_id)
90
def got_result(is_pending):
91
self.assertEquals(is_pending, True)
92
return result.addCallback(got_result)
94
def test_is_message_pending_false(self):
96
The L{BrokerDBusObject} should expose a remote
97
C{send_message} method which adds a given message to the
100
message_id = self.broker_service.message_store.add({"type": "test"})
101
self.broker_service.message_store.add_pending_offset(1)
102
result = self.remote_service.is_message_pending(message_id)
103
def got_result(is_pending):
104
self.assertEquals(is_pending, False)
105
return result.addCallback(got_result)
107
def test_exchange_notification(self):
109
The BrokerDBusObject will broadcast a C{impending_exchange} signal
110
before exchanging, to give plugins a chance to send messages to get
111
into the next exchange. It does this by hooking in to the
112
C{impending-exchange} event.
114
plugin_service = SampleSignalReceiver(self.remote_service,
115
self.broker_service.bus)
116
result = plugin_service.wait_for_signal("impending_exchange")
117
self.broker_service.reactor.fire("impending-exchange")
118
# The typical failure case for this test is to hang until timeout :\
120
test_exchange_notification.timeout = 4
122
def test_exchange_failed_notification(self):
124
The BrokerService will broadcast a C{exchange_failed} signal
125
if the exchange fails.
127
plugin_service = SampleSignalReceiver(self.remote_service,
128
self.broker_service.bus)
129
result = plugin_service.wait_for_signal("exchange_failed")
130
self.broker_service.reactor.fire("exchange-failed")
131
# The typical failure case for this test is to hang until timeout :\
133
test_exchange_failed_notification.timeout = 4
135
def test_resynchronize_clients(self):
137
The exchange broadcasts the reactor event 'resynchronize-clients'; in
138
this case the BrokerDBusObject should broadcast a dbus signal
141
plugin_service = SampleSignalReceiver(self.remote_service,
142
self.broker_service.bus)
143
result = plugin_service.wait_for_signal("resynchronize")
144
self.broker_service.reactor.fire("resynchronize-clients")
145
# The typical failure case for this test is to hang until timeout :\
147
test_resynchronize_clients.timeout = 4
149
def test_broadcast_messages(self):
151
The DBus service calls the 'message' method on all registered plugins
152
when messages are received from the server. The message is passed as a
156
final_message = Deferred()
157
class MyService(Object):
158
bus_name = "my.service.name"
159
object_path = "/my/service/name"
161
def message(self, message):
162
final_message.callback(message)
164
my_service = MyService(self.broker_service.bus)
166
registration = self.remote.register_plugin(
167
"my.service.name", "/my/service/name")
169
def registered(result):
170
transport = self.broker_service.transport
171
transport.responses.append([{"type": "foobar", "value": 42}])
172
self.broker_service.exchanger.exchange()
173
registration.addCallback(registered)
176
message = array_to_string(message)
177
message = loads(message)
178
self.assertEquals(message,
179
{"type": "foobar", "value": 42})
181
final_message.addCallback(ready)
184
test_broadcast_messages.timeout = 4
186
def test_failed_operation_without_plugins(self):
188
When there are no broker plugins available to handle a message, an
189
operation-result message should be sent back to the server indicating a
192
self.log_helper.ignore_errors("Nobody handled the foobar message.")
193
self.broker_service.message_store.set_accepted_types(
194
["operation-result"])
195
result = self.broker_service.reactor.fire("message",
198
result = [result for result in result if result is not None][0]
199
class Startswith(object):
200
def __eq__(self, other):
201
return other.startswith(
202
"Landscape client failed to handle this request (foobar)")
203
def broadcasted(ignored):
205
self.broker_service.message_store.get_pending_messages(),
206
[{"type": "operation-result",
208
"result-text": Startswith(),
210
result.addCallback(broadcasted)
214
def test_failed_operation_with_plugins_not_handling(self):
216
When no broker plugins handle a message (i.e., they return False from
217
the message() call), an operation-result message should be sent back to
218
the server indicating a failure.
220
self.log_helper.ignore_errors("Nobody handled the foobar message.")
221
class MyService(Object):
222
bus_name = "my.service.name"
223
object_path = "/my/service/name"
225
def message(self, message):
229
self.broker_service.message_store.set_accepted_types(
230
["operation-result"])
232
my_service = MyService(self.broker_service.bus)
234
result = self.remote.register_plugin(
235
"my.service.name", "/my/service/name")
236
def registered(ignored):
237
result = self.broker_service.reactor.fire("message",
240
return [result for result in result if result is not None][0]
242
class Startswith(object):
243
def __eq__(self, other):
244
return other.startswith(
245
"Landscape client failed to handle this request (foobar)")
246
def broadcasted(ignored):
247
self.assertTrue(my_service.called)
249
self.broker_service.message_store.get_pending_messages(),
250
[{"type": "operation-result",
252
"result-text": Startswith(),
254
result.addCallback(registered)
255
result.addCallback(broadcasted)
259
def test_resynchronize_not_handled_by_plugins(self):
261
*resynchronize* operations are special, in that we know the broker
262
handles them in a special way. If none of the broker-plugins respond
263
to a resynchronize event, we should not send back a failure, because
264
the broker itself will respond to those.
266
self.broker_service.message_store.set_accepted_types(
267
["operation-result"])
268
result = self.broker_service.reactor.fire("message",
269
{"type": "resynchronize",
271
result = [result for result in result if result is not None][0]
272
def broadcasted(ignored):
274
self.broker_service.message_store.get_pending_messages(),
276
result.addCallback(broadcasted)
280
def test_register(self):
282
Remote parties can request a registration to be made with the server.
284
identity = self.broker_service.identity
286
def register_done(deferred_result):
287
self.assertEquals(deferred_result.called, False)
289
self.broker_service.reactor.fire("message",
290
{"type": "set-id", "id": "SECURE",
291
"insecure-id": "INSECURE"})
293
self.assertEquals(deferred_result.called, True)
295
# Hook register_done() to be called after register() returns. We
296
# must only fire the "set-id" message after this method returns,
297
# since that's when the deferred is created and hooked on the
299
registration_mock = self.mocker.patch(self.broker_service.registration)
300
registration_mock.register()
301
self.mocker.passthrough(register_done)
304
return self.remote_service.register()
306
def test_registration_done_event_becomes_signal(self):
309
waiter.callback("We got it!")
310
self.broker_service.bus.add_signal_receiver(got_signal,
312
self.broker_service.reactor.fire("registration-done")
315
def test_registration_failed_event_becomes_signal(self):
318
waiter.callback("We got it!")
319
self.broker_service.bus.add_signal_receiver(got_signal,
320
"registration_failed")
321
self.broker_service.reactor.fire("registration-failed")
324
def test_reload_configuration(self):
325
open(self.config_filename, "a").write("computer_title = New Title")
326
result = self.remote_service.reload_configuration()
327
def got_result(result):
328
self.assertEquals(self.broker_service.config.computer_title,
330
return result.addCallback(got_result)
332
def test_reload_configuration_stops_plugins(self):
334
Reloading the configuration must stop all clients (by calling C{exit}
335
on them) so that they can be restarted by the watchdog and see the new
336
changes in the config file.
338
class MyService(Object):
339
bus_name = "my.service.name"
340
object_path = "/my/service/name"
341
def __init__(self, *args, **kw):
342
Object.__init__(self, *args, **kw)
347
self.stash.append(True)
348
my_service = MyService(self.broker_service.bus)
349
def got_result(result):
350
self.assertEquals(my_service.stash, [True])
351
self.remote.register_plugin("my.service.name", "/my/service/name")
352
result = self.remote.reload_configuration()
353
return result.addCallback(got_result)
355
def test_get_accepted_types_empty(self):
356
self.broker_service.message_store.set_accepted_types([])
357
deferred = self.remote_service.get_accepted_message_types()
358
def got_result(result):
359
self.assertEquals(result, [])
360
return deferred.addCallback(got_result)
362
def test_get_accepted_message_types(self):
363
self.broker_service.message_store.set_accepted_types(["foo", "bar"])
364
deferred = self.remote_service.get_accepted_message_types()
365
def got_result(result):
366
self.assertEquals(set(result), set(["foo", "bar"]))
367
return deferred.addCallback(got_result)
369
def test_message_type_acceptance_changed_event_becomes_signal(self):
371
def got_signal(type, accepted):
372
waiter.callback("We got it!")
373
self.assertEquals(type, "some-type")
374
self.assertEquals(accepted, True)
376
self.broker_service.bus.add_signal_receiver(
378
"message_type_acceptance_changed")
379
self.broker_service.reactor.fire("message-type-acceptance-changed",
383
def test_register_and_get_plugins(self):
384
result = self.remote.register_plugin("service.name", "/Path")
385
def got_result(result):
386
result = self.remote.get_registered_plugins()
387
result.addCallback(self.assertEquals, [("service.name", "/Path")])
389
result.addCallback(got_result)
392
def test_no_duplicate_plugins(self):
394
Adding the same plugin data twice does not cause duplicate entries.
396
result = self.remote.register_plugin("service.name", "/Path")
397
result.addCallback(lambda ign: self.remote.register_plugin(
398
"service.name", "/Path"))
399
result.addCallback(lambda ign: self.remote.get_registered_plugins())
400
result.addCallback(self.assertEquals, [("service.name", "/Path")])
405
class MyService(Object):
406
bus_name = "my.service.name"
407
object_path = "/my/service/name"
410
# We'll actually change the stash in a bit instead of right
411
# now. The idea is that the broker's exit method should wait
412
# for us to do our whole thing before it returns.
413
from twisted.internet import reactor
414
deferred = Deferred()
417
deferred.callback(None)
418
reactor.callLater(0.2, change_stash)
420
self.my_service = MyService(self.broker_service.bus)
421
def got_result(result):
422
self.assertEquals(stash, [True])
423
self.remote.register_plugin("my.service.name", "/my/service/name")
424
result = self.remote.exit()
425
return result.addCallback(got_result)
427
def test_exit_runs_quickly_with_missing_services(self):
429
If other daemons die, the Broker won't retry them for ages.
431
self.log_helper.ignore_errors(ZeroDivisionError)
433
self.remote.register_plugin("my.service.name", "/my/service/name")
436
self.broker_service.reactor.call_on("post-exit",
437
lambda: post_exits.append(True))
440
result.errback(Exception("It took too long!"))
442
def cancel_delayed(result):
445
from twisted.internet import reactor
446
delayed = reactor.callLater(5, took_too_long)
448
result = self.remote.exit()
449
result.addCallback(cancel_delayed)
452
def test_exit_exits_when_other_daemons_blow_up(self):
454
If other daemons blow up in their exit() methods, exit should ignore
455
the error and exit anyway.
457
self.log_helper.ignore_errors(ZeroDivisionError)
459
class MyService(Object):
460
bus_name = "my.service.name"
461
object_path = "/my/service/name"
465
self.my_service = MyService(self.broker_service.bus)
466
self.remote.register_plugin("my.service.name", "/my/service/name")
469
self.broker_service.reactor.call_on("post-exit",
470
lambda: post_exits.append(True))
472
def got_result(result):
473
# The actual exit happens a second after the dbus response.
474
self.broker_service.reactor.advance(1)
475
self.assertEquals(post_exits, [True])
477
result = self.remote.exit()
478
return result.addCallback(got_result)
480
def test_exit_fires_reactor_events(self):
483
self.broker_service.reactor.call_on("pre-exit",
484
lambda: stash.append("pre"))
485
self.broker_service.reactor.call_on("post-exit",
486
lambda: stash.append("post"))
488
def got_result(result):
489
self.broker_service.reactor.advance(1)
490
self.assertEquals(stash, ["pre", "post"])
492
result = self.remote.exit()
493
result.addCallback(got_result)
496
def test_call_if_accepted(self):
498
If a plugins message type is accepted, call a given function.
500
self.broker_service.message_store.set_accepted_types(["foo"])
503
deferred = self.remote.call_if_accepted("foo", l.append, True)
504
def got_accepted(result):
505
self.assertEquals(l, [True])
507
deferred.addCallback(got_accepted)
510
def test_not_called_if_not_accepted(self):
512
If a plugins message type is not accepted, don't call a given
517
deferred = self.remote.call_if_accepted("foo", l.append, True)
518
def got_accepted(result):
519
self.assertEquals(l, [])
521
deferred.addCallback(got_accepted)
524
def test_value_of_called_if_accepted(self):
526
If a plugins message type is not accepted, don't call a given
529
self.broker_service.message_store.set_accepted_types(["foo"])
530
deferred = self.remote.call_if_accepted("foo", lambda: "hi")
531
def got_accepted(result):
532
self.assertEquals(result, "hi")
534
deferred.addCallback(got_accepted)