~ahasenack/landscape-client/landscape-client-1.5.5-0ubuntu0.9.04.0

« back to all changes in this revision

Viewing changes to landscape/broker/tests/test_service.py

  • Committer: Bazaar Package Importer
  • Author(s): Rick Clark
  • Date: 2008-09-08 16:35:57 UTC
  • mto: This revision was merged to the branch mainline in revision 2.
  • Revision ID: james.westby@ubuntu.com-20080908163557-fl0d2oc35hur473w
Tags: upstream-1.0.18
ImportĀ upstreamĀ versionĀ 1.0.18

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
from dbus.service import signal
 
2
 
 
3
from twisted.internet.defer import Deferred
 
4
 
 
5
from landscape.schema import Message
 
6
from landscape.broker.broker import IFACE_NAME
 
7
from landscape.tests.helpers import (
 
8
    LandscapeIsolatedTest, RemoteBrokerHelper, LandscapeTest)
 
9
from landscape.lib.bpickle import dumps, loads
 
10
from landscape.lib.dbus_util import (Object, method,
 
11
                                     byte_array, array_to_string)
 
12
from landscape.lib.twisted_util import gather_results
 
13
from landscape.manager.manager import FAILED
 
14
 
 
15
 
 
16
class SampleSignalReceiver(object):
 
17
 
 
18
    def __init__(self, remote_exchange, bus):
 
19
        self.bus = bus
 
20
        self.signal_waiters = {}
 
21
 
 
22
    def got_signal(self, name, *data):
 
23
        if name in self.signal_waiters:
 
24
            self.signal_waiters[name].callback(data)
 
25
 
 
26
    def wait_for_signal(self, name):
 
27
        handler = lambda *args: self.got_signal(name, *args)
 
28
        self.bus.add_signal_receiver(handler, name)
 
29
        self.signal_waiters[name] = Deferred()
 
30
        return self.signal_waiters[name]
 
31
 
 
32
 
 
33
 
 
34
class BrokerDBusObjectTest(LandscapeIsolatedTest):
 
35
 
 
36
    helpers = [RemoteBrokerHelper]
 
37
 
 
38
    def setUp(self):
 
39
        super(BrokerDBusObjectTest, self).setUp()
 
40
        self.broker_service.message_store.set_accepted_types(["test"])
 
41
        self.broker_service.message_store.add_schema(Message("test", {}))
 
42
 
 
43
    def test_ping(self):
 
44
        """
 
45
        The broker can be pinged over DBUS to see if it is alive.
 
46
        """
 
47
        return self.remote_service.ping().addCallback(self.assertEquals, True)
 
48
 
 
49
    def test_send_message(self):
 
50
        """
 
51
        The L{BrokerDBusObject} should expose a remote
 
52
        C{send_message} method which adds a given message to the
 
53
        message store.
 
54
        """
 
55
        result = self.remote_service.send_message(
 
56
            byte_array(dumps({"type": "test"})), dbus_interface=IFACE_NAME)
 
57
        def got_result(message_id):
 
58
            service = self.broker_service
 
59
            self.assertTrue(service.message_store.is_pending(message_id))
 
60
            messages = service.message_store.get_pending_messages()
 
61
            self.assertEquals(len(messages), 1)
 
62
            self.assertMessage(messages[0], {"type": "test"})
 
63
        result.addCallback(got_result)
 
64
        return result
 
65
 
 
66
    def test_send_urgent_message(self):
 
67
        """
 
68
        The C{send_message} method should take a flag indicating that
 
69
        the client should be put into urgent exchange mode.
 
70
        """
 
71
        result = self.remote_service.send_message(
 
72
            byte_array(dumps({"type": "test"})), True,
 
73
            dbus_interface=IFACE_NAME)
 
74
        def got_result(result):
 
75
            messages = self.broker_service.message_store.get_pending_messages()
 
76
            self.assertEquals(len(messages), 1)
 
77
            self.assertMessage(messages[0], {"type": "test"})
 
78
            self.assertTrue(self.broker_service.exchanger.is_urgent())
 
79
        result.addCallback(got_result)
 
80
        return result
 
81
 
 
82
    def test_is_message_pending_true(self):
 
83
        """
 
84
        The L{BrokerDBusObject} should expose a remote
 
85
        C{send_message} method which adds a given message to the
 
86
        message store.
 
87
        """
 
88
        message_id = self.broker_service.message_store.add({"type": "test"})
 
89
        result = self.remote_service.is_message_pending(message_id)
 
90
        def got_result(is_pending):
 
91
            self.assertEquals(is_pending, True)
 
92
        return result.addCallback(got_result)
 
93
 
 
94
    def test_is_message_pending_false(self):
 
95
        """
 
96
        The L{BrokerDBusObject} should expose a remote
 
97
        C{send_message} method which adds a given message to the
 
98
        message store.
 
99
        """
 
100
        message_id = self.broker_service.message_store.add({"type": "test"})
 
101
        self.broker_service.message_store.add_pending_offset(1)
 
102
        result = self.remote_service.is_message_pending(message_id)
 
103
        def got_result(is_pending):
 
104
            self.assertEquals(is_pending, False)
 
105
        return result.addCallback(got_result)
 
106
 
 
107
    def test_exchange_notification(self):
 
108
        """
 
109
        The BrokerDBusObject will broadcast a C{impending_exchange} signal
 
110
        before exchanging, to give plugins a chance to send messages to get
 
111
        into the next exchange. It does this by hooking in to the
 
112
        C{impending-exchange} event.
 
113
        """
 
114
        plugin_service = SampleSignalReceiver(self.remote_service,
 
115
                                              self.broker_service.bus)
 
116
        result = plugin_service.wait_for_signal("impending_exchange")
 
117
        self.broker_service.reactor.fire("impending-exchange")
 
118
        # The typical failure case for this test is to hang until timeout :\
 
119
        return result
 
120
    test_exchange_notification.timeout = 4
 
121
 
 
122
    def test_exchange_failed_notification(self):
 
123
        """
 
124
        The BrokerService will broadcast a C{exchange_failed} signal
 
125
        if the exchange fails.
 
126
        """
 
127
        plugin_service = SampleSignalReceiver(self.remote_service,
 
128
                                              self.broker_service.bus)
 
129
        result = plugin_service.wait_for_signal("exchange_failed")
 
130
        self.broker_service.reactor.fire("exchange-failed")
 
131
        # The typical failure case for this test is to hang until timeout :\
 
132
        return result
 
133
    test_exchange_failed_notification.timeout = 4
 
134
 
 
135
    def test_resynchronize_clients(self):
 
136
        """
 
137
        The exchange broadcasts the reactor event 'resynchronize-clients'; in
 
138
        this case the BrokerDBusObject should broadcast a dbus signal
 
139
        'resynchronize'.
 
140
        """
 
141
        plugin_service = SampleSignalReceiver(self.remote_service,
 
142
                                              self.broker_service.bus)
 
143
        result = plugin_service.wait_for_signal("resynchronize")
 
144
        self.broker_service.reactor.fire("resynchronize-clients")
 
145
        # The typical failure case for this test is to hang until timeout :\
 
146
        return result
 
147
    test_resynchronize_clients.timeout = 4
 
148
 
 
149
    def test_broadcast_messages(self):
 
150
        """
 
151
        The DBus service calls the 'message' method on all registered plugins
 
152
        when messages are received from the server. The message is passed as a
 
153
        bpickle.
 
154
        """
 
155
 
 
156
        final_message = Deferred()
 
157
        class MyService(Object):
 
158
            bus_name = "my.service.name"
 
159
            object_path = "/my/service/name"
 
160
            @method(bus_name)
 
161
            def message(self, message):
 
162
                final_message.callback(message)
 
163
 
 
164
        my_service = MyService(self.broker_service.bus)
 
165
 
 
166
        registration = self.remote.register_plugin(
 
167
            "my.service.name", "/my/service/name")
 
168
 
 
169
        def registered(result):
 
170
            transport = self.broker_service.transport
 
171
            transport.responses.append([{"type": "foobar", "value": 42}])
 
172
            self.broker_service.exchanger.exchange()
 
173
        registration.addCallback(registered)
 
174
 
 
175
        def ready(message):
 
176
            message = array_to_string(message)
 
177
            message = loads(message)
 
178
            self.assertEquals(message,
 
179
                              {"type": "foobar", "value": 42})
 
180
 
 
181
        final_message.addCallback(ready)
 
182
        return final_message
 
183
 
 
184
    test_broadcast_messages.timeout = 4
 
185
 
 
186
    def test_failed_operation_without_plugins(self):
 
187
        """
 
188
        When there are no broker plugins available to handle a message, an
 
189
        operation-result message should be sent back to the server indicating a
 
190
        failure.
 
191
        """
 
192
        self.log_helper.ignore_errors("Nobody handled the foobar message.")
 
193
        self.broker_service.message_store.set_accepted_types(
 
194
            ["operation-result"])
 
195
        result = self.broker_service.reactor.fire("message",
 
196
                                                  {"type": "foobar",
 
197
                                                   "operation-id": 4})
 
198
        result = [result for result in result if result is not None][0]
 
199
        class Startswith(object):
 
200
            def __eq__(self, other):
 
201
                return other.startswith(
 
202
                    "Landscape client failed to handle this request (foobar)")
 
203
        def broadcasted(ignored):
 
204
            self.assertMessages(
 
205
                self.broker_service.message_store.get_pending_messages(),
 
206
                [{"type": "operation-result",
 
207
                  "status": FAILED,
 
208
                  "result-text": Startswith(),
 
209
                  "operation-id": 4}])
 
210
        result.addCallback(broadcasted)
 
211
        return result
 
212
 
 
213
 
 
214
    def test_failed_operation_with_plugins_not_handling(self):
 
215
        """
 
216
        When no broker plugins handle a message (i.e., they return False from
 
217
        the message() call), an operation-result message should be sent back to
 
218
        the server indicating a failure.
 
219
        """
 
220
        self.log_helper.ignore_errors("Nobody handled the foobar message.")
 
221
        class MyService(Object):
 
222
            bus_name = "my.service.name"
 
223
            object_path = "/my/service/name"
 
224
            @method(bus_name)
 
225
            def message(self, message):
 
226
                self.called = True
 
227
                return False
 
228
 
 
229
        self.broker_service.message_store.set_accepted_types(
 
230
            ["operation-result"])
 
231
 
 
232
        my_service = MyService(self.broker_service.bus)
 
233
 
 
234
        result = self.remote.register_plugin(
 
235
            "my.service.name", "/my/service/name")
 
236
        def registered(ignored):
 
237
            result = self.broker_service.reactor.fire("message",
 
238
                                                      {"type": "foobar",
 
239
                                                       "operation-id": 4})
 
240
            return [result for result in result if result is not None][0]
 
241
 
 
242
        class Startswith(object):
 
243
            def __eq__(self, other):
 
244
                return other.startswith(
 
245
                    "Landscape client failed to handle this request (foobar)")
 
246
        def broadcasted(ignored):
 
247
            self.assertTrue(my_service.called)
 
248
            self.assertMessages(
 
249
                self.broker_service.message_store.get_pending_messages(),
 
250
                [{"type": "operation-result",
 
251
                  "status": FAILED,
 
252
                  "result-text": Startswith(),
 
253
                  "operation-id": 4}])
 
254
        result.addCallback(registered)
 
255
        result.addCallback(broadcasted)
 
256
        return result
 
257
 
 
258
 
 
259
    def test_resynchronize_not_handled_by_plugins(self):
 
260
        """
 
261
        *resynchronize* operations are special, in that we know the broker
 
262
        handles them in a special way. If none of the broker-plugins respond
 
263
        to a resynchronize event, we should not send back a failure, because
 
264
        the broker itself will respond to those.
 
265
        """
 
266
        self.broker_service.message_store.set_accepted_types(
 
267
            ["operation-result"])
 
268
        result = self.broker_service.reactor.fire("message",
 
269
                                                  {"type": "resynchronize",
 
270
                                                   "operation-id": 4})
 
271
        result = [result for result in result if result is not None][0]
 
272
        def broadcasted(ignored):
 
273
            self.assertMessages(
 
274
                self.broker_service.message_store.get_pending_messages(),
 
275
                [])
 
276
        result.addCallback(broadcasted)
 
277
        return result
 
278
 
 
279
 
 
280
    def test_register(self):
 
281
        """
 
282
        Remote parties can request a registration to be made with the server.
 
283
        """
 
284
        identity = self.broker_service.identity
 
285
 
 
286
        def register_done(deferred_result):
 
287
            self.assertEquals(deferred_result.called, False)
 
288
 
 
289
            self.broker_service.reactor.fire("message",
 
290
                                     {"type": "set-id", "id": "SECURE",
 
291
                                      "insecure-id": "INSECURE"})
 
292
 
 
293
            self.assertEquals(deferred_result.called, True)
 
294
 
 
295
        # Hook register_done() to be called after register() returns.  We
 
296
        # must only fire the "set-id" message after this method returns,
 
297
        # since that's when the deferred is created and hooked on the
 
298
        # related events.
 
299
        registration_mock = self.mocker.patch(self.broker_service.registration)
 
300
        registration_mock.register()
 
301
        self.mocker.passthrough(register_done)
 
302
        self.mocker.replay()
 
303
 
 
304
        return self.remote_service.register()
 
305
 
 
306
    def test_registration_done_event_becomes_signal(self):
 
307
        waiter = Deferred()
 
308
        def got_signal():
 
309
            waiter.callback("We got it!")
 
310
        self.broker_service.bus.add_signal_receiver(got_signal,
 
311
                                                    "registration_done")
 
312
        self.broker_service.reactor.fire("registration-done")
 
313
        return waiter
 
314
 
 
315
    def test_registration_failed_event_becomes_signal(self):
 
316
        waiter = Deferred()
 
317
        def got_signal():
 
318
            waiter.callback("We got it!")
 
319
        self.broker_service.bus.add_signal_receiver(got_signal,
 
320
                                                    "registration_failed")
 
321
        self.broker_service.reactor.fire("registration-failed")
 
322
        return waiter
 
323
 
 
324
    def test_reload_configuration(self):
 
325
        open(self.config_filename, "a").write("computer_title = New Title")
 
326
        result = self.remote_service.reload_configuration()
 
327
        def got_result(result):
 
328
            self.assertEquals(self.broker_service.config.computer_title,
 
329
                              "New Title")
 
330
        return result.addCallback(got_result)
 
331
 
 
332
    def test_reload_configuration_stops_plugins(self):
 
333
        """
 
334
        Reloading the configuration must stop all clients (by calling C{exit}
 
335
        on them) so that they can be restarted by the watchdog and see the new
 
336
        changes in the config file.
 
337
        """
 
338
        class MyService(Object):
 
339
            bus_name = "my.service.name"
 
340
            object_path = "/my/service/name"
 
341
            def __init__(self, *args, **kw):
 
342
                Object.__init__(self, *args, **kw)
 
343
                self.stash = []
 
344
 
 
345
            @method(bus_name)
 
346
            def exit(self):
 
347
                self.stash.append(True)
 
348
        my_service = MyService(self.broker_service.bus)
 
349
        def got_result(result):
 
350
            self.assertEquals(my_service.stash, [True])
 
351
        self.remote.register_plugin("my.service.name", "/my/service/name")
 
352
        result = self.remote.reload_configuration()
 
353
        return result.addCallback(got_result)
 
354
 
 
355
    def test_get_accepted_types_empty(self):
 
356
        self.broker_service.message_store.set_accepted_types([])
 
357
        deferred = self.remote_service.get_accepted_message_types()
 
358
        def got_result(result):
 
359
            self.assertEquals(result, [])
 
360
        return deferred.addCallback(got_result)
 
361
 
 
362
    def test_get_accepted_message_types(self):
 
363
        self.broker_service.message_store.set_accepted_types(["foo", "bar"])
 
364
        deferred = self.remote_service.get_accepted_message_types()
 
365
        def got_result(result):
 
366
            self.assertEquals(set(result), set(["foo", "bar"]))
 
367
        return deferred.addCallback(got_result)
 
368
 
 
369
    def test_message_type_acceptance_changed_event_becomes_signal(self):
 
370
        waiter = Deferred()
 
371
        def got_signal(type, accepted):
 
372
            waiter.callback("We got it!")
 
373
            self.assertEquals(type, "some-type")
 
374
            self.assertEquals(accepted, True)
 
375
 
 
376
        self.broker_service.bus.add_signal_receiver(
 
377
                                         got_signal,
 
378
                                         "message_type_acceptance_changed")
 
379
        self.broker_service.reactor.fire("message-type-acceptance-changed",
 
380
                                         "some-type", True)
 
381
        return waiter
 
382
 
 
383
    def test_register_and_get_plugins(self):
 
384
        result = self.remote.register_plugin("service.name", "/Path")
 
385
        def got_result(result):
 
386
            result = self.remote.get_registered_plugins()
 
387
            result.addCallback(self.assertEquals, [("service.name", "/Path")])
 
388
            return result
 
389
        result.addCallback(got_result)
 
390
        return result
 
391
 
 
392
    def test_no_duplicate_plugins(self):
 
393
        """
 
394
        Adding the same plugin data twice does not cause duplicate entries.
 
395
        """
 
396
        result = self.remote.register_plugin("service.name", "/Path")
 
397
        result.addCallback(lambda ign: self.remote.register_plugin(
 
398
                "service.name", "/Path"))
 
399
        result.addCallback(lambda ign: self.remote.get_registered_plugins())
 
400
        result.addCallback(self.assertEquals, [("service.name", "/Path")])
 
401
        return result
 
402
 
 
403
    def test_exit(self):
 
404
        stash = []
 
405
        class MyService(Object):
 
406
            bus_name = "my.service.name"
 
407
            object_path = "/my/service/name"
 
408
            @method(bus_name)
 
409
            def exit(self):
 
410
                # We'll actually change the stash in a bit instead of right
 
411
                # now.  The idea is that the broker's exit method should wait
 
412
                # for us to do our whole thing before it returns.
 
413
                from twisted.internet import reactor
 
414
                deferred = Deferred()
 
415
                def change_stash():
 
416
                    stash.append(True)
 
417
                    deferred.callback(None)
 
418
                reactor.callLater(0.2, change_stash)
 
419
                return deferred
 
420
        self.my_service = MyService(self.broker_service.bus)
 
421
        def got_result(result):
 
422
            self.assertEquals(stash, [True])
 
423
        self.remote.register_plugin("my.service.name", "/my/service/name")
 
424
        result = self.remote.exit()
 
425
        return result.addCallback(got_result)
 
426
 
 
427
    def test_exit_runs_quickly_with_missing_services(self):
 
428
        """
 
429
        If other daemons die, the Broker won't retry them for ages.
 
430
        """
 
431
        self.log_helper.ignore_errors(ZeroDivisionError)
 
432
 
 
433
        self.remote.register_plugin("my.service.name", "/my/service/name")
 
434
 
 
435
        post_exits = []
 
436
        self.broker_service.reactor.call_on("post-exit",
 
437
                                            lambda: post_exits.append(True))
 
438
 
 
439
        def took_too_long():
 
440
            result.errback(Exception("It took too long!"))
 
441
 
 
442
        def cancel_delayed(result):
 
443
            delayed.cancel()
 
444
 
 
445
        from twisted.internet import reactor
 
446
        delayed = reactor.callLater(5, took_too_long)
 
447
 
 
448
        result = self.remote.exit()
 
449
        result.addCallback(cancel_delayed)
 
450
        return result
 
451
 
 
452
    def test_exit_exits_when_other_daemons_blow_up(self):
 
453
        """
 
454
        If other daemons blow up in their exit() methods, exit should ignore
 
455
        the error and exit anyway.
 
456
        """
 
457
        self.log_helper.ignore_errors(ZeroDivisionError)
 
458
 
 
459
        class MyService(Object):
 
460
            bus_name = "my.service.name"
 
461
            object_path = "/my/service/name"
 
462
            @method(bus_name)
 
463
            def exit(self):
 
464
                1/0
 
465
        self.my_service = MyService(self.broker_service.bus)
 
466
        self.remote.register_plugin("my.service.name", "/my/service/name")
 
467
 
 
468
        post_exits = []
 
469
        self.broker_service.reactor.call_on("post-exit",
 
470
                                            lambda: post_exits.append(True))
 
471
 
 
472
        def got_result(result):
 
473
            # The actual exit happens a second after the dbus response.
 
474
            self.broker_service.reactor.advance(1)
 
475
            self.assertEquals(post_exits, [True])
 
476
 
 
477
        result = self.remote.exit()
 
478
        return result.addCallback(got_result)
 
479
 
 
480
    def test_exit_fires_reactor_events(self):
 
481
        stash = []
 
482
 
 
483
        self.broker_service.reactor.call_on("pre-exit",
 
484
                                            lambda: stash.append("pre"))
 
485
        self.broker_service.reactor.call_on("post-exit",
 
486
                                            lambda: stash.append("post"))
 
487
 
 
488
        def got_result(result):
 
489
            self.broker_service.reactor.advance(1)
 
490
            self.assertEquals(stash, ["pre", "post"])
 
491
 
 
492
        result = self.remote.exit()
 
493
        result.addCallback(got_result)
 
494
        return result
 
495
 
 
496
    def test_call_if_accepted(self):
 
497
        """
 
498
        If a plugins message type is accepted, call a given function.
 
499
        """
 
500
        self.broker_service.message_store.set_accepted_types(["foo"])
 
501
        l = []
 
502
 
 
503
        deferred = self.remote.call_if_accepted("foo", l.append, True)
 
504
        def got_accepted(result):
 
505
            self.assertEquals(l, [True])
 
506
 
 
507
        deferred.addCallback(got_accepted)
 
508
        return deferred
 
509
 
 
510
    def test_not_called_if_not_accepted(self):
 
511
        """
 
512
        If a plugins message type is not accepted, don't call a given
 
513
        function.
 
514
        """
 
515
        l = []
 
516
 
 
517
        deferred = self.remote.call_if_accepted("foo", l.append, True)
 
518
        def got_accepted(result):
 
519
            self.assertEquals(l, [])
 
520
            
 
521
        deferred.addCallback(got_accepted)
 
522
        return deferred
 
523
 
 
524
    def test_value_of_called_if_accepted(self):
 
525
        """
 
526
        If a plugins message type is not accepted, don't call a given
 
527
        function.
 
528
        """
 
529
        self.broker_service.message_store.set_accepted_types(["foo"])
 
530
        deferred = self.remote.call_if_accepted("foo", lambda: "hi")
 
531
        def got_accepted(result):
 
532
            self.assertEquals(result, "hi")
 
533
 
 
534
        deferred.addCallback(got_accepted)
 
535
        return deferred