1
# -*- coding: utf-8 -*-
3
# Copyright 2011-2013 Canonical Ltd.
4
# Copyright 2015-2016 Chicharreros (https://launchpad.net/~chicharreros)
6
# This program is free software: you can redistribute it and/or modify it
7
# under the terms of the GNU General Public License version 3, as published
8
# by the Free Software Foundation.
10
# This program is distributed in the hope that it will be useful, but
11
# WITHOUT AN WARRANTY; without even the implied warranties of
12
# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
13
# PURPOSE. See the GNU General Public License for more details.
15
# You should have received a copy of the GNU General Public License along
16
# with this program. If not, see <http://www.gnu.org/licenses/>.
18
# In addition, as a special exception, the copyright holders give
19
# permission to link the code of portions of this program with the
20
# OpenSSL library under certain conditions as described in each
21
# individual source file, and distribute linked combinations
23
# You must obey the GNU General Public License in all respects
24
# for all of the code used other than OpenSSL. If you modify
25
# file(s) with this exception, you may extend this exception to your
26
# version of the file(s), but you are not obligated to do so. If you
27
# do not wish to do so, delete this exception statement from your
28
# version. If you delete this exception statement from all source
29
# files in the program, then also delete it here.
31
"""Integration tests for the proxy-enabled webclient."""
39
from urllib.parse import (urlencode, unquote, urlparse,
42
from urllib import urlencode, unquote
43
from urlparse import urlparse, urljoin, parse_qsl
45
from OpenSSL import crypto
46
from socket import gethostname
47
from twisted.cred import checkers, portal
48
from twisted.internet import defer
49
from twisted.web import guard, http, resource
50
from ubuntuone.devtools.handlers import MementoHandler
51
from ubuntuone.devtools.testcases import TestCase, skipIfOS
52
from ubuntuone.devtools.testcases.squid import SquidTestCase
53
from ubuntuone.devtools.testing.txwebserver import (
59
from ubuntuone import keyring
60
from ubuntuone.utils import webclient
61
from ubuntuone.utils.webclient import gsettings, txweb
62
from ubuntuone.utils.webclient.common import (
71
SAMPLE_VALUE = "sample result"
72
SAMPLE_RESOURCE = '{"%s": "%s"}' % (SAMPLE_KEY, SAMPLE_VALUE)
73
SAMPLE_USERNAME = "peddro"
74
SAMPLE_PASSWORD = "cantropus"
75
SAMPLE_CREDENTIALS = dict(username="username", password="password")
76
SAMPLE_HEADERS = {SAMPLE_KEY: SAMPLE_VALUE}
77
SAMPLE_POST_PARAMS = {"param1": "value1", "param2": "value2"}
78
SAMPLE_JPEG_HEADER = '\xff\xd8\xff\xe0\x00\x10JFIF'
80
SIMPLERESOURCE = "simpleresource"
81
BYTEZERORESOURCE = "bytezeroresource"
82
POSTABLERESOURCE = "postableresource"
83
THROWERROR = "throwerror"
84
UNAUTHORIZED = "unauthorized"
86
VERIFYHEADERS = "verifyheaders"
87
VERIFYPOSTPARAMS = "verifypostparams"
89
AUTHRESOURCE = "authresource"
91
WEBCLIENT_MODULE_NAME = webclient.webclient_module().__name__
94
def sample_get_credentials():
95
"""Will return the sample credentials right now."""
96
return defer.succeed(SAMPLE_CREDENTIALS)
99
class SimpleResource(resource.Resource):
100
"""A simple web resource."""
102
def render_GET(self, request):
103
"""Make a bit of html out of these resource's content."""
104
return SAMPLE_RESOURCE
107
class ByteZeroResource(resource.Resource):
108
"""A resource that has a nul byte in the middle of it."""
110
def render_GET(self, request):
111
"""Return the content of this resource."""
112
return SAMPLE_JPEG_HEADER
115
class PostableResource(resource.Resource):
116
"""A resource that only answers to POST requests."""
118
def render_POST(self, request):
119
"""Make a bit of html out of these resource's content."""
120
return SAMPLE_RESOURCE
123
class HeadOnlyResource(resource.Resource):
124
"""A resource that fails if called with a method other than HEAD."""
126
def render_HEAD(self, request):
127
"""Return a bit of html."""
131
class VerifyHeadersResource(resource.Resource):
132
"""A resource that verifies the headers received."""
134
def render_GET(self, request):
135
"""Make a bit of html out of these resource's content."""
136
headers = request.requestHeaders.getRawHeaders(SAMPLE_KEY)
137
if headers != [SAMPLE_VALUE]:
138
request.setResponseCode(http.BAD_REQUEST)
139
return "ERROR: Expected header not present."
140
request.setHeader(SAMPLE_KEY, SAMPLE_VALUE)
141
return SAMPLE_RESOURCE
144
class VerifyPostParameters(resource.Resource):
145
"""A resource that answers to POST requests with some parameters."""
147
def fetch_post_args_only(self, request):
148
"""Fetch only the POST arguments, not the args in the url."""
149
request.process = lambda: None
150
request.requestReceived(request.method, request.path,
154
def render_POST(self, request):
155
"""Verify the parameters that we've been called with."""
156
post_params = self.fetch_post_args_only(request)
157
expected = dict((key, [val]) for key, val
158
in SAMPLE_POST_PARAMS.items())
159
if post_params != expected:
160
request.setResponseCode(http.BAD_REQUEST)
161
return "ERROR: Expected arguments not present, %r != %r" % (
162
post_params, expected)
163
return SAMPLE_RESOURCE
166
class SimpleRealm(object):
167
"""The same simple resource for all users."""
169
def requestAvatar(self, avatarId, mind, *interfaces):
170
"""The avatar for this user."""
171
if resource.IResource in interfaces:
172
return (resource.IResource, SimpleResource(), lambda: None)
173
raise NotImplementedError()
176
class AuthCheckerResource(resource.Resource):
177
"""A resource that verifies the request was auth signed."""
179
def render_GET(self, request):
180
"""Make a bit of html out of these resource's content."""
181
header = request.requestHeaders.getRawHeaders("Authorization")[0]
182
if header.startswith("Auth "):
183
return SAMPLE_RESOURCE
184
request.setResponseCode(http.BAD_REQUEST)
185
return "ERROR: Expected Auth header not present."
188
def get_root_resource():
189
"""Get the root resource with all the children."""
190
root = resource.Resource()
191
root.putChild(SIMPLERESOURCE, SimpleResource())
192
root.putChild(BYTEZERORESOURCE, ByteZeroResource())
193
root.putChild(POSTABLERESOURCE, PostableResource())
195
root.putChild(THROWERROR, resource.NoResource())
197
unauthorized_resource = resource.ErrorPage(http.UNAUTHORIZED,
198
"Unauthorized", "Unauthorized")
199
root.putChild(UNAUTHORIZED, unauthorized_resource)
200
root.putChild(HEADONLY, HeadOnlyResource())
201
root.putChild(VERIFYHEADERS, VerifyHeadersResource())
202
root.putChild(VERIFYPOSTPARAMS, VerifyPostParameters())
203
root.putChild(AUTHRESOURCE, AuthCheckerResource())
205
db = checkers.InMemoryUsernamePasswordDatabaseDontUse()
206
db.addUser(SAMPLE_USERNAME, SAMPLE_PASSWORD)
207
test_portal = portal.Portal(SimpleRealm(), [db])
208
cred_factory = guard.BasicCredentialFactory("example.org")
209
guarded_resource = guard.HTTPAuthSessionWrapper(test_portal,
211
root.putChild(GUARDED, guarded_resource)
215
class HTTPMockWebServer(HTTPWebServer):
216
"""A mock webserver for the webclient tests."""
219
"""Create a new instance."""
220
root = get_root_resource()
221
super(HTTPMockWebServer, self).__init__(root)
224
class HTTPSMockWebServer(HTTPSWebServer):
225
"""A mock webserver for the webclient tests."""
227
def __init__(self, ssl_settings):
228
"""Create a new instance."""
229
root = get_root_resource()
230
super(HTTPSMockWebServer, self).__init__(root, ssl_settings)
233
class ModuleSelectionTestCase(TestCase):
234
"""Test the functions to choose the txweb or libsoup backend."""
236
def assert_module_name(self, module, expected_name):
237
"""Check the name of a given module."""
238
module_filename = os.path.basename(module.__file__)
239
module_name = os.path.splitext(module_filename)[0]
240
self.assertEqual(module_name, expected_name)
242
def test_webclient_module_libsoup(self):
243
"""Test the module name for the libsoup case."""
244
module = webclient.webclient_module()
245
self.assert_module_name(module, "libsoup")
248
class WebClientTestCase(TestCase):
249
"""Test for the webclient."""
252
webclient_factory = webclient.webclient_factory
254
@defer.inlineCallbacks
256
yield super(WebClientTestCase, self).setUp()
257
self.wc = self.webclient_factory()
258
self.addCleanup(self.wc.shutdown)
259
self.ws = HTTPMockWebServer()
261
self.addCleanup(self.ws.stop)
262
self.base_iri = self.ws.get_iri()
264
@defer.inlineCallbacks
265
def test_request_takes_an_iri(self):
266
"""Passing a non-unicode iri fails."""
267
d = self.wc.request(bytes(self.base_iri + SIMPLERESOURCE))
268
yield self.assertFailure(d, TypeError)
270
@defer.inlineCallbacks
271
def test_get_iri(self):
272
"""Passing in a unicode iri works fine."""
273
result = yield self.wc.request(self.base_iri + SIMPLERESOURCE)
274
self.assertEqual(SAMPLE_RESOURCE, result.content)
276
@defer.inlineCallbacks
277
def test_get_iri_error(self):
278
"""The errback is called when there's some error."""
279
yield self.assertFailure(self.wc.request(self.base_iri + THROWERROR),
282
@defer.inlineCallbacks
283
def test_zero_byte_in_content(self):
284
"""Test a reply with a nul byte in the middle of it."""
285
result = yield self.wc.request(self.base_iri + BYTEZERORESOURCE)
286
self.assertEqual(SAMPLE_JPEG_HEADER, result.content)
288
@defer.inlineCallbacks
290
"""Test a post request."""
291
result = yield self.wc.request(self.base_iri + POSTABLERESOURCE,
293
self.assertEqual(SAMPLE_RESOURCE, result.content)
295
@defer.inlineCallbacks
296
def test_post_with_args(self):
297
"""Test a post request with arguments."""
298
args = urlencode(SAMPLE_POST_PARAMS)
299
iri = self.base_iri + VERIFYPOSTPARAMS + "?" + args
301
"content-type": "application/x-www-form-urlencoded",
303
result = yield self.wc.request(iri, method="POST",
304
extra_headers=headers, post_content=args)
305
self.assertEqual(SAMPLE_RESOURCE, result.content)
307
@defer.inlineCallbacks
308
def test_unauthorized(self):
309
"""Detect when a request failed with the UNAUTHORIZED http code."""
310
yield self.assertFailure(self.wc.request(self.base_iri + UNAUTHORIZED),
313
@defer.inlineCallbacks
314
def test_method_head(self):
315
"""The HTTP method is used."""
316
result = yield self.wc.request(self.base_iri + HEADONLY, method="HEAD")
317
self.assertEqual("", result.content)
319
@defer.inlineCallbacks
320
def test_send_extra_headers(self):
321
"""The extra_headers are sent to the server."""
322
result = yield self.wc.request(self.base_iri + VERIFYHEADERS,
323
extra_headers=SAMPLE_HEADERS)
324
self.assertIn(SAMPLE_KEY, result.headers)
325
self.assertEqual(result.headers[SAMPLE_KEY], [SAMPLE_VALUE])
327
@defer.inlineCallbacks
328
def test_send_basic_auth(self):
329
"""The basic authentication headers are sent."""
330
other_wc = self.webclient_factory(username=SAMPLE_USERNAME,
331
password=SAMPLE_PASSWORD)
332
self.addCleanup(other_wc.shutdown)
333
result = yield other_wc.request(self.base_iri + GUARDED)
334
self.assertEqual(SAMPLE_RESOURCE, result.content)
336
@defer.inlineCallbacks
337
def test_send_basic_auth_wrong_credentials(self):
338
"""Wrong credentials returns a webclient error."""
339
other_wc = self.webclient_factory(username=SAMPLE_USERNAME,
340
password="wrong password!")
341
self.addCleanup(other_wc.shutdown)
342
yield self.assertFailure(other_wc.request(self.base_iri + GUARDED),
345
@defer.inlineCallbacks
346
def test_request_is_auth_signed(self):
347
"""The request is auth signed."""
348
tsc = self.wc.get_timestamp_checker()
349
self.patch(tsc, "get_faithful_time", lambda: defer.succeed('1'))
350
result = yield self.wc.request(self.base_iri + AUTHRESOURCE,
351
auth_credentials=SAMPLE_CREDENTIALS)
352
self.assertEqual(SAMPLE_RESOURCE, result.content)
354
@defer.inlineCallbacks
355
def test_auth_signing_uses_timestamp(self):
356
"""Auth signing uses the timestamp."""
359
def fake_get_faithful_time():
360
"""A fake get_timestamp"""
362
return defer.succeed('1')
364
tsc = self.wc.get_timestamp_checker()
365
self.patch(tsc, "get_faithful_time", fake_get_faithful_time)
366
yield self.wc.request(self.base_iri + AUTHRESOURCE,
367
auth_credentials=SAMPLE_CREDENTIALS)
368
self.assertTrue(called, "The timestamp must be retrieved.")
370
@defer.inlineCallbacks
371
def test_returned_content_are_bytes(self):
372
"""The returned content are bytes."""
373
tsc = self.wc.get_timestamp_checker()
374
self.patch(tsc, "get_faithful_time", lambda: defer.succeed('1'))
375
result = yield self.wc.request(self.base_iri + AUTHRESOURCE,
376
auth_credentials=SAMPLE_CREDENTIALS)
377
self.assertTrue(isinstance(result.content, bytes),
378
"The type of %r must be bytes" % result.content)
380
@defer.inlineCallbacks
381
def test_webclienterror_not_string(self):
382
"""The returned exception contains unicode data."""
383
deferred = self.wc.request(self.base_iri + THROWERROR)
384
failure = yield self.assertFailure(deferred, WebClientError)
385
for error in failure.args:
386
self.assertTrue(isinstance(error, basestring))
389
class FakeSavingReactor(object):
390
"""A fake reactor that saves connection attempts."""
393
"""Initialize this fake instance."""
394
self.connections = []
396
def connectTCP(self, host, port, factory, *args):
397
"""Fake the connection."""
398
self.connections.append((host, port, args))
399
factory.response_headers = {}
400
factory.deferred = defer.succeed("response content")
402
def connectSSL(self, host, port, factory, *args):
403
"""Fake the connection."""
404
self.connections.append((host, port, args))
405
factory.response_headers = {}
406
factory.deferred = defer.succeed("response content")
409
class TxWebClientTestCase(WebClientTestCase):
410
"""Test case for txweb."""
412
webclient_factory = txweb.WebClient
414
@defer.inlineCallbacks
416
"""Set the diff tests."""
417
# delay import, otherwise a default reactor gets installed
418
from twisted.web import client
419
self.factory = client.HTTPClientFactory
420
# set the factory to be used
421
# Hook the server's buildProtocol to make the protocol instance
422
# accessible to tests and ensure that the reactor is clean!
423
build_protocol = self.factory.buildProtocol
424
self.serverProtocol = None
426
def remember_protocol_instance(my_self, addr):
427
"""Remember the protocol used in the test."""
428
protocol = build_protocol(my_self, addr)
429
self.serverProtocol = protocol
430
on_connection_lost = defer.Deferred()
431
connection_lost = protocol.connectionLost
433
def defer_connection_lost(protocol, *a):
434
"""Lost connection."""
435
if not on_connection_lost.called:
436
on_connection_lost.callback(None)
437
connection_lost(protocol, *a)
439
self.patch(protocol, 'connectionLost', defer_connection_lost)
442
"""Clean the connection."""
443
if self.serverProtocol.transport is not None:
444
self.serverProtocol.transport.loseConnection()
445
return on_connection_lost
447
self.addCleanup(cleanup)
450
self.factory.buildProtocol = remember_protocol_instance
451
self.addCleanup(self.set_build_protocol, build_protocol)
452
txweb.WebClient.client_factory = self.factory
454
yield super(TxWebClientTestCase, self).setUp()
456
def set_build_protocol(self, method):
457
"""Set the method back."""
458
self.factory.buildProtocol = method
461
class TxWebClientReactorReplaceableTestCase(TestCase):
462
"""In the txweb client the reactor is replaceable."""
466
FAKE_IRI_TEMPLATE = u"%%s://%s/fake_page" % FAKE_HOST
468
@defer.inlineCallbacks
469
def _test_replaceable_reactor(self, iri):
470
"""The reactor can be replaced with the tunnel client."""
471
fake_reactor = FakeSavingReactor()
472
wc = txweb.WebClient(fake_reactor)
473
_response = yield wc.request(iri)
475
host, _port, _args = fake_reactor.connections[0]
476
self.assertEqual(host, self.FAKE_HOST)
478
def test_replaceable_reactor_http(self):
479
"""Test the replaceable reactor with an http iri."""
480
return self._test_replaceable_reactor(self.FAKE_IRI_TEMPLATE % "http")
482
def test_replaceable_reactor_https(self):
483
"""Test the replaceable reactor with an https iri."""
484
return self._test_replaceable_reactor(self.FAKE_IRI_TEMPLATE % "https")
487
class TimestampCheckerTestCase(TestCase):
488
"""Tests for the timestampchecker classmethod."""
490
@defer.inlineCallbacks
492
"""Initialize this testcase."""
493
yield super(TimestampCheckerTestCase, self).setUp()
494
self.wc = webclient.webclient_factory()
495
self.patch(self.wc.__class__, "timestamp_checker", None)
497
def test_timestamp_checker_has_the_same_class_as_the_creator(self):
498
"""The TimestampChecker has the same class."""
499
tsc = self.wc.get_timestamp_checker()
500
self.assertEqual(tsc.webclient_class, self.wc.__class__)
502
def test_timestamp_checker_is_the_same_for_all_webclients(self):
503
"""The TimestampChecker is the same for all webclients."""
504
tsc1 = self.wc.get_timestamp_checker()
505
wc2 = webclient.webclient_factory()
506
tsc2 = wc2.get_timestamp_checker()
507
self.assertIs(tsc1, tsc2)
510
class BasicProxyTestCase(SquidTestCase):
511
"""Test that the proxy works at all."""
515
@defer.inlineCallbacks
517
yield super(BasicProxyTestCase, self).setUp()
518
self.ws = HTTPMockWebServer()
520
self.addCleanup(self.ws.stop)
521
self.base_iri = self.ws.get_iri()
522
self.wc = webclient.webclient_factory()
523
self.addCleanup(self.wc.shutdown)
525
def assert_header_contains(self, headers, expected):
526
"""One of the headers matching key must contain a given value."""
527
self.assertTrue(any(expected in value for value in headers))
529
@defer.inlineCallbacks
530
def test_anonymous_proxy_is_used(self):
531
"""The anonymous proxy is used by the webclient."""
532
settings = self.get_nonauth_proxy_settings()
533
self.wc.force_use_proxy(settings)
534
result = yield self.wc.request(self.base_iri + SIMPLERESOURCE)
535
self.assert_header_contains(result.headers["Via"], "squid")
538
'LP: #1111880 - ncsa_auth crashing for auth proxy tests.')
539
@defer.inlineCallbacks
540
def test_authenticated_proxy_is_used(self):
541
"""The authenticated proxy is used by the webclient."""
542
settings = self.get_auth_proxy_settings()
543
self.wc.force_use_proxy(settings)
544
result = yield self.wc.request(self.base_iri + SIMPLERESOURCE)
545
self.assert_header_contains(result.headers["Via"], "squid")
547
if WEBCLIENT_MODULE_NAME.endswith(".txweb"):
548
reason = "txweb does not support proxies."
549
test_anonymous_proxy_is_used.skip = reason
550
test_authenticated_proxy_is_used.kip = reason
551
test_auth_proxy_is_used_creds_requested.skip = reason
554
class HeaderDictTestCase(TestCase):
555
"""Tests for the case insensitive header dictionary."""
557
def test_constructor_handles_keys(self):
558
"""The constructor handles case-insensitive keys."""
559
hd = HeaderDict({"ClAvE": "value"})
560
self.assertIn("clave", hd)
562
def test_can_set_get_items(self):
563
"""The item is set/getted."""
567
self.assertEqual(hd["key"], "value2")
569
def test_can_test_presence(self):
570
"""The presence of an item is found."""
572
self.assertNotIn("cLaVe", hd)
573
hd["CLAVE"] = "value1"
574
self.assertIn("cLaVe", hd)
576
self.assertNotIn("cLaVe", hd)
579
class FakeKeyring(object):
580
"""A fake keyring."""
582
def __init__(self, creds):
583
"""A fake keyring."""
587
"""Fake instance callable."""
590
def get_credentials(self, domain):
591
"""A fake get_credentials."""
592
if isinstance(self.creds, Exception):
593
return defer.fail(self.creds)
594
return defer.succeed(self.creds)
597
class BaseSSLTestCase(SquidTestCase):
598
"""Base test that allows to use ssl connections."""
600
@defer.inlineCallbacks
602
"""Set the diff tests."""
603
yield super(BaseSSLTestCase, self).setUp()
604
self.cert_dir = os.path.join(self.tmpdir, 'cert')
605
self.cert_details = dict(organization='Canonical',
606
common_name=gethostname(),
607
locality_name='London',
610
state_name='London',)
611
self.ssl_settings = self._generate_self_signed_certificate(
614
self.addCleanup(self._clean_ssl_certificate_files)
616
self.ws = HTTPSMockWebServer(self.ssl_settings)
618
self.addCleanup(self.ws.stop)
619
self.base_iri = self.ws.get_iri()
621
def _clean_ssl_certificate_files(self):
622
"""Remove the certificate files."""
623
if os.path.exists(self.cert_dir):
624
shutil.rmtree(self.cert_dir)
626
def _generate_self_signed_certificate(self, cert_dir, cert_details):
627
"""Generate the required SSL certificates."""
628
if not os.path.exists(cert_dir):
629
os.makedirs(cert_dir)
630
cert_path = os.path.join(cert_dir, 'cert.crt')
631
key_path = os.path.join(cert_dir, 'cert.key')
633
if os.path.exists(cert_path):
635
if os.path.exists(key_path):
640
key.generate_key(crypto.TYPE_RSA, 1024)
642
# create a self-signed cert
644
cert.get_subject().C = cert_details['country_name']
645
cert.get_subject().ST = cert_details['state_name']
646
cert.get_subject().L = cert_details['locality_name']
647
cert.get_subject().O = cert_details['organization']
648
cert.get_subject().OU = cert_details['unit']
649
cert.get_subject().CN = cert_details['common_name']
650
cert.set_serial_number(1000)
651
cert.gmtime_adj_notBefore(0)
652
cert.gmtime_adj_notAfter(10 * 365 * 24 * 60 * 60)
653
cert.set_issuer(cert.get_subject())
655
cert.sign(key, 'sha1')
657
with open(cert_path, 'wt') as fd:
658
fd.write(crypto.dump_certificate(crypto.FILETYPE_PEM, cert))
660
with open(key_path, 'wt') as fd:
661
fd.write(crypto.dump_privatekey(crypto.FILETYPE_PEM, key))
663
return dict(key=key_path, cert=cert_path)
666
class CorrectProxyTestCase(BaseSSLTestCase):
667
"""Test the interaction with a SSL enabled proxy."""
669
@defer.inlineCallbacks
672
yield super(CorrectProxyTestCase, self).setUp()
674
# fake the gsettings to have diff settings for https and http
675
http_settings = self.get_auth_proxy_settings()
677
# remember so that we can use them in the creds request
678
proxy_username = http_settings['username']
679
proxy_password = http_settings['password']
681
# delete the username and password so that we get a 407 for testing
682
del http_settings['username']
683
del http_settings['password']
685
https_settings = self.get_nonauth_proxy_settings()
687
proxy_settings = dict(http=http_settings, https=https_settings)
688
self.patch(gsettings, "get_proxy_settings", lambda: proxy_settings)
690
self.wc = webclient.webclient_factory()
691
self.addCleanup(self.wc.shutdown)
695
def assert_header_contains(self, headers, expected):
696
"""One of the headers matching key must contain a given value."""
697
self.assertTrue(any(expected in value for value in headers))
699
@defer.inlineCallbacks
700
def test_https_request(self):
701
"""Test using the correct proxy for the ssl request.
703
In order to assert that the correct proxy is used we expect not to call
704
the auth dialog since we set the https proxy not to use the auth proxy
705
and to fail because we are reaching a https page with bad self-signed
708
# we fail due to the fake ssl cert
709
yield self.failUnlessFailure(self.wc.request(
710
self.base_iri + SIMPLERESOURCE),
712
# https requests do not use the auth proxy therefore called should be
713
# empty. This asserts that we are using the correct settings for the
715
self.assertEqual([], self.called)
717
@defer.inlineCallbacks
718
def test_http_request(self):
719
"""Test using the correct proxy for the plain request.
721
This tests does the opposite to the https tests. We did set the auth
722
proxy for the http request therefore we expect the proxy dialog to be
723
used and not to get an error since we are not visiting a https with bad
726
# we do not fail since we are not going to the https page
727
result = yield self.wc.request(self.base_iri + SIMPLERESOURCE)
728
self.assert_header_contains(result.headers["Via"], "squid")
730
if WEBCLIENT_MODULE_NAME.endswith(".txweb"):
731
reason = 'Multiple proxy settings is not supported.'
732
test_https_request.skip = reason
733
test_http_request.skip = reason
735
if WEBCLIENT_MODULE_NAME.endswith(".libsoup"):
736
reason = 'Hard to test since we need to fully mock gsettings.'
737
test_https_request.skip = reason
738
test_http_request.skip = reason
741
class SSLTestCase(BaseSSLTestCase):
742
"""Test error handling when dealing with ssl."""
744
@defer.inlineCallbacks
746
"""Set the diff tests."""
747
yield super(SSLTestCase, self).setUp()
749
self.memento = MementoHandler()
750
self.memento.setLevel(logging.DEBUG)
751
logger = webclient.webclient_module().logger
752
logger.addHandler(self.memento)
753
self.addCleanup(logger.removeHandler, self.memento)
755
self.wc = webclient.webclient_factory()
756
self.addCleanup(self.wc.shutdown)
760
def test_ssl_fail(self):
761
"""Test showing the dialog and rejecting."""
762
self.failUnlessFailure(self.wc.request(
763
self.base_iri + SIMPLERESOURCE), WebClientError)
764
self.assertNotEqual(None, self.memento.check_error('SSL errors'))
766
if (WEBCLIENT_MODULE_NAME.endswith(".txweb") or
767
WEBCLIENT_MODULE_NAME.endswith(".libsoup")):
768
reason = 'SSL support has not yet been implemented.'
769
test_ssl_fail.skip = reason