1
# -*- coding: utf-8 -*-
3
# Author: Natalia B. Bidart <natalia.bidart@canonical.com>
5
# Copyright 2010 Canonical Ltd.
7
# This program is free software: you can redistribute it and/or modify it
8
# under the terms of the GNU General Public License version 3, as published
9
# by the Free Software Foundation.
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranties of
13
# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
14
# PURPOSE. See the GNU General Public License for more details.
16
# You should have received a copy of the GNU General Public License along
17
# with this program. If not, see <http://www.gnu.org/licenses/>.
19
"""Tests for the Ubuntu One credentials management dbus service."""
21
from functools import wraps
23
from twisted.internet.defer import Deferred, inlineCallbacks
24
from ubuntuone.devtools.testcase import DBusTestCase as TestCase
25
from ubuntuone.devtools.handlers import MementoHandler
27
from ubuntuone.credentials import (dbus, logger, logging, ubuntu_sso,
28
CredentialsManagement, TIMEOUT_INTERVAL,
29
DBUS_BUS_NAME, DBUS_CREDENTIALS_PATH, DBUS_CREDENTIALS_IFACE,
30
APP_NAME, HELP_TEXT_KEY, DESCRIPTION, TC_URL_KEY, TC_URL,
31
PING_URL_KEY, PING_URL, WINDOW_ID_KEY,
35
'consumer_key': 'faked_consumer_key',
36
'consumer_secret': 'faked_consumer_secret',
37
'token': 'faked_token',
38
'token_secret': 'faked_token_secret',
39
'token_name': 'Woohoo test',
43
class FakedSSOService(dbus.service.Object):
44
"""Faked DBus object that manages credentials."""
49
def __init__(self, *args, **kwargs):
50
super(FakedSSOService, self).__init__(*args, **kwargs)
51
self._credentials = {}
55
def maybe_emit_error(f):
56
"""Decorator to fake a CredentialsError signal."""
59
def inner(self, *args, **kwargs):
60
"""Fake a CredentialsError signal."""
61
if FakedSSOService.error_dict is not None:
62
self.CredentialsError(FakedSSOService.app_name,
63
FakedSSOService.error_dict)
65
return f(self, *args, **kwargs)
70
"""Decorator to store arguments to check correct calls."""
73
def inner(self, app_name, args):
74
"""Store arguments to check correct calls."""
75
self._app_name = app_name
77
return f(self, app_name, args)
81
@dbus.service.signal(ubuntu_sso.DBUS_CREDENTIALS_IFACE, signature='s')
82
def AuthorizationDenied(self, app_name):
83
"""Signal thrown when the user denies the authorization."""
85
@dbus.service.signal(ubuntu_sso.DBUS_CREDENTIALS_IFACE, signature='sa{ss}')
86
def CredentialsFound(self, app_name, credentials):
87
"""Signal thrown when the credentials are found."""
89
@dbus.service.signal(ubuntu_sso.DBUS_CREDENTIALS_IFACE, signature='s')
90
def CredentialsNotFound(self, app_name):
91
"""Signal thrown when the credentials are not found."""
93
@dbus.service.signal(ubuntu_sso.DBUS_CREDENTIALS_IFACE, signature='s')
94
def CredentialsCleared(self, app_name):
95
"""Signal thrown when the credentials were cleared."""
97
@dbus.service.signal(ubuntu_sso.DBUS_CREDENTIALS_IFACE, signature='s')
98
def CredentialsStored(self, app_name):
99
"""Signal thrown when the credentials were cleared."""
101
@dbus.service.signal(ubuntu_sso.DBUS_CREDENTIALS_IFACE, signature='sa{ss}')
102
def CredentialsError(self, app_name, error_dict):
103
"""Signal thrown when there is a problem getting the credentials."""
107
@dbus.service.method(dbus_interface=ubuntu_sso.DBUS_CREDENTIALS_IFACE,
108
in_signature='sa{ss}', out_signature='')
109
def find_credentials(self, app_name, args):
110
"""Look for the credentials for an application."""
111
creds = self._credentials.get(FakedSSOService.app_name, None)
112
if creds is not None:
113
self.CredentialsFound(FakedSSOService.app_name, creds)
115
self.CredentialsNotFound(FakedSSOService.app_name)
119
@dbus.service.method(dbus_interface=ubuntu_sso.DBUS_CREDENTIALS_IFACE,
120
in_signature='sa{ss}', out_signature='')
121
def clear_credentials(self, app_name, args):
122
"""Clear the credentials for an application."""
123
self._credentials.pop(FakedSSOService.app_name, None)
124
self.CredentialsCleared(FakedSSOService.app_name)
128
@dbus.service.method(dbus_interface=ubuntu_sso.DBUS_CREDENTIALS_IFACE,
129
in_signature='sa{ss}', out_signature='')
130
def store_credentials(self, app_name, args):
131
"""Store the token for an application."""
132
self._credentials[FakedSSOService.app_name] = args
133
self.CredentialsStored(FakedSSOService.app_name)
137
@dbus.service.method(dbus_interface=ubuntu_sso.DBUS_CREDENTIALS_IFACE,
138
in_signature='sa{ss}', out_signature='')
139
def register(self, app_name, args):
140
"""Get credentials if found else prompt GUI to register."""
141
creds = self._credentials.get(FakedSSOService.app_name, None)
142
if creds is not None and len(creds) > 0:
143
self.CredentialsFound(FakedSSOService.app_name, creds)
145
# fake an AuthorizationDenied
146
self.AuthorizationDenied(FakedSSOService.app_name)
148
# fake the adding of the credentials, in reality this will bring
149
# a GUI that the user will interact with.
150
self._credentials[FakedSSOService.app_name] = FAKED_CREDENTIALS
151
self.CredentialsFound(FakedSSOService.app_name, FAKED_CREDENTIALS)
155
@dbus.service.method(dbus_interface=ubuntu_sso.DBUS_CREDENTIALS_IFACE,
156
in_signature='sa{ss}', out_signature='')
157
def login(self, app_name, args):
158
"""Get credentials if found else prompt GUI to login."""
159
self.register(app_name, args)
162
class BaseTestCase(TestCase):
163
"""Base test case."""
170
super(BaseTestCase, self).setUp()
171
FakedSSOService.app_name = self.app_name
172
FakedSSOService.error_dict = self.error_dict
174
self.memento = MementoHandler()
175
self.memento.setLevel(logging.DEBUG)
176
logger.addHandler(self.memento)
178
self.bus = dbus.SessionBus()
179
self.sso_server = self.register_server(ubuntu_sso.DBUS_BUS_NAME,
180
ubuntu_sso.DBUS_CREDENTIALS_PATH,
181
FakedSSOService) # faked SSO server
183
def register_server(self, bus_name, object_path, service_class):
184
"""Register a service on the session bus."""
185
name = self.bus.request_name(bus_name, dbus.bus.NAME_FLAG_DO_NOT_QUEUE)
186
self.assertNotEqual(name, dbus.bus.REQUEST_NAME_REPLY_EXISTS,
187
'Service %s should not be running.' % bus_name)
188
mock = service_class(object_path=object_path, conn=self.bus)
189
self.addCleanup(mock.remove_from_connection)
193
def get_proxy(self, bus_name, object_path, dbus_interface):
194
obj = self.bus.get_object(bus_name=bus_name, object_path=object_path,
195
follow_name_owner_changes=True)
196
proxy = dbus.Interface(object=obj, dbus_interface=dbus_interface)
199
def get_sso_proxy(self):
200
return self.get_proxy(bus_name=ubuntu_sso.DBUS_BUS_NAME,
201
object_path=ubuntu_sso.DBUS_CREDENTIALS_PATH,
202
dbus_interface=ubuntu_sso.DBUS_CREDENTIALS_IFACE)
205
class CredentialsManagementTestCase(BaseTestCase):
206
"""Test case for the DBus object that manages Ubuntu One credentials."""
208
signals = ('CredentialsFound', 'CredentialsNotFound', 'CredentialsCleared',
209
'CredentialsStored', 'CredentialsError', 'AuthorizationDenied')
212
super(CredentialsManagementTestCase, self).setUp()
213
self.creds_server = self.register_server(DBUS_BUS_NAME,
214
DBUS_CREDENTIALS_PATH,
215
CredentialsManagement) # real service
217
self.deferred = Deferred()
218
self.proxy = self.get_creds_proxy()
220
def get_creds_proxy(self):
221
return self.get_proxy(bus_name=DBUS_BUS_NAME,
222
object_path=DBUS_CREDENTIALS_PATH,
223
dbus_interface=DBUS_CREDENTIALS_IFACE)
225
def connect_signals(self, callback=None):
226
"""Connect every signal accordingly to fire self.deferred.
228
If 'callback' is not None, it will be used as a tuple (sig_name,
229
function) and 'sig_name' will be connected to 'function', which should
230
fire self.deferred properly.
233
success_sig_name, success_function = None, None
234
if callback is not None:
235
success_sig_name, success_function = callback
238
"""Decorator to fire self.deferred."""
239
def inner(*args, **kwargs):
240
"""Fire self.deferred."""
241
msg = 'Received an unexpected signal (%r).' % sig_name
242
self.deferred.errback(TypeError(msg))
245
for sig_name in self.signals:
246
if sig_name == success_sig_name:
247
sig = self.proxy.connect_to_signal(sig_name, success_function)
249
sig = self.proxy.connect_to_signal(sig_name, fail(sig_name))
250
self.addCleanup(sig.remove)
253
def add_credentials(self, creds=FAKED_CREDENTIALS):
254
"""Add some fake credentials for 'self.app_name'."""
256
sso_proxy = self.get_sso_proxy()
257
sso_proxy.store_credentials(self.app_name, creds,
258
reply_handler=lambda: d.callback(None),
259
error_handler=d.errback)
264
"""Perform the test itself."""
267
def test_get_sso_proxy(self):
268
"""The SSO dbus proxy is properly retrieved."""
269
sso_proxy = CredentialsManagement().sso_proxy
270
self.assertEqual(sso_proxy.bus_name, ubuntu_sso.DBUS_BUS_NAME)
271
self.assertEqual(sso_proxy.object_path,
272
ubuntu_sso.DBUS_CREDENTIALS_PATH)
273
self.assertEqual(sso_proxy.dbus_interface,
274
ubuntu_sso.DBUS_CREDENTIALS_IFACE)
277
def test_shutdown(self):
278
"""On shutdown, SSO backend signals are disconnected."""
281
self.proxy.shutdown(reply_handler=lambda: d.callback(None),
282
error_handler=d.errback)
285
# TODO: assert over params passed to remove_signal_receiver
288
class ArgsTestCase(CredentialsManagementTestCase):
289
"""Test case to check that proper arguments are passed to SSO backend."""
292
def test_find_credentials(self):
293
"""The find_credentials method calls ubuntu_sso's method."""
295
self.proxy.find_credentials(reply_handler=lambda: d.callback(None),
296
error_handler=d.errback)
299
self.assertEqual(self.sso_server._app_name, APP_NAME)
300
self.assertEqual(self.sso_server._args, {})
303
def test_clear_credentials(self):
304
"""The clear_credentials method calls ubuntu_sso's method."""
306
self.proxy.clear_credentials(reply_handler=lambda: d.callback(None),
307
error_handler=d.errback)
310
self.assertEqual(self.sso_server._app_name, APP_NAME)
311
self.assertEqual(self.sso_server._args, {})
314
def test_store_credentials(self):
315
"""The store_credentials method calls ubuntu_sso's method."""
317
self.proxy.store_credentials(FAKED_CREDENTIALS,
318
reply_handler=lambda: d.callback(None),
319
error_handler=d.errback)
322
self.assertEqual(self.sso_server._app_name, APP_NAME)
323
self.assertEqual(self.sso_server._args, FAKED_CREDENTIALS)
326
def test_register(self):
327
"""The register method calls ubuntu_sso's method."""
329
self.proxy.register(reply_handler=lambda: d.callback(None),
330
error_handler=d.errback)
333
self.assertEqual(self.sso_server._app_name, APP_NAME)
334
params = {HELP_TEXT_KEY: DESCRIPTION, TC_URL_KEY: TC_URL,
335
PING_URL_KEY: PING_URL, WINDOW_ID_KEY: '0'}
336
self.assertEqual(self.sso_server._args, params)
339
def test_login(self):
340
"""The login method calls ubuntu_sso's method."""
342
self.proxy.login(reply_handler=lambda: d.callback(None),
343
error_handler=d.errback)
346
self.assertEqual(self.sso_server._app_name, APP_NAME)
347
params = {HELP_TEXT_KEY: DESCRIPTION, TC_URL_KEY: TC_URL,
348
PING_URL_KEY: PING_URL, WINDOW_ID_KEY: '0'}
349
self.assertEqual(self.sso_server._args, params)
352
class SameAppNoErrorTestCase(CredentialsManagementTestCase):
353
"""Test case when the app_name matches APP_NAME and there was no error."""
356
def test_find_credentials(self):
357
"""The find_credentials method calls ubuntu_sso's method."""
359
yield self.add_credentials()
361
def verify(credentials):
363
self.assertEqual(credentials, FAKED_CREDENTIALS)
364
self.deferred.callback(None)
366
self.connect_signals(callback=('CredentialsFound', verify))
368
self.proxy.find_credentials(reply_handler=lambda: d.callback(None),
369
error_handler=d.errback)
374
def test_find_credentials_without_credentials(self):
375
"""The find_credentials method calls ubuntu_sso's method."""
380
self.deferred.callback(None)
382
self.connect_signals(callback=('CredentialsNotFound', verify))
384
self.proxy.find_credentials(reply_handler=lambda: d.callback(None),
385
error_handler=d.errback)
390
def test_clear_credentials(self):
391
"""The clear_credentials method calls ubuntu_sso's method."""
393
yield self.add_credentials()
397
self.deferred.callback(None)
399
self.connect_signals(callback=('CredentialsCleared', verify))
401
self.proxy.clear_credentials(reply_handler=lambda: d.callback(None),
402
error_handler=d.errback)
407
def test_clear_credentials_without_credentials(self):
408
"""The clear_credentials method calls ubuntu_sso's method."""
413
self.deferred.callback(None)
415
self.connect_signals(callback=('CredentialsCleared', verify))
417
self.proxy.clear_credentials(reply_handler=lambda: d.callback(None),
418
error_handler=d.errback)
423
def test_store_credentials(self):
424
"""The store_credentials method calls ubuntu_sso's method."""
429
self.deferred.callback(None)
431
self.connect_signals(callback=('CredentialsStored', verify))
433
self.proxy.store_credentials(FAKED_CREDENTIALS,
434
reply_handler=lambda: d.callback(None),
435
error_handler=d.errback)
440
def test_register_with_credentials(self):
441
"""The register method calls ubuntu_sso's method."""
443
yield self.add_credentials()
445
def verify(credentials):
447
self.assertEqual(credentials, FAKED_CREDENTIALS)
448
self.deferred.callback(None)
450
self.connect_signals(callback=('CredentialsFound', verify))
452
self.proxy.register(reply_handler=lambda: d.callback(None),
453
error_handler=d.errback)
458
def test_register_without_credentials(self):
459
"""The register method calls ubuntu_sso's method."""
462
def verify(credentials):
464
self.assertEqual(credentials, FAKED_CREDENTIALS)
465
self.deferred.callback(None)
467
self.connect_signals(callback=('CredentialsFound', verify))
469
self.proxy.register(reply_handler=lambda: d.callback(None),
470
error_handler=d.errback)
475
def test_register_authorization_denied(self):
476
"""The register method calls ubuntu_sso's method."""
478
yield self.add_credentials(creds={})
482
self.deferred.callback(None)
484
self.connect_signals(callback=('AuthorizationDenied', verify))
486
self.proxy.register(reply_handler=lambda: d.callback(None),
487
error_handler=d.errback)
492
def test_login_with_credentials(self):
493
"""The login method calls ubuntu_sso's method."""
495
yield self.add_credentials()
497
def verify(credentials):
499
self.assertEqual(credentials, FAKED_CREDENTIALS)
500
self.deferred.callback(None)
502
self.connect_signals(callback=('CredentialsFound', verify))
504
self.proxy.login(reply_handler=lambda: d.callback(None),
505
error_handler=d.errback)
510
def test_login_without_credentials(self):
511
"""The login method calls ubuntu_sso's method."""
514
def verify(credentials):
516
self.assertEqual(credentials, FAKED_CREDENTIALS)
517
self.deferred.callback(None)
519
self.connect_signals(callback=('CredentialsFound', verify))
521
self.proxy.login(reply_handler=lambda: d.callback(None),
522
error_handler=d.errback)
527
def test_login_authorization_denied(self):
528
"""The login method calls ubuntu_sso's method."""
530
yield self.add_credentials(creds={})
534
self.deferred.callback(None)
536
self.connect_signals(callback=('AuthorizationDenied', verify))
538
self.proxy.login(reply_handler=lambda: d.callback(None),
539
error_handler=d.errback)
544
class SameAppWithErrorTestCase(SameAppNoErrorTestCase):
545
"""Test case when the app_name matches APP_NAME and there was an error."""
547
error_dict = {'error_type': 'Test'}
549
def connect_signals(self, callback=None):
550
"""CredentialsError is the success signals in this suite."""
552
def verify(error_dict):
554
self.assertEqual(error_dict, self.error_dict)
555
self.deferred.callback(error_dict)
557
args = ('CredentialsError', verify)
558
super(SameAppWithErrorTestCase, self).connect_signals(callback=args)
561
class OtherAppNoErrorTestCase(SameAppNoErrorTestCase):
562
"""Test case when the app_name is not APP_NAME and there was no error."""
564
app_name = APP_NAME * 2
566
def connect_signals(self, callback=None):
567
"""No signal should be received in this suite."""
568
# ignore all success connection, self.deferred should always errback
569
super(OtherAppNoErrorTestCase, self).connect_signals(callback=None)
573
"""Perform the test itself."""
574
if not self.deferred.called:
575
msg = 'does not match %r, exiting.' % APP_NAME
576
if self.memento.check_info(self.app_name, msg):
577
self.deferred.callback(None)
579
self.deferred.errback('Log should be present.')
584
class OtherAppWithErrorTestCase(OtherAppNoErrorTestCase):
585
"""Test case when the app_name is not APP_NAME and there was an error."""
587
app_name = APP_NAME * 2
588
error_dict = {'error_type': 'Test'}
591
class RefCountingTestCase(BaseTestCase):
592
"""Tests for the CredentialsManagement ref counting."""
595
super(RefCountingTestCase, self).setUp()
597
self.client = CredentialsManagement()
599
def _set_called(self, *args, **kwargs):
600
"""Keep track of method calls."""
601
self._called = (args, kwargs)
603
def test_ref_counting(self):
604
"""Ref counting is in place."""
605
self.assertEqual(self.client.ref_count, 0)
607
def test_find_credentials(self):
608
"""Keep proper track of on going requests."""
609
self.client.find_credentials()
611
self.assertEqual(self.client.ref_count, 1)
613
def test_clear_credentials(self):
614
"""Keep proper track of on going requests."""
615
self.client.clear_credentials()
617
self.assertEqual(self.client.ref_count, 1)
619
def test_store_credentials(self):
620
"""Keep proper track of on going requests."""
621
self.client.store_credentials(FAKED_CREDENTIALS)
623
self.assertEqual(self.client.ref_count, 1)
625
def test_register(self):
626
"""Keep proper track of on going requests."""
627
self.client.register()
629
self.assertEqual(self.client.ref_count, 1)
631
def test_login(self):
632
"""Keep proper track of on going requests."""
635
self.assertEqual(self.client.ref_count, 1)
637
def test_several_requests(self):
638
"""Requests can be nested."""
640
self.client.clear_credentials()
641
self.client.find_credentials()
642
self.client.register()
643
self.client.store_credentials(FAKED_CREDENTIALS)
645
self.assertEqual(self.client.ref_count, 5)
647
def test_credentials_found(self):
648
"""Ref counter is decreased when a signal is sent."""
649
self.client.ref_count = 3
650
self.client.CredentialsFound(FAKED_CREDENTIALS)
652
self.assertEqual(self.client.ref_count, 2)
654
def test_credentials_not_found(self):
655
"""Ref counter is decreased when a signal is sent."""
656
self.client.ref_count = 3
657
self.client.CredentialsNotFound()
659
self.assertEqual(self.client.ref_count, 2)
661
def test_credentials_cleared(self):
662
"""Ref counter is decreased when a signal is sent."""
663
self.client.ref_count = 3
664
self.client.CredentialsCleared()
666
self.assertEqual(self.client.ref_count, 2)
668
def test_credentials_stored(self):
669
"""Ref counter is decreased when a signal is sent."""
670
self.client.ref_count = 3
671
self.client.CredentialsStored()
673
self.assertEqual(self.client.ref_count, 2)
675
def test_credentials_error(self):
676
"""Ref counter is decreased when a signal is sent."""
677
self.client.ref_count = 3
678
self.client.CredentialsError({'error_type': 'test'})
680
self.assertEqual(self.client.ref_count, 2)
682
def test_authorization_denied(self):
683
"""Ref counter is decreased when a signal is sent."""
684
self.client.ref_count = 3
685
self.client.AuthorizationDenied()
687
self.assertEqual(self.client.ref_count, 2)
689
def test_credentials_found_when_ref_count_is_not_positive(self):
690
"""Ref counter is decreased when a signal is sent."""
691
self.client._ref_count = -3
692
self.client.CredentialsFound(FAKED_CREDENTIALS)
694
self.assertEqual(self.client.ref_count, 0)
695
msg = 'Attempting to decrease ref_count to a negative value (-4).'
696
self.assertTrue(self.memento.check_warning(msg))
698
def test_credentials_not_found_when_ref_count_is_not_positive(self):
699
"""Ref counter is decreased when a signal is sent."""
700
self.client._ref_count = -3
701
self.client.CredentialsNotFound()
703
self.assertEqual(self.client.ref_count, 0)
704
msg = 'Attempting to decrease ref_count to a negative value (-4).'
705
self.assertTrue(self.memento.check_warning(msg))
707
def test_credentials_cleared_when_ref_count_is_not_positive(self):
708
"""Ref counter is decreased when a signal is sent."""
709
self.client._ref_count = -3
710
self.client.CredentialsCleared()
712
self.assertEqual(self.client.ref_count, 0)
713
msg = 'Attempting to decrease ref_count to a negative value (-4).'
714
self.assertTrue(self.memento.check_warning(msg))
716
def test_credentials_stored_when_ref_count_is_not_positive(self):
717
"""Ref counter is decreased when a signal is sent."""
718
self.client._ref_count = -3
719
self.client.CredentialsStored()
721
self.assertEqual(self.client.ref_count, 0)
722
msg = 'Attempting to decrease ref_count to a negative value (-4).'
723
self.assertTrue(self.memento.check_warning(msg))
725
def test_credentials_error_when_ref_count_is_not_positive(self):
726
"""Ref counter is decreased when a signal is sent."""
727
self.client._ref_count = -3
728
self.client.CredentialsError({'error_type': 'test'})
730
self.assertEqual(self.client.ref_count, 0)
731
msg = 'Attempting to decrease ref_count to a negative value (-4).'
732
self.assertTrue(self.memento.check_warning(msg))
734
def test_autorization_denied_when_ref_count_is_not_positive(self):
735
"""Ref counter is decreased when a signal is sent."""
736
self.client._ref_count = -3
737
self.client.AuthorizationDenied()
739
self.assertEqual(self.client.ref_count, 0)
740
msg = 'Attempting to decrease ref_count to a negative value (-4).'
741
self.assertTrue(self.memento.check_warning(msg))
743
def test_on_zero_ref_count_shutdown(self):
744
"""When ref count reaches 0, queue shutdown op."""
745
self.client.timeout_func = self._set_called
746
self.client.find_credentials()
747
self.client.CredentialsFound(FAKED_CREDENTIALS)
749
self.assertEqual(self._called,
750
((TIMEOUT_INTERVAL, self.client.shutdown_func), {}))
752
def test_on_non_zero_ref_count_do_not_shutdown(self):
753
"""If ref count is not 0, do not queue shutdown op."""
754
self.client.timeout_func = self._set_called
755
self.client.find_credentials()
757
self.assertEqual(self._called, False)