~didrocks/ubuntuone-client/dont-suffer-zg-crash

« back to all changes in this revision

Viewing changes to tests/oauthdesktop/test_auth.py

  • Committer: Bazaar Package Importer
  • Author(s): Rodrigo Moya
  • Date: 2010-06-23 23:08:15 UTC
  • mto: This revision was merged to the branch mainline in revision 34.
  • Revision ID: james.westby@ubuntu.com-20100623230815-4m3ugh10u9x9xzw5
Tags: upstream-1.3.2
ImportĀ upstreamĀ versionĀ 1.3.2

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# test_auth - Tests for ubuntuone.oauthdesktop.auth module
2
 
#
3
 
# Author: Stuart Langridge <stuart.langridge@canonical.com>
4
 
#
5
 
# Copyright 2009 Canonical Ltd.
6
 
#
7
 
# This program is free software: you can redistribute it and/or modify it
8
 
# under the terms of the GNU General Public License version 3, as published
9
 
# by the Free Software Foundation.
10
 
#
11
 
# This program is distributed in the hope that it will be useful, but
12
 
# WITHOUT ANY WARRANTY; without even the implied warranties of
13
 
# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
14
 
# PURPOSE.  See the GNU General Public License for more details.
15
 
#
16
 
# You should have received a copy of the GNU General Public License along
17
 
# with this program.  If not, see <http://www.gnu.org/licenses/>.
18
 
"""Tests for the OAuth client code for StorageFS."""
19
 
 
20
 
import gnomekeyring
21
 
from contrib.mocker import ANY, IN, MockerTestCase
22
 
import testresources
23
 
 
24
 
from oauth import oauth
25
 
from twisted.trial.unittest import TestCase as TwistedTestCase
26
 
from ubuntuone.oauthdesktop.auth import (AuthorisationClient,
27
 
                                                NoAccessToken)
28
 
 
29
 
 
30
 
# Adding TwistedTestCase as parent to be able to skip
31
 
# test_ensure_access_token_no_token
32
 
class AuthorisationClientTests(MockerTestCase, TwistedTestCase):
33
 
    """Test the GNOME keyring integration portions of the auth code."""
34
 
 
35
 
    def setUp(self):
36
 
        """Sets up a mock keyring."""
37
 
        MockerTestCase.setUp(self)
38
 
        self.keyring = self.mocker.mock()
39
 
        self.client = None
40
 
        self.item = self.mocker.mock(gnomekeyring.Found)
41
 
 
42
 
        self.item_id = 999
43
 
 
44
 
        ex = self.expect(self.item.item_id)
45
 
        ex.result(self.item_id)
46
 
        ex.count(0, None)
47
 
 
48
 
        ex = self.expect(self.item.secret)
49
 
        ex.result('oauth_token=access_key&oauth_token_secret=access_secret')
50
 
        ex.count(0, None)
51
 
 
52
 
    def expect_token_query(self):
53
 
        """Expects the keyring to be queried for a token."""
54
 
        return self.expect(
55
 
            self.keyring.find_items_sync(
56
 
                gnomekeyring.ITEM_GENERIC_SECRET,
57
 
                {'ubuntuone-realm': 'realm',
58
 
                 'oauth-consumer-key': 'consumer_key'})
59
 
            )
60
 
 
61
 
    def expect_token_store(self):
62
 
        """Expects the token to be stored in the keyring."""
63
 
        return self.expect(self.keyring.item_create_sync(
64
 
                None, gnomekeyring.ITEM_GENERIC_SECRET,
65
 
                'UbuntuOne token for realm',
66
 
                {'ubuntuone-realm': 'realm',
67
 
                 'oauth-consumer-key': 'consumer_key'},
68
 
                # Either order for the token and secret is valid
69
 
                IN(['oauth_token=access_key&oauth_token_secret=access_secret',
70
 
                    'oauth_token_secret=access_secret&oauth_token=access_key']),
71
 
                True))
72
 
 
73
 
    def expect_token_store_denied(self):
74
 
        """Expects the token to be denied for storing in keyring."""
75
 
        self.expect_token_store().throw(gnomekeyring.DeniedError)
76
 
 
77
 
    def mock_has_token(self):
78
 
        """Mocks a cached token in the keyring."""
79
 
        self.expect_token_query().result([self.item])
80
 
 
81
 
    def mock_no_token(self, exception):
82
 
        """Mocks no token in the keyring."""
83
 
        self.expect_token_query().throw(exception)
84
 
 
85
 
    def replay(self, callback_parent=None, callback_denied=None,
86
 
                     do_login=True):
87
 
        """Starts the replay phase and sets up a client object to be tested,
88
 
        wired up to the mock keyring.
89
 
 
90
 
        """
91
 
        self.mocker.replay()
92
 
        self.client = AuthorisationClient(
93
 
            'realm', 'request_token_url', 'user_authorisation_url',
94
 
            'access_token_url', 'consumer_key', 'consumer_secret',
95
 
            callback_parent, callback_denied, do_login, keyring=self.keyring)
96
 
 
97
 
    def test_get_access_token(self):
98
 
        """The get_access_token method returns the access token"""
99
 
        self.mock_has_token()
100
 
        self.replay()
101
 
        token = self.client.get_access_token()
102
 
        self.assertTrue(isinstance(token, oauth.OAuthToken))
103
 
        self.assertEqual(token.key, 'access_key')
104
 
        self.assertEqual(token.secret, 'access_secret')
105
 
 
106
 
    def test_get_access_token_no_match(self):
107
 
        """The get_access_token method fails if there are no matching items"""
108
 
        self.mock_no_token(gnomekeyring.NoMatchError)
109
 
        self.replay()
110
 
        self.assertRaises(NoAccessToken, self.client.get_access_token)
111
 
 
112
 
    def test_get_access_token_denied(self):
113
 
        """The get_access_token method fails if access is denied"""
114
 
        self.mock_no_token(gnomekeyring.DeniedError)
115
 
        self.replay()
116
 
        self.assertRaises(NoAccessToken, self.client.get_access_token)
117
 
 
118
 
    def test_have_access_token(self):
119
 
        """The `have_access_token` method returns True if a the
120
 
        keyring contains a token."""
121
 
        self.mock_has_token()
122
 
        self.replay()
123
 
        self.assertEqual(self.client.have_access_token(), True)
124
 
 
125
 
    def test_have_access_token_fail(self):
126
 
        """The `have_access_token` method returns False if the keyring
127
 
        does not contain a token."""
128
 
        self.mock_no_token(gnomekeyring.NoMatchError)
129
 
        self.replay()
130
 
        self.assertEqual(self.client.have_access_token(), False)
131
 
 
132
 
    def test_store_token(self):
133
 
        """The store_token method correctly stores an item in the keyring,
134
 
           and correctly sets an ACL on it."""
135
 
        self.expect_token_store().result(self.item_id)
136
 
        saka = self.mocker.replace(
137
 
          "ubuntuone.oauthdesktop.key_acls.set_all_key_acls")
138
 
        saka(item_id=self.item_id)
139
 
        self.mocker.result(None)
140
 
 
141
 
        sleep = self.mocker.replace("time.sleep")
142
 
        sleep(4)
143
 
        self.mocker.result(None)
144
 
 
145
 
        self.replay()
146
 
        self.client.store_token(oauth.OAuthToken('access_key', 'access_secret'))
147
 
 
148
 
    def test_store_token_denied(self):
149
 
        """The store_token method correctly stores an item in the keyring,
150
 
           and correctly sets an ACL on it."""
151
 
        self.expect_token_store_denied()
152
 
 
153
 
        self.replay()
154
 
        self.client.store_token(oauth.OAuthToken('access_key', 'access_secret'))
155
 
 
156
 
    def test_clear_existing_token(self):
157
 
        """Makes sure that clear token clears an existing token."""
158
 
        self.mock_has_token()
159
 
        self.expect(self.keyring.item_delete_sync(None, self.item_id))
160
 
        self.replay()
161
 
        self.client.clear_token()
162
 
 
163
 
    def test_clear_no_existing_token(self):
164
 
        """Makes sure that clear with no existing token still works."""
165
 
        self.mock_no_token(gnomekeyring.NoMatchError)
166
 
        self.replay()
167
 
        self.client.clear_token()
168
 
 
169
 
    def test_ensure_access_token(self):
170
 
        """If the user already has a token, no new token is requested."""
171
 
        self.mock_has_token()
172
 
        callback_function = self.mocker.mock()
173
 
        callback_function(ANY)
174
 
        self.replay(callback_parent=callback_function)
175
 
        self.client.ensure_access_token()
176
 
 
177
 
    def test_ensure_access_token_no_token(self):
178
 
        """If the user has no token, a new one is requested and stored, via
179
 
           an OAuth callback to the internal webserver."""
180
 
 
181
 
        self.mock_no_token(gnomekeyring.NoMatchError)
182
 
 
183
 
        request_token = self.mocker.mock()
184
 
        self.expect(request_token.key).result('access_key').count(0, None)
185
 
        ex = self.expect(request_token.secret)
186
 
        ex.result('access_secret').count(0, None)
187
 
        make_token_request = self.mocker.mock()
188
 
        self.expect(make_token_request(ANY)).result(request_token)
189
 
 
190
 
        open_in_browser = self.mocker.mock()
191
 
        open_in_browser(ANY)
192
 
 
193
 
        ri = self.mocker.replace("random.randint")
194
 
        ri(1000000, 10000000)
195
 
        self.mocker.result(12345678)
196
 
        get_temporary_httpd = self.mocker.mock()
197
 
        get_temporary_httpd(12345678, ANY, True)
198
 
        self.mocker.result("http://callbackurl")
199
 
 
200
 
        self.replay(callback_parent=lambda a: None)
201
 
 
202
 
        self.client.make_token_request = make_token_request
203
 
        self.client.open_in_browser = open_in_browser
204
 
        self.client.get_temporary_httpd = get_temporary_httpd
205
 
        # skip the "are we online, via networkmanager" bit
206
 
        self.client.acquire_access_token_if_online = \
207
 
            self.client.acquire_access_token
208
 
        self.client.ensure_access_token()
209
 
    test_ensure_access_token_no_token.skip = \
210
 
        "Fails with exceptions.AssertionError: [Mocker] Unmet expectations, "\
211
 
        "see bug #488933"
212
 
 
213
 
 
214
 
class AcquireAccessTokenTests(testresources.ResourcedTestCase):
215
 
    """OAuth token acquisition tests."""
216
 
 
217
 
    def setUp(self):
218
 
        super(AcquireAccessTokenTests, self).setUp()
219
 
 
220
 
    def tearDown(self):
221
 
        super(AcquireAccessTokenTests, self).tearDown()
222
 
 
223
 
    def test_acquire_access_token(self):
224
 
        """Test that acquire_access_token() can acquire the access token."""
225
 
 
226
 
        def make_token_request(oauth_request):
227
 
            """Make an OAuth token request via the test browser."""
228
 
            return oauth.OAuthToken.from_string("oauth_token=access_token&" +
229
 
                                                "oauth_token_secret=" +
230
 
                                                "access_secret")
231
 
 
232
 
        def open_in_browser(url):
233
 
            """Just return as we aren't subscribed to the page."""
234
 
            return
235
 
 
236
 
        def got_token(token):
237
 
            """Called with the token once auth is completed"""
238
 
            self.assertTrue(isinstance(token, oauth.OAuthToken))
239
 
 
240
 
        def get_temporary_httpd(nonce, retrieve_function, store):
241
 
            """Mock the temporary httpd and return a callback URL"""
242
 
            # returns callback URL; this is an invalid URL, of course,
243
 
            # (port is too high) but we check later that the mechanize
244
 
            # browser tries to navigate there
245
 
            return "http://localhost:99999/?nonce=99999"
246
 
 
247
 
        def store_token(token):
248
 
            """Don't use the keyring; do nothing"""
249
 
            pass
250
 
 
251
 
        client = AuthorisationClient(
252
 
            'http://ubuntuone/',
253
 
            'http://ubuntuone/oauth/request/',
254
 
            'http://ubuntuone/oauth/authorize/',
255
 
            'http://ubuntuone/oauth/access/',
256
 
            'consumer_key', 'consumer_secret',
257
 
            callback_parent=got_token)
258
 
        client.make_token_request = make_token_request
259
 
        client.open_in_browser = open_in_browser
260
 
        client.get_temporary_httpd = get_temporary_httpd
261
 
        client.store_token = store_token
262
 
 
263
 
        client.acquire_access_token('token description')