1
# test_auth - Tests for canonical.ubuntuone.oauthdesktop.auth module
3
# Author: Stuart Langridge <stuart.langridge@canonical.com>
5
# Copyright 2009 Canonical Ltd.
7
# This program is free software: you can redistribute it and/or modify it
8
# under the terms of the GNU General Public License version 3, as published
9
# by the Free Software Foundation.
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranties of
13
# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
14
# PURPOSE. See the GNU General Public License for more details.
16
# You should have received a copy of the GNU General Public License along
17
# with this program. If not, see <http://www.gnu.org/licenses/>.
18
"""Tests for the OAuth client code for StorageFS."""
21
from contrib.mocker import ANY, IN, MockerTestCase
24
from canonical.ubuntuone.storage.protocol import oauth
25
from canonical.ubuntuone.oauthdesktop.auth import (AuthorisationClient,
29
class AuthorisationClientTests(MockerTestCase):
30
"""Test the GNOME keyring integration portions of the auth code."""
33
"""Sets up a mock keyring."""
34
MockerTestCase.setUp(self)
35
self.keyring = self.mocker.mock()
37
self.item = self.mocker.mock(gnomekeyring.Found)
41
ex = self.expect(self.item.item_id)
42
ex.result(self.item_id)
45
ex = self.expect(self.item.secret)
46
ex.result('oauth_token=access_key&oauth_token_secret=access_secret')
49
def expect_token_query(self):
50
"""Expects the keyring to be queried for a token."""
52
self.keyring.find_items_sync(
53
gnomekeyring.ITEM_GENERIC_SECRET,
54
{'ubuntuone-realm': 'realm',
55
'oauth-consumer-key': 'consumer_key'})
58
def expect_token_store(self):
59
"""Expects the token to be stored in the keyring."""
60
self.expect(self.keyring.item_create_sync(
61
None, gnomekeyring.ITEM_GENERIC_SECRET,
62
'UbuntuOne token for realm',
63
{'ubuntuone-realm': 'realm', 'oauth-consumer-key': 'consumer_key'},
64
# Either order for the token and secret is valid in the item secret
65
IN(['oauth_token=access_key&oauth_token_secret=access_secret',
66
'oauth_token_secret=access_secret&oauth_token=access_key']),
67
True)).result(self.item_id)
69
def mock_has_token(self):
70
"""Mocks a cached token in the keyring."""
71
self.expect_token_query().result([self.item])
73
def mock_no_token(self, exception):
74
"""Mocks no token in the keyring."""
75
self.expect_token_query().throw(exception)
77
def replay(self, callback_parent=None, callback_denied=None,
79
"""Starts the replay phase and sets up a client object to be tested,
80
wired up to the mock keyring.
84
self.client = AuthorisationClient(
85
'realm', 'request_token_url', 'user_authorisation_url',
86
'access_token_url', 'consumer_key', 'consumer_secret',
87
callback_parent, callback_denied, do_login, keyring=self.keyring)
89
def test_get_access_token(self):
90
"""The get_access_token method returns the access token"""
93
token = self.client.get_access_token()
94
self.assertTrue(isinstance(token, oauth.OAuthToken))
95
self.assertEqual(token.key, 'access_key')
96
self.assertEqual(token.secret, 'access_secret')
98
def test_get_access_token_no_match(self):
99
"""The get_access_token method fails if there are no matching items"""
100
self.mock_no_token(gnomekeyring.NoMatchError)
102
self.assertRaises(NoAccessToken, self.client.get_access_token)
104
def test_get_access_token_denied(self):
105
"""The get_access_token method fails if access is denied"""
106
self.mock_no_token(gnomekeyring.DeniedError)
108
self.assertRaises(NoAccessToken, self.client.get_access_token)
110
def test_have_access_token(self):
111
"""The `have_access_token` method returns True if a the
112
keyring contains a token."""
113
self.mock_has_token()
115
self.assertEqual(self.client.have_access_token(), True)
117
def test_have_access_token_fail(self):
118
"""The `have_access_token` method returns False if the keyring
119
does not contain a token."""
120
self.mock_no_token(gnomekeyring.NoMatchError)
122
self.assertEqual(self.client.have_access_token(), False)
124
def test_store_token(self):
125
"""The store_token method correctly stores an item in the keyring,
126
and correctly sets an ACL on it."""
127
self.expect_token_store()
128
saka = self.mocker.replace(
129
"canonical.ubuntuone.oauthdesktop.key_acls.set_all_key_acls")
130
saka(item_id=self.item_id)
131
self.mocker.result(None)
133
sleep = self.mocker.replace("time.sleep")
135
self.mocker.result(None)
138
self.client.store_token(oauth.OAuthToken('access_key', 'access_secret'))
140
def test_clear_existing_token(self):
141
"""Makes sure that clear token clears an existing token."""
142
self.mock_has_token()
143
self.expect(self.keyring.item_delete_sync(None, self.item_id))
145
self.client.clear_token()
147
def test_clear_no_existing_token(self):
148
"""Makes sure that clear with no existing token still works."""
149
self.mock_no_token(gnomekeyring.NoMatchError)
151
self.client.clear_token()
153
def test_ensure_access_token(self):
154
"""If the user already has a token, no new token is requested."""
155
self.mock_has_token()
156
callback_function = self.mocker.mock()
157
callback_function(ANY)
158
self.replay(callback_parent=callback_function)
159
self.client.ensure_access_token()
161
def test_ensure_access_token_no_token(self):
162
"""If the user has no token, a new one is requested and stored, via
163
an OAuth callback to the internal webserver."""
165
self.mock_no_token(gnomekeyring.NoMatchError)
167
request_token = self.mocker.mock()
168
self.expect(request_token.key).result('access_key').count(0, None)
169
ex = self.expect(request_token.secret)
170
ex.result('access_secret').count(0, None)
171
make_token_request = self.mocker.mock()
172
self.expect(make_token_request(ANY)).result(request_token)
174
open_in_browser = self.mocker.mock()
177
ri = self.mocker.replace("random.randint")
178
ri(1000000, 10000000)
179
self.mocker.result(12345678)
180
get_temporary_httpd = self.mocker.mock()
181
get_temporary_httpd(12345678, ANY, True)
182
self.mocker.result("http://callbackurl")
184
self.replay(callback_parent=lambda a: None)
186
self.client.make_token_request = make_token_request
187
self.client.open_in_browser = open_in_browser
188
self.client.get_temporary_httpd = get_temporary_httpd
189
# skip the "are we online, via networkmanager" bit
190
self.client.acquire_access_token_if_online = \
191
self.client.acquire_access_token
192
self.client.ensure_access_token()
195
class AcquireAccessTokenTests(testresources.ResourcedTestCase):
196
"""OAuth token acquisition tests."""
199
super(AcquireAccessTokenTests, self).setUp()
202
super(AcquireAccessTokenTests, self).tearDown()
204
def test_acquire_access_token(self):
205
"""Test that acquire_access_token() can acquire the access token."""
207
def make_token_request(oauth_request):
208
"""Make an OAuth token request via the test browser."""
209
return oauth.OAuthToken.from_string("oauth_token=access_token&" +
210
"oauth_token_secret=" +
213
def open_in_browser(url):
214
"""Just return as we aren't subscribed to the page."""
217
def got_token(token):
218
"""Called with the token once auth is completed"""
219
self.assertTrue(isinstance(token, oauth.OAuthToken))
221
def get_temporary_httpd(nonce, retrieve_function, store):
222
"""Mock the temporary httpd and return a callback URL"""
223
# returns callback URL; this is an invalid URL, of course,
224
# (port is too high) but we check later that the mechanize
225
# browser tries to navigate there
226
return "http://127.0.0.1:99999/?nonce=99999"
228
def store_token(token):
229
"""Don't use the keyring; do nothing"""
232
client = AuthorisationClient(
234
'http://ubuntuone/oauth/request/',
235
'http://ubuntuone/oauth/authorize/',
236
'http://ubuntuone/oauth/access/',
237
'consumer_key', 'consumer_secret',
238
callback_parent=got_token)
239
client.make_token_request = make_token_request
240
client.open_in_browser = open_in_browser
241
client.get_temporary_httpd = get_temporary_httpd
242
client.store_token = store_token
244
client.acquire_access_token('token description')