~facundo/magicicada-client/changes-for-xenial

« back to all changes in this revision

Viewing changes to ubuntuone/utils/webclient/tests/test_webclient.py

  • Committer: Magicicada Bot
  • Author(s): Natalia
  • Date: 2016-05-30 15:43:30 UTC
  • mfrom: (1418.1.21 no-sso-client)
  • Revision ID: magicicada_bot-20160530154330-b4his4s3wlucu7zv
[r=facundo] - Decouple client code from ubuntu-sso-client code. Copied and made an initial cleanup on the networkstate, utils and keyring modules.
- Removed completely dependencies with oauthlibs.
- Moved tests/ folder to inside ubuntuone/ proper folders.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# -*- coding: utf-8 -*-
 
2
#
 
3
# Copyright 2011-2013 Canonical Ltd.
 
4
# Copyright 2015-2016 Chicharreros (https://launchpad.net/~chicharreros)
 
5
#
 
6
# This program is free software: you can redistribute it and/or modify it
 
7
# under the terms of the GNU General Public License version 3, as published
 
8
# by the Free Software Foundation.
 
9
#
 
10
# This program is distributed in the hope that it will be useful, but
 
11
# WITHOUT AN WARRANTY; without even the implied warranties of
 
12
# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
 
13
# PURPOSE.  See the GNU General Public License for more details.
 
14
#
 
15
# You should have received a copy of the GNU General Public License along
 
16
# with this program.  If not, see <http://www.gnu.org/licenses/>.
 
17
#
 
18
# In addition, as a special exception, the copyright holders give
 
19
# permission to link the code of portions of this program with the
 
20
# OpenSSL library under certain conditions as described in each
 
21
# individual source file, and distribute linked combinations
 
22
# including the two.
 
23
# You must obey the GNU General Public License in all respects
 
24
# for all of the code used other than OpenSSL.  If you modify
 
25
# file(s) with this exception, you may extend this exception to your
 
26
# version of the file(s), but you are not obligated to do so.  If you
 
27
# do not wish to do so, delete this exception statement from your
 
28
# version.  If you delete this exception statement from all source
 
29
# files in the program, then also delete it here.
 
30
 
 
31
"""Integration tests for the proxy-enabled webclient."""
 
32
 
 
33
import logging
 
34
import os
 
35
import shutil
 
36
import sys
 
37
 
 
38
try:
 
39
    from urllib.parse import (urlencode, unquote, urlparse,
 
40
                              urljoin, parse_qsl)
 
41
except ImportError:
 
42
    from urllib import urlencode, unquote
 
43
    from urlparse import urlparse, urljoin, parse_qsl
 
44
 
 
45
from OpenSSL import crypto
 
46
from socket import gethostname
 
47
from twisted.cred import checkers, portal
 
48
from twisted.internet import defer
 
49
from twisted.web import guard, http, resource
 
50
from ubuntuone.devtools.handlers import MementoHandler
 
51
from ubuntuone.devtools.testcases import TestCase, skipIfOS
 
52
from ubuntuone.devtools.testcases.squid import SquidTestCase
 
53
from ubuntuone.devtools.testing.txwebserver import (
 
54
    HTTPWebServer,
 
55
    HTTPSWebServer,
 
56
)
 
57
 
 
58
 
 
59
from ubuntuone import keyring
 
60
from ubuntuone.utils import webclient
 
61
from ubuntuone.utils.webclient import gsettings, txweb
 
62
from ubuntuone.utils.webclient.common import (
 
63
    BaseWebClient,
 
64
    HeaderDict,
 
65
    UnauthorizedError,
 
66
    WebClientError,
 
67
)
 
68
 
 
69
ANY_VALUE = object()
 
70
SAMPLE_KEY = "result"
 
71
SAMPLE_VALUE = "sample result"
 
72
SAMPLE_RESOURCE = '{"%s": "%s"}' % (SAMPLE_KEY, SAMPLE_VALUE)
 
73
SAMPLE_USERNAME = "peddro"
 
74
SAMPLE_PASSWORD = "cantropus"
 
75
SAMPLE_CREDENTIALS = dict(username="username", password="password")
 
76
SAMPLE_HEADERS = {SAMPLE_KEY: SAMPLE_VALUE}
 
77
SAMPLE_POST_PARAMS = {"param1": "value1", "param2": "value2"}
 
78
SAMPLE_JPEG_HEADER = '\xff\xd8\xff\xe0\x00\x10JFIF'
 
79
 
 
80
SIMPLERESOURCE = "simpleresource"
 
81
BYTEZERORESOURCE = "bytezeroresource"
 
82
POSTABLERESOURCE = "postableresource"
 
83
THROWERROR = "throwerror"
 
84
UNAUTHORIZED = "unauthorized"
 
85
HEADONLY = "headonly"
 
86
VERIFYHEADERS = "verifyheaders"
 
87
VERIFYPOSTPARAMS = "verifypostparams"
 
88
GUARDED = "guarded"
 
89
AUTHRESOURCE = "authresource"
 
90
 
 
91
WEBCLIENT_MODULE_NAME = webclient.webclient_module().__name__
 
92
 
 
93
 
 
94
def sample_get_credentials():
 
95
    """Will return the sample credentials right now."""
 
96
    return defer.succeed(SAMPLE_CREDENTIALS)
 
97
 
 
98
 
 
99
class SimpleResource(resource.Resource):
 
100
    """A simple web resource."""
 
101
 
 
102
    def render_GET(self, request):
 
103
        """Make a bit of html out of these resource's content."""
 
104
        return SAMPLE_RESOURCE
 
105
 
 
106
 
 
107
class ByteZeroResource(resource.Resource):
 
108
    """A resource that has a nul byte in the middle of it."""
 
109
 
 
110
    def render_GET(self, request):
 
111
        """Return the content of this resource."""
 
112
        return SAMPLE_JPEG_HEADER
 
113
 
 
114
 
 
115
class PostableResource(resource.Resource):
 
116
    """A resource that only answers to POST requests."""
 
117
 
 
118
    def render_POST(self, request):
 
119
        """Make a bit of html out of these resource's content."""
 
120
        return SAMPLE_RESOURCE
 
121
 
 
122
 
 
123
class HeadOnlyResource(resource.Resource):
 
124
    """A resource that fails if called with a method other than HEAD."""
 
125
 
 
126
    def render_HEAD(self, request):
 
127
        """Return a bit of html."""
 
128
        return "OK"
 
129
 
 
130
 
 
131
class VerifyHeadersResource(resource.Resource):
 
132
    """A resource that verifies the headers received."""
 
133
 
 
134
    def render_GET(self, request):
 
135
        """Make a bit of html out of these resource's content."""
 
136
        headers = request.requestHeaders.getRawHeaders(SAMPLE_KEY)
 
137
        if headers != [SAMPLE_VALUE]:
 
138
            request.setResponseCode(http.BAD_REQUEST)
 
139
            return "ERROR: Expected header not present."
 
140
        request.setHeader(SAMPLE_KEY, SAMPLE_VALUE)
 
141
        return SAMPLE_RESOURCE
 
142
 
 
143
 
 
144
class VerifyPostParameters(resource.Resource):
 
145
    """A resource that answers to POST requests with some parameters."""
 
146
 
 
147
    def fetch_post_args_only(self, request):
 
148
        """Fetch only the POST arguments, not the args in the url."""
 
149
        request.process = lambda: None
 
150
        request.requestReceived(request.method, request.path,
 
151
                                request.clientproto)
 
152
        return request.args
 
153
 
 
154
    def render_POST(self, request):
 
155
        """Verify the parameters that we've been called with."""
 
156
        post_params = self.fetch_post_args_only(request)
 
157
        expected = dict((key, [val]) for key, val
 
158
                                      in SAMPLE_POST_PARAMS.items())
 
159
        if post_params != expected:
 
160
            request.setResponseCode(http.BAD_REQUEST)
 
161
            return "ERROR: Expected arguments not present, %r != %r" % (
 
162
                    post_params, expected)
 
163
        return SAMPLE_RESOURCE
 
164
 
 
165
 
 
166
class SimpleRealm(object):
 
167
    """The same simple resource for all users."""
 
168
 
 
169
    def requestAvatar(self, avatarId, mind, *interfaces):
 
170
        """The avatar for this user."""
 
171
        if resource.IResource in interfaces:
 
172
            return (resource.IResource, SimpleResource(), lambda: None)
 
173
        raise NotImplementedError()
 
174
 
 
175
 
 
176
class AuthCheckerResource(resource.Resource):
 
177
    """A resource that verifies the request was auth signed."""
 
178
 
 
179
    def render_GET(self, request):
 
180
        """Make a bit of html out of these resource's content."""
 
181
        header = request.requestHeaders.getRawHeaders("Authorization")[0]
 
182
        if header.startswith("Auth "):
 
183
            return SAMPLE_RESOURCE
 
184
        request.setResponseCode(http.BAD_REQUEST)
 
185
        return "ERROR: Expected Auth header not present."
 
186
 
 
187
 
 
188
def get_root_resource():
 
189
    """Get the root resource with all the children."""
 
190
    root = resource.Resource()
 
191
    root.putChild(SIMPLERESOURCE, SimpleResource())
 
192
    root.putChild(BYTEZERORESOURCE, ByteZeroResource())
 
193
    root.putChild(POSTABLERESOURCE, PostableResource())
 
194
 
 
195
    root.putChild(THROWERROR, resource.NoResource())
 
196
 
 
197
    unauthorized_resource = resource.ErrorPage(http.UNAUTHORIZED,
 
198
                                               "Unauthorized", "Unauthorized")
 
199
    root.putChild(UNAUTHORIZED, unauthorized_resource)
 
200
    root.putChild(HEADONLY, HeadOnlyResource())
 
201
    root.putChild(VERIFYHEADERS, VerifyHeadersResource())
 
202
    root.putChild(VERIFYPOSTPARAMS, VerifyPostParameters())
 
203
    root.putChild(AUTHRESOURCE, AuthCheckerResource())
 
204
 
 
205
    db = checkers.InMemoryUsernamePasswordDatabaseDontUse()
 
206
    db.addUser(SAMPLE_USERNAME, SAMPLE_PASSWORD)
 
207
    test_portal = portal.Portal(SimpleRealm(), [db])
 
208
    cred_factory = guard.BasicCredentialFactory("example.org")
 
209
    guarded_resource = guard.HTTPAuthSessionWrapper(test_portal,
 
210
                                                        [cred_factory])
 
211
    root.putChild(GUARDED, guarded_resource)
 
212
    return root
 
213
 
 
214
 
 
215
class HTTPMockWebServer(HTTPWebServer):
 
216
    """A mock webserver for the webclient tests."""
 
217
 
 
218
    def __init__(self):
 
219
        """Create a new instance."""
 
220
        root = get_root_resource()
 
221
        super(HTTPMockWebServer, self).__init__(root)
 
222
 
 
223
 
 
224
class HTTPSMockWebServer(HTTPSWebServer):
 
225
    """A mock webserver for the webclient tests."""
 
226
 
 
227
    def __init__(self, ssl_settings):
 
228
        """Create a new instance."""
 
229
        root = get_root_resource()
 
230
        super(HTTPSMockWebServer, self).__init__(root, ssl_settings)
 
231
 
 
232
 
 
233
class ModuleSelectionTestCase(TestCase):
 
234
    """Test the functions to choose the txweb or libsoup backend."""
 
235
 
 
236
    def assert_module_name(self, module, expected_name):
 
237
        """Check the name of a given module."""
 
238
        module_filename = os.path.basename(module.__file__)
 
239
        module_name = os.path.splitext(module_filename)[0]
 
240
        self.assertEqual(module_name, expected_name)
 
241
 
 
242
    def test_webclient_module_libsoup(self):
 
243
        """Test the module name for the libsoup case."""
 
244
        module = webclient.webclient_module()
 
245
        self.assert_module_name(module, "libsoup")
 
246
 
 
247
 
 
248
class WebClientTestCase(TestCase):
 
249
    """Test for the webclient."""
 
250
 
 
251
    timeout = 1
 
252
    webclient_factory = webclient.webclient_factory
 
253
 
 
254
    @defer.inlineCallbacks
 
255
    def setUp(self):
 
256
        yield super(WebClientTestCase, self).setUp()
 
257
        self.wc = self.webclient_factory()
 
258
        self.addCleanup(self.wc.shutdown)
 
259
        self.ws = HTTPMockWebServer()
 
260
        self.ws.start()
 
261
        self.addCleanup(self.ws.stop)
 
262
        self.base_iri = self.ws.get_iri()
 
263
 
 
264
    @defer.inlineCallbacks
 
265
    def test_request_takes_an_iri(self):
 
266
        """Passing a non-unicode iri fails."""
 
267
        d = self.wc.request(bytes(self.base_iri + SIMPLERESOURCE))
 
268
        yield self.assertFailure(d, TypeError)
 
269
 
 
270
    @defer.inlineCallbacks
 
271
    def test_get_iri(self):
 
272
        """Passing in a unicode iri works fine."""
 
273
        result = yield self.wc.request(self.base_iri + SIMPLERESOURCE)
 
274
        self.assertEqual(SAMPLE_RESOURCE, result.content)
 
275
 
 
276
    @defer.inlineCallbacks
 
277
    def test_get_iri_error(self):
 
278
        """The errback is called when there's some error."""
 
279
        yield self.assertFailure(self.wc.request(self.base_iri + THROWERROR),
 
280
                                 WebClientError)
 
281
 
 
282
    @defer.inlineCallbacks
 
283
    def test_zero_byte_in_content(self):
 
284
        """Test a reply with a nul byte in the middle of it."""
 
285
        result = yield self.wc.request(self.base_iri + BYTEZERORESOURCE)
 
286
        self.assertEqual(SAMPLE_JPEG_HEADER, result.content)
 
287
 
 
288
    @defer.inlineCallbacks
 
289
    def test_post(self):
 
290
        """Test a post request."""
 
291
        result = yield self.wc.request(self.base_iri + POSTABLERESOURCE,
 
292
                                       method="POST")
 
293
        self.assertEqual(SAMPLE_RESOURCE, result.content)
 
294
 
 
295
    @defer.inlineCallbacks
 
296
    def test_post_with_args(self):
 
297
        """Test a post request with arguments."""
 
298
        args = urlencode(SAMPLE_POST_PARAMS)
 
299
        iri = self.base_iri + VERIFYPOSTPARAMS + "?" + args
 
300
        headers = {
 
301
            "content-type": "application/x-www-form-urlencoded",
 
302
        }
 
303
        result = yield self.wc.request(iri, method="POST",
 
304
                                      extra_headers=headers, post_content=args)
 
305
        self.assertEqual(SAMPLE_RESOURCE, result.content)
 
306
 
 
307
    @defer.inlineCallbacks
 
308
    def test_unauthorized(self):
 
309
        """Detect when a request failed with the UNAUTHORIZED http code."""
 
310
        yield self.assertFailure(self.wc.request(self.base_iri + UNAUTHORIZED),
 
311
                                 UnauthorizedError)
 
312
 
 
313
    @defer.inlineCallbacks
 
314
    def test_method_head(self):
 
315
        """The HTTP method is used."""
 
316
        result = yield self.wc.request(self.base_iri + HEADONLY, method="HEAD")
 
317
        self.assertEqual("", result.content)
 
318
 
 
319
    @defer.inlineCallbacks
 
320
    def test_send_extra_headers(self):
 
321
        """The extra_headers are sent to the server."""
 
322
        result = yield self.wc.request(self.base_iri + VERIFYHEADERS,
 
323
                                       extra_headers=SAMPLE_HEADERS)
 
324
        self.assertIn(SAMPLE_KEY, result.headers)
 
325
        self.assertEqual(result.headers[SAMPLE_KEY], [SAMPLE_VALUE])
 
326
 
 
327
    @defer.inlineCallbacks
 
328
    def test_send_basic_auth(self):
 
329
        """The basic authentication headers are sent."""
 
330
        other_wc = self.webclient_factory(username=SAMPLE_USERNAME,
 
331
                                          password=SAMPLE_PASSWORD)
 
332
        self.addCleanup(other_wc.shutdown)
 
333
        result = yield other_wc.request(self.base_iri + GUARDED)
 
334
        self.assertEqual(SAMPLE_RESOURCE, result.content)
 
335
 
 
336
    @defer.inlineCallbacks
 
337
    def test_send_basic_auth_wrong_credentials(self):
 
338
        """Wrong credentials returns a webclient error."""
 
339
        other_wc = self.webclient_factory(username=SAMPLE_USERNAME,
 
340
                                          password="wrong password!")
 
341
        self.addCleanup(other_wc.shutdown)
 
342
        yield self.assertFailure(other_wc.request(self.base_iri + GUARDED),
 
343
                                 UnauthorizedError)
 
344
 
 
345
    @defer.inlineCallbacks
 
346
    def test_request_is_auth_signed(self):
 
347
        """The request is auth signed."""
 
348
        tsc = self.wc.get_timestamp_checker()
 
349
        self.patch(tsc, "get_faithful_time", lambda: defer.succeed('1'))
 
350
        result = yield self.wc.request(self.base_iri + AUTHRESOURCE,
 
351
                                       auth_credentials=SAMPLE_CREDENTIALS)
 
352
        self.assertEqual(SAMPLE_RESOURCE, result.content)
 
353
 
 
354
    @defer.inlineCallbacks
 
355
    def test_auth_signing_uses_timestamp(self):
 
356
        """Auth signing uses the timestamp."""
 
357
        called = []
 
358
 
 
359
        def fake_get_faithful_time():
 
360
            """A fake get_timestamp"""
 
361
            called.append(True)
 
362
            return defer.succeed('1')
 
363
 
 
364
        tsc = self.wc.get_timestamp_checker()
 
365
        self.patch(tsc, "get_faithful_time", fake_get_faithful_time)
 
366
        yield self.wc.request(self.base_iri + AUTHRESOURCE,
 
367
                              auth_credentials=SAMPLE_CREDENTIALS)
 
368
        self.assertTrue(called, "The timestamp must be retrieved.")
 
369
 
 
370
    @defer.inlineCallbacks
 
371
    def test_returned_content_are_bytes(self):
 
372
        """The returned content are bytes."""
 
373
        tsc = self.wc.get_timestamp_checker()
 
374
        self.patch(tsc, "get_faithful_time", lambda: defer.succeed('1'))
 
375
        result = yield self.wc.request(self.base_iri + AUTHRESOURCE,
 
376
                                       auth_credentials=SAMPLE_CREDENTIALS)
 
377
        self.assertTrue(isinstance(result.content, bytes),
 
378
                        "The type of %r must be bytes" % result.content)
 
379
 
 
380
    @defer.inlineCallbacks
 
381
    def test_webclienterror_not_string(self):
 
382
        """The returned exception contains unicode data."""
 
383
        deferred = self.wc.request(self.base_iri + THROWERROR)
 
384
        failure = yield self.assertFailure(deferred, WebClientError)
 
385
        for error in failure.args:
 
386
            self.assertTrue(isinstance(error, basestring))
 
387
 
 
388
 
 
389
class FakeSavingReactor(object):
 
390
    """A fake reactor that saves connection attempts."""
 
391
 
 
392
    def __init__(self):
 
393
        """Initialize this fake instance."""
 
394
        self.connections = []
 
395
 
 
396
    def connectTCP(self, host, port, factory, *args):
 
397
        """Fake the connection."""
 
398
        self.connections.append((host, port, args))
 
399
        factory.response_headers = {}
 
400
        factory.deferred = defer.succeed("response content")
 
401
 
 
402
    def connectSSL(self, host, port, factory, *args):
 
403
        """Fake the connection."""
 
404
        self.connections.append((host, port, args))
 
405
        factory.response_headers = {}
 
406
        factory.deferred = defer.succeed("response content")
 
407
 
 
408
 
 
409
class TxWebClientTestCase(WebClientTestCase):
 
410
    """Test case for txweb."""
 
411
 
 
412
    webclient_factory = txweb.WebClient
 
413
 
 
414
    @defer.inlineCallbacks
 
415
    def setUp(self):
 
416
        """Set the diff tests."""
 
417
        # delay import, otherwise a default reactor gets installed
 
418
        from twisted.web import client
 
419
        self.factory = client.HTTPClientFactory
 
420
        # set the factory to be used
 
421
        # Hook the server's buildProtocol to make the protocol instance
 
422
        # accessible to tests and ensure that the reactor is clean!
 
423
        build_protocol = self.factory.buildProtocol
 
424
        self.serverProtocol = None
 
425
 
 
426
        def remember_protocol_instance(my_self, addr):
 
427
            """Remember the protocol used in the test."""
 
428
            protocol = build_protocol(my_self, addr)
 
429
            self.serverProtocol = protocol
 
430
            on_connection_lost = defer.Deferred()
 
431
            connection_lost = protocol.connectionLost
 
432
 
 
433
            def defer_connection_lost(protocol, *a):
 
434
                """Lost connection."""
 
435
                if not on_connection_lost.called:
 
436
                    on_connection_lost.callback(None)
 
437
                connection_lost(protocol, *a)
 
438
 
 
439
            self.patch(protocol, 'connectionLost', defer_connection_lost)
 
440
 
 
441
            def cleanup():
 
442
                """Clean the connection."""
 
443
                if self.serverProtocol.transport is not None:
 
444
                    self.serverProtocol.transport.loseConnection()
 
445
                return on_connection_lost
 
446
 
 
447
            self.addCleanup(cleanup)
 
448
            return protocol
 
449
 
 
450
        self.factory.buildProtocol = remember_protocol_instance
 
451
        self.addCleanup(self.set_build_protocol, build_protocol)
 
452
        txweb.WebClient.client_factory = self.factory
 
453
 
 
454
        yield super(TxWebClientTestCase, self).setUp()
 
455
 
 
456
    def set_build_protocol(self, method):
 
457
        """Set the method back."""
 
458
        self.factory.buildProtocol = method
 
459
 
 
460
 
 
461
class TxWebClientReactorReplaceableTestCase(TestCase):
 
462
    """In the txweb client the reactor is replaceable."""
 
463
 
 
464
    timeout = 3
 
465
    FAKE_HOST = u"fake"
 
466
    FAKE_IRI_TEMPLATE = u"%%s://%s/fake_page" % FAKE_HOST
 
467
 
 
468
    @defer.inlineCallbacks
 
469
    def _test_replaceable_reactor(self, iri):
 
470
        """The reactor can be replaced with the tunnel client."""
 
471
        fake_reactor = FakeSavingReactor()
 
472
        wc = txweb.WebClient(fake_reactor)
 
473
        _response = yield wc.request(iri)
 
474
        assert(_response)
 
475
        host, _port, _args = fake_reactor.connections[0]
 
476
        self.assertEqual(host, self.FAKE_HOST)
 
477
 
 
478
    def test_replaceable_reactor_http(self):
 
479
        """Test the replaceable reactor with an http iri."""
 
480
        return self._test_replaceable_reactor(self.FAKE_IRI_TEMPLATE % "http")
 
481
 
 
482
    def test_replaceable_reactor_https(self):
 
483
        """Test the replaceable reactor with an https iri."""
 
484
        return self._test_replaceable_reactor(self.FAKE_IRI_TEMPLATE % "https")
 
485
 
 
486
 
 
487
class TimestampCheckerTestCase(TestCase):
 
488
    """Tests for the timestampchecker classmethod."""
 
489
 
 
490
    @defer.inlineCallbacks
 
491
    def setUp(self):
 
492
        """Initialize this testcase."""
 
493
        yield super(TimestampCheckerTestCase, self).setUp()
 
494
        self.wc = webclient.webclient_factory()
 
495
        self.patch(self.wc.__class__, "timestamp_checker", None)
 
496
 
 
497
    def test_timestamp_checker_has_the_same_class_as_the_creator(self):
 
498
        """The TimestampChecker has the same class."""
 
499
        tsc = self.wc.get_timestamp_checker()
 
500
        self.assertEqual(tsc.webclient_class, self.wc.__class__)
 
501
 
 
502
    def test_timestamp_checker_is_the_same_for_all_webclients(self):
 
503
        """The TimestampChecker is the same for all webclients."""
 
504
        tsc1 = self.wc.get_timestamp_checker()
 
505
        wc2 = webclient.webclient_factory()
 
506
        tsc2 = wc2.get_timestamp_checker()
 
507
        self.assertIs(tsc1, tsc2)
 
508
 
 
509
 
 
510
class BasicProxyTestCase(SquidTestCase):
 
511
    """Test that the proxy works at all."""
 
512
 
 
513
    timeout = 3
 
514
 
 
515
    @defer.inlineCallbacks
 
516
    def setUp(self):
 
517
        yield super(BasicProxyTestCase, self).setUp()
 
518
        self.ws = HTTPMockWebServer()
 
519
        self.ws.start()
 
520
        self.addCleanup(self.ws.stop)
 
521
        self.base_iri = self.ws.get_iri()
 
522
        self.wc = webclient.webclient_factory()
 
523
        self.addCleanup(self.wc.shutdown)
 
524
 
 
525
    def assert_header_contains(self, headers, expected):
 
526
        """One of the headers matching key must contain a given value."""
 
527
        self.assertTrue(any(expected in value for value in headers))
 
528
 
 
529
    @defer.inlineCallbacks
 
530
    def test_anonymous_proxy_is_used(self):
 
531
        """The anonymous proxy is used by the webclient."""
 
532
        settings = self.get_nonauth_proxy_settings()
 
533
        self.wc.force_use_proxy(settings)
 
534
        result = yield self.wc.request(self.base_iri + SIMPLERESOURCE)
 
535
        self.assert_header_contains(result.headers["Via"], "squid")
 
536
 
 
537
    @skipIfOS('linux2',
 
538
              'LP: #1111880 - ncsa_auth crashing for auth proxy tests.')
 
539
    @defer.inlineCallbacks
 
540
    def test_authenticated_proxy_is_used(self):
 
541
        """The authenticated proxy is used by the webclient."""
 
542
        settings = self.get_auth_proxy_settings()
 
543
        self.wc.force_use_proxy(settings)
 
544
        result = yield self.wc.request(self.base_iri + SIMPLERESOURCE)
 
545
        self.assert_header_contains(result.headers["Via"], "squid")
 
546
 
 
547
    if WEBCLIENT_MODULE_NAME.endswith(".txweb"):
 
548
        reason = "txweb does not support proxies."
 
549
        test_anonymous_proxy_is_used.skip = reason
 
550
        test_authenticated_proxy_is_used.kip = reason
 
551
        test_auth_proxy_is_used_creds_requested.skip = reason
 
552
 
 
553
 
 
554
class HeaderDictTestCase(TestCase):
 
555
    """Tests for the case insensitive header dictionary."""
 
556
 
 
557
    def test_constructor_handles_keys(self):
 
558
        """The constructor handles case-insensitive keys."""
 
559
        hd = HeaderDict({"ClAvE": "value"})
 
560
        self.assertIn("clave", hd)
 
561
 
 
562
    def test_can_set_get_items(self):
 
563
        """The item is set/getted."""
 
564
        hd = HeaderDict()
 
565
        hd["key"] = "value"
 
566
        hd["KEY"] = "value2"
 
567
        self.assertEqual(hd["key"], "value2")
 
568
 
 
569
    def test_can_test_presence(self):
 
570
        """The presence of an item is found."""
 
571
        hd = HeaderDict()
 
572
        self.assertNotIn("cLaVe", hd)
 
573
        hd["CLAVE"] = "value1"
 
574
        self.assertIn("cLaVe", hd)
 
575
        del(hd["cLAVe"])
 
576
        self.assertNotIn("cLaVe", hd)
 
577
 
 
578
 
 
579
class FakeKeyring(object):
 
580
    """A fake keyring."""
 
581
 
 
582
    def __init__(self, creds):
 
583
        """A fake keyring."""
 
584
        self.creds = creds
 
585
 
 
586
    def __call__(self):
 
587
        """Fake instance callable."""
 
588
        return self
 
589
 
 
590
    def get_credentials(self, domain):
 
591
        """A fake get_credentials."""
 
592
        if isinstance(self.creds, Exception):
 
593
            return defer.fail(self.creds)
 
594
        return defer.succeed(self.creds)
 
595
 
 
596
 
 
597
class BaseSSLTestCase(SquidTestCase):
 
598
    """Base test that allows to use ssl connections."""
 
599
 
 
600
    @defer.inlineCallbacks
 
601
    def setUp(self):
 
602
        """Set the diff tests."""
 
603
        yield super(BaseSSLTestCase, self).setUp()
 
604
        self.cert_dir = os.path.join(self.tmpdir, 'cert')
 
605
        self.cert_details = dict(organization='Canonical',
 
606
                                 common_name=gethostname(),
 
607
                                 locality_name='London',
 
608
                                 unit='Ubuntu One',
 
609
                                 country_name='UK',
 
610
                                 state_name='London',)
 
611
        self.ssl_settings = self._generate_self_signed_certificate(
 
612
            self.cert_dir,
 
613
            self.cert_details)
 
614
        self.addCleanup(self._clean_ssl_certificate_files)
 
615
 
 
616
        self.ws = HTTPSMockWebServer(self.ssl_settings)
 
617
        self.ws.start()
 
618
        self.addCleanup(self.ws.stop)
 
619
        self.base_iri = self.ws.get_iri()
 
620
 
 
621
    def _clean_ssl_certificate_files(self):
 
622
        """Remove the certificate files."""
 
623
        if os.path.exists(self.cert_dir):
 
624
            shutil.rmtree(self.cert_dir)
 
625
 
 
626
    def _generate_self_signed_certificate(self, cert_dir, cert_details):
 
627
        """Generate the required SSL certificates."""
 
628
        if not os.path.exists(cert_dir):
 
629
            os.makedirs(cert_dir)
 
630
        cert_path = os.path.join(cert_dir, 'cert.crt')
 
631
        key_path = os.path.join(cert_dir, 'cert.key')
 
632
 
 
633
        if os.path.exists(cert_path):
 
634
            os.unlink(cert_path)
 
635
        if os.path.exists(key_path):
 
636
            os.unlink(key_path)
 
637
 
 
638
        # create a key pair
 
639
        key = crypto.PKey()
 
640
        key.generate_key(crypto.TYPE_RSA, 1024)
 
641
 
 
642
        # create a self-signed cert
 
643
        cert = crypto.X509()
 
644
        cert.get_subject().C = cert_details['country_name']
 
645
        cert.get_subject().ST = cert_details['state_name']
 
646
        cert.get_subject().L = cert_details['locality_name']
 
647
        cert.get_subject().O = cert_details['organization']
 
648
        cert.get_subject().OU = cert_details['unit']
 
649
        cert.get_subject().CN = cert_details['common_name']
 
650
        cert.set_serial_number(1000)
 
651
        cert.gmtime_adj_notBefore(0)
 
652
        cert.gmtime_adj_notAfter(10 * 365 * 24 * 60 * 60)
 
653
        cert.set_issuer(cert.get_subject())
 
654
        cert.set_pubkey(key)
 
655
        cert.sign(key, 'sha1')
 
656
 
 
657
        with open(cert_path, 'wt') as fd:
 
658
            fd.write(crypto.dump_certificate(crypto.FILETYPE_PEM, cert))
 
659
 
 
660
        with open(key_path, 'wt') as fd:
 
661
            fd.write(crypto.dump_privatekey(crypto.FILETYPE_PEM, key))
 
662
 
 
663
        return dict(key=key_path, cert=cert_path)
 
664
 
 
665
 
 
666
class CorrectProxyTestCase(BaseSSLTestCase):
 
667
    """Test the interaction with a SSL enabled proxy."""
 
668
 
 
669
    @defer.inlineCallbacks
 
670
    def setUp(self):
 
671
        """Set the tests."""
 
672
        yield super(CorrectProxyTestCase, self).setUp()
 
673
 
 
674
        # fake the gsettings to have diff settings for https and http
 
675
        http_settings = self.get_auth_proxy_settings()
 
676
 
 
677
        # remember so that we can use them in the creds request
 
678
        proxy_username = http_settings['username']
 
679
        proxy_password = http_settings['password']
 
680
 
 
681
        # delete the username and password so that we get a 407 for testing
 
682
        del http_settings['username']
 
683
        del http_settings['password']
 
684
 
 
685
        https_settings = self.get_nonauth_proxy_settings()
 
686
 
 
687
        proxy_settings = dict(http=http_settings, https=https_settings)
 
688
        self.patch(gsettings, "get_proxy_settings", lambda: proxy_settings)
 
689
 
 
690
        self.wc = webclient.webclient_factory()
 
691
        self.addCleanup(self.wc.shutdown)
 
692
 
 
693
        self.called = []
 
694
 
 
695
    def assert_header_contains(self, headers, expected):
 
696
        """One of the headers matching key must contain a given value."""
 
697
        self.assertTrue(any(expected in value for value in headers))
 
698
 
 
699
    @defer.inlineCallbacks
 
700
    def test_https_request(self):
 
701
        """Test using the correct proxy for the ssl request.
 
702
 
 
703
        In order to assert that the correct proxy is used we expect not to call
 
704
        the auth dialog since we set the https proxy not to use the auth proxy
 
705
        and to fail because we are reaching a https page with bad self-signed
 
706
        certs.
 
707
        """
 
708
        # we fail due to the fake ssl cert
 
709
        yield self.failUnlessFailure(self.wc.request(
 
710
                                     self.base_iri + SIMPLERESOURCE),
 
711
                                     WebClientError)
 
712
        # https requests do not use the auth proxy therefore called should be
 
713
        # empty. This asserts that we are using the correct settings for the
 
714
        # request.
 
715
        self.assertEqual([], self.called)
 
716
 
 
717
    @defer.inlineCallbacks
 
718
    def test_http_request(self):
 
719
        """Test using the correct proxy for the plain request.
 
720
 
 
721
        This tests does the opposite to the https tests. We did set the auth
 
722
        proxy for the http request therefore we expect the proxy dialog to be
 
723
        used and not to get an error since we are not visiting a https with bad
 
724
        self-signed certs.
 
725
        """
 
726
        # we do not fail since we are not going to the https page
 
727
        result = yield self.wc.request(self.base_iri + SIMPLERESOURCE)
 
728
        self.assert_header_contains(result.headers["Via"], "squid")
 
729
 
 
730
    if WEBCLIENT_MODULE_NAME.endswith(".txweb"):
 
731
        reason = 'Multiple proxy settings is not supported.'
 
732
        test_https_request.skip = reason
 
733
        test_http_request.skip = reason
 
734
 
 
735
    if WEBCLIENT_MODULE_NAME.endswith(".libsoup"):
 
736
        reason = 'Hard to test since we need to fully mock gsettings.'
 
737
        test_https_request.skip = reason
 
738
        test_http_request.skip = reason
 
739
 
 
740
 
 
741
class SSLTestCase(BaseSSLTestCase):
 
742
    """Test error handling when dealing with ssl."""
 
743
 
 
744
    @defer.inlineCallbacks
 
745
    def setUp(self):
 
746
        """Set the diff tests."""
 
747
        yield super(SSLTestCase, self).setUp()
 
748
 
 
749
        self.memento = MementoHandler()
 
750
        self.memento.setLevel(logging.DEBUG)
 
751
        logger = webclient.webclient_module().logger
 
752
        logger.addHandler(self.memento)
 
753
        self.addCleanup(logger.removeHandler, self.memento)
 
754
 
 
755
        self.wc = webclient.webclient_factory()
 
756
        self.addCleanup(self.wc.shutdown)
 
757
 
 
758
        self.called = []
 
759
 
 
760
    def test_ssl_fail(self):
 
761
        """Test showing the dialog and rejecting."""
 
762
        self.failUnlessFailure(self.wc.request(
 
763
                self.base_iri + SIMPLERESOURCE), WebClientError)
 
764
        self.assertNotEqual(None, self.memento.check_error('SSL errors'))
 
765
 
 
766
    if (WEBCLIENT_MODULE_NAME.endswith(".txweb") or
 
767
            WEBCLIENT_MODULE_NAME.endswith(".libsoup")):
 
768
        reason = 'SSL support has not yet been implemented.'
 
769
        test_ssl_fail.skip = reason