~myers-1/pyopenssl/npn

132.1.49 by Jean-Paul Calderone
Fix pyflakes
1
# Copyright (C) Jean-Paul Calderone 2008-2010, All rights reserved
42 by Jean-Paul Calderone
Put my copyright on everything I've changed so far, put a more reasonable version number on version.py
2
41 by Jean-Paul Calderone
A couple trivial Context tests
3
"""
4
Unit tests for L{OpenSSL.SSL}.
5
"""
6
135.1.1 by Jean-Paul Calderone
Allow EWOULDBLOCK from connect_ex as well
7
from errno import ECONNREFUSED, EINPROGRESS, EWOULDBLOCK
94 by Jean-Paul Calderone
Fix clearly retarded bugs in previous revision
8
from sys import platform
132.1.7 by Jean-Paul Calderone
test for successful and unsuccessful connect
9
from socket import error, socket
119.1.1 by Jean-Paul Calderone
Avoid using symlink, since we cannot use it on windows
10
from os import makedirs
70.2.3 by Jean-Paul Calderone
support capath parameter to load_verify_locations; add wrapper for set_default_verify_paths
11
from os.path import join
109 by Jean-Paul Calderone
Switch from Twisted's TestCase to the stdlib TestCase so there are fewer ways the test suite can be invoked
12
from unittest import main
96 by Jean-Paul Calderone
Let's try using Twisted's TestCase and see how that goes.
13
132.1.49 by Jean-Paul Calderone
Fix pyflakes
14
from OpenSSL.crypto import TYPE_RSA, FILETYPE_PEM, FILETYPE_ASN1
15
from OpenSSL.crypto import PKey, X509, X509Extension
16
from OpenSSL.crypto import dump_privatekey, load_privatekey
17
from OpenSSL.crypto import dump_certificate, load_certificate
18
132.1.14 by Jean-Paul Calderone
somewhat more coverage in test_shutdown
19
from OpenSSL.SSL import SENT_SHUTDOWN, RECEIVED_SHUTDOWN
41 by Jean-Paul Calderone
A couple trivial Context tests
20
from OpenSSL.SSL import SSLv2_METHOD, SSLv3_METHOD, SSLv23_METHOD, TLSv1_METHOD
119 by Jean-Paul Calderone
Add a bunch of tests which assert that extension types are defined nicely.
21
from OpenSSL.SSL import OP_NO_SSLv2, OP_NO_SSLv3, OP_SINGLE_DH_USE
98.2.1 by Rick Dean
BIO read/write to allow for socketless SSL Connections
22
from OpenSSL.SSL import VERIFY_PEER, VERIFY_FAIL_IF_NO_PEER_CERT, VERIFY_CLIENT_ONCE
132.1.49 by Jean-Paul Calderone
Fix pyflakes
23
from OpenSSL.SSL import Error, SysCallError, WantReadError, ZeroReturnError
24
from OpenSSL.SSL import Context, ContextType, Connection, ConnectionType
25
132.2.38 by Jean-Paul Calderone
Fix the import_crypto_module helper and factor some str/unicode helpers into the test util module
26
from OpenSSL.test.util import TestCase, bytes, b
110 by Jean-Paul Calderone
Make it easier to be Python 2.3 compatible by making that the default
27
from OpenSSL.test.test_crypto import cleartextCertificatePEM, cleartextPrivateKeyPEM
132.1.49 by Jean-Paul Calderone
Fix pyflakes
28
from OpenSSL.test.test_crypto import client_cert_pem, client_key_pem
29
from OpenSSL.test.test_crypto import server_cert_pem, server_key_pem, root_cert_pem
30
85 by Jean-Paul Calderone
make the tests recognize the optionalness of the DTLS constants
31
try:
32
    from OpenSSL.SSL import OP_NO_QUERY_MTU
33
except ImportError:
34
    OP_NO_QUERY_MTU = None
35
try:
36
    from OpenSSL.SSL import OP_COOKIE_EXCHANGE
37
except ImportError:
38
    OP_COOKIE_EXCHANGE = None
39
try:
40
    from OpenSSL.SSL import OP_NO_TICKET
41
except ImportError:
42
    OP_NO_TICKET = None
63 by Jean-Paul Calderone
Python 2.3 compatibility in test_ssl
43
44
132.1.52 by Jean-Paul Calderone
Sort of add a load_tmp_dh test; also a Context.set_cipher_list test
45
# openssl dhparam 128 -out dh-128.pem (note that 128 is a small number of bits
46
# to use)
47
dhparam = """\
48
-----BEGIN DH PARAMETERS-----
49
MBYCEQCobsg29c9WZP/54oAPcwiDAgEC
50
-----END DH PARAMETERS-----
51
"""
52
53
132.1.34 by Jean-Paul Calderone
Try for a real test of Context.add_extra_chain_cert; not working
54
def verify_cb(conn, cert, errnum, depth, ok):
55
    return ok
56
112.1.1 by Rick Dean
Add a new test case of connecting to ourself over an actual socket to pass data.
57
def socket_pair():
112.1.4 by Jean-Paul Calderone
Adjust socket_pair docstring style; also make it simpler to avoid possible undesirable socket behaviors
58
    """
119 by Jean-Paul Calderone
Add a bunch of tests which assert that extension types are defined nicely.
59
    Establish and return a pair of network sockets connected to each other.
112.1.4 by Jean-Paul Calderone
Adjust socket_pair docstring style; also make it simpler to avoid possible undesirable socket behaviors
60
    """
61
    # Connect a pair of sockets
112.1.1 by Rick Dean
Add a new test case of connecting to ourself over an actual socket to pass data.
62
    port = socket()
63
    port.bind(('', 0))
64
    port.listen(1)
65
    client = socket()
66
    client.setblocking(False)
119.1.2 by Jean-Paul Calderone
Windows is a poor sport; getsockname is not guaranteed to return the correct result until someone has connected to you; joy
67
    client.connect_ex(("127.0.0.1", port.getsockname()[1]))
116.1.1 by Jean-Paul Calderone
Try doing socket_pair with blocking sockets
68
    client.setblocking(True)
112.1.1 by Rick Dean
Add a new test case of connecting to ourself over an actual socket to pass data.
69
    server = port.accept()[0]
70
112.1.4 by Jean-Paul Calderone
Adjust socket_pair docstring style; also make it simpler to avoid possible undesirable socket behaviors
71
    # Let's pass some unencrypted data to make sure our socket connection is
72
    # fine.  Just one byte, so we don't have to worry about buffers getting
73
    # filled up or fragmentation.
132.2.38 by Jean-Paul Calderone
Fix the import_crypto_module helper and factor some str/unicode helpers into the test util module
74
    server.send(b("x"))
75
    assert client.recv(1024) == b("x")
76
    client.send(b("y"))
77
    assert server.recv(1024) == b("y")
112.1.1 by Rick Dean
Add a new test case of connecting to ourself over an actual socket to pass data.
78
132.1.1 by Jean-Paul Calderone
Some tests which are supposed to exercise the paths through Connection.sendall
79
    # Most of our callers want non-blocking sockets, make it easy for them.
116.1.1 by Jean-Paul Calderone
Try doing socket_pair with blocking sockets
80
    server.setblocking(False)
81
    client.setblocking(False)
82
112.1.1 by Rick Dean
Add a new test case of connecting to ourself over an actual socket to pass data.
83
    return (server, client)
84
85
116.1.1 by Jean-Paul Calderone
Try doing socket_pair with blocking sockets
86
132.3.5 by Jean-Paul Calderone
Convert the verify location tests to do the handshake properly
87
def handshake(client, server):
88
    conns = [client, server]
89
    while conns:
90
        for conn in conns:
91
            try:
92
                conn.do_handshake()
93
            except WantReadError:
94
                pass
95
            else:
96
                conns.remove(conn)
97
98
132.1.34 by Jean-Paul Calderone
Try for a real test of Context.add_extra_chain_cert; not working
99
class _LoopbackMixin:
132.1.50 by Jean-Paul Calderone
Some more test method docstrings
100
    """
101
    Helper mixin which defines methods for creating a connected socket pair and
102
    for forcing two connected SSL sockets to talk to each other via memory BIOs.
103
    """
132.1.34 by Jean-Paul Calderone
Try for a real test of Context.add_extra_chain_cert; not working
104
    def _loopback(self):
105
        (server, client) = socket_pair()
106
107
        ctx = Context(TLSv1_METHOD)
108
        ctx.use_privatekey(load_privatekey(FILETYPE_PEM, server_key_pem))
109
        ctx.use_certificate(load_certificate(FILETYPE_PEM, server_cert_pem))
110
        server = Connection(ctx, server)
111
        server.set_accept_state()
112
        client = Connection(Context(TLSv1_METHOD), client)
113
        client.set_connect_state()
114
132.3.5 by Jean-Paul Calderone
Convert the verify location tests to do the handshake properly
115
        handshake(client, server)
132.1.34 by Jean-Paul Calderone
Try for a real test of Context.add_extra_chain_cert; not working
116
117
        server.setblocking(True)
118
        client.setblocking(True)
119
        return server, client
120
121
122
    def _interactInMemory(self, client_conn, server_conn):
123
        """
124
        Try to read application bytes from each of the two L{Connection}
125
        objects.  Copy bytes back and forth between their send/receive buffers
126
        for as long as there is anything to copy.  When there is nothing more
127
        to copy, return C{None}.  If one of them actually manages to deliver
128
        some application bytes, return a two-tuple of the connection from which
129
        the bytes were read and the bytes themselves.
130
        """
131
        wrote = True
132
        while wrote:
133
            # Loop until neither side has anything to say
134
            wrote = False
135
136
            # Copy stuff from each side's send buffer to the other side's
137
            # receive buffer.
138
            for (read, write) in [(client_conn, server_conn),
139
                                  (server_conn, client_conn)]:
140
141
                # Give the side a chance to generate some more bytes, or
142
                # succeed.
143
                try:
144
                    bytes = read.recv(2 ** 16)
145
                except WantReadError:
146
                    # It didn't succeed, so we'll hope it generated some
147
                    # output.
148
                    pass
149
                else:
150
                    # It did succeed, so we'll stop now and let the caller deal
151
                    # with it.
152
                    return (read, bytes)
153
154
                while True:
155
                    # Keep copying as long as there's more stuff there.
156
                    try:
157
                        dirty = read.bio_read(4096)
158
                    except WantReadError:
159
                        # Okay, nothing more waiting to be sent.  Stop
160
                        # processing this send buffer.
161
                        break
162
                    else:
163
                        # Keep track of the fact that someone generated some
164
                        # output.
165
                        wrote = True
166
                        write.bio_write(dirty)
167
168
169
170
class ContextTests(TestCase, _LoopbackMixin):
41 by Jean-Paul Calderone
A couple trivial Context tests
171
    """
172
    Unit tests for L{OpenSSL.SSL.Context}.
173
    """
174
    def test_method(self):
175
        """
176
        L{Context} can be instantiated with one of L{SSLv2_METHOD},
177
        L{SSLv3_METHOD}, L{SSLv23_METHOD}, or L{TLSv1_METHOD}.
178
        """
179
        for meth in [SSLv2_METHOD, SSLv3_METHOD, SSLv23_METHOD, TLSv1_METHOD]:
180
            Context(meth)
181
        self.assertRaises(TypeError, Context, "")
182
        self.assertRaises(ValueError, Context, 10)
183
184
112.5.1 by Rick Dean
More unit tests about types
185
    def test_type(self):
186
        """
119 by Jean-Paul Calderone
Add a bunch of tests which assert that extension types are defined nicely.
187
        L{Context} and L{ContextType} refer to the same type object and can be
188
        used to create instances of that type.
112.5.1 by Rick Dean
More unit tests about types
189
        """
119 by Jean-Paul Calderone
Add a bunch of tests which assert that extension types are defined nicely.
190
        self.assertIdentical(Context, ContextType)
191
        self.assertConsistentType(Context, 'Context', TLSv1_METHOD)
112.5.1 by Rick Dean
More unit tests about types
192
193
41 by Jean-Paul Calderone
A couple trivial Context tests
194
    def test_use_privatekey(self):
195
        """
196
        L{Context.use_privatekey} takes an L{OpenSSL.crypto.PKey} instance.
197
        """
198
        key = PKey()
199
        key.generate_key(TYPE_RSA, 128)
200
        ctx = Context(TLSv1_METHOD)
201
        ctx.use_privatekey(key)
202
        self.assertRaises(TypeError, ctx.use_privatekey, "")
60 by Jean-Paul Calderone
Fix a threading bug in passphrase callback support for context objects.
203
204
132.1.26 by Jean-Paul Calderone
Context app data wrong arg tests
205
    def test_set_app_data_wrong_args(self):
206
        """
207
        L{Context.set_app_data} raises L{TypeError} if called with other than
208
        one argument.
209
        """
210
        context = Context(TLSv1_METHOD)
211
        self.assertRaises(TypeError, context.set_app_data)
212
        self.assertRaises(TypeError, context.set_app_data, None, None)
213
214
215
    def test_get_app_data_wrong_args(self):
216
        """
217
        L{Context.get_app_data} raises L{TypeError} if called with any
218
        arguments.
219
        """
220
        context = Context(TLSv1_METHOD)
221
        self.assertRaises(TypeError, context.get_app_data, None)
222
223
224
    def test_app_data(self):
225
        """
226
        L{Context.set_app_data} stores an object for later retrieval using
227
        L{Context.get_app_data}.
228
        """
229
        app_data = object()
230
        context = Context(TLSv1_METHOD)
231
        context.set_app_data(app_data)
232
        self.assertIdentical(context.get_app_data(), app_data)
233
234
132.1.27 by Jean-Paul Calderone
Context.set_options wrong args test
235
    def test_set_options_wrong_args(self):
236
        """
237
        L{Context.set_options} raises L{TypeError} if called with the wrong
238
        number of arguments or a non-C{int} argument.
239
        """
240
        context = Context(TLSv1_METHOD)
241
        self.assertRaises(TypeError, context.set_options)
242
        self.assertRaises(TypeError, context.set_options, None)
243
        self.assertRaises(TypeError, context.set_options, 1, None)
244
245
132.1.28 by Jean-Paul Calderone
Context.set_timeout and get_timeout tests
246
    def test_set_timeout_wrong_args(self):
247
        """
248
        L{Context.set_timeout} raises L{TypeError} if called with the wrong
249
        number of arguments or a non-C{int} argument.
250
        """
251
        context = Context(TLSv1_METHOD)
252
        self.assertRaises(TypeError, context.set_timeout)
253
        self.assertRaises(TypeError, context.set_timeout, None)
254
        self.assertRaises(TypeError, context.set_timeout, 1, None)
255
256
257
    def test_get_timeout_wrong_args(self):
258
        """
259
        L{Context.get_timeout} raises L{TypeError} if called with any arguments.
260
        """
261
        context = Context(TLSv1_METHOD)
262
        self.assertRaises(TypeError, context.get_timeout, None)
263
264
265
    def test_timeout(self):
266
        """
267
        L{Context.set_timeout} sets the session timeout for all connections
268
        created using the context object.  L{Context.get_timeout} retrieves this
269
        value.
270
        """
271
        context = Context(TLSv1_METHOD)
272
        context.set_timeout(1234)
273
        self.assertEquals(context.get_timeout(), 1234)
274
275
132.1.30 by Jean-Paul Calderone
Tests for set_verify_depth and get_verify_depth
276
    def test_set_verify_depth_wrong_args(self):
277
        """
278
        L{Context.set_verify_depth} raises L{TypeError} if called with the wrong
279
        number of arguments or a non-C{int} argument.
280
        """
281
        context = Context(TLSv1_METHOD)
282
        self.assertRaises(TypeError, context.set_verify_depth)
283
        self.assertRaises(TypeError, context.set_verify_depth, None)
284
        self.assertRaises(TypeError, context.set_verify_depth, 1, None)
285
286
287
    def test_get_verify_depth_wrong_args(self):
288
        """
289
        L{Context.get_verify_depth} raises L{TypeError} if called with any arguments.
290
        """
291
        context = Context(TLSv1_METHOD)
292
        self.assertRaises(TypeError, context.get_verify_depth, None)
293
294
295
    def test_verify_depth(self):
296
        """
297
        L{Context.set_verify_depth} sets the number of certificates in a chain
298
        to follow before giving up.  The value can be retrieved with
299
        L{Context.get_verify_depth}.
300
        """
301
        context = Context(TLSv1_METHOD)
302
        context.set_verify_depth(11)
303
        self.assertEquals(context.get_verify_depth(), 11)
304
305
132.1.25 by Jean-Paul Calderone
Tests for the error handling in the global passphrase callback
306
    def _write_encrypted_pem(self, passphrase):
132.1.50 by Jean-Paul Calderone
Some more test method docstrings
307
        """
308
        Write a new private key out to a new file, encrypted using the given
309
        passphrase.  Return the path to the new file.
310
        """
132.1.25 by Jean-Paul Calderone
Tests for the error handling in the global passphrase callback
311
        key = PKey()
312
        key.generate_key(TYPE_RSA, 128)
313
        pemFile = self.mktemp()
132.2.40 by Jean-Paul Calderone
Convert some bytes to text and vice versa
314
        fObj = open(pemFile, 'w')
315
        pem = dump_privatekey(FILETYPE_PEM, key, "blowfish", passphrase)
316
        fObj.write(pem.decode('ascii'))
132.1.25 by Jean-Paul Calderone
Tests for the error handling in the global passphrase callback
317
        fObj.close()
318
        return pemFile
319
320
132.1.35 by Jean-Paul Calderone
A test for use_certificate_chain_file which fails just like add_extra_chain_cert
321
    def test_set_passwd_cb_wrong_args(self):
322
        """
323
        L{Context.set_passwd_cb} raises L{TypeError} if called with the
324
        wrong arguments or with a non-callable first argument.
325
        """
326
        context = Context(TLSv1_METHOD)
327
        self.assertRaises(TypeError, context.set_passwd_cb)
328
        self.assertRaises(TypeError, context.set_passwd_cb, None)
329
        self.assertRaises(TypeError, context.set_passwd_cb, lambda: None, None, None)
330
331
60 by Jean-Paul Calderone
Fix a threading bug in passphrase callback support for context objects.
332
    def test_set_passwd_cb(self):
333
        """
334
        L{Context.set_passwd_cb} accepts a callable which will be invoked when
335
        a private key is loaded from an encrypted PEM.
336
        """
132.2.40 by Jean-Paul Calderone
Convert some bytes to text and vice versa
337
        passphrase = b("foobar")
132.1.25 by Jean-Paul Calderone
Tests for the error handling in the global passphrase callback
338
        pemFile = self._write_encrypted_pem(passphrase)
60 by Jean-Paul Calderone
Fix a threading bug in passphrase callback support for context objects.
339
        calledWith = []
340
        def passphraseCallback(maxlen, verify, extra):
341
            calledWith.append((maxlen, verify, extra))
342
            return passphrase
343
        context = Context(TLSv1_METHOD)
344
        context.set_passwd_cb(passphraseCallback)
345
        context.use_privatekey_file(pemFile)
346
        self.assertTrue(len(calledWith), 1)
347
        self.assertTrue(isinstance(calledWith[0][0], int))
348
        self.assertTrue(isinstance(calledWith[0][1], int))
349
        self.assertEqual(calledWith[0][2], None)
61 by Jean-Paul Calderone
Fix a threading bug in the info callback support for context objects.
350
351
132.1.25 by Jean-Paul Calderone
Tests for the error handling in the global passphrase callback
352
    def test_passwd_callback_exception(self):
353
        """
354
        L{Context.use_privatekey_file} propagates any exception raised by the
355
        passphrase callback.
356
        """
132.2.40 by Jean-Paul Calderone
Convert some bytes to text and vice versa
357
        pemFile = self._write_encrypted_pem(b("monkeys are nice"))
132.1.25 by Jean-Paul Calderone
Tests for the error handling in the global passphrase callback
358
        def passphraseCallback(maxlen, verify, extra):
359
            raise RuntimeError("Sorry, I am a fail.")
360
361
        context = Context(TLSv1_METHOD)
362
        context.set_passwd_cb(passphraseCallback)
363
        self.assertRaises(RuntimeError, context.use_privatekey_file, pemFile)
364
365
366
    def test_passwd_callback_false(self):
367
        """
368
        L{Context.use_privatekey_file} raises L{OpenSSL.SSL.Error} if the
369
        passphrase callback returns a false value.
370
        """
132.2.40 by Jean-Paul Calderone
Convert some bytes to text and vice versa
371
        pemFile = self._write_encrypted_pem(b("monkeys are nice"))
132.1.25 by Jean-Paul Calderone
Tests for the error handling in the global passphrase callback
372
        def passphraseCallback(maxlen, verify, extra):
373
            return None
374
375
        context = Context(TLSv1_METHOD)
376
        context.set_passwd_cb(passphraseCallback)
377
        self.assertRaises(Error, context.use_privatekey_file, pemFile)
378
379
380
    def test_passwd_callback_non_string(self):
381
        """
382
        L{Context.use_privatekey_file} raises L{OpenSSL.SSL.Error} if the
383
        passphrase callback returns a true non-string value.
384
        """
132.2.40 by Jean-Paul Calderone
Convert some bytes to text and vice versa
385
        pemFile = self._write_encrypted_pem(b("monkeys are nice"))
132.1.25 by Jean-Paul Calderone
Tests for the error handling in the global passphrase callback
386
        def passphraseCallback(maxlen, verify, extra):
387
            return 10
388
389
        context = Context(TLSv1_METHOD)
390
        context.set_passwd_cb(passphraseCallback)
391
        self.assertRaises(Error, context.use_privatekey_file, pemFile)
392
393
394
    def test_passwd_callback_too_long(self):
395
        """
396
        If the passphrase returned by the passphrase callback returns a string
397
        longer than the indicated maximum length, it is truncated.
398
        """
399
        # A priori knowledge!
132.2.40 by Jean-Paul Calderone
Convert some bytes to text and vice versa
400
        passphrase = b("x") * 1024
132.1.25 by Jean-Paul Calderone
Tests for the error handling in the global passphrase callback
401
        pemFile = self._write_encrypted_pem(passphrase)
402
        def passphraseCallback(maxlen, verify, extra):
403
            assert maxlen == 1024
132.2.41 by Jean-Paul Calderone
Convert some more bytes to text and vice versa
404
            return passphrase + b("y")
132.1.25 by Jean-Paul Calderone
Tests for the error handling in the global passphrase callback
405
406
        context = Context(TLSv1_METHOD)
407
        context.set_passwd_cb(passphraseCallback)
408
        # This shall succeed because the truncated result is the correct
409
        # passphrase.
410
        context.use_privatekey_file(pemFile)
411
412
61 by Jean-Paul Calderone
Fix a threading bug in the info callback support for context objects.
413
    def test_set_info_callback(self):
414
        """
415
        L{Context.set_info_callback} accepts a callable which will be invoked
416
        when certain information about an SSL connection is available.
417
        """
112.1.1 by Rick Dean
Add a new test case of connecting to ourself over an actual socket to pass data.
418
        (server, client) = socket_pair()
61 by Jean-Paul Calderone
Fix a threading bug in the info callback support for context objects.
419
420
        clientSSL = Connection(Context(TLSv1_METHOD), client)
421
        clientSSL.set_connect_state()
422
423
        called = []
424
        def info(conn, where, ret):
425
            called.append((conn, where, ret))
426
        context = Context(TLSv1_METHOD)
427
        context.set_info_callback(info)
428
        context.use_certificate(
429
            load_certificate(FILETYPE_PEM, cleartextCertificatePEM))
430
        context.use_privatekey(
431
            load_privatekey(FILETYPE_PEM, cleartextPrivateKeyPEM))
432
433
        serverSSL = Connection(context, server)
434
        serverSSL.set_accept_state()
435
436
        while not called:
437
            for ssl in clientSSL, serverSSL:
438
                try:
439
                    ssl.do_handshake()
440
                except WantReadError:
441
                    pass
442
443
        # Kind of lame.  Just make sure it got called somehow.
444
        self.assertTrue(called)
70.2.1 by Jean-Paul Calderone
test for the pre-existing load_verify_locations function
445
446
70.2.3 by Jean-Paul Calderone
support capath parameter to load_verify_locations; add wrapper for set_default_verify_paths
447
    def _load_verify_locations_test(self, *args):
132.1.50 by Jean-Paul Calderone
Some more test method docstrings
448
        """
449
        Create a client context which will verify the peer certificate and call
450
        its C{load_verify_locations} method with C{*args}.  Then connect it to a
451
        server and ensure that the handshake succeeds.
452
        """
112.1.1 by Rick Dean
Add a new test case of connecting to ourself over an actual socket to pass data.
453
        (server, client) = socket_pair()
70.2.1 by Jean-Paul Calderone
test for the pre-existing load_verify_locations function
454
455
        clientContext = Context(TLSv1_METHOD)
70.2.3 by Jean-Paul Calderone
support capath parameter to load_verify_locations; add wrapper for set_default_verify_paths
456
        clientContext.load_verify_locations(*args)
70.2.1 by Jean-Paul Calderone
test for the pre-existing load_verify_locations function
457
        # Require that the server certificate verify properly or the
458
        # connection will fail.
459
        clientContext.set_verify(
460
            VERIFY_PEER,
461
            lambda conn, cert, errno, depth, preverify_ok: preverify_ok)
462
463
        clientSSL = Connection(clientContext, client)
464
        clientSSL.set_connect_state()
465
466
        serverContext = Context(TLSv1_METHOD)
467
        serverContext.use_certificate(
468
            load_certificate(FILETYPE_PEM, cleartextCertificatePEM))
469
        serverContext.use_privatekey(
470
            load_privatekey(FILETYPE_PEM, cleartextPrivateKeyPEM))
471
472
        serverSSL = Connection(serverContext, server)
473
        serverSSL.set_accept_state()
474
132.3.5 by Jean-Paul Calderone
Convert the verify location tests to do the handshake properly
475
        # Without load_verify_locations above, the handshake
476
        # will fail:
477
        # Error: [('SSL routines', 'SSL3_GET_SERVER_CERTIFICATE',
478
        #          'certificate verify failed')]
479
        handshake(clientSSL, serverSSL)
70.2.1 by Jean-Paul Calderone
test for the pre-existing load_verify_locations function
480
481
        cert = clientSSL.get_peer_certificate()
98 by Jean-Paul Calderone
Fix the tests by updating the certificate they use so that it is not expired
482
        self.assertEqual(cert.get_subject().CN, 'Testing Root CA')
70.2.2 by Jean-Paul Calderone
a test for the obvious error case of load_verify_locations
483
122.3.17 by Jean-Paul Calderone
Minimal tests for Context.add_extra_chain_cert
484
70.2.3 by Jean-Paul Calderone
support capath parameter to load_verify_locations; add wrapper for set_default_verify_paths
485
    def test_load_verify_file(self):
486
        """
487
        L{Context.load_verify_locations} accepts a file name and uses the
488
        certificates within for verification purposes.
489
        """
490
        cafile = self.mktemp()
132.2.40 by Jean-Paul Calderone
Convert some bytes to text and vice versa
491
        fObj = open(cafile, 'w')
132.2.41 by Jean-Paul Calderone
Convert some more bytes to text and vice versa
492
        fObj.write(cleartextCertificatePEM.decode('ascii'))
70.2.3 by Jean-Paul Calderone
support capath parameter to load_verify_locations; add wrapper for set_default_verify_paths
493
        fObj.close()
494
495
        self._load_verify_locations_test(cafile)
496
70.2.2 by Jean-Paul Calderone
a test for the obvious error case of load_verify_locations
497
498
    def test_load_verify_invalid_file(self):
499
        """
500
        L{Context.load_verify_locations} raises L{Error} when passed a
501
        non-existent cafile.
502
        """
503
        clientContext = Context(TLSv1_METHOD)
504
        self.assertRaises(
505
            Error, clientContext.load_verify_locations, self.mktemp())
70.2.3 by Jean-Paul Calderone
support capath parameter to load_verify_locations; add wrapper for set_default_verify_paths
506
507
508
    def test_load_verify_directory(self):
509
        """
510
        L{Context.load_verify_locations} accepts a directory name and uses
511
        the certificates within for verification purposes.
512
        """
513
        capath = self.mktemp()
514
        makedirs(capath)
515
        # Hash value computed manually with c_rehash to avoid depending on
516
        # c_rehash in the test suite.
119.1.1 by Jean-Paul Calderone
Avoid using symlink, since we cannot use it on windows
517
        cafile = join(capath, 'c7adac82.0')
132.2.40 by Jean-Paul Calderone
Convert some bytes to text and vice versa
518
        fObj = open(cafile, 'w')
132.2.41 by Jean-Paul Calderone
Convert some more bytes to text and vice versa
519
        fObj.write(cleartextCertificatePEM.decode('ascii'))
119.1.1 by Jean-Paul Calderone
Avoid using symlink, since we cannot use it on windows
520
        fObj.close()
70.2.3 by Jean-Paul Calderone
support capath parameter to load_verify_locations; add wrapper for set_default_verify_paths
521
522
        self._load_verify_locations_test(None, capath)
523
524
132.1.35 by Jean-Paul Calderone
A test for use_certificate_chain_file which fails just like add_extra_chain_cert
525
    def test_load_verify_locations_wrong_args(self):
526
        """
527
        L{Context.load_verify_locations} raises L{TypeError} if called with
528
        the wrong number of arguments or with non-C{str} arguments.
529
        """
530
        context = Context(TLSv1_METHOD)
531
        self.assertRaises(TypeError, context.load_verify_locations)
532
        self.assertRaises(TypeError, context.load_verify_locations, object())
533
        self.assertRaises(TypeError, context.load_verify_locations, object(), object())
534
        self.assertRaises(TypeError, context.load_verify_locations, None, None, None)
535
536
132.1.1 by Jean-Paul Calderone
Some tests which are supposed to exercise the paths through Connection.sendall
537
    if platform == "win32":
538
        "set_default_verify_paths appears not to work on Windows.  "
119.1.4 by Jean-Paul Calderone
skip on darwin and win32
539
        "See LP#404343 and LP#404344."
540
    else:
541
        def test_set_default_verify_paths(self):
542
            """
543
            L{Context.set_default_verify_paths} causes the platform-specific CA
544
            certificate locations to be used for verification purposes.
545
            """
546
            # Testing this requires a server with a certificate signed by one of
547
            # the CAs in the platform CA location.  Getting one of those costs
548
            # money.  Fortunately (or unfortunately, depending on your
549
            # perspective), it's easy to think of a public server on the
550
            # internet which has such a certificate.  Connecting to the network
551
            # in a unit test is bad, but it's the only way I can think of to
552
            # really test this. -exarkun
553
554
            # Arg, verisign.com doesn't speak TLSv1
555
            context = Context(SSLv3_METHOD)
556
            context.set_default_verify_paths()
557
            context.set_verify(
122.3.1 by Ziga Seilnacht
Types should have a correct __module__ attribute.
558
                VERIFY_PEER,
119.1.4 by Jean-Paul Calderone
skip on darwin and win32
559
                lambda conn, cert, errno, depth, preverify_ok: preverify_ok)
560
561
            client = socket()
562
            client.connect(('verisign.com', 443))
563
            clientSSL = Connection(context, client)
564
            clientSSL.set_connect_state()
565
            clientSSL.do_handshake()
566
            clientSSL.send('GET / HTTP/1.0\r\n\r\n')
567
            self.assertTrue(clientSSL.recv(1024))
70.2.6 by Jean-Paul Calderone
reject arguments to set_default_verify_paths
568
569
570
    def test_set_default_verify_paths_signature(self):
571
        """
572
        L{Context.set_default_verify_paths} takes no arguments and raises
573
        L{TypeError} if given any.
574
        """
575
        context = Context(TLSv1_METHOD)
576
        self.assertRaises(TypeError, context.set_default_verify_paths, None)
577
        self.assertRaises(TypeError, context.set_default_verify_paths, 1)
578
        self.assertRaises(TypeError, context.set_default_verify_paths, "")
83 by Jean-Paul Calderone
Expose some new DTLS-related constants in OpenSSL.SSL - OP_NO_QUERY_MTU, OP_COOKIE_EXCHANGE, and OP_NO_TICKET.
579
132.1.35 by Jean-Paul Calderone
A test for use_certificate_chain_file which fails just like add_extra_chain_cert
580
122.3.17 by Jean-Paul Calderone
Minimal tests for Context.add_extra_chain_cert
581
    def test_add_extra_chain_cert_invalid_cert(self):
582
        """
583
        L{Context.add_extra_chain_cert} raises L{TypeError} if called with
584
        other than one argument or if called with an object which is not an
585
        instance of L{X509}.
586
        """
587
        context = Context(TLSv1_METHOD)
588
        self.assertRaises(TypeError, context.add_extra_chain_cert)
589
        self.assertRaises(TypeError, context.add_extra_chain_cert, object())
590
        self.assertRaises(TypeError, context.add_extra_chain_cert, object(), object())
591
592
132.1.35 by Jean-Paul Calderone
A test for use_certificate_chain_file which fails just like add_extra_chain_cert
593
    def _create_certificate_chain(self):
122.3.17 by Jean-Paul Calderone
Minimal tests for Context.add_extra_chain_cert
594
        """
132.1.35 by Jean-Paul Calderone
A test for use_certificate_chain_file which fails just like add_extra_chain_cert
595
        Construct and return a chain of certificates.
132.1.34 by Jean-Paul Calderone
Try for a real test of Context.add_extra_chain_cert; not working
596
597
            1. A new self-signed certificate authority certificate (cacert)
598
            2. A new intermediate certificate signed by cacert (icert)
599
            3. A new server certificate signed by icert (scert)
122.3.17 by Jean-Paul Calderone
Minimal tests for Context.add_extra_chain_cert
600
        """
132.2.45 by Jean-Paul Calderone
merge trunk
601
        caext = X509Extension(b('basicConstraints'), False, b('CA:true'))
132.1.34 by Jean-Paul Calderone
Try for a real test of Context.add_extra_chain_cert; not working
602
603
        # Step 1
604
        cakey = PKey()
605
        cakey.generate_key(TYPE_RSA, 512)
606
        cacert = X509()
132.1.44 by Jean-Paul Calderone
Fix the scert.sign(skey, ...) bug; should have been scert.sign(ikey, ...)
607
        cacert.get_subject().commonName = "Authority Certificate"
132.1.34 by Jean-Paul Calderone
Try for a real test of Context.add_extra_chain_cert; not working
608
        cacert.set_issuer(cacert.get_subject())
609
        cacert.set_pubkey(cakey)
132.2.48 by Jean-Paul Calderone
Make timestamps bytes
610
        cacert.set_notBefore(b("20000101000000Z"))
611
        cacert.set_notAfter(b("20200101000000Z"))
132.1.34 by Jean-Paul Calderone
Try for a real test of Context.add_extra_chain_cert; not working
612
        cacert.add_extensions([caext])
132.1.44 by Jean-Paul Calderone
Fix the scert.sign(skey, ...) bug; should have been scert.sign(ikey, ...)
613
        cacert.set_serial_number(0)
614
        cacert.sign(cakey, "sha1")
132.1.34 by Jean-Paul Calderone
Try for a real test of Context.add_extra_chain_cert; not working
615
616
        # Step 2
617
        ikey = PKey()
618
        ikey.generate_key(TYPE_RSA, 512)
619
        icert = X509()
620
        icert.get_subject().commonName = "Intermediate Certificate"
621
        icert.set_issuer(cacert.get_subject())
622
        icert.set_pubkey(ikey)
132.2.48 by Jean-Paul Calderone
Make timestamps bytes
623
        icert.set_notBefore(b("20000101000000Z"))
624
        icert.set_notAfter(b("20200101000000Z"))
132.1.34 by Jean-Paul Calderone
Try for a real test of Context.add_extra_chain_cert; not working
625
        icert.add_extensions([caext])
132.1.44 by Jean-Paul Calderone
Fix the scert.sign(skey, ...) bug; should have been scert.sign(ikey, ...)
626
        icert.set_serial_number(0)
627
        icert.sign(cakey, "sha1")
132.1.34 by Jean-Paul Calderone
Try for a real test of Context.add_extra_chain_cert; not working
628
629
        # Step 3
630
        skey = PKey()
631
        skey.generate_key(TYPE_RSA, 512)
632
        scert = X509()
633
        scert.get_subject().commonName = "Server Certificate"
634
        scert.set_issuer(icert.get_subject())
635
        scert.set_pubkey(skey)
132.2.48 by Jean-Paul Calderone
Make timestamps bytes
636
        scert.set_notBefore(b("20000101000000Z"))
637
        scert.set_notAfter(b("20200101000000Z"))
132.2.49 by Jean-Paul Calderone
Some fields of X509Extension are bytes, not text
638
        scert.add_extensions([
639
                X509Extension(b('basicConstraints'), True, b('CA:false'))])
132.1.44 by Jean-Paul Calderone
Fix the scert.sign(skey, ...) bug; should have been scert.sign(ikey, ...)
640
        scert.set_serial_number(0)
641
        scert.sign(ikey, "sha1")
642
132.1.35 by Jean-Paul Calderone
A test for use_certificate_chain_file which fails just like add_extra_chain_cert
643
        return [(cakey, cacert), (ikey, icert), (skey, scert)]
644
645
646
    def _handshake_test(self, serverContext, clientContext):
647
        """
648
        Verify that a client and server created with the given contexts can
649
        successfully handshake and communicate.
650
        """
651
        serverSocket, clientSocket = socket_pair()
652
132.1.34 by Jean-Paul Calderone
Try for a real test of Context.add_extra_chain_cert; not working
653
        server = Connection(serverContext, serverSocket)
132.1.12 by Jean-Paul Calderone
Test for Connection.shutdown
654
        server.set_accept_state()
132.1.34 by Jean-Paul Calderone
Try for a real test of Context.add_extra_chain_cert; not working
655
132.1.35 by Jean-Paul Calderone
A test for use_certificate_chain_file which fails just like add_extra_chain_cert
656
        client = Connection(clientContext, clientSocket)
657
        client.set_connect_state()
658
659
        # Make them talk to each other.
660
        # self._interactInMemory(client, server)
661
        for i in range(3):
662
            for s in [client, server]:
663
                try:
664
                    s.do_handshake()
665
                except WantReadError:
666
                    pass
667
668
669
    def test_add_extra_chain_cert(self):
670
        """
671
        L{Context.add_extra_chain_cert} accepts an L{X509} instance to add to
672
        the certificate chain.
673
674
        See L{_create_certificate_chain} for the details of the certificate
675
        chain tested.
676
677
        The chain is tested by starting a server with scert and connecting
678
        to it with a client which trusts cacert and requires verification to
679
        succeed.
680
        """
681
        chain = self._create_certificate_chain()
682
        [(cakey, cacert), (ikey, icert), (skey, scert)] = chain
683
132.1.34 by Jean-Paul Calderone
Try for a real test of Context.add_extra_chain_cert; not working
684
        # Dump the CA certificate to a file because that's the only way to load
685
        # it as a trusted CA in the client context.
686
        for cert, name in [(cacert, 'ca.pem'), (icert, 'i.pem'), (scert, 's.pem')]:
132.2.40 by Jean-Paul Calderone
Convert some bytes to text and vice versa
687
            fObj = open(name, 'w')
132.2.41 by Jean-Paul Calderone
Convert some more bytes to text and vice versa
688
            fObj.write(dump_certificate(FILETYPE_PEM, cert).decode('ascii'))
132.1.34 by Jean-Paul Calderone
Try for a real test of Context.add_extra_chain_cert; not working
689
            fObj.close()
690
691
        for key, name in [(cakey, 'ca.key'), (ikey, 'i.key'), (skey, 's.key')]:
132.2.40 by Jean-Paul Calderone
Convert some bytes to text and vice versa
692
            fObj = open(name, 'w')
132.2.41 by Jean-Paul Calderone
Convert some more bytes to text and vice versa
693
            fObj.write(dump_privatekey(FILETYPE_PEM, key).decode('ascii'))
132.1.34 by Jean-Paul Calderone
Try for a real test of Context.add_extra_chain_cert; not working
694
            fObj.close()
695
132.1.35 by Jean-Paul Calderone
A test for use_certificate_chain_file which fails just like add_extra_chain_cert
696
        # Create the server context
697
        serverContext = Context(TLSv1_METHOD)
698
        serverContext.use_privatekey(skey)
699
        serverContext.use_certificate(scert)
132.1.44 by Jean-Paul Calderone
Fix the scert.sign(skey, ...) bug; should have been scert.sign(ikey, ...)
700
        # The client already has cacert, we only need to give them icert.
132.1.35 by Jean-Paul Calderone
A test for use_certificate_chain_file which fails just like add_extra_chain_cert
701
        serverContext.add_extra_chain_cert(icert)
702
132.1.34 by Jean-Paul Calderone
Try for a real test of Context.add_extra_chain_cert; not working
703
        # Create the client
704
        clientContext = Context(TLSv1_METHOD)
705
        clientContext.set_verify(
706
            VERIFY_PEER | VERIFY_FAIL_IF_NO_PEER_CERT, verify_cb)
707
        clientContext.load_verify_locations('ca.pem')
132.1.35 by Jean-Paul Calderone
A test for use_certificate_chain_file which fails just like add_extra_chain_cert
708
709
        # Try it out.
710
        self._handshake_test(serverContext, clientContext)
711
712
713
    def test_use_certificate_chain_file(self):
714
        """
715
        L{Context.use_certificate_chain_file} reads a certificate chain from
716
        the specified file.
717
718
        The chain is tested by starting a server with scert and connecting
719
        to it with a client which trusts cacert and requires verification to
720
        succeed.
721
        """
722
        chain = self._create_certificate_chain()
723
        [(cakey, cacert), (ikey, icert), (skey, scert)] = chain
724
725
        # Write out the chain file.
726
        chainFile = self.mktemp()
132.2.40 by Jean-Paul Calderone
Convert some bytes to text and vice versa
727
        fObj = open(chainFile, 'w')
132.1.35 by Jean-Paul Calderone
A test for use_certificate_chain_file which fails just like add_extra_chain_cert
728
        # Most specific to least general.
132.2.40 by Jean-Paul Calderone
Convert some bytes to text and vice versa
729
        fObj.write(dump_certificate(FILETYPE_PEM, scert).decode('ascii'))
730
        fObj.write(dump_certificate(FILETYPE_PEM, icert).decode('ascii'))
731
        fObj.write(dump_certificate(FILETYPE_PEM, cacert).decode('ascii'))
132.1.35 by Jean-Paul Calderone
A test for use_certificate_chain_file which fails just like add_extra_chain_cert
732
        fObj.close()
733
734
        serverContext = Context(TLSv1_METHOD)
735
        serverContext.use_certificate_chain_file(chainFile)
736
        serverContext.use_privatekey(skey)
737
132.2.40 by Jean-Paul Calderone
Convert some bytes to text and vice versa
738
        fObj = open('ca.pem', 'w')
739
        fObj.write(dump_certificate(FILETYPE_PEM, cacert).decode('ascii'))
132.1.35 by Jean-Paul Calderone
A test for use_certificate_chain_file which fails just like add_extra_chain_cert
740
        fObj.close()
741
742
        clientContext = Context(TLSv1_METHOD)
743
        clientContext.set_verify(
744
            VERIFY_PEER | VERIFY_FAIL_IF_NO_PEER_CERT, verify_cb)
745
        clientContext.load_verify_locations('ca.pem')
746
747
        self._handshake_test(serverContext, clientContext)
748
132.1.42 by Jean-Paul Calderone
notes for later
749
    # XXX load_client_ca
750
    # XXX set_session_id
132.1.51 by Jean-Paul Calderone
Add tests for get_verify_mode
751
752
    def test_get_verify_mode_wrong_args(self):
753
        """
754
        L{Context.get_verify_mode} raises L{TypeError} if called with any
755
        arguments.
756
        """
757
        context = Context(TLSv1_METHOD)
758
        self.assertRaises(TypeError, context.get_verify_mode, None)
759
760
761
    def test_get_verify_mode(self):
762
        """
763
        L{Context.get_verify_mode} returns the verify mode flags previously
764
        passed to L{Context.set_verify}.
765
        """
766
        context = Context(TLSv1_METHOD)
767
        self.assertEquals(context.get_verify_mode(), 0)
768
        context.set_verify(
769
            VERIFY_PEER | VERIFY_CLIENT_ONCE, lambda *args: None)
770
        self.assertEquals(
771
            context.get_verify_mode(), VERIFY_PEER | VERIFY_CLIENT_ONCE)
772
132.1.52 by Jean-Paul Calderone
Sort of add a load_tmp_dh test; also a Context.set_cipher_list test
773
774
    def test_load_tmp_dh_wrong_args(self):
775
        """
776
        L{Context.load_tmp_dh} raises L{TypeError} if called with the wrong
777
        number of arguments or with a non-C{str} argument.
778
        """
779
        context = Context(TLSv1_METHOD)
780
        self.assertRaises(TypeError, context.load_tmp_dh)
781
        self.assertRaises(TypeError, context.load_tmp_dh, "foo", None)
782
        self.assertRaises(TypeError, context.load_tmp_dh, object())
783
784
785
    def test_load_tmp_dh_missing_file(self):
786
        """
787
        L{Context.load_tmp_dh} raises L{OpenSSL.SSL.Error} if the specified file
788
        does not exist.
789
        """
790
        context = Context(TLSv1_METHOD)
791
        self.assertRaises(Error, context.load_tmp_dh, "hello")
792
793
794
    def test_load_tmp_dh(self):
795
        """
796
        L{Context.load_tmp_dh} loads Diffie-Hellman parameters from the
797
        specified file.
798
        """
799
        context = Context(TLSv1_METHOD)
800
        dhfilename = self.mktemp()
801
        dhfile = open(dhfilename, "w")
802
        dhfile.write(dhparam)
803
        dhfile.close()
804
        context.load_tmp_dh(dhfilename)
805
        # XXX What should I assert here? -exarkun
806
807
808
    def test_set_cipher_list(self):
809
        """
810
        L{Context.set_cipher_list} accepts a C{str} naming the ciphers which
811
        connections created with the context object will be able to choose from.
812
        """
813
        context = Context(TLSv1_METHOD)
814
        context.set_cipher_list("hello world:EXP-RC4-MD5")
815
        conn = Connection(context, None)
816
        self.assertEquals(conn.get_cipher_list(), ["EXP-RC4-MD5"])
132.1.12 by Jean-Paul Calderone
Test for Connection.shutdown
817
818
819
820
class ConnectionTests(TestCase, _LoopbackMixin):
112.5.1 by Rick Dean
More unit tests about types
821
    """
822
    Unit tests for L{OpenSSL.SSL.Connection}.
823
    """
132.1.53 by Jean-Paul Calderone
Notes on further tests to write
824
    # XXX want_write
825
    # XXX want_read
826
    # XXX get_peer_certificate -> None
827
    # XXX sock_shutdown
828
    # XXX master_key -> TypeError
829
    # XXX server_random -> TypeError
830
    # XXX state_string
831
    # XXX connect -> TypeError
832
    # XXX connect_ex -> TypeError
833
    # XXX set_connect_state -> TypeError
834
    # XXX set_accept_state -> TypeError
835
    # XXX renegotiate_pending
836
    # XXX do_handshake -> TypeError
837
    # XXX bio_read -> TypeError
838
    # XXX recv -> TypeError
839
    # XXX send -> TypeError
840
    # XXX bio_write -> TypeError
841
112.5.1 by Rick Dean
More unit tests about types
842
    def test_type(self):
843
        """
119 by Jean-Paul Calderone
Add a bunch of tests which assert that extension types are defined nicely.
844
        L{Connection} and L{ConnectionType} refer to the same type object and
845
        can be used to create instances of that type.
112.5.1 by Rick Dean
More unit tests about types
846
        """
119 by Jean-Paul Calderone
Add a bunch of tests which assert that extension types are defined nicely.
847
        self.assertIdentical(Connection, ConnectionType)
112.5.1 by Rick Dean
More unit tests about types
848
        ctx = Context(TLSv1_METHOD)
119 by Jean-Paul Calderone
Add a bunch of tests which assert that extension types are defined nicely.
849
        self.assertConsistentType(Connection, 'Connection', ctx, None)
850
851
126 by Jean-Paul Calderone
Add a couple tests for Connection.get_context
852
    def test_get_context(self):
853
        """
854
        L{Connection.get_context} returns the L{Context} instance used to
855
        construct the L{Connection} instance.
856
        """
857
        context = Context(TLSv1_METHOD)
858
        connection = Connection(context, None)
859
        self.assertIdentical(connection.get_context(), context)
860
861
862
    def test_get_context_wrong_args(self):
863
        """
864
        L{Connection.get_context} raises L{TypeError} if called with any
865
        arguments.
866
        """
867
        connection = Connection(Context(TLSv1_METHOD), None)
868
        self.assertRaises(TypeError, connection.get_context, None)
869
870
132.1.5 by Jean-Paul Calderone
Tests for Connection.pending
871
    def test_pending(self):
132.1.46 by Jean-Paul Calderone
more docstrings
872
        """
873
        L{Connection.pending} returns the number of bytes available for
874
        immediate read.
875
        """
132.1.5 by Jean-Paul Calderone
Tests for Connection.pending
876
        connection = Connection(Context(TLSv1_METHOD), None)
877
        self.assertEquals(connection.pending(), 0)
878
879
880
    def test_pending_wrong_args(self):
132.1.46 by Jean-Paul Calderone
more docstrings
881
        """
882
        L{Connection.pending} raises L{TypeError} if called with any arguments.
883
        """
132.1.5 by Jean-Paul Calderone
Tests for Connection.pending
884
        connection = Connection(Context(TLSv1_METHOD), None)
885
        self.assertRaises(TypeError, connection.pending, None)
886
887
132.1.13 by Jean-Paul Calderone
a slew of misc mostly wrong-args tests
888
    def test_connect_wrong_args(self):
132.1.46 by Jean-Paul Calderone
more docstrings
889
        """
890
        L{Connection.connect} raises L{TypeError} if called with a non-address
891
        argument or with the wrong number of arguments.
892
        """
132.1.13 by Jean-Paul Calderone
a slew of misc mostly wrong-args tests
893
        connection = Connection(Context(TLSv1_METHOD), socket())
894
        self.assertRaises(TypeError, connection.connect, None)
132.1.46 by Jean-Paul Calderone
more docstrings
895
        self.assertRaises(TypeError, connection.connect)
896
        self.assertRaises(TypeError, connection.connect, ("127.0.0.1", 1), None)
132.1.13 by Jean-Paul Calderone
a slew of misc mostly wrong-args tests
897
898
132.1.7 by Jean-Paul Calderone
test for successful and unsuccessful connect
899
    def test_connect_refused(self):
132.1.46 by Jean-Paul Calderone
more docstrings
900
        """
901
        L{Connection.connect} raises L{socket.error} if the underlying socket
902
        connect method raises it.
903
        """
132.1.7 by Jean-Paul Calderone
test for successful and unsuccessful connect
904
        client = socket()
905
        context = Context(TLSv1_METHOD)
906
        clientSSL = Connection(context, client)
907
        exc = self.assertRaises(error, clientSSL.connect, ("127.0.0.1", 1))
132.1.10 by Jean-Paul Calderone
Compat with pre-2.6
908
        self.assertEquals(exc.args[0], ECONNREFUSED)
132.1.7 by Jean-Paul Calderone
test for successful and unsuccessful connect
909
910
911
    def test_connect(self):
132.1.46 by Jean-Paul Calderone
more docstrings
912
        """
913
        L{Connection.connect} establishes a connection to the specified address.
914
        """
132.1.8 by Jean-Paul Calderone
test for Connection.accept
915
        port = socket()
916
        port.bind(('', 0))
917
        port.listen(3)
918
919
        clientSSL = Connection(Context(TLSv1_METHOD), socket())
135.1.3 by Jean-Paul Calderone
Hard-code loopback address because of Windows
920
        clientSSL.connect(('127.0.0.1', port.getsockname()[1]))
921
        # XXX An assertion?  Or something?
132.1.8 by Jean-Paul Calderone
test for Connection.accept
922
923
132.3.1 by Jean-Paul Calderone
Skip test_connect_ex on OS X
924
    if platform == "darwin":
925
        "connect_ex sometimes causes a kernel panic on OS X 10.6.4"
926
    else:
927
        def test_connect_ex(self):
928
            """
929
            If there is a connection error, L{Connection.connect_ex} returns the
930
            errno instead of raising an exception.
931
            """
932
            port = socket()
933
            port.bind(('', 0))
934
            port.listen(3)
132.1.11 by Jean-Paul Calderone
test for connect_ex
935
132.3.1 by Jean-Paul Calderone
Skip test_connect_ex on OS X
936
            clientSSL = Connection(Context(TLSv1_METHOD), socket())
937
            clientSSL.setblocking(False)
938
            result = clientSSL.connect_ex(port.getsockname())
939
            expected = (EINPROGRESS, EWOULDBLOCK)
940
            self.assertTrue(
941
                    result in expected, "%r not in %r" % (result, expected))
132.1.11 by Jean-Paul Calderone
test for connect_ex
942
943
132.1.13 by Jean-Paul Calderone
a slew of misc mostly wrong-args tests
944
    def test_accept_wrong_args(self):
132.1.46 by Jean-Paul Calderone
more docstrings
945
        """
946
        L{Connection.accept} raises L{TypeError} if called with any arguments.
947
        """
132.1.13 by Jean-Paul Calderone
a slew of misc mostly wrong-args tests
948
        connection = Connection(Context(TLSv1_METHOD), socket())
949
        self.assertRaises(TypeError, connection.accept, None)
950
951
132.1.8 by Jean-Paul Calderone
test for Connection.accept
952
    def test_accept(self):
132.1.46 by Jean-Paul Calderone
more docstrings
953
        """
954
        L{Connection.accept} accepts a pending connection attempt and returns a
955
        tuple of a new L{Connection} (the accepted client) and the address the
956
        connection originated from.
957
        """
132.1.7 by Jean-Paul Calderone
test for successful and unsuccessful connect
958
        ctx = Context(TLSv1_METHOD)
959
        ctx.use_privatekey(load_privatekey(FILETYPE_PEM, server_key_pem))
960
        ctx.use_certificate(load_certificate(FILETYPE_PEM, server_cert_pem))
132.1.8 by Jean-Paul Calderone
test for Connection.accept
961
        port = socket()
962
        portSSL = Connection(ctx, port)
963
        portSSL.bind(('', 0))
964
        portSSL.listen(3)
965
966
        clientSSL = Connection(Context(TLSv1_METHOD), socket())
135.1.3 by Jean-Paul Calderone
Hard-code loopback address because of Windows
967
968
        # Calling portSSL.getsockname() here to get the server IP address sounds
969
        # great, but frequently fails on Windows.
970
        clientSSL.connect(('127.0.0.1', portSSL.getsockname()[1]))
132.1.8 by Jean-Paul Calderone
test for Connection.accept
971
972
        serverSSL, address = portSSL.accept()
973
974
        self.assertTrue(isinstance(serverSSL, Connection))
975
        self.assertIdentical(serverSSL.get_context(), ctx)
976
        self.assertEquals(address, clientSSL.getsockname())
132.1.7 by Jean-Paul Calderone
test for successful and unsuccessful connect
977
978
132.1.13 by Jean-Paul Calderone
a slew of misc mostly wrong-args tests
979
    def test_shutdown_wrong_args(self):
132.1.46 by Jean-Paul Calderone
more docstrings
980
        """
981
        L{Connection.shutdown} raises L{TypeError} if called with the wrong
982
        number of arguments or with arguments other than integers.
983
        """
132.1.13 by Jean-Paul Calderone
a slew of misc mostly wrong-args tests
984
        connection = Connection(Context(TLSv1_METHOD), None)
985
        self.assertRaises(TypeError, connection.shutdown, None)
132.1.16 by Jean-Paul Calderone
more wrong args tests
986
        self.assertRaises(TypeError, connection.get_shutdown, None)
987
        self.assertRaises(TypeError, connection.set_shutdown)
988
        self.assertRaises(TypeError, connection.set_shutdown, None)
989
        self.assertRaises(TypeError, connection.set_shutdown, 0, 1)
132.1.13 by Jean-Paul Calderone
a slew of misc mostly wrong-args tests
990
991
132.1.12 by Jean-Paul Calderone
Test for Connection.shutdown
992
    def test_shutdown(self):
132.1.46 by Jean-Paul Calderone
more docstrings
993
        """
994
        L{Connection.shutdown} performs an SSL-level connection shutdown.
995
        """
132.1.12 by Jean-Paul Calderone
Test for Connection.shutdown
996
        server, client = self._loopback()
132.1.14 by Jean-Paul Calderone
somewhat more coverage in test_shutdown
997
        self.assertFalse(server.shutdown())
998
        self.assertEquals(server.get_shutdown(), SENT_SHUTDOWN)
132.1.12 by Jean-Paul Calderone
Test for Connection.shutdown
999
        self.assertRaises(ZeroReturnError, client.recv, 1024)
132.1.14 by Jean-Paul Calderone
somewhat more coverage in test_shutdown
1000
        self.assertEquals(client.get_shutdown(), RECEIVED_SHUTDOWN)
1001
        client.shutdown()
1002
        self.assertEquals(client.get_shutdown(), SENT_SHUTDOWN|RECEIVED_SHUTDOWN)
1003
        self.assertRaises(ZeroReturnError, server.recv, 1024)
1004
        self.assertEquals(server.get_shutdown(), SENT_SHUTDOWN|RECEIVED_SHUTDOWN)
132.1.12 by Jean-Paul Calderone
Test for Connection.shutdown
1005
1006
132.1.15 by Jean-Paul Calderone
test for Connection.set_shutdown
1007
    def test_set_shutdown(self):
132.1.46 by Jean-Paul Calderone
more docstrings
1008
        """
1009
        L{Connection.set_shutdown} sets the state of the SSL connection shutdown
1010
        process.
1011
        """
132.1.15 by Jean-Paul Calderone
test for Connection.set_shutdown
1012
        connection = Connection(Context(TLSv1_METHOD), socket())
1013
        connection.set_shutdown(RECEIVED_SHUTDOWN)
1014
        self.assertEquals(connection.get_shutdown(), RECEIVED_SHUTDOWN)
1015
1016
132.1.13 by Jean-Paul Calderone
a slew of misc mostly wrong-args tests
1017
    def test_app_data_wrong_args(self):
132.1.46 by Jean-Paul Calderone
more docstrings
1018
        """
1019
        L{Connection.set_app_data} raises L{TypeError} if called with other than
1020
        one argument.  L{Connection.get_app_data} raises L{TypeError} if called
1021
        with any arguments.
1022
        """
132.1.13 by Jean-Paul Calderone
a slew of misc mostly wrong-args tests
1023
        conn = Connection(Context(TLSv1_METHOD), None)
1024
        self.assertRaises(TypeError, conn.get_app_data, None)
1025
        self.assertRaises(TypeError, conn.set_app_data)
1026
        self.assertRaises(TypeError, conn.set_app_data, None, None)
1027
1028
132.1.9 by Jean-Paul Calderone
get_app_data and set_app_data
1029
    def test_app_data(self):
132.1.46 by Jean-Paul Calderone
more docstrings
1030
        """
1031
        Any object can be set as app data by passing it to
1032
        L{Connection.set_app_data} and later retrieved with
1033
        L{Connection.get_app_data}.
1034
        """
132.1.9 by Jean-Paul Calderone
get_app_data and set_app_data
1035
        conn = Connection(Context(TLSv1_METHOD), None)
1036
        app_data = object()
1037
        conn.set_app_data(app_data)
1038
        self.assertIdentical(conn.get_app_data(), app_data)
1039
1040
132.1.13 by Jean-Paul Calderone
a slew of misc mostly wrong-args tests
1041
    def test_makefile(self):
132.1.46 by Jean-Paul Calderone
more docstrings
1042
        """
1043
        L{Connection.makefile} is not implemented and calling that method raises
1044
        L{NotImplementedError}.
1045
        """
132.1.13 by Jean-Paul Calderone
a slew of misc mostly wrong-args tests
1046
        conn = Connection(Context(TLSv1_METHOD), None)
1047
        self.assertRaises(NotImplementedError, conn.makefile)
1048
1049
119 by Jean-Paul Calderone
Add a bunch of tests which assert that extension types are defined nicely.
1050
132.1.4 by Jean-Paul Calderone
simple tests for get_cipher_list
1051
class ConnectionGetCipherListTests(TestCase):
132.1.50 by Jean-Paul Calderone
Some more test method docstrings
1052
    """
1053
    Tests for L{Connection.get_cipher_list}.
1054
    """
132.1.48 by Jean-Paul Calderone
Rename test method
1055
    def test_wrong_args(self):
132.1.50 by Jean-Paul Calderone
Some more test method docstrings
1056
        """
1057
        L{Connection.get_cipher_list} raises L{TypeError} if called with any
1058
        arguments.
1059
        """
132.1.4 by Jean-Paul Calderone
simple tests for get_cipher_list
1060
        connection = Connection(Context(TLSv1_METHOD), None)
1061
        self.assertRaises(TypeError, connection.get_cipher_list, None)
1062
1063
1064
    def test_result(self):
132.1.50 by Jean-Paul Calderone
Some more test method docstrings
1065
        """
1066
        L{Connection.get_cipher_list} returns a C{list} of C{str} giving the
1067
        names of the ciphers which might be used.
1068
        """
132.1.4 by Jean-Paul Calderone
simple tests for get_cipher_list
1069
        connection = Connection(Context(TLSv1_METHOD), None)
1070
        ciphers = connection.get_cipher_list()
1071
        self.assertTrue(isinstance(ciphers, list))
1072
        for cipher in ciphers:
1073
            self.assertTrue(isinstance(cipher, str))
1074
1075
1076
132.1.6 by Jean-Paul Calderone
Try testing renegotiation some; not much luck
1077
class ConnectionSendallTests(TestCase, _LoopbackMixin):
1078
    """
1079
    Tests for L{Connection.sendall}.
1080
    """
132.1.50 by Jean-Paul Calderone
Some more test method docstrings
1081
    def test_wrong_args(self):
132.1.6 by Jean-Paul Calderone
Try testing renegotiation some; not much luck
1082
        """
1083
        When called with arguments other than a single string,
1084
        L{Connection.sendall} raises L{TypeError}.
1085
        """
1086
        connection = Connection(Context(TLSv1_METHOD), None)
1087
        self.assertRaises(TypeError, connection.sendall)
1088
        self.assertRaises(TypeError, connection.sendall, object())
1089
        self.assertRaises(TypeError, connection.sendall, "foo", "bar")
1090
1091
132.1.1 by Jean-Paul Calderone
Some tests which are supposed to exercise the paths through Connection.sendall
1092
    def test_short(self):
1093
        """
1094
        L{Connection.sendall} transmits all of the bytes in the string passed to
1095
        it.
1096
        """
1097
        server, client = self._loopback()
132.2.40 by Jean-Paul Calderone
Convert some bytes to text and vice versa
1098
        server.sendall(b('x'))
1099
        self.assertEquals(client.recv(1), b('x'))
132.1.1 by Jean-Paul Calderone
Some tests which are supposed to exercise the paths through Connection.sendall
1100
1101
1102
    def test_long(self):
132.1.50 by Jean-Paul Calderone
Some more test method docstrings
1103
        """
1104
        L{Connection.sendall} transmits all of the bytes in the string passed to
1105
        it even if this requires multiple calls of an underlying write function.
1106
        """
132.1.1 by Jean-Paul Calderone
Some tests which are supposed to exercise the paths through Connection.sendall
1107
        server, client = self._loopback()
134.1.3 by Jean-Paul Calderone
Tweak the write sizer
1108
        # Should be enough, underlying SSL_write should only do 16k at a time.
1109
        # On Windows, after 32k of bytes the write will block (forever - because
1110
        # no one is yet reading).
132.2.46 by Jean-Paul Calderone
merge trunk
1111
        message = b('x') * (1024 * 32 - 1) + b('y')
132.1.1 by Jean-Paul Calderone
Some tests which are supposed to exercise the paths through Connection.sendall
1112
        server.sendall(message)
1113
        accum = []
1114
        received = 0
1115
        while received < len(message):
132.2.41 by Jean-Paul Calderone
Convert some more bytes to text and vice versa
1116
            data = client.recv(1024)
1117
            accum.append(data)
1118
            received += len(data)
132.2.47 by Jean-Paul Calderone
Avoid bytes literal
1119
        self.assertEquals(message, b('').join(accum))
132.1.1 by Jean-Paul Calderone
Some tests which are supposed to exercise the paths through Connection.sendall
1120
1121
132.1.3 by Jean-Paul Calderone
Test a low-level failure in SSL_write
1122
    def test_closed(self):
1123
        """
1124
        If the underlying socket is closed, L{Connection.sendall} propagates the
1125
        write error from the low level write call.
1126
        """
1127
        server, client = self._loopback()
132.3.2 by Jean-Paul Calderone
Try triggering the error by shutting down the sending underlying TCP socket
1128
        server.sock_shutdown(2)
132.1.3 by Jean-Paul Calderone
Test a low-level failure in SSL_write
1129
        self.assertRaises(SysCallError, server.sendall, "hello, world")
1130
132.1.1 by Jean-Paul Calderone
Some tests which are supposed to exercise the paths through Connection.sendall
1131
132.1.5 by Jean-Paul Calderone
Tests for Connection.pending
1132
132.1.6 by Jean-Paul Calderone
Try testing renegotiation some; not much luck
1133
class ConnectionRenegotiateTests(TestCase, _LoopbackMixin):
1134
    """
1135
    Tests for SSL renegotiation APIs.
1136
    """
1137
    def test_renegotiate_wrong_args(self):
132.1.50 by Jean-Paul Calderone
Some more test method docstrings
1138
        """
1139
        L{Connection.renegotiate} raises L{TypeError} if called with any
1140
        arguments.
1141
        """
132.1.6 by Jean-Paul Calderone
Try testing renegotiation some; not much luck
1142
        connection = Connection(Context(TLSv1_METHOD), None)
1143
        self.assertRaises(TypeError, connection.renegotiate, None)
1144
1145
132.1.13 by Jean-Paul Calderone
a slew of misc mostly wrong-args tests
1146
    def test_total_renegotiations_wrong_args(self):
132.1.50 by Jean-Paul Calderone
Some more test method docstrings
1147
        """
1148
        L{Connection.total_renegotiations} raises L{TypeError} if called with
1149
        any arguments.
1150
        """
132.1.13 by Jean-Paul Calderone
a slew of misc mostly wrong-args tests
1151
        connection = Connection(Context(TLSv1_METHOD), None)
1152
        self.assertRaises(TypeError, connection.total_renegotiations, None)
1153
1154
1155
    def test_total_renegotiations(self):
132.1.50 by Jean-Paul Calderone
Some more test method docstrings
1156
        """
1157
        L{Connection.total_renegotiations} returns C{0} before any
1158
        renegotiations have happened.
1159
        """
132.1.13 by Jean-Paul Calderone
a slew of misc mostly wrong-args tests
1160
        connection = Connection(Context(TLSv1_METHOD), None)
1161
        self.assertEquals(connection.total_renegotiations(), 0)
1162
1163
132.1.6 by Jean-Paul Calderone
Try testing renegotiation some; not much luck
1164
#     def test_renegotiate(self):
1165
#         """
1166
#         """
1167
#         server, client = self._loopback()
1168
1169
#         server.send("hello world")
1170
#         self.assertEquals(client.recv(len("hello world")), "hello world")
1171
1172
#         self.assertEquals(server.total_renegotiations(), 0)
1173
#         self.assertTrue(server.renegotiate())
1174
1175
#         server.setblocking(False)
1176
#         client.setblocking(False)
1177
#         while server.renegotiate_pending():
1178
#             client.do_handshake()
1179
#             server.do_handshake()
1180
1181
#         self.assertEquals(server.total_renegotiations(), 1)
1182
1183
1184
1185
119 by Jean-Paul Calderone
Add a bunch of tests which assert that extension types are defined nicely.
1186
class ErrorTests(TestCase):
1187
    """
1188
    Unit tests for L{OpenSSL.SSL.Error}.
1189
    """
1190
    def test_type(self):
1191
        """
1192
        L{Error} is an exception type.
1193
        """
1194
        self.assertTrue(issubclass(Error, Exception))
112.5.1 by Rick Dean
More unit tests about types
1195
        self.assertEqual(Error.__name__, 'Error')
1196
1197
1198
83 by Jean-Paul Calderone
Expose some new DTLS-related constants in OpenSSL.SSL - OP_NO_QUERY_MTU, OP_COOKIE_EXCHANGE, and OP_NO_TICKET.
1199
class ConstantsTests(TestCase):
1200
    """
1201
    Tests for the values of constants exposed in L{OpenSSL.SSL}.
1202
1203
    These are values defined by OpenSSL intended only to be used as flags to
1204
    OpenSSL APIs.  The only assertions it seems can be made about them is
1205
    their values.
1206
    """
87 by Jean-Paul Calderone
work around stupid feature deficiencies in unittest.py
1207
    # unittest.TestCase has no skip mechanism
1208
    if OP_NO_QUERY_MTU is not None:
1209
        def test_op_no_query_mtu(self):
1210
            """
1211
            The value of L{OpenSSL.SSL.OP_NO_QUERY_MTU} is 0x1000, the value of
1212
            I{SSL_OP_NO_QUERY_MTU} defined by I{openssl/ssl.h}.
1213
            """
1214
            self.assertEqual(OP_NO_QUERY_MTU, 0x1000)
1215
    else:
1216
        "OP_NO_QUERY_MTU unavailable - OpenSSL version may be too old"
1217
1218
1219
    if OP_COOKIE_EXCHANGE is not None:
1220
        def test_op_cookie_exchange(self):
1221
            """
1222
            The value of L{OpenSSL.SSL.OP_COOKIE_EXCHANGE} is 0x2000, the value
1223
            of I{SSL_OP_COOKIE_EXCHANGE} defined by I{openssl/ssl.h}.
1224
            """
1225
            self.assertEqual(OP_COOKIE_EXCHANGE, 0x2000)
1226
    else:
1227
        "OP_COOKIE_EXCHANGE unavailable - OpenSSL version may be too old"
1228
1229
1230
    if OP_NO_TICKET is not None:
1231
        def test_op_no_ticket(self):
1232
            """
1233
            The value of L{OpenSSL.SSL.OP_NO_TICKET} is 0x4000, the value of
1234
            I{SSL_OP_NO_TICKET} defined by I{openssl/ssl.h}.
1235
            """
1236
            self.assertEqual(OP_NO_TICKET, 0x4000)
1237
    else:
1238
        "OP_NO_TICKET unavailable - OpenSSL version may be too old"
98.1.1 by Rick Dean
FILETYPE_TEXT for dumping certificates, private keys, and certificate signing requests.
1239
1240
101.1.1 by Jean-Paul Calderone
Merge rick's bio_read_write branch
1241
132.1.34 by Jean-Paul Calderone
Try for a real test of Context.add_extra_chain_cert; not working
1242
class MemoryBIOTests(TestCase, _LoopbackMixin):
101.1.3 by Jean-Paul Calderone
A few cleanups and simplifications to the mem bio tests
1243
    """
1244
    Tests for L{OpenSSL.SSL.Connection} using a memory BIO.
1245
    """
112.1.5 by Jean-Paul Calderone
Give _server and _client docstrings and make their sock parameter required; pass None to these methods from the existing memory bio tests; update test method names to match the prevailing test method naming convention (which is inconsistent with the rest of the naming convention in pyOpenSSL, oh my).
1246
    def _server(self, sock):
1247
        """
1248
        Create a new server-side SSL L{Connection} object wrapped around
1249
        C{sock}.
1250
        """
101.1.4 by Jean-Paul Calderone
Refactor the existing memory bio tests a bit; add a new test which exercises the short-write case; change the error handling code in bio_read and bio_write to *not* use SSL_get_error, as these functions do nothing with SSL_* APIs
1251
        # Create the server side Connection.  This is mostly setup boilerplate
1252
        # - use TLSv1, use a particular certificate, etc.
1253
        server_ctx = Context(TLSv1_METHOD)
1254
        server_ctx.set_options(OP_NO_SSLv2 | OP_NO_SSLv3 | OP_SINGLE_DH_USE )
1255
        server_ctx.set_verify(VERIFY_PEER|VERIFY_FAIL_IF_NO_PEER_CERT|VERIFY_CLIENT_ONCE, verify_cb)
1256
        server_store = server_ctx.get_cert_store()
1257
        server_ctx.use_privatekey(load_privatekey(FILETYPE_PEM, server_key_pem))
1258
        server_ctx.use_certificate(load_certificate(FILETYPE_PEM, server_cert_pem))
1259
        server_ctx.check_privatekey()
1260
        server_store.add_cert(load_certificate(FILETYPE_PEM, root_cert_pem))
112.1.1 by Rick Dean
Add a new test case of connecting to ourself over an actual socket to pass data.
1261
        # Here the Connection is actually created.  If None is passed as the 2nd
1262
        # parameter, it indicates a memory BIO should be created.
1263
        server_conn = Connection(server_ctx, sock)
101.1.4 by Jean-Paul Calderone
Refactor the existing memory bio tests a bit; add a new test which exercises the short-write case; change the error handling code in bio_read and bio_write to *not* use SSL_get_error, as these functions do nothing with SSL_* APIs
1264
        server_conn.set_accept_state()
1265
        return server_conn
1266
1267
112.1.5 by Jean-Paul Calderone
Give _server and _client docstrings and make their sock parameter required; pass None to these methods from the existing memory bio tests; update test method names to match the prevailing test method naming convention (which is inconsistent with the rest of the naming convention in pyOpenSSL, oh my).
1268
    def _client(self, sock):
1269
        """
1270
        Create a new client-side SSL L{Connection} object wrapped around
1271
        C{sock}.
1272
        """
1273
        # Now create the client side Connection.  Similar boilerplate to the
1274
        # above.
101.1.4 by Jean-Paul Calderone
Refactor the existing memory bio tests a bit; add a new test which exercises the short-write case; change the error handling code in bio_read and bio_write to *not* use SSL_get_error, as these functions do nothing with SSL_* APIs
1275
        client_ctx = Context(TLSv1_METHOD)
1276
        client_ctx.set_options(OP_NO_SSLv2 | OP_NO_SSLv3 | OP_SINGLE_DH_USE )
1277
        client_ctx.set_verify(VERIFY_PEER|VERIFY_FAIL_IF_NO_PEER_CERT|VERIFY_CLIENT_ONCE, verify_cb)
1278
        client_store = client_ctx.get_cert_store()
1279
        client_ctx.use_privatekey(load_privatekey(FILETYPE_PEM, client_key_pem))
1280
        client_ctx.use_certificate(load_certificate(FILETYPE_PEM, client_cert_pem))
1281
        client_ctx.check_privatekey()
1282
        client_store.add_cert(load_certificate(FILETYPE_PEM, root_cert_pem))
112.1.1 by Rick Dean
Add a new test case of connecting to ourself over an actual socket to pass data.
1283
        client_conn = Connection(client_ctx, sock)
101.1.4 by Jean-Paul Calderone
Refactor the existing memory bio tests a bit; add a new test which exercises the short-write case; change the error handling code in bio_read and bio_write to *not* use SSL_get_error, as these functions do nothing with SSL_* APIs
1284
        client_conn.set_connect_state()
1285
        return client_conn
1286
1287
112.1.5 by Jean-Paul Calderone
Give _server and _client docstrings and make their sock parameter required; pass None to these methods from the existing memory bio tests; update test method names to match the prevailing test method naming convention (which is inconsistent with the rest of the naming convention in pyOpenSSL, oh my).
1288
    def test_memoryConnect(self):
101.1.3 by Jean-Paul Calderone
A few cleanups and simplifications to the mem bio tests
1289
        """
1290
        Two L{Connection}s which use memory BIOs can be manually connected by
1291
        reading from the output of each and writing those bytes to the input of
1292
        the other and in this way establish a connection and exchange
1293
        application-level bytes with each other.
1294
        """
112.1.5 by Jean-Paul Calderone
Give _server and _client docstrings and make their sock parameter required; pass None to these methods from the existing memory bio tests; update test method names to match the prevailing test method naming convention (which is inconsistent with the rest of the naming convention in pyOpenSSL, oh my).
1295
        server_conn = self._server(None)
1296
        client_conn = self._client(None)
98.2.1 by Rick Dean
BIO read/write to allow for socketless SSL Connections
1297
101.1.3 by Jean-Paul Calderone
A few cleanups and simplifications to the mem bio tests
1298
        # There should be no key or nonces yet.
1299
        self.assertIdentical(server_conn.master_key(), None)
1300
        self.assertIdentical(server_conn.client_random(), None)
1301
        self.assertIdentical(server_conn.server_random(), None)
1302
1303
        # First, the handshake needs to happen.  We'll deliver bytes back and
1304
        # forth between the client and server until neither of them feels like
1305
        # speaking any more.
132.1.34 by Jean-Paul Calderone
Try for a real test of Context.add_extra_chain_cert; not working
1306
        self.assertIdentical(
1307
            self._interactInMemory(client_conn, server_conn), None)
101.1.3 by Jean-Paul Calderone
A few cleanups and simplifications to the mem bio tests
1308
1309
        # Now that the handshake is done, there should be a key and nonces.
1310
        self.assertNotIdentical(server_conn.master_key(), None)
1311
        self.assertNotIdentical(server_conn.client_random(), None)
1312
        self.assertNotIdentical(server_conn.server_random(), None)
112 by Jean-Paul Calderone
Fix assertions about client and server nonces
1313
        self.assertEquals(server_conn.client_random(), client_conn.client_random())
1314
        self.assertEquals(server_conn.server_random(), client_conn.server_random())
1315
        self.assertNotEquals(server_conn.client_random(), server_conn.server_random())
1316
        self.assertNotEquals(client_conn.client_random(), client_conn.server_random())
101.1.3 by Jean-Paul Calderone
A few cleanups and simplifications to the mem bio tests
1317
1318
        # Here are the bytes we'll try to send.
132.2.39 by Jean-Paul Calderone
Convert a couple more things which should be byte strings into byte strings
1319
        important_message = b('One if by land, two if by sea.')
98.2.1 by Rick Dean
BIO read/write to allow for socketless SSL Connections
1320
101.1.3 by Jean-Paul Calderone
A few cleanups and simplifications to the mem bio tests
1321
        server_conn.write(important_message)
1322
        self.assertEquals(
132.1.34 by Jean-Paul Calderone
Try for a real test of Context.add_extra_chain_cert; not working
1323
            self._interactInMemory(client_conn, server_conn),
101.1.3 by Jean-Paul Calderone
A few cleanups and simplifications to the mem bio tests
1324
            (client_conn, important_message))
1325
1326
        client_conn.write(important_message[::-1])
1327
        self.assertEquals(
132.1.34 by Jean-Paul Calderone
Try for a real test of Context.add_extra_chain_cert; not working
1328
            self._interactInMemory(client_conn, server_conn),
101.1.3 by Jean-Paul Calderone
A few cleanups and simplifications to the mem bio tests
1329
            (server_conn, important_message[::-1]))
98.2.1 by Rick Dean
BIO read/write to allow for socketless SSL Connections
1330
1331
112.1.5 by Jean-Paul Calderone
Give _server and _client docstrings and make their sock parameter required; pass None to these methods from the existing memory bio tests; update test method names to match the prevailing test method naming convention (which is inconsistent with the rest of the naming convention in pyOpenSSL, oh my).
1332
    def test_socketConnect(self):
112.1.1 by Rick Dean
Add a new test case of connecting to ourself over an actual socket to pass data.
1333
        """
112.1.5 by Jean-Paul Calderone
Give _server and _client docstrings and make their sock parameter required; pass None to these methods from the existing memory bio tests; update test method names to match the prevailing test method naming convention (which is inconsistent with the rest of the naming convention in pyOpenSSL, oh my).
1334
        Just like L{test_memoryConnect} but with an actual socket.
1335
1336
        This is primarily to rule out the memory BIO code as the source of
1337
        any problems encountered while passing data over a L{Connection} (if
1338
        this test fails, there must be a problem outside the memory BIO
1339
        code, as no memory BIO is involved here).  Even though this isn't a
1340
        memory BIO test, it's convenient to have it here.
112.1.1 by Rick Dean
Add a new test case of connecting to ourself over an actual socket to pass data.
1341
        """
132.3.4 by Jean-Paul Calderone
Switch to the loopback setup helper in test_socketConnect, incidentally converting the connections to blocking, which avoids the OS X recv error.
1342
        server_conn, client_conn = self._loopback()
112.1.1 by Rick Dean
Add a new test case of connecting to ourself over an actual socket to pass data.
1343
132.2.39 by Jean-Paul Calderone
Convert a couple more things which should be byte strings into byte strings
1344
        important_message = b("Help me Obi Wan Kenobi, you're my only hope.")
112.1.1 by Rick Dean
Add a new test case of connecting to ourself over an actual socket to pass data.
1345
        client_conn.send(important_message)
1346
        msg = server_conn.recv(1024)
1347
        self.assertEqual(msg, important_message)
1348
1349
        # Again in the other direction, just for fun.
1350
        important_message = important_message[::-1]
1351
        server_conn.send(important_message)
1352
        msg = client_conn.recv(1024)
1353
        self.assertEqual(msg, important_message)
1354
1355
101.1.2 by Jean-Paul Calderone
Correct a few minor bugs
1356
    def test_socketOverridesMemory(self):
98.2.1 by Rick Dean
BIO read/write to allow for socketless SSL Connections
1357
        """
1358
        Test that L{OpenSSL.SSL.bio_read} and L{OpenSSL.SSL.bio_write} don't
1359
        work on L{OpenSSL.SSL.Connection}() that use sockets.
1360
        """
1361
        context = Context(SSLv3_METHOD)
1362
        client = socket()
1363
        clientSSL = Connection(context, client)
1364
        self.assertRaises( TypeError, clientSSL.bio_read, 100)
1365
        self.assertRaises( TypeError, clientSSL.bio_write, "foo")
101.1.9 by Jean-Paul Calderone
Make bio_shutdown raise an exception when called on a socket-based Connection
1366
        self.assertRaises( TypeError, clientSSL.bio_shutdown )
101.1.4 by Jean-Paul Calderone
Refactor the existing memory bio tests a bit; add a new test which exercises the short-write case; change the error handling code in bio_read and bio_write to *not* use SSL_get_error, as these functions do nothing with SSL_* APIs
1367
1368
1369
    def test_outgoingOverflow(self):
1370
        """
1371
        If more bytes than can be written to the memory BIO are passed to
1372
        L{Connection.send} at once, the number of bytes which were written is
1373
        returned and that many bytes from the beginning of the input can be
1374
        read from the other end of the connection.
1375
        """
112.1.5 by Jean-Paul Calderone
Give _server and _client docstrings and make their sock parameter required; pass None to these methods from the existing memory bio tests; update test method names to match the prevailing test method naming convention (which is inconsistent with the rest of the naming convention in pyOpenSSL, oh my).
1376
        server = self._server(None)
1377
        client = self._client(None)
101.1.4 by Jean-Paul Calderone
Refactor the existing memory bio tests a bit; add a new test which exercises the short-write case; change the error handling code in bio_read and bio_write to *not* use SSL_get_error, as these functions do nothing with SSL_* APIs
1378
132.1.34 by Jean-Paul Calderone
Try for a real test of Context.add_extra_chain_cert; not working
1379
        self._interactInMemory(client, server)
101.1.4 by Jean-Paul Calderone
Refactor the existing memory bio tests a bit; add a new test which exercises the short-write case; change the error handling code in bio_read and bio_write to *not* use SSL_get_error, as these functions do nothing with SSL_* APIs
1380
1381
        size = 2 ** 15
1382
        sent = client.send("x" * size)
1383
        # Sanity check.  We're trying to test what happens when the entire
1384
        # input can't be sent.  If the entire input was sent, this test is
1385
        # meaningless.
1386
        self.assertTrue(sent < size)
1387
132.1.34 by Jean-Paul Calderone
Try for a real test of Context.add_extra_chain_cert; not working
1388
        receiver, received = self._interactInMemory(client, server)
101.1.4 by Jean-Paul Calderone
Refactor the existing memory bio tests a bit; add a new test which exercises the short-write case; change the error handling code in bio_read and bio_write to *not* use SSL_get_error, as these functions do nothing with SSL_* APIs
1389
        self.assertIdentical(receiver, server)
1390
1391
        # We can rely on all of these bytes being received at once because
1392
        # _loopback passes 2 ** 16 to recv - more than 2 ** 15.
1393
        self.assertEquals(len(received), sent)
101.1.5 by Jean-Paul Calderone
add Connection.bio_shutdown
1394
1395
1396
    def test_shutdown(self):
1397
        """
1398
        L{Connection.bio_shutdown} signals the end of the data stream from
1399
        which the L{Connection} reads.
1400
        """
112.1.5 by Jean-Paul Calderone
Give _server and _client docstrings and make their sock parameter required; pass None to these methods from the existing memory bio tests; update test method names to match the prevailing test method naming convention (which is inconsistent with the rest of the naming convention in pyOpenSSL, oh my).
1401
        server = self._server(None)
101.1.5 by Jean-Paul Calderone
add Connection.bio_shutdown
1402
        server.bio_shutdown()
1403
        e = self.assertRaises(Error, server.recv, 1024)
1404
        # We don't want WantReadError or ZeroReturnError or anything - it's a
1405
        # handshake failure.
1406
        self.assertEquals(e.__class__, Error)
109 by Jean-Paul Calderone
Switch from Twisted's TestCase to the stdlib TestCase so there are fewer ways the test suite can be invoked
1407
1408
122.3.11 by Ziga Seilnacht
Lowercase *client_CA* methods for consistency with the rest of PyOpenSSL.
1409
    def _check_client_ca_list(self, func):
122.3.12 by Jean-Paul Calderone
Break up big set_client_ca_list test into a bunch of smaller ones
1410
        """
1411
        Verify the return value of the C{get_client_ca_list} method for server and client connections.
1412
1413
        @param func: A function which will be called with the server context
1414
            before the client and server are connected to each other.  This
1415
            function should specify a list of CAs for the server to send to the
1416
            client and return that same list.  The list will be used to verify
1417
            that C{get_client_ca_list} returns the proper value at various
1418
            times.
1419
        """
122.3.5 by Ziga Seilnacht
Allow setting and inspecting the preferred client certificate signer list.
1420
        server = self._server(None)
1421
        client = self._client(None)
122.3.11 by Ziga Seilnacht
Lowercase *client_CA* methods for consistency with the rest of PyOpenSSL.
1422
        self.assertEqual(client.get_client_ca_list(), [])
1423
        self.assertEqual(server.get_client_ca_list(), [])
122.3.5 by Ziga Seilnacht
Allow setting and inspecting the preferred client certificate signer list.
1424
        ctx = server.get_context()
1425
        expected = func(ctx)
122.3.11 by Ziga Seilnacht
Lowercase *client_CA* methods for consistency with the rest of PyOpenSSL.
1426
        self.assertEqual(client.get_client_ca_list(), [])
1427
        self.assertEqual(server.get_client_ca_list(), expected)
132.1.34 by Jean-Paul Calderone
Try for a real test of Context.add_extra_chain_cert; not working
1428
        self._interactInMemory(client, server)
122.3.11 by Ziga Seilnacht
Lowercase *client_CA* methods for consistency with the rest of PyOpenSSL.
1429
        self.assertEqual(client.get_client_ca_list(), expected)
1430
        self.assertEqual(server.get_client_ca_list(), expected)
1431
1432
122.3.12 by Jean-Paul Calderone
Break up big set_client_ca_list test into a bunch of smaller ones
1433
    def test_set_client_ca_list_errors(self):
122.3.5 by Ziga Seilnacht
Allow setting and inspecting the preferred client certificate signer list.
1434
        """
122.3.12 by Jean-Paul Calderone
Break up big set_client_ca_list test into a bunch of smaller ones
1435
        L{Context.set_client_ca_list} raises a L{TypeError} if called with a
1436
        non-list or a list that contains objects other than X509Names.
122.3.5 by Ziga Seilnacht
Allow setting and inspecting the preferred client certificate signer list.
1437
        """
1438
        ctx = Context(TLSv1_METHOD)
122.3.11 by Ziga Seilnacht
Lowercase *client_CA* methods for consistency with the rest of PyOpenSSL.
1439
        self.assertRaises(TypeError, ctx.set_client_ca_list, "spam")
1440
        self.assertRaises(TypeError, ctx.set_client_ca_list, ["spam"])
1441
        self.assertIdentical(ctx.set_client_ca_list([]), None)
1442
1443
122.3.12 by Jean-Paul Calderone
Break up big set_client_ca_list test into a bunch of smaller ones
1444
    def test_set_empty_ca_list(self):
1445
        """
1446
        If passed an empty list, L{Context.set_client_ca_list} configures the
1447
        context to send no CA names to the client and, on both the server and
1448
        client sides, L{Connection.get_client_ca_list} returns an empty list
1449
        after the connection is set up.
1450
        """
122.3.11 by Ziga Seilnacht
Lowercase *client_CA* methods for consistency with the rest of PyOpenSSL.
1451
        def no_ca(ctx):
1452
            ctx.set_client_ca_list([])
122.3.5 by Ziga Seilnacht
Allow setting and inspecting the preferred client certificate signer list.
1453
            return []
122.3.11 by Ziga Seilnacht
Lowercase *client_CA* methods for consistency with the rest of PyOpenSSL.
1454
        self._check_client_ca_list(no_ca)
122.3.5 by Ziga Seilnacht
Allow setting and inspecting the preferred client certificate signer list.
1455
122.3.12 by Jean-Paul Calderone
Break up big set_client_ca_list test into a bunch of smaller ones
1456
1457
    def test_set_one_ca_list(self):
1458
        """
1459
        If passed a list containing a single X509Name,
1460
        L{Context.set_client_ca_list} configures the context to send that CA
1461
        name to the client and, on both the server and client sides,
1462
        L{Connection.get_client_ca_list} returns a list containing that
1463
        X509Name after the connection is set up.
1464
        """
1465
        cacert = load_certificate(FILETYPE_PEM, root_cert_pem)
1466
        cadesc = cacert.get_subject()
1467
        def single_ca(ctx):
1468
            ctx.set_client_ca_list([cadesc])
1469
            return [cadesc]
1470
        self._check_client_ca_list(single_ca)
1471
1472
1473
    def test_set_multiple_ca_list(self):
1474
        """
1475
        If passed a list containing multiple X509Name objects,
1476
        L{Context.set_client_ca_list} configures the context to send those CA
1477
        names to the client and, on both the server and client sides,
1478
        L{Connection.get_client_ca_list} returns a list containing those
1479
        X509Names after the connection is set up.
1480
        """
1481
        secert = load_certificate(FILETYPE_PEM, server_cert_pem)
1482
        clcert = load_certificate(FILETYPE_PEM, server_cert_pem)
1483
1484
        sedesc = secert.get_subject()
1485
        cldesc = clcert.get_subject()
1486
122.3.11 by Ziga Seilnacht
Lowercase *client_CA* methods for consistency with the rest of PyOpenSSL.
1487
        def multiple_ca(ctx):
122.3.12 by Jean-Paul Calderone
Break up big set_client_ca_list test into a bunch of smaller ones
1488
            L = [sedesc, cldesc]
122.3.11 by Ziga Seilnacht
Lowercase *client_CA* methods for consistency with the rest of PyOpenSSL.
1489
            ctx.set_client_ca_list(L)
122.3.5 by Ziga Seilnacht
Allow setting and inspecting the preferred client certificate signer list.
1490
            return L
122.3.11 by Ziga Seilnacht
Lowercase *client_CA* methods for consistency with the rest of PyOpenSSL.
1491
        self._check_client_ca_list(multiple_ca)
122.3.5 by Ziga Seilnacht
Allow setting and inspecting the preferred client certificate signer list.
1492
122.3.12 by Jean-Paul Calderone
Break up big set_client_ca_list test into a bunch of smaller ones
1493
1494
    def test_reset_ca_list(self):
1495
        """
1496
        If called multiple times, only the X509Names passed to the final call
1497
        of L{Context.set_client_ca_list} are used to configure the CA names
1498
        sent to the client.
1499
        """
1500
        cacert = load_certificate(FILETYPE_PEM, root_cert_pem)
1501
        secert = load_certificate(FILETYPE_PEM, server_cert_pem)
1502
        clcert = load_certificate(FILETYPE_PEM, server_cert_pem)
1503
1504
        cadesc = cacert.get_subject()
1505
        sedesc = secert.get_subject()
1506
        cldesc = clcert.get_subject()
1507
122.3.11 by Ziga Seilnacht
Lowercase *client_CA* methods for consistency with the rest of PyOpenSSL.
1508
        def changed_ca(ctx):
1509
            ctx.set_client_ca_list([sedesc, cldesc])
1510
            ctx.set_client_ca_list([cadesc])
122.3.5 by Ziga Seilnacht
Allow setting and inspecting the preferred client certificate signer list.
1511
            return [cadesc]
122.3.11 by Ziga Seilnacht
Lowercase *client_CA* methods for consistency with the rest of PyOpenSSL.
1512
        self._check_client_ca_list(changed_ca)
122.3.5 by Ziga Seilnacht
Allow setting and inspecting the preferred client certificate signer list.
1513
122.3.12 by Jean-Paul Calderone
Break up big set_client_ca_list test into a bunch of smaller ones
1514
1515
    def test_mutated_ca_list(self):
1516
        """
1517
        If the list passed to L{Context.set_client_ca_list} is mutated
1518
        afterwards, this does not affect the list of CA names sent to the
1519
        client.
1520
        """
1521
        cacert = load_certificate(FILETYPE_PEM, root_cert_pem)
1522
        secert = load_certificate(FILETYPE_PEM, server_cert_pem)
1523
1524
        cadesc = cacert.get_subject()
1525
        sedesc = secert.get_subject()
1526
122.3.11 by Ziga Seilnacht
Lowercase *client_CA* methods for consistency with the rest of PyOpenSSL.
1527
        def mutated_ca(ctx):
122.3.5 by Ziga Seilnacht
Allow setting and inspecting the preferred client certificate signer list.
1528
            L = [cadesc]
122.3.11 by Ziga Seilnacht
Lowercase *client_CA* methods for consistency with the rest of PyOpenSSL.
1529
            ctx.set_client_ca_list([cadesc])
122.3.5 by Ziga Seilnacht
Allow setting and inspecting the preferred client certificate signer list.
1530
            L.append(sedesc)
1531
            return [cadesc]
122.3.11 by Ziga Seilnacht
Lowercase *client_CA* methods for consistency with the rest of PyOpenSSL.
1532
        self._check_client_ca_list(mutated_ca)
1533
1534
122.3.12 by Jean-Paul Calderone
Break up big set_client_ca_list test into a bunch of smaller ones
1535
    def test_add_client_ca_errors(self):
122.3.5 by Ziga Seilnacht
Allow setting and inspecting the preferred client certificate signer list.
1536
        """
122.3.12 by Jean-Paul Calderone
Break up big set_client_ca_list test into a bunch of smaller ones
1537
        L{Context.add_client_ca} raises L{TypeError} if called with a non-X509
1538
        object or with a number of arguments other than one.
122.3.5 by Ziga Seilnacht
Allow setting and inspecting the preferred client certificate signer list.
1539
        """
1540
        ctx = Context(TLSv1_METHOD)
1541
        cacert = load_certificate(FILETYPE_PEM, root_cert_pem)
122.3.12 by Jean-Paul Calderone
Break up big set_client_ca_list test into a bunch of smaller ones
1542
        self.assertRaises(TypeError, ctx.add_client_ca)
122.3.11 by Ziga Seilnacht
Lowercase *client_CA* methods for consistency with the rest of PyOpenSSL.
1543
        self.assertRaises(TypeError, ctx.add_client_ca, "spam")
122.3.12 by Jean-Paul Calderone
Break up big set_client_ca_list test into a bunch of smaller ones
1544
        self.assertRaises(TypeError, ctx.add_client_ca, cacert, cacert)
122.3.11 by Ziga Seilnacht
Lowercase *client_CA* methods for consistency with the rest of PyOpenSSL.
1545
1546
122.3.13 by Jean-Paul Calderone
Break up big add_client_ca test into a bunch of smaller ones
1547
    def test_one_add_client_ca(self):
122.3.5 by Ziga Seilnacht
Allow setting and inspecting the preferred client certificate signer list.
1548
        """
122.3.13 by Jean-Paul Calderone
Break up big add_client_ca test into a bunch of smaller ones
1549
        A certificate's subject can be added as a CA to be sent to the client
1550
        with L{Context.add_client_ca}.
122.3.5 by Ziga Seilnacht
Allow setting and inspecting the preferred client certificate signer list.
1551
        """
1552
        cacert = load_certificate(FILETYPE_PEM, root_cert_pem)
1553
        cadesc = cacert.get_subject()
122.3.11 by Ziga Seilnacht
Lowercase *client_CA* methods for consistency with the rest of PyOpenSSL.
1554
        def single_ca(ctx):
1555
            ctx.add_client_ca(cacert)
122.3.5 by Ziga Seilnacht
Allow setting and inspecting the preferred client certificate signer list.
1556
            return [cadesc]
122.3.11 by Ziga Seilnacht
Lowercase *client_CA* methods for consistency with the rest of PyOpenSSL.
1557
        self._check_client_ca_list(single_ca)
122.3.5 by Ziga Seilnacht
Allow setting and inspecting the preferred client certificate signer list.
1558
122.3.13 by Jean-Paul Calderone
Break up big add_client_ca test into a bunch of smaller ones
1559
1560
    def test_multiple_add_client_ca(self):
1561
        """
1562
        Multiple CA names can be sent to the client by calling
1563
        L{Context.add_client_ca} with multiple X509 objects.
1564
        """
1565
        cacert = load_certificate(FILETYPE_PEM, root_cert_pem)
1566
        secert = load_certificate(FILETYPE_PEM, server_cert_pem)
1567
1568
        cadesc = cacert.get_subject()
1569
        sedesc = secert.get_subject()
1570
122.3.11 by Ziga Seilnacht
Lowercase *client_CA* methods for consistency with the rest of PyOpenSSL.
1571
        def multiple_ca(ctx):
1572
            ctx.add_client_ca(cacert)
1573
            ctx.add_client_ca(secert)
122.3.5 by Ziga Seilnacht
Allow setting and inspecting the preferred client certificate signer list.
1574
            return [cadesc, sedesc]
122.3.11 by Ziga Seilnacht
Lowercase *client_CA* methods for consistency with the rest of PyOpenSSL.
1575
        self._check_client_ca_list(multiple_ca)
122.3.5 by Ziga Seilnacht
Allow setting and inspecting the preferred client certificate signer list.
1576
122.3.13 by Jean-Paul Calderone
Break up big add_client_ca test into a bunch of smaller ones
1577
1578
    def test_set_and_add_client_ca(self):
1579
        """
1580
        A call to L{Context.set_client_ca_list} followed by a call to
1581
        L{Context.add_client_ca} results in using the CA names from the first
1582
        call and the CA name from the second call.
1583
        """
1584
        cacert = load_certificate(FILETYPE_PEM, root_cert_pem)
1585
        secert = load_certificate(FILETYPE_PEM, server_cert_pem)
1586
        clcert = load_certificate(FILETYPE_PEM, server_cert_pem)
1587
1588
        cadesc = cacert.get_subject()
1589
        sedesc = secert.get_subject()
1590
        cldesc = clcert.get_subject()
1591
122.3.11 by Ziga Seilnacht
Lowercase *client_CA* methods for consistency with the rest of PyOpenSSL.
1592
        def mixed_set_add_ca(ctx):
1593
            ctx.set_client_ca_list([cadesc, sedesc])
1594
            ctx.add_client_ca(clcert)
122.3.5 by Ziga Seilnacht
Allow setting and inspecting the preferred client certificate signer list.
1595
            return [cadesc, sedesc, cldesc]
122.3.11 by Ziga Seilnacht
Lowercase *client_CA* methods for consistency with the rest of PyOpenSSL.
1596
        self._check_client_ca_list(mixed_set_add_ca)
122.3.5 by Ziga Seilnacht
Allow setting and inspecting the preferred client certificate signer list.
1597
122.3.13 by Jean-Paul Calderone
Break up big add_client_ca test into a bunch of smaller ones
1598
1599
    def test_set_after_add_client_ca(self):
1600
        """
1601
        A call to L{Context.set_client_ca_list} after a call to
1602
        L{Context.add_client_ca} replaces the CA name specified by the former
1603
        call with the names specified by the latter cal.
1604
        """
1605
        cacert = load_certificate(FILETYPE_PEM, root_cert_pem)
1606
        secert = load_certificate(FILETYPE_PEM, server_cert_pem)
1607
        clcert = load_certificate(FILETYPE_PEM, server_cert_pem)
1608
1609
        cadesc = cacert.get_subject()
1610
        sedesc = secert.get_subject()
1611
122.3.11 by Ziga Seilnacht
Lowercase *client_CA* methods for consistency with the rest of PyOpenSSL.
1612
        def set_replaces_add_ca(ctx):
1613
            ctx.add_client_ca(clcert)
1614
            ctx.set_client_ca_list([cadesc])
1615
            ctx.add_client_ca(secert)
122.3.5 by Ziga Seilnacht
Allow setting and inspecting the preferred client certificate signer list.
1616
            return [cadesc, sedesc]
122.3.11 by Ziga Seilnacht
Lowercase *client_CA* methods for consistency with the rest of PyOpenSSL.
1617
        self._check_client_ca_list(set_replaces_add_ca)
122.3.5 by Ziga Seilnacht
Allow setting and inspecting the preferred client certificate signer list.
1618
109 by Jean-Paul Calderone
Switch from Twisted's TestCase to the stdlib TestCase so there are fewer ways the test suite can be invoked
1619
122.3.1 by Ziga Seilnacht
Types should have a correct __module__ attribute.
1620
109 by Jean-Paul Calderone
Switch from Twisted's TestCase to the stdlib TestCase so there are fewer ways the test suite can be invoked
1621
if __name__ == '__main__':
1622
    main()