~mandel/ubuntu-sso-client/pinned-certs

« back to all changes in this revision

Viewing changes to ubuntuone/oauthdesktop/tests/test_auth.py

  • Committer: natalia.bidart at canonical
  • Date: 2010-06-08 11:54:48 UTC
  • Revision ID: natalia.bidart@canonical.com-20100608115448-aqeiwm4ki1nljaw1
Built correct structure.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# test_auth - Tests for ubuntuone.oauthdesktop.auth module
 
2
#
 
3
# Author: Stuart Langridge <stuart.langridge@canonical.com>
 
4
#
 
5
# Copyright 2009 Canonical Ltd.
 
6
#
 
7
# This program is free software: you can redistribute it and/or modify it
 
8
# under the terms of the GNU General Public License version 3, as published
 
9
# by the Free Software Foundation.
 
10
#
 
11
# This program is distributed in the hope that it will be useful, but
 
12
# WITHOUT ANY WARRANTY; without even the implied warranties of
 
13
# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
 
14
# PURPOSE.  See the GNU General Public License for more details.
 
15
#
 
16
# You should have received a copy of the GNU General Public License along
 
17
# with this program.  If not, see <http://www.gnu.org/licenses/>.
 
18
"""Tests for the OAuth client code for StorageFS."""
 
19
 
 
20
import gnomekeyring
 
21
from mocker import ANY, IN, MockerTestCase
 
22
import testresources
 
23
 
 
24
from oauth import oauth
 
25
from twisted.trial.unittest import TestCase as TwistedTestCase
 
26
from ubuntuone.oauthdesktop.auth import (AuthorisationClient,
 
27
                                                NoAccessToken)
 
28
 
 
29
 
 
30
# Adding TwistedTestCase as parent to be able to skip
 
31
# test_ensure_access_token_no_token
 
32
class AuthorisationClientTests(MockerTestCase, TwistedTestCase):
 
33
    """Test the GNOME keyring integration portions of the auth code."""
 
34
 
 
35
    def setUp(self):
 
36
        """Sets up a mock keyring."""
 
37
        MockerTestCase.setUp(self)
 
38
        self.keyring = self.mocker.mock()
 
39
        self.client = None
 
40
        self.item = self.mocker.mock(gnomekeyring.Found)
 
41
 
 
42
        self.item_id = 999
 
43
 
 
44
        ex = self.expect(self.item.item_id)
 
45
        ex.result(self.item_id)
 
46
        ex.count(0, None)
 
47
 
 
48
        ex = self.expect(self.item.secret)
 
49
        ex.result('oauth_token=access_key&oauth_token_secret=access_secret')
 
50
        ex.count(0, None)
 
51
 
 
52
    def expect_token_query(self):
 
53
        """Expects the keyring to be queried for a token."""
 
54
        return self.expect(
 
55
            self.keyring.find_items_sync(
 
56
                gnomekeyring.ITEM_GENERIC_SECRET,
 
57
                {'ubuntuone-realm': 'realm',
 
58
                 'oauth-consumer-key': 'consumer_key'})
 
59
            )
 
60
 
 
61
    def expect_token_store(self):
 
62
        """Expects the token to be stored in the keyring."""
 
63
        return self.expect(self.keyring.item_create_sync(
 
64
                None, gnomekeyring.ITEM_GENERIC_SECRET,
 
65
                'UbuntuOne token for realm',
 
66
                {'ubuntuone-realm': 'realm',
 
67
                 'oauth-consumer-key': 'consumer_key'},
 
68
                # Either order for the token and secret is valid
 
69
                IN(['oauth_token=access_key&oauth_token_secret=access_secret',
 
70
                    'oauth_token_secret=access_secret&oauth_token=access_key']),
 
71
                True))
 
72
 
 
73
    def expect_token_store_denied(self):
 
74
        """Expects the token to be denied for storing in keyring."""
 
75
        self.expect_token_store().throw(gnomekeyring.DeniedError)
 
76
 
 
77
    def mock_has_token(self):
 
78
        """Mocks a cached token in the keyring."""
 
79
        self.expect_token_query().result([self.item])
 
80
 
 
81
    def mock_no_token(self, exception):
 
82
        """Mocks no token in the keyring."""
 
83
        self.expect_token_query().throw(exception)
 
84
 
 
85
    def replay(self, callback_parent=None, callback_denied=None,
 
86
                     do_login=True):
 
87
        """Starts the replay phase and sets up a client object to be tested,
 
88
        wired up to the mock keyring.
 
89
 
 
90
        """
 
91
        self.mocker.replay()
 
92
        self.client = AuthorisationClient(
 
93
            'realm', 'request_token_url', 'user_authorisation_url',
 
94
            'access_token_url', 'consumer_key', 'consumer_secret',
 
95
            callback_parent, callback_denied, do_login, keyring=self.keyring)
 
96
 
 
97
    def test_get_access_token(self):
 
98
        """The get_access_token method returns the access token"""
 
99
        self.mock_has_token()
 
100
        self.replay()
 
101
        token = self.client.get_access_token()
 
102
        self.assertTrue(isinstance(token, oauth.OAuthToken))
 
103
        self.assertEqual(token.key, 'access_key')
 
104
        self.assertEqual(token.secret, 'access_secret')
 
105
 
 
106
    def test_get_access_token_no_match(self):
 
107
        """The get_access_token method fails if there are no matching items"""
 
108
        self.mock_no_token(gnomekeyring.NoMatchError)
 
109
        self.replay()
 
110
        self.assertRaises(NoAccessToken, self.client.get_access_token)
 
111
 
 
112
    def test_get_access_token_denied(self):
 
113
        """The get_access_token method fails if access is denied"""
 
114
        self.mock_no_token(gnomekeyring.DeniedError)
 
115
        self.replay()
 
116
        self.assertRaises(NoAccessToken, self.client.get_access_token)
 
117
 
 
118
    def test_have_access_token(self):
 
119
        """The `have_access_token` method returns True if a the
 
120
        keyring contains a token."""
 
121
        self.mock_has_token()
 
122
        self.replay()
 
123
        self.assertEqual(self.client.have_access_token(), True)
 
124
 
 
125
    def test_have_access_token_fail(self):
 
126
        """The `have_access_token` method returns False if the keyring
 
127
        does not contain a token."""
 
128
        self.mock_no_token(gnomekeyring.NoMatchError)
 
129
        self.replay()
 
130
        self.assertEqual(self.client.have_access_token(), False)
 
131
 
 
132
    def test_store_token(self):
 
133
        """The store_token method correctly stores an item in the keyring,
 
134
           and correctly sets an ACL on it."""
 
135
        self.expect_token_store().result(self.item_id)
 
136
        saka = self.mocker.replace(
 
137
          "ubuntuone.oauthdesktop.key_acls.set_all_key_acls")
 
138
        saka(item_id=self.item_id)
 
139
        self.mocker.result(None)
 
140
 
 
141
        sleep = self.mocker.replace("time.sleep")
 
142
        sleep(4)
 
143
        self.mocker.result(None)
 
144
 
 
145
        self.replay()
 
146
        self.client.store_token(oauth.OAuthToken('access_key', 'access_secret'))
 
147
 
 
148
    def test_store_token_denied(self):
 
149
        """The store_token method correctly stores an item in the keyring,
 
150
           and correctly sets an ACL on it."""
 
151
        self.expect_token_store_denied()
 
152
 
 
153
        self.replay()
 
154
        self.client.store_token(oauth.OAuthToken('access_key', 'access_secret'))
 
155
 
 
156
    def test_clear_existing_token(self):
 
157
        """Makes sure that clear token clears an existing token."""
 
158
        self.mock_has_token()
 
159
        self.expect(self.keyring.item_delete_sync(None, self.item_id))
 
160
        self.replay()
 
161
        self.client.clear_token()
 
162
 
 
163
    def test_clear_no_existing_token(self):
 
164
        """Makes sure that clear with no existing token still works."""
 
165
        self.mock_no_token(gnomekeyring.NoMatchError)
 
166
        self.replay()
 
167
        self.client.clear_token()
 
168
 
 
169
    def test_ensure_access_token(self):
 
170
        """If the user already has a token, no new token is requested."""
 
171
        self.mock_has_token()
 
172
        callback_function = self.mocker.mock()
 
173
        callback_function(ANY)
 
174
        self.replay(callback_parent=callback_function)
 
175
        self.client.ensure_access_token()
 
176
 
 
177
    def test_ensure_access_token_no_token(self):
 
178
        """If the user has no token, a new one is requested and stored, via
 
179
           an OAuth callback to the internal webserver."""
 
180
 
 
181
        self.mock_no_token(gnomekeyring.NoMatchError)
 
182
 
 
183
        request_token = self.mocker.mock()
 
184
        self.expect(request_token.key).result('access_key').count(0, None)
 
185
        ex = self.expect(request_token.secret)
 
186
        ex.result('access_secret').count(0, None)
 
187
        make_token_request = self.mocker.mock()
 
188
        self.expect(make_token_request(ANY)).result(request_token)
 
189
 
 
190
        open_in_browser = self.mocker.mock()
 
191
        open_in_browser(ANY)
 
192
 
 
193
        ri = self.mocker.replace("random.randint")
 
194
        ri(1000000, 10000000)
 
195
        self.mocker.result(12345678)
 
196
        get_temporary_httpd = self.mocker.mock()
 
197
        get_temporary_httpd(12345678, ANY, True)
 
198
        self.mocker.result("http://callbackurl")
 
199
 
 
200
        self.replay(callback_parent=lambda a: None)
 
201
 
 
202
        self.client.make_token_request = make_token_request
 
203
        self.client.open_in_browser = open_in_browser
 
204
        self.client.get_temporary_httpd = get_temporary_httpd
 
205
        # skip the "are we online, via networkmanager" bit
 
206
        self.client.acquire_access_token_if_online = \
 
207
            self.client.acquire_access_token
 
208
        self.client.ensure_access_token()
 
209
    test_ensure_access_token_no_token.skip = \
 
210
        "Fails with exceptions.AssertionError: [Mocker] Unmet expectations, "\
 
211
        "see bug #488933"
 
212
 
 
213
 
 
214
class AcquireAccessTokenTests(testresources.ResourcedTestCase):
 
215
    """OAuth token acquisition tests."""
 
216
 
 
217
    def setUp(self):
 
218
        super(AcquireAccessTokenTests, self).setUp()
 
219
 
 
220
    def tearDown(self):
 
221
        super(AcquireAccessTokenTests, self).tearDown()
 
222
 
 
223
    def test_acquire_access_token(self):
 
224
        """Test that acquire_access_token() can acquire the access token."""
 
225
 
 
226
        def make_token_request(oauth_request):
 
227
            """Make an OAuth token request via the test browser."""
 
228
            return oauth.OAuthToken.from_string("oauth_token=access_token&" +
 
229
                                                "oauth_token_secret=" +
 
230
                                                "access_secret")
 
231
 
 
232
        def open_in_browser(url):
 
233
            """Just return as we aren't subscribed to the page."""
 
234
            return
 
235
 
 
236
        def got_token(token):
 
237
            """Called with the token once auth is completed"""
 
238
            self.assertTrue(isinstance(token, oauth.OAuthToken))
 
239
 
 
240
        def get_temporary_httpd(nonce, retrieve_function, store):
 
241
            """Mock the temporary httpd and return a callback URL"""
 
242
            # returns callback URL; this is an invalid URL, of course,
 
243
            # (port is too high) but we check later that the mechanize
 
244
            # browser tries to navigate there
 
245
            return "http://localhost:99999/?nonce=99999"
 
246
 
 
247
        def store_token(token):
 
248
            """Don't use the keyring; do nothing"""
 
249
            pass
 
250
 
 
251
        client = AuthorisationClient(
 
252
            'http://ubuntuone/',
 
253
            'http://ubuntuone/oauth/request/',
 
254
            'http://ubuntuone/oauth/authorize/',
 
255
            'http://ubuntuone/oauth/access/',
 
256
            'consumer_key', 'consumer_secret',
 
257
            callback_parent=got_token)
 
258
        client.make_token_request = make_token_request
 
259
        client.open_in_browser = open_in_browser
 
260
        client.get_temporary_httpd = get_temporary_httpd
 
261
        client.store_token = store_token
 
262
 
 
263
        client.acquire_access_token('token description')