~free.ekanayaka/landscape-client/karmic-updates-1.4.4-0ubuntu0.9.10

« back to all changes in this revision

Viewing changes to landscape/lib/tests/test_dbus_util.py

  • Committer: Bazaar Package Importer
  • Author(s): Rick Clark
  • Date: 2008-09-08 16:35:57 UTC
  • mfrom: (1.1.1 upstream)
  • Revision ID: james.westby@ubuntu.com-20080908163557-l3ixzj5dxz37wnw2
Tags: 1.0.18-0ubuntu1
New upstream release 

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
import os
 
2
import sys
 
3
import time
 
4
import signal
 
5
 
 
6
from dbus.service import method
 
7
import dbus.glib
 
8
from dbus import DBusException, Array, Byte
 
9
 
 
10
from twisted.internet.defer import Deferred
 
11
 
 
12
from landscape.lib.dbus_util import (get_bus, get_object,
 
13
                                     method as async_method,
 
14
                                     byte_array, array_to_string,
 
15
                                     Object, ServiceUnknownError,
 
16
                                     SecurityError)
 
17
from landscape.tests.helpers import (
 
18
    LandscapeIsolatedTest, DBusHelper, LandscapeTest)
 
19
from landscape.tests.mocker import ARGS, KWARGS
 
20
 
 
21
 
 
22
class BoringService(Object):
 
23
 
 
24
    bus_name = "sample.service"
 
25
    object_path = "/com/example/BoringService"
 
26
    iface_name = "com.example.BoringService"
 
27
 
 
28
    @method(iface_name)
 
29
    def return1(self):
 
30
        return 1
 
31
 
 
32
    @method(iface_name)
 
33
    def error(self):
 
34
        1 / 0
 
35
 
 
36
    @method(iface_name)
 
37
    def return_none(self):
 
38
        return
 
39
 
 
40
 
 
41
class AsynchronousWrapperTests(LandscapeIsolatedTest):
 
42
 
 
43
    helpers = [DBusHelper]
 
44
 
 
45
    def setUp(self):
 
46
        super(AsynchronousWrapperTests, self).setUp()
 
47
        self.service = BoringService(self.bus)
 
48
        self.remote_service = get_object(self.bus,
 
49
                                         self.service.bus_name,
 
50
                                         self.service.object_path,
 
51
                                         self.service.iface_name)
 
52
 
 
53
    def test_get_bus(self):
 
54
        self.assertEquals(type(get_bus("session")), dbus.SessionBus)
 
55
        self.assertEquals(type(get_bus("system")), dbus.SystemBus)
 
56
        self.assertRaises(ValueError, get_bus, "nope")
 
57
 
 
58
    def test_get_object_returns_deferred(self):
 
59
        """
 
60
        There is a L{dbus.Bus.get_object} replacement, L{get_object}, which
 
61
        returns an object which returns Deferreds on method calls
 
62
        """
 
63
        result = self.remote_service.return1()
 
64
        self.assertTrue(isinstance(result, Deferred))
 
65
        result.addCallback(self.assertEquals, 1)
 
66
        return result
 
67
 
 
68
    def test_get_object_returns_failing_deferred(self):
 
69
        """
 
70
        The asynchronous method wrapper deals with errors appropriately, by
 
71
        converting them to errbacks on a Deferred.
 
72
        """
 
73
        result = self.remote_service.error()
 
74
        self.assertTrue(isinstance(result, Deferred))
 
75
        self.assertFailure(result, DBusException)
 
76
        return result
 
77
 
 
78
    def test_return_none(self):
 
79
        """
 
80
        L{get_object} has no problems with methods that don't return values.
 
81
        """
 
82
        result = self.remote_service.return_none()
 
83
        result.addCallback(self.assertEquals, ())
 
84
        return result
 
85
 
 
86
    def test_helper_methods(self):
 
87
        """The wrapper shouldn't get in the way of standard methods."""
 
88
        self.remote_service.connect_to_signal("nononono", lambda: None)
 
89
 
 
90
    def test_default_interface_name(self):
 
91
        """
 
92
        When an interface isn't provided to get_object(), one is automatically
 
93
        generated from the object_path.  This allows us to work with older
 
94
        versions of Python dbus without too much pain.
 
95
        """
 
96
        class MyService(Object):
 
97
            bus_name = "my.bus.name"
 
98
            object_path = "/my/Service"
 
99
            @method("my.Service")
 
100
            def return2(self):
 
101
                return 2
 
102
        service = MyService(self.bus)
 
103
        remote_service = get_object(self.bus, MyService.bus_name,
 
104
                                    MyService.object_path)
 
105
 
 
106
        result = remote_service.return2()
 
107
        result.addCallback(self.assertEquals, 2)
 
108
        return result
 
109
 
 
110
 
 
111
class HalfSynchronousService(Object):
 
112
 
 
113
    bus_name = "sample.service"
 
114
    object_path = "/com/example/UnitedStatesOfWhatever"
 
115
    iface_name = "com.example.UnitedStatesOfWhateverIface"
 
116
 
 
117
    def __init__(self, bus):
 
118
        super(HalfSynchronousService, self).__init__(bus)
 
119
        self.deferred = Deferred()
 
120
 
 
121
    @async_method(iface_name)
 
122
    def add1(self, i):
 
123
        return i + 1
 
124
 
 
125
    @async_method(iface_name)
 
126
    def return_none(self):
 
127
        return
 
128
 
 
129
    @async_method(iface_name)
 
130
    def sync_error(self):
 
131
        1 / 0
 
132
 
 
133
    @async_method(iface_name)
 
134
    def async(self):
 
135
        return self.deferred
 
136
 
 
137
 
 
138
class AsynchronousMethodTests(LandscapeIsolatedTest):
 
139
 
 
140
    helpers = [DBusHelper]
 
141
 
 
142
    def setUp(self):
 
143
        super(AsynchronousMethodTests, self).setUp()
 
144
        self.service = HalfSynchronousService(self.bus)
 
145
        self.remote_service = get_object(self.bus,
 
146
                                         self.service.bus_name,
 
147
                                         self.service.object_path,
 
148
                                         self.service.iface_name)
 
149
 
 
150
    def test_synchronous_method(self):
 
151
        """
 
152
        Using the L{landscape.lib.dbus_util.method} decorator to declare a
 
153
        method basically works the same as L{dbus.service.method}, if you
 
154
        return a value synchronously.
 
155
        """
 
156
        return self.remote_service.add1(3).addCallback(self.assertEquals, 4)
 
157
 
 
158
    def test_return_None(self):
 
159
        """
 
160
        Methods should be able to return None, and this will be translated to a
 
161
        return of no values.
 
162
        """
 
163
        d = self.remote_service.return_none()
 
164
        return d.addCallback(self.assertEquals, ())
 
165
 
 
166
    def test_asynchronous_method(self):
 
167
        """
 
168
        However, when a Deferred is returned, it will be handled so that the
 
169
        return value of the method is the ultimate result of the Deferred.
 
170
        """
 
171
        d = self.remote_service.async()
 
172
        d.addCallback(self.assertEquals, "hi")
 
173
        self.service.deferred.callback("hi")
 
174
        return d
 
175
 
 
176
    def test_synchronous_error(self):
 
177
        """
 
178
        Synchronous exceptions are propagated as normal.
 
179
        """
 
180
        self.log_helper.ignore_errors(ZeroDivisionError)
 
181
        d = self.remote_service.sync_error()
 
182
        def got_error(dbus_exception):
 
183
            # This is pretty much the best we can do, afaict; it doesn't
 
184
            # include any more information but the type.
 
185
            self.assertTrue("ZeroDivisionError" in str(dbus_exception))
 
186
        # assertFailure to make sure it *actually* fails
 
187
        # assertFailure causes the next callback to get the *exception* object
 
188
        # (not a Failure). That means we should add a callback to check stuff
 
189
        # about the exception, not an errback.
 
190
        self.assertFailure(d, DBusException)
 
191
        d.addCallback(got_error)
 
192
        return d
 
193
 
 
194
    def test_asynchronous_error(self):
 
195
        """
 
196
        Returning a Deferred which fails is propagated in the same way as a
 
197
        synchronous exception is.
 
198
        """
 
199
        self.log_helper.ignore_errors(ZeroDivisionError)
 
200
        # ignore the result of the method and convert it to an exception
 
201
        self.service.deferred.addCallback(lambda ignored: 1/0)
 
202
        d = self.remote_service.async()
 
203
 
 
204
        def got_error(dbus_exception):
 
205
            # This is pretty much the best we can do, afaict; it doesn't
 
206
            # include any more information but the type.
 
207
            self.assertTrue("ZeroDivisionError" in str(dbus_exception),
 
208
                            str(dbus_exception))
 
209
 
 
210
        # fire off the result of the async method call
 
211
        self.service.deferred.callback("ignored")
 
212
 
 
213
        # assertFailure to make sure it *actually* fails
 
214
        # assertFailure causes the next callback to get the *exception* object
 
215
        # (not a Failure). That means we should add a callback to check stuff
 
216
        # about the exception, not an errback.
 
217
        self.assertFailure(d, DBusException)
 
218
        d.addCallback(got_error)
 
219
        return d
 
220
 
 
221
    def test_errors_get_logged(self):
 
222
        """
 
223
        An exception raised during processing of a method call should be
 
224
        logged.
 
225
        """
 
226
        self.log_helper.ignore_errors(ZeroDivisionError)
 
227
        d = self.remote_service.sync_error()
 
228
        def got_error(failure):
 
229
            log = self.logfile.getvalue()
 
230
            self.assertTrue("Traceback" in log)
 
231
            self.assertTrue("ZeroDivisionError" in log)
 
232
        return d.addErrback(got_error)
 
233
 
 
234
 
 
235
class ErrorHandlingTests(LandscapeIsolatedTest):
 
236
 
 
237
    helpers = [DBusHelper]
 
238
 
 
239
    def test_service_unknown(self):
 
240
        remote_service = get_object(self.bus, "com.foo", "/com/foo/Bar",
 
241
                                    "com.foo", retry_timeout=0)
 
242
        d = remote_service.foo()
 
243
        self.assertFailure(d, ServiceUnknownError)
 
244
        return d
 
245
 
 
246
 
 
247
 
 
248
class SecurityErrorTests(LandscapeTest):
 
249
    """Tests for cases that SecurityError is raised."""
 
250
 
 
251
    def setUp(self):
 
252
        super(SecurityErrorTests, self).setUp()
 
253
        self.bus = self.mocker.mock()
 
254
        self.service = self.mocker.mock()
 
255
        self.bus.get_object("com.foo", "/com/foo", introspect=False)
 
256
        self.mocker.result(self.service)
 
257
        self.mocker.count(0, None)
 
258
        self.remote = get_object(self.bus, "com.foo", "/com/foo")
 
259
 
 
260
    def _test_security_error(self, error_message):
 
261
        def raise_dbus_error(*args, **kw):
 
262
            kw["error_handler"](DBusException(error_message))
 
263
 
 
264
        self.service.send_message(ARGS, KWARGS)
 
265
        self.mocker.call(raise_dbus_error)
 
266
        self.mocker.replay()
 
267
 
 
268
        d = self.remote.send_message({"type": "text-message",
 
269
                                      "message": "hello"})
 
270
        self.assertFailure(d, SecurityError)
 
271
        return d
 
272
 
 
273
    def test_feisty_security_error(self):
 
274
        """
 
275
        When an exception that looks like a security error from DBUS
 
276
        0.80.x is raised, this should be translated to a
 
277
        L{SecurityError}.
 
278
        """
 
279
        return self._test_security_error(
 
280
            "A security policy in place prevents this sender from sending "
 
281
            "this message to this recipient, see message bus configuration "
 
282
            "file (rejected message had interface "
 
283
            '"com.canonical.landscape" member "send_message" error name '
 
284
            '"(unset)" destination ":1.107")')
 
285
 
 
286
    def test_gutsy_security_error(self):
 
287
        """
 
288
        When an exception that looks like a security error on DBUS 0.82.x
 
289
        is raised, this should be translated to a L{SecurityError}.
 
290
        """
 
291
        return self._test_security_error(
 
292
            "org.freedesktop.DBus.Error.AccessDenied: A security "
 
293
            "policy in place prevents this sender from sending this "
 
294
            "message to this recipient, see message bus configuration "
 
295
            "file (rejected message had interface "
 
296
            '"com.canonical.landscape" member "send_message" error '
 
297
            'name "(unset)" destination ":1.15")')
 
298
 
 
299
 
 
300
 
 
301
 
 
302
class RetryTests(LandscapeIsolatedTest):
 
303
 
 
304
    helpers = [DBusHelper]
 
305
 
 
306
    def setUp(self):
 
307
        super(RetryTests, self).setUp()
 
308
        self.pids = []
 
309
        self.remote_service = get_object(self.bus,
 
310
                                         HalfSynchronousService.bus_name,
 
311
                                         HalfSynchronousService.object_path,
 
312
                                         HalfSynchronousService.iface_name)
 
313
 
 
314
    def tearDown(self):
 
315
        super(RetryTests, self).tearDown()
 
316
        for pid in self.pids:
 
317
            try:
 
318
                os.kill(pid, signal.SIGKILL)
 
319
                os.waitpid(pid, 0)
 
320
            except OSError:
 
321
                pass #LOL!
 
322
 
 
323
    def test_retry_on_first_call(self):
 
324
        """
 
325
        If an object is unavailable when a method is called,
 
326
        AsynchronousProxyMethod will continue trying to get it for a while.
 
327
        """
 
328
        d = self.remote_service.add1(0)
 
329
        HalfSynchronousService(self.bus)
 
330
        return d.addCallback(self.assertEquals, 1)
 
331
 
 
332
    def _start_service_in_subprocess(self):
 
333
        executable = self.make_path("""\
 
334
#!%s
 
335
from twisted.internet.glib2reactor import install
 
336
install()
 
337
from twisted.internet import reactor
 
338
 
 
339
from dbus import SessionBus
 
340
 
 
341
import sys
 
342
sys.path = %r
 
343
 
 
344
from landscape.lib.tests.test_dbus_util import HalfSynchronousService
 
345
 
 
346
bus = SessionBus()
 
347
HalfSynchronousService(bus)
 
348
reactor.run()
 
349
""" % (sys.executable, sys.path))
 
350
        os.chmod(executable, 0755)
 
351
        pid = os.fork()
 
352
        if pid == 0:
 
353
            os.execlp(executable, executable)
 
354
        self.pids.append(pid)
 
355
 
 
356
    def _stop_service_in_subprocess(self):
 
357
        os.kill(self.pids[-1], signal.SIGKILL)
 
358
        os.waitpid(self.pids[-1], 0)
 
359
 
 
360
    def test_retry_on_second_call(self):
 
361
        """
 
362
        Either get_object or an actual method call may raise an exception about
 
363
        an object not being available. This test ensures that the exception
 
364
        raised from the method call is handled to retry, by causing the object
 
365
        to be cached with an initial successful call.
 
366
        """
 
367
        self._start_service_in_subprocess()
 
368
        result = self.remote_service.add1(0)
 
369
        def got_first_result(result):
 
370
            self.assertEquals(result, 1)
 
371
            self._stop_service_in_subprocess()
 
372
            second_result = self.remote_service.add1(1)
 
373
            self._start_service_in_subprocess()
 
374
            return second_result
 
375
        result.addCallback(got_first_result)
 
376
        result.addCallback(self.assertEquals, 2)
 
377
        return result
 
378
 
 
379
    def test_timeout_time(self):
 
380
        """
 
381
        It's possible to specify the retry timout to use, and when the timeout
 
382
        is reached, the underlying dbus error will be raised.
 
383
        """
 
384
        remote_service = get_object(self.bus,
 
385
                                    HalfSynchronousService.bus_name,
 
386
                                    HalfSynchronousService.object_path,
 
387
                                    retry_timeout=1)
 
388
 
 
389
        start_time = time.time()
 
390
        result = remote_service.add1(0)
 
391
        self.assertFailure(result, ServiceUnknownError)
 
392
 
 
393
        def got_error(exception):
 
394
            self.assertTrue(time.time() - start_time > 1)
 
395
        return result.addCallback(got_error)
 
396
 
 
397
 
 
398
    def test_catch_synchronous_errors_from_method(self):
 
399
        """
 
400
        DBus sometimes raises synchronous errors from calling the method, for
 
401
        example, when the value passed does not match the signature. The
 
402
        asynchronous call wrapper should handle this case and fail the
 
403
        deferred.
 
404
        """
 
405
        self.log_helper.ignore_errors(".*Unable to set arguments.*")
 
406
        HalfSynchronousService(self.bus)
 
407
        d = self.remote_service.add1(None)
 
408
        self.assertFailure(d, TypeError)
 
409
        return d
 
410
 
 
411
 
 
412
class UtilityTests(LandscapeTest):
 
413
    def test_array_to_string(self):
 
414
        self.assertEquals(array_to_string([102, 111, 111]), "foo")
 
415
        self.assertEquals(
 
416
            array_to_string(Array([Byte(102), Byte(111), Byte(111)])),
 
417
            "foo")
 
418
 
 
419
    def test_byte_array(self):
 
420
        self.assertEquals(byte_array("foo"), [102, 111, 111])
 
421
        self.assertEquals(byte_array("foo"),
 
422
                          Array([Byte(102), Byte(111), Byte(111)]))
 
423
 
 
424
    def test_array_to_string_with_horrible_dapper_signing_bug(self):
 
425
        """
 
426
        In older versions of dbus, bytes would be deserialized incorrectly as
 
427
        signed. array_to_string compensates for this.
 
428
        """
 
429
        self.assertEquals(array_to_string([-56, -127]), chr(200) + chr(129))