132.1.49
by Jean-Paul Calderone
Fix pyflakes |
1 |
# Copyright (C) Jean-Paul Calderone 2008-2010, All rights reserved
|
42
by Jean-Paul Calderone
Put my copyright on everything I've changed so far, put a more reasonable version number on version.py |
2 |
|
41
by Jean-Paul Calderone
A couple trivial Context tests |
3 |
"""
|
4 |
Unit tests for L{OpenSSL.SSL}.
|
|
5 |
"""
|
|
6 |
||
135.1.1
by Jean-Paul Calderone
Allow EWOULDBLOCK from connect_ex as well |
7 |
from errno import ECONNREFUSED, EINPROGRESS, EWOULDBLOCK |
94
by Jean-Paul Calderone
Fix clearly retarded bugs in previous revision |
8 |
from sys import platform |
132.1.7
by Jean-Paul Calderone
test for successful and unsuccessful connect |
9 |
from socket import error, socket |
119.1.1
by Jean-Paul Calderone
Avoid using symlink, since we cannot use it on windows |
10 |
from os import makedirs |
70.2.3
by Jean-Paul Calderone
support capath parameter to load_verify_locations; add wrapper for set_default_verify_paths |
11 |
from os.path import join |
109
by Jean-Paul Calderone
Switch from Twisted's TestCase to the stdlib TestCase so there are fewer ways the test suite can be invoked |
12 |
from unittest import main |
96
by Jean-Paul Calderone
Let's try using Twisted's TestCase and see how that goes. |
13 |
|
132.1.49
by Jean-Paul Calderone
Fix pyflakes |
14 |
from OpenSSL.crypto import TYPE_RSA, FILETYPE_PEM, FILETYPE_ASN1 |
15 |
from OpenSSL.crypto import PKey, X509, X509Extension |
|
16 |
from OpenSSL.crypto import dump_privatekey, load_privatekey |
|
17 |
from OpenSSL.crypto import dump_certificate, load_certificate |
|
18 |
||
132.1.14
by Jean-Paul Calderone
somewhat more coverage in test_shutdown |
19 |
from OpenSSL.SSL import SENT_SHUTDOWN, RECEIVED_SHUTDOWN |
41
by Jean-Paul Calderone
A couple trivial Context tests |
20 |
from OpenSSL.SSL import SSLv2_METHOD, SSLv3_METHOD, SSLv23_METHOD, TLSv1_METHOD |
119
by Jean-Paul Calderone
Add a bunch of tests which assert that extension types are defined nicely. |
21 |
from OpenSSL.SSL import OP_NO_SSLv2, OP_NO_SSLv3, OP_SINGLE_DH_USE |
98.2.1
by Rick Dean
BIO read/write to allow for socketless SSL Connections |
22 |
from OpenSSL.SSL import VERIFY_PEER, VERIFY_FAIL_IF_NO_PEER_CERT, VERIFY_CLIENT_ONCE |
132.1.49
by Jean-Paul Calderone
Fix pyflakes |
23 |
from OpenSSL.SSL import Error, SysCallError, WantReadError, ZeroReturnError |
24 |
from OpenSSL.SSL import Context, ContextType, Connection, ConnectionType |
|
25 |
||
132.2.38
by Jean-Paul Calderone
Fix the import_crypto_module helper and factor some str/unicode helpers into the test util module |
26 |
from OpenSSL.test.util import TestCase, bytes, b |
110
by Jean-Paul Calderone
Make it easier to be Python 2.3 compatible by making that the default |
27 |
from OpenSSL.test.test_crypto import cleartextCertificatePEM, cleartextPrivateKeyPEM |
132.1.49
by Jean-Paul Calderone
Fix pyflakes |
28 |
from OpenSSL.test.test_crypto import client_cert_pem, client_key_pem |
29 |
from OpenSSL.test.test_crypto import server_cert_pem, server_key_pem, root_cert_pem |
|
30 |
||
85
by Jean-Paul Calderone
make the tests recognize the optionalness of the DTLS constants |
31 |
try: |
32 |
from OpenSSL.SSL import OP_NO_QUERY_MTU |
|
33 |
except ImportError: |
|
34 |
OP_NO_QUERY_MTU = None |
|
35 |
try: |
|
36 |
from OpenSSL.SSL import OP_COOKIE_EXCHANGE |
|
37 |
except ImportError: |
|
38 |
OP_COOKIE_EXCHANGE = None |
|
39 |
try: |
|
40 |
from OpenSSL.SSL import OP_NO_TICKET |
|
41 |
except ImportError: |
|
42 |
OP_NO_TICKET = None |
|
63
by Jean-Paul Calderone
Python 2.3 compatibility in test_ssl |
43 |
|
44 |
||
132.1.52
by Jean-Paul Calderone
Sort of add a load_tmp_dh test; also a Context.set_cipher_list test |
45 |
# openssl dhparam 128 -out dh-128.pem (note that 128 is a small number of bits
|
46 |
# to use)
|
|
47 |
dhparam = """\ |
|
48 |
-----BEGIN DH PARAMETERS-----
|
|
49 |
MBYCEQCobsg29c9WZP/54oAPcwiDAgEC
|
|
50 |
-----END DH PARAMETERS-----
|
|
51 |
"""
|
|
52 |
||
53 |
||
132.1.34
by Jean-Paul Calderone
Try for a real test of Context.add_extra_chain_cert; not working |
54 |
def verify_cb(conn, cert, errnum, depth, ok): |
55 |
return ok |
|
56 |
||
112.1.1
by Rick Dean
Add a new test case of connecting to ourself over an actual socket to pass data. |
57 |
def socket_pair(): |
112.1.4
by Jean-Paul Calderone
Adjust socket_pair docstring style; also make it simpler to avoid possible undesirable socket behaviors |
58 |
"""
|
119
by Jean-Paul Calderone
Add a bunch of tests which assert that extension types are defined nicely. |
59 |
Establish and return a pair of network sockets connected to each other.
|
112.1.4
by Jean-Paul Calderone
Adjust socket_pair docstring style; also make it simpler to avoid possible undesirable socket behaviors |
60 |
"""
|
61 |
# Connect a pair of sockets
|
|
112.1.1
by Rick Dean
Add a new test case of connecting to ourself over an actual socket to pass data. |
62 |
port = socket() |
63 |
port.bind(('', 0)) |
|
64 |
port.listen(1) |
|
65 |
client = socket() |
|
66 |
client.setblocking(False) |
|
119.1.2
by Jean-Paul Calderone
Windows is a poor sport; getsockname is not guaranteed to return the correct result until someone has connected to you; joy |
67 |
client.connect_ex(("127.0.0.1", port.getsockname()[1])) |
116.1.1
by Jean-Paul Calderone
Try doing socket_pair with blocking sockets |
68 |
client.setblocking(True) |
112.1.1
by Rick Dean
Add a new test case of connecting to ourself over an actual socket to pass data. |
69 |
server = port.accept()[0] |
70 |
||
112.1.4
by Jean-Paul Calderone
Adjust socket_pair docstring style; also make it simpler to avoid possible undesirable socket behaviors |
71 |
# Let's pass some unencrypted data to make sure our socket connection is
|
72 |
# fine. Just one byte, so we don't have to worry about buffers getting
|
|
73 |
# filled up or fragmentation.
|
|
132.2.38
by Jean-Paul Calderone
Fix the import_crypto_module helper and factor some str/unicode helpers into the test util module |
74 |
server.send(b("x")) |
75 |
assert client.recv(1024) == b("x") |
|
76 |
client.send(b("y")) |
|
77 |
assert server.recv(1024) == b("y") |
|
112.1.1
by Rick Dean
Add a new test case of connecting to ourself over an actual socket to pass data. |
78 |
|
132.1.1
by Jean-Paul Calderone
Some tests which are supposed to exercise the paths through Connection.sendall |
79 |
# Most of our callers want non-blocking sockets, make it easy for them.
|
116.1.1
by Jean-Paul Calderone
Try doing socket_pair with blocking sockets |
80 |
server.setblocking(False) |
81 |
client.setblocking(False) |
|
82 |
||
112.1.1
by Rick Dean
Add a new test case of connecting to ourself over an actual socket to pass data. |
83 |
return (server, client) |
84 |
||
85 |
||
116.1.1
by Jean-Paul Calderone
Try doing socket_pair with blocking sockets |
86 |
|
132.3.5
by Jean-Paul Calderone
Convert the verify location tests to do the handshake properly |
87 |
def handshake(client, server): |
88 |
conns = [client, server] |
|
89 |
while conns: |
|
90 |
for conn in conns: |
|
91 |
try: |
|
92 |
conn.do_handshake() |
|
93 |
except WantReadError: |
|
94 |
pass
|
|
95 |
else: |
|
96 |
conns.remove(conn) |
|
97 |
||
98 |
||
132.1.34
by Jean-Paul Calderone
Try for a real test of Context.add_extra_chain_cert; not working |
99 |
class _LoopbackMixin: |
132.1.50
by Jean-Paul Calderone
Some more test method docstrings |
100 |
"""
|
101 |
Helper mixin which defines methods for creating a connected socket pair and
|
|
102 |
for forcing two connected SSL sockets to talk to each other via memory BIOs.
|
|
103 |
"""
|
|
132.1.34
by Jean-Paul Calderone
Try for a real test of Context.add_extra_chain_cert; not working |
104 |
def _loopback(self): |
105 |
(server, client) = socket_pair() |
|
106 |
||
107 |
ctx = Context(TLSv1_METHOD) |
|
108 |
ctx.use_privatekey(load_privatekey(FILETYPE_PEM, server_key_pem)) |
|
109 |
ctx.use_certificate(load_certificate(FILETYPE_PEM, server_cert_pem)) |
|
110 |
server = Connection(ctx, server) |
|
111 |
server.set_accept_state() |
|
112 |
client = Connection(Context(TLSv1_METHOD), client) |
|
113 |
client.set_connect_state() |
|
114 |
||
132.3.5
by Jean-Paul Calderone
Convert the verify location tests to do the handshake properly |
115 |
handshake(client, server) |
132.1.34
by Jean-Paul Calderone
Try for a real test of Context.add_extra_chain_cert; not working |
116 |
|
117 |
server.setblocking(True) |
|
118 |
client.setblocking(True) |
|
119 |
return server, client |
|
120 |
||
121 |
||
122 |
def _interactInMemory(self, client_conn, server_conn): |
|
123 |
"""
|
|
124 |
Try to read application bytes from each of the two L{Connection}
|
|
125 |
objects. Copy bytes back and forth between their send/receive buffers
|
|
126 |
for as long as there is anything to copy. When there is nothing more
|
|
127 |
to copy, return C{None}. If one of them actually manages to deliver
|
|
128 |
some application bytes, return a two-tuple of the connection from which
|
|
129 |
the bytes were read and the bytes themselves.
|
|
130 |
"""
|
|
131 |
wrote = True |
|
132 |
while wrote: |
|
133 |
# Loop until neither side has anything to say
|
|
134 |
wrote = False |
|
135 |
||
136 |
# Copy stuff from each side's send buffer to the other side's
|
|
137 |
# receive buffer.
|
|
138 |
for (read, write) in [(client_conn, server_conn), |
|
139 |
(server_conn, client_conn)]: |
|
140 |
||
141 |
# Give the side a chance to generate some more bytes, or
|
|
142 |
# succeed.
|
|
143 |
try: |
|
144 |
bytes = read.recv(2 ** 16) |
|
145 |
except WantReadError: |
|
146 |
# It didn't succeed, so we'll hope it generated some
|
|
147 |
# output.
|
|
148 |
pass
|
|
149 |
else: |
|
150 |
# It did succeed, so we'll stop now and let the caller deal
|
|
151 |
# with it.
|
|
152 |
return (read, bytes) |
|
153 |
||
154 |
while True: |
|
155 |
# Keep copying as long as there's more stuff there.
|
|
156 |
try: |
|
157 |
dirty = read.bio_read(4096) |
|
158 |
except WantReadError: |
|
159 |
# Okay, nothing more waiting to be sent. Stop
|
|
160 |
# processing this send buffer.
|
|
161 |
break
|
|
162 |
else: |
|
163 |
# Keep track of the fact that someone generated some
|
|
164 |
# output.
|
|
165 |
wrote = True |
|
166 |
write.bio_write(dirty) |
|
167 |
||
168 |
||
169 |
||
170 |
class ContextTests(TestCase, _LoopbackMixin): |
|
41
by Jean-Paul Calderone
A couple trivial Context tests |
171 |
"""
|
172 |
Unit tests for L{OpenSSL.SSL.Context}.
|
|
173 |
"""
|
|
174 |
def test_method(self): |
|
175 |
"""
|
|
176 |
L{Context} can be instantiated with one of L{SSLv2_METHOD},
|
|
177 |
L{SSLv3_METHOD}, L{SSLv23_METHOD}, or L{TLSv1_METHOD}.
|
|
178 |
"""
|
|
179 |
for meth in [SSLv2_METHOD, SSLv3_METHOD, SSLv23_METHOD, TLSv1_METHOD]: |
|
180 |
Context(meth) |
|
181 |
self.assertRaises(TypeError, Context, "") |
|
182 |
self.assertRaises(ValueError, Context, 10) |
|
183 |
||
184 |
||
112.5.1
by Rick Dean
More unit tests about types |
185 |
def test_type(self): |
186 |
"""
|
|
119
by Jean-Paul Calderone
Add a bunch of tests which assert that extension types are defined nicely. |
187 |
L{Context} and L{ContextType} refer to the same type object and can be
|
188 |
used to create instances of that type.
|
|
112.5.1
by Rick Dean
More unit tests about types |
189 |
"""
|
119
by Jean-Paul Calderone
Add a bunch of tests which assert that extension types are defined nicely. |
190 |
self.assertIdentical(Context, ContextType) |
191 |
self.assertConsistentType(Context, 'Context', TLSv1_METHOD) |
|
112.5.1
by Rick Dean
More unit tests about types |
192 |
|
193 |
||
41
by Jean-Paul Calderone
A couple trivial Context tests |
194 |
def test_use_privatekey(self): |
195 |
"""
|
|
196 |
L{Context.use_privatekey} takes an L{OpenSSL.crypto.PKey} instance.
|
|
197 |
"""
|
|
198 |
key = PKey() |
|
199 |
key.generate_key(TYPE_RSA, 128) |
|
200 |
ctx = Context(TLSv1_METHOD) |
|
201 |
ctx.use_privatekey(key) |
|
202 |
self.assertRaises(TypeError, ctx.use_privatekey, "") |
|
60
by Jean-Paul Calderone
Fix a threading bug in passphrase callback support for context objects. |
203 |
|
204 |
||
132.1.26
by Jean-Paul Calderone
Context app data wrong arg tests |
205 |
def test_set_app_data_wrong_args(self): |
206 |
"""
|
|
207 |
L{Context.set_app_data} raises L{TypeError} if called with other than
|
|
208 |
one argument.
|
|
209 |
"""
|
|
210 |
context = Context(TLSv1_METHOD) |
|
211 |
self.assertRaises(TypeError, context.set_app_data) |
|
212 |
self.assertRaises(TypeError, context.set_app_data, None, None) |
|
213 |
||
214 |
||
215 |
def test_get_app_data_wrong_args(self): |
|
216 |
"""
|
|
217 |
L{Context.get_app_data} raises L{TypeError} if called with any
|
|
218 |
arguments.
|
|
219 |
"""
|
|
220 |
context = Context(TLSv1_METHOD) |
|
221 |
self.assertRaises(TypeError, context.get_app_data, None) |
|
222 |
||
223 |
||
224 |
def test_app_data(self): |
|
225 |
"""
|
|
226 |
L{Context.set_app_data} stores an object for later retrieval using
|
|
227 |
L{Context.get_app_data}.
|
|
228 |
"""
|
|
229 |
app_data = object() |
|
230 |
context = Context(TLSv1_METHOD) |
|
231 |
context.set_app_data(app_data) |
|
232 |
self.assertIdentical(context.get_app_data(), app_data) |
|
233 |
||
234 |
||
132.1.27
by Jean-Paul Calderone
Context.set_options wrong args test |
235 |
def test_set_options_wrong_args(self): |
236 |
"""
|
|
237 |
L{Context.set_options} raises L{TypeError} if called with the wrong
|
|
238 |
number of arguments or a non-C{int} argument.
|
|
239 |
"""
|
|
240 |
context = Context(TLSv1_METHOD) |
|
241 |
self.assertRaises(TypeError, context.set_options) |
|
242 |
self.assertRaises(TypeError, context.set_options, None) |
|
243 |
self.assertRaises(TypeError, context.set_options, 1, None) |
|
244 |
||
245 |
||
132.1.28
by Jean-Paul Calderone
Context.set_timeout and get_timeout tests |
246 |
def test_set_timeout_wrong_args(self): |
247 |
"""
|
|
248 |
L{Context.set_timeout} raises L{TypeError} if called with the wrong
|
|
249 |
number of arguments or a non-C{int} argument.
|
|
250 |
"""
|
|
251 |
context = Context(TLSv1_METHOD) |
|
252 |
self.assertRaises(TypeError, context.set_timeout) |
|
253 |
self.assertRaises(TypeError, context.set_timeout, None) |
|
254 |
self.assertRaises(TypeError, context.set_timeout, 1, None) |
|
255 |
||
256 |
||
257 |
def test_get_timeout_wrong_args(self): |
|
258 |
"""
|
|
259 |
L{Context.get_timeout} raises L{TypeError} if called with any arguments.
|
|
260 |
"""
|
|
261 |
context = Context(TLSv1_METHOD) |
|
262 |
self.assertRaises(TypeError, context.get_timeout, None) |
|
263 |
||
264 |
||
265 |
def test_timeout(self): |
|
266 |
"""
|
|
267 |
L{Context.set_timeout} sets the session timeout for all connections
|
|
268 |
created using the context object. L{Context.get_timeout} retrieves this
|
|
269 |
value.
|
|
270 |
"""
|
|
271 |
context = Context(TLSv1_METHOD) |
|
272 |
context.set_timeout(1234) |
|
273 |
self.assertEquals(context.get_timeout(), 1234) |
|
274 |
||
275 |
||
132.1.30
by Jean-Paul Calderone
Tests for set_verify_depth and get_verify_depth |
276 |
def test_set_verify_depth_wrong_args(self): |
277 |
"""
|
|
278 |
L{Context.set_verify_depth} raises L{TypeError} if called with the wrong
|
|
279 |
number of arguments or a non-C{int} argument.
|
|
280 |
"""
|
|
281 |
context = Context(TLSv1_METHOD) |
|
282 |
self.assertRaises(TypeError, context.set_verify_depth) |
|
283 |
self.assertRaises(TypeError, context.set_verify_depth, None) |
|
284 |
self.assertRaises(TypeError, context.set_verify_depth, 1, None) |
|
285 |
||
286 |
||
287 |
def test_get_verify_depth_wrong_args(self): |
|
288 |
"""
|
|
289 |
L{Context.get_verify_depth} raises L{TypeError} if called with any arguments.
|
|
290 |
"""
|
|
291 |
context = Context(TLSv1_METHOD) |
|
292 |
self.assertRaises(TypeError, context.get_verify_depth, None) |
|
293 |
||
294 |
||
295 |
def test_verify_depth(self): |
|
296 |
"""
|
|
297 |
L{Context.set_verify_depth} sets the number of certificates in a chain
|
|
298 |
to follow before giving up. The value can be retrieved with
|
|
299 |
L{Context.get_verify_depth}.
|
|
300 |
"""
|
|
301 |
context = Context(TLSv1_METHOD) |
|
302 |
context.set_verify_depth(11) |
|
303 |
self.assertEquals(context.get_verify_depth(), 11) |
|
304 |
||
305 |
||
132.1.25
by Jean-Paul Calderone
Tests for the error handling in the global passphrase callback |
306 |
def _write_encrypted_pem(self, passphrase): |
132.1.50
by Jean-Paul Calderone
Some more test method docstrings |
307 |
"""
|
308 |
Write a new private key out to a new file, encrypted using the given
|
|
309 |
passphrase. Return the path to the new file.
|
|
310 |
"""
|
|
132.1.25
by Jean-Paul Calderone
Tests for the error handling in the global passphrase callback |
311 |
key = PKey() |
312 |
key.generate_key(TYPE_RSA, 128) |
|
313 |
pemFile = self.mktemp() |
|
132.2.40
by Jean-Paul Calderone
Convert some bytes to text and vice versa |
314 |
fObj = open(pemFile, 'w') |
315 |
pem = dump_privatekey(FILETYPE_PEM, key, "blowfish", passphrase) |
|
316 |
fObj.write(pem.decode('ascii')) |
|
132.1.25
by Jean-Paul Calderone
Tests for the error handling in the global passphrase callback |
317 |
fObj.close() |
318 |
return pemFile |
|
319 |
||
320 |
||
132.1.35
by Jean-Paul Calderone
A test for use_certificate_chain_file which fails just like add_extra_chain_cert |
321 |
def test_set_passwd_cb_wrong_args(self): |
322 |
"""
|
|
323 |
L{Context.set_passwd_cb} raises L{TypeError} if called with the
|
|
324 |
wrong arguments or with a non-callable first argument.
|
|
325 |
"""
|
|
326 |
context = Context(TLSv1_METHOD) |
|
327 |
self.assertRaises(TypeError, context.set_passwd_cb) |
|
328 |
self.assertRaises(TypeError, context.set_passwd_cb, None) |
|
329 |
self.assertRaises(TypeError, context.set_passwd_cb, lambda: None, None, None) |
|
330 |
||
331 |
||
60
by Jean-Paul Calderone
Fix a threading bug in passphrase callback support for context objects. |
332 |
def test_set_passwd_cb(self): |
333 |
"""
|
|
334 |
L{Context.set_passwd_cb} accepts a callable which will be invoked when
|
|
335 |
a private key is loaded from an encrypted PEM.
|
|
336 |
"""
|
|
132.2.40
by Jean-Paul Calderone
Convert some bytes to text and vice versa |
337 |
passphrase = b("foobar") |
132.1.25
by Jean-Paul Calderone
Tests for the error handling in the global passphrase callback |
338 |
pemFile = self._write_encrypted_pem(passphrase) |
60
by Jean-Paul Calderone
Fix a threading bug in passphrase callback support for context objects. |
339 |
calledWith = [] |
340 |
def passphraseCallback(maxlen, verify, extra): |
|
341 |
calledWith.append((maxlen, verify, extra)) |
|
342 |
return passphrase |
|
343 |
context = Context(TLSv1_METHOD) |
|
344 |
context.set_passwd_cb(passphraseCallback) |
|
345 |
context.use_privatekey_file(pemFile) |
|
346 |
self.assertTrue(len(calledWith), 1) |
|
347 |
self.assertTrue(isinstance(calledWith[0][0], int)) |
|
348 |
self.assertTrue(isinstance(calledWith[0][1], int)) |
|
349 |
self.assertEqual(calledWith[0][2], None) |
|
61
by Jean-Paul Calderone
Fix a threading bug in the info callback support for context objects. |
350 |
|
351 |
||
132.1.25
by Jean-Paul Calderone
Tests for the error handling in the global passphrase callback |
352 |
def test_passwd_callback_exception(self): |
353 |
"""
|
|
354 |
L{Context.use_privatekey_file} propagates any exception raised by the
|
|
355 |
passphrase callback.
|
|
356 |
"""
|
|
132.2.40
by Jean-Paul Calderone
Convert some bytes to text and vice versa |
357 |
pemFile = self._write_encrypted_pem(b("monkeys are nice")) |
132.1.25
by Jean-Paul Calderone
Tests for the error handling in the global passphrase callback |
358 |
def passphraseCallback(maxlen, verify, extra): |
359 |
raise RuntimeError("Sorry, I am a fail.") |
|
360 |
||
361 |
context = Context(TLSv1_METHOD) |
|
362 |
context.set_passwd_cb(passphraseCallback) |
|
363 |
self.assertRaises(RuntimeError, context.use_privatekey_file, pemFile) |
|
364 |
||
365 |
||
366 |
def test_passwd_callback_false(self): |
|
367 |
"""
|
|
368 |
L{Context.use_privatekey_file} raises L{OpenSSL.SSL.Error} if the
|
|
369 |
passphrase callback returns a false value.
|
|
370 |
"""
|
|
132.2.40
by Jean-Paul Calderone
Convert some bytes to text and vice versa |
371 |
pemFile = self._write_encrypted_pem(b("monkeys are nice")) |
132.1.25
by Jean-Paul Calderone
Tests for the error handling in the global passphrase callback |
372 |
def passphraseCallback(maxlen, verify, extra): |
373 |
return None |
|
374 |
||
375 |
context = Context(TLSv1_METHOD) |
|
376 |
context.set_passwd_cb(passphraseCallback) |
|
377 |
self.assertRaises(Error, context.use_privatekey_file, pemFile) |
|
378 |
||
379 |
||
380 |
def test_passwd_callback_non_string(self): |
|
381 |
"""
|
|
382 |
L{Context.use_privatekey_file} raises L{OpenSSL.SSL.Error} if the
|
|
383 |
passphrase callback returns a true non-string value.
|
|
384 |
"""
|
|
132.2.40
by Jean-Paul Calderone
Convert some bytes to text and vice versa |
385 |
pemFile = self._write_encrypted_pem(b("monkeys are nice")) |
132.1.25
by Jean-Paul Calderone
Tests for the error handling in the global passphrase callback |
386 |
def passphraseCallback(maxlen, verify, extra): |
387 |
return 10 |
|
388 |
||
389 |
context = Context(TLSv1_METHOD) |
|
390 |
context.set_passwd_cb(passphraseCallback) |
|
391 |
self.assertRaises(Error, context.use_privatekey_file, pemFile) |
|
392 |
||
393 |
||
394 |
def test_passwd_callback_too_long(self): |
|
395 |
"""
|
|
396 |
If the passphrase returned by the passphrase callback returns a string
|
|
397 |
longer than the indicated maximum length, it is truncated.
|
|
398 |
"""
|
|
399 |
# A priori knowledge!
|
|
132.2.40
by Jean-Paul Calderone
Convert some bytes to text and vice versa |
400 |
passphrase = b("x") * 1024 |
132.1.25
by Jean-Paul Calderone
Tests for the error handling in the global passphrase callback |
401 |
pemFile = self._write_encrypted_pem(passphrase) |
402 |
def passphraseCallback(maxlen, verify, extra): |
|
403 |
assert maxlen == 1024 |
|
132.2.41
by Jean-Paul Calderone
Convert some more bytes to text and vice versa |
404 |
return passphrase + b("y") |
132.1.25
by Jean-Paul Calderone
Tests for the error handling in the global passphrase callback |
405 |
|
406 |
context = Context(TLSv1_METHOD) |
|
407 |
context.set_passwd_cb(passphraseCallback) |
|
408 |
# This shall succeed because the truncated result is the correct
|
|
409 |
# passphrase.
|
|
410 |
context.use_privatekey_file(pemFile) |
|
411 |
||
412 |
||
61
by Jean-Paul Calderone
Fix a threading bug in the info callback support for context objects. |
413 |
def test_set_info_callback(self): |
414 |
"""
|
|
415 |
L{Context.set_info_callback} accepts a callable which will be invoked
|
|
416 |
when certain information about an SSL connection is available.
|
|
417 |
"""
|
|
112.1.1
by Rick Dean
Add a new test case of connecting to ourself over an actual socket to pass data. |
418 |
(server, client) = socket_pair() |
61
by Jean-Paul Calderone
Fix a threading bug in the info callback support for context objects. |
419 |
|
420 |
clientSSL = Connection(Context(TLSv1_METHOD), client) |
|
421 |
clientSSL.set_connect_state() |
|
422 |
||
423 |
called = [] |
|
424 |
def info(conn, where, ret): |
|
425 |
called.append((conn, where, ret)) |
|
426 |
context = Context(TLSv1_METHOD) |
|
427 |
context.set_info_callback(info) |
|
428 |
context.use_certificate( |
|
429 |
load_certificate(FILETYPE_PEM, cleartextCertificatePEM)) |
|
430 |
context.use_privatekey( |
|
431 |
load_privatekey(FILETYPE_PEM, cleartextPrivateKeyPEM)) |
|
432 |
||
433 |
serverSSL = Connection(context, server) |
|
434 |
serverSSL.set_accept_state() |
|
435 |
||
436 |
while not called: |
|
437 |
for ssl in clientSSL, serverSSL: |
|
438 |
try: |
|
439 |
ssl.do_handshake() |
|
440 |
except WantReadError: |
|
441 |
pass
|
|
442 |
||
443 |
# Kind of lame. Just make sure it got called somehow.
|
|
444 |
self.assertTrue(called) |
|
70.2.1
by Jean-Paul Calderone
test for the pre-existing load_verify_locations function |
445 |
|
446 |
||
70.2.3
by Jean-Paul Calderone
support capath parameter to load_verify_locations; add wrapper for set_default_verify_paths |
447 |
def _load_verify_locations_test(self, *args): |
132.1.50
by Jean-Paul Calderone
Some more test method docstrings |
448 |
"""
|
449 |
Create a client context which will verify the peer certificate and call
|
|
450 |
its C{load_verify_locations} method with C{*args}. Then connect it to a
|
|
451 |
server and ensure that the handshake succeeds.
|
|
452 |
"""
|
|
112.1.1
by Rick Dean
Add a new test case of connecting to ourself over an actual socket to pass data. |
453 |
(server, client) = socket_pair() |
70.2.1
by Jean-Paul Calderone
test for the pre-existing load_verify_locations function |
454 |
|
455 |
clientContext = Context(TLSv1_METHOD) |
|
70.2.3
by Jean-Paul Calderone
support capath parameter to load_verify_locations; add wrapper for set_default_verify_paths |
456 |
clientContext.load_verify_locations(*args) |
70.2.1
by Jean-Paul Calderone
test for the pre-existing load_verify_locations function |
457 |
# Require that the server certificate verify properly or the
|
458 |
# connection will fail.
|
|
459 |
clientContext.set_verify( |
|
460 |
VERIFY_PEER, |
|
461 |
lambda conn, cert, errno, depth, preverify_ok: preverify_ok) |
|
462 |
||
463 |
clientSSL = Connection(clientContext, client) |
|
464 |
clientSSL.set_connect_state() |
|
465 |
||
466 |
serverContext = Context(TLSv1_METHOD) |
|
467 |
serverContext.use_certificate( |
|
468 |
load_certificate(FILETYPE_PEM, cleartextCertificatePEM)) |
|
469 |
serverContext.use_privatekey( |
|
470 |
load_privatekey(FILETYPE_PEM, cleartextPrivateKeyPEM)) |
|
471 |
||
472 |
serverSSL = Connection(serverContext, server) |
|
473 |
serverSSL.set_accept_state() |
|
474 |
||
132.3.5
by Jean-Paul Calderone
Convert the verify location tests to do the handshake properly |
475 |
# Without load_verify_locations above, the handshake
|
476 |
# will fail:
|
|
477 |
# Error: [('SSL routines', 'SSL3_GET_SERVER_CERTIFICATE',
|
|
478 |
# 'certificate verify failed')]
|
|
479 |
handshake(clientSSL, serverSSL) |
|
70.2.1
by Jean-Paul Calderone
test for the pre-existing load_verify_locations function |
480 |
|
481 |
cert = clientSSL.get_peer_certificate() |
|
98
by Jean-Paul Calderone
Fix the tests by updating the certificate they use so that it is not expired |
482 |
self.assertEqual(cert.get_subject().CN, 'Testing Root CA') |
70.2.2
by Jean-Paul Calderone
a test for the obvious error case of load_verify_locations |
483 |
|
122.3.17
by Jean-Paul Calderone
Minimal tests for Context.add_extra_chain_cert |
484 |
|
70.2.3
by Jean-Paul Calderone
support capath parameter to load_verify_locations; add wrapper for set_default_verify_paths |
485 |
def test_load_verify_file(self): |
486 |
"""
|
|
487 |
L{Context.load_verify_locations} accepts a file name and uses the
|
|
488 |
certificates within for verification purposes.
|
|
489 |
"""
|
|
490 |
cafile = self.mktemp() |
|
132.2.40
by Jean-Paul Calderone
Convert some bytes to text and vice versa |
491 |
fObj = open(cafile, 'w') |
132.2.41
by Jean-Paul Calderone
Convert some more bytes to text and vice versa |
492 |
fObj.write(cleartextCertificatePEM.decode('ascii')) |
70.2.3
by Jean-Paul Calderone
support capath parameter to load_verify_locations; add wrapper for set_default_verify_paths |
493 |
fObj.close() |
494 |
||
495 |
self._load_verify_locations_test(cafile) |
|
496 |
||
70.2.2
by Jean-Paul Calderone
a test for the obvious error case of load_verify_locations |
497 |
|
498 |
def test_load_verify_invalid_file(self): |
|
499 |
"""
|
|
500 |
L{Context.load_verify_locations} raises L{Error} when passed a
|
|
501 |
non-existent cafile.
|
|
502 |
"""
|
|
503 |
clientContext = Context(TLSv1_METHOD) |
|
504 |
self.assertRaises( |
|
505 |
Error, clientContext.load_verify_locations, self.mktemp()) |
|
70.2.3
by Jean-Paul Calderone
support capath parameter to load_verify_locations; add wrapper for set_default_verify_paths |
506 |
|
507 |
||
508 |
def test_load_verify_directory(self): |
|
509 |
"""
|
|
510 |
L{Context.load_verify_locations} accepts a directory name and uses
|
|
511 |
the certificates within for verification purposes.
|
|
512 |
"""
|
|
513 |
capath = self.mktemp() |
|
514 |
makedirs(capath) |
|
515 |
# Hash value computed manually with c_rehash to avoid depending on
|
|
516 |
# c_rehash in the test suite.
|
|
119.1.1
by Jean-Paul Calderone
Avoid using symlink, since we cannot use it on windows |
517 |
cafile = join(capath, 'c7adac82.0') |
132.2.40
by Jean-Paul Calderone
Convert some bytes to text and vice versa |
518 |
fObj = open(cafile, 'w') |
132.2.41
by Jean-Paul Calderone
Convert some more bytes to text and vice versa |
519 |
fObj.write(cleartextCertificatePEM.decode('ascii')) |
119.1.1
by Jean-Paul Calderone
Avoid using symlink, since we cannot use it on windows |
520 |
fObj.close() |
70.2.3
by Jean-Paul Calderone
support capath parameter to load_verify_locations; add wrapper for set_default_verify_paths |
521 |
|
522 |
self._load_verify_locations_test(None, capath) |
|
523 |
||
524 |
||
132.1.35
by Jean-Paul Calderone
A test for use_certificate_chain_file which fails just like add_extra_chain_cert |
525 |
def test_load_verify_locations_wrong_args(self): |
526 |
"""
|
|
527 |
L{Context.load_verify_locations} raises L{TypeError} if called with
|
|
528 |
the wrong number of arguments or with non-C{str} arguments.
|
|
529 |
"""
|
|
530 |
context = Context(TLSv1_METHOD) |
|
531 |
self.assertRaises(TypeError, context.load_verify_locations) |
|
532 |
self.assertRaises(TypeError, context.load_verify_locations, object()) |
|
533 |
self.assertRaises(TypeError, context.load_verify_locations, object(), object()) |
|
534 |
self.assertRaises(TypeError, context.load_verify_locations, None, None, None) |
|
535 |
||
536 |
||
132.1.1
by Jean-Paul Calderone
Some tests which are supposed to exercise the paths through Connection.sendall |
537 |
if platform == "win32": |
538 |
"set_default_verify_paths appears not to work on Windows. "
|
|
119.1.4
by Jean-Paul Calderone
skip on darwin and win32 |
539 |
"See LP#404343 and LP#404344."
|
540 |
else: |
|
541 |
def test_set_default_verify_paths(self): |
|
542 |
"""
|
|
543 |
L{Context.set_default_verify_paths} causes the platform-specific CA
|
|
544 |
certificate locations to be used for verification purposes.
|
|
545 |
"""
|
|
546 |
# Testing this requires a server with a certificate signed by one of
|
|
547 |
# the CAs in the platform CA location. Getting one of those costs
|
|
548 |
# money. Fortunately (or unfortunately, depending on your
|
|
549 |
# perspective), it's easy to think of a public server on the
|
|
550 |
# internet which has such a certificate. Connecting to the network
|
|
551 |
# in a unit test is bad, but it's the only way I can think of to
|
|
552 |
# really test this. -exarkun
|
|
553 |
||
554 |
# Arg, verisign.com doesn't speak TLSv1
|
|
555 |
context = Context(SSLv3_METHOD) |
|
556 |
context.set_default_verify_paths() |
|
557 |
context.set_verify( |
|
122.3.1
by Ziga Seilnacht
Types should have a correct __module__ attribute. |
558 |
VERIFY_PEER, |
119.1.4
by Jean-Paul Calderone
skip on darwin and win32 |
559 |
lambda conn, cert, errno, depth, preverify_ok: preverify_ok) |
560 |
||
561 |
client = socket() |
|
562 |
client.connect(('verisign.com', 443)) |
|
563 |
clientSSL = Connection(context, client) |
|
564 |
clientSSL.set_connect_state() |
|
565 |
clientSSL.do_handshake() |
|
566 |
clientSSL.send('GET / HTTP/1.0\r\n\r\n') |
|
567 |
self.assertTrue(clientSSL.recv(1024)) |
|
70.2.6
by Jean-Paul Calderone
reject arguments to set_default_verify_paths |
568 |
|
569 |
||
570 |
def test_set_default_verify_paths_signature(self): |
|
571 |
"""
|
|
572 |
L{Context.set_default_verify_paths} takes no arguments and raises
|
|
573 |
L{TypeError} if given any.
|
|
574 |
"""
|
|
575 |
context = Context(TLSv1_METHOD) |
|
576 |
self.assertRaises(TypeError, context.set_default_verify_paths, None) |
|
577 |
self.assertRaises(TypeError, context.set_default_verify_paths, 1) |
|
578 |
self.assertRaises(TypeError, context.set_default_verify_paths, "") |
|
83
by Jean-Paul Calderone
Expose some new DTLS-related constants in OpenSSL.SSL - OP_NO_QUERY_MTU, OP_COOKIE_EXCHANGE, and OP_NO_TICKET. |
579 |
|
132.1.35
by Jean-Paul Calderone
A test for use_certificate_chain_file which fails just like add_extra_chain_cert |
580 |
|
122.3.17
by Jean-Paul Calderone
Minimal tests for Context.add_extra_chain_cert |
581 |
def test_add_extra_chain_cert_invalid_cert(self): |
582 |
"""
|
|
583 |
L{Context.add_extra_chain_cert} raises L{TypeError} if called with
|
|
584 |
other than one argument or if called with an object which is not an
|
|
585 |
instance of L{X509}.
|
|
586 |
"""
|
|
587 |
context = Context(TLSv1_METHOD) |
|
588 |
self.assertRaises(TypeError, context.add_extra_chain_cert) |
|
589 |
self.assertRaises(TypeError, context.add_extra_chain_cert, object()) |
|
590 |
self.assertRaises(TypeError, context.add_extra_chain_cert, object(), object()) |
|
591 |
||
592 |
||
132.1.35
by Jean-Paul Calderone
A test for use_certificate_chain_file which fails just like add_extra_chain_cert |
593 |
def _create_certificate_chain(self): |
122.3.17
by Jean-Paul Calderone
Minimal tests for Context.add_extra_chain_cert |
594 |
"""
|
132.1.35
by Jean-Paul Calderone
A test for use_certificate_chain_file which fails just like add_extra_chain_cert |
595 |
Construct and return a chain of certificates.
|
132.1.34
by Jean-Paul Calderone
Try for a real test of Context.add_extra_chain_cert; not working |
596 |
|
597 |
1. A new self-signed certificate authority certificate (cacert)
|
|
598 |
2. A new intermediate certificate signed by cacert (icert)
|
|
599 |
3. A new server certificate signed by icert (scert)
|
|
122.3.17
by Jean-Paul Calderone
Minimal tests for Context.add_extra_chain_cert |
600 |
"""
|
132.2.45
by Jean-Paul Calderone
merge trunk |
601 |
caext = X509Extension(b('basicConstraints'), False, b('CA:true')) |
132.1.34
by Jean-Paul Calderone
Try for a real test of Context.add_extra_chain_cert; not working |
602 |
|
603 |
# Step 1
|
|
604 |
cakey = PKey() |
|
605 |
cakey.generate_key(TYPE_RSA, 512) |
|
606 |
cacert = X509() |
|
132.1.44
by Jean-Paul Calderone
Fix the scert.sign(skey, ...) bug; should have been scert.sign(ikey, ...) |
607 |
cacert.get_subject().commonName = "Authority Certificate" |
132.1.34
by Jean-Paul Calderone
Try for a real test of Context.add_extra_chain_cert; not working |
608 |
cacert.set_issuer(cacert.get_subject()) |
609 |
cacert.set_pubkey(cakey) |
|
132.2.48
by Jean-Paul Calderone
Make timestamps bytes |
610 |
cacert.set_notBefore(b("20000101000000Z")) |
611 |
cacert.set_notAfter(b("20200101000000Z")) |
|
132.1.34
by Jean-Paul Calderone
Try for a real test of Context.add_extra_chain_cert; not working |
612 |
cacert.add_extensions([caext]) |
132.1.44
by Jean-Paul Calderone
Fix the scert.sign(skey, ...) bug; should have been scert.sign(ikey, ...) |
613 |
cacert.set_serial_number(0) |
614 |
cacert.sign(cakey, "sha1") |
|
132.1.34
by Jean-Paul Calderone
Try for a real test of Context.add_extra_chain_cert; not working |
615 |
|
616 |
# Step 2
|
|
617 |
ikey = PKey() |
|
618 |
ikey.generate_key(TYPE_RSA, 512) |
|
619 |
icert = X509() |
|
620 |
icert.get_subject().commonName = "Intermediate Certificate" |
|
621 |
icert.set_issuer(cacert.get_subject()) |
|
622 |
icert.set_pubkey(ikey) |
|
132.2.48
by Jean-Paul Calderone
Make timestamps bytes |
623 |
icert.set_notBefore(b("20000101000000Z")) |
624 |
icert.set_notAfter(b("20200101000000Z")) |
|
132.1.34
by Jean-Paul Calderone
Try for a real test of Context.add_extra_chain_cert; not working |
625 |
icert.add_extensions([caext]) |
132.1.44
by Jean-Paul Calderone
Fix the scert.sign(skey, ...) bug; should have been scert.sign(ikey, ...) |
626 |
icert.set_serial_number(0) |
627 |
icert.sign(cakey, "sha1") |
|
132.1.34
by Jean-Paul Calderone
Try for a real test of Context.add_extra_chain_cert; not working |
628 |
|
629 |
# Step 3
|
|
630 |
skey = PKey() |
|
631 |
skey.generate_key(TYPE_RSA, 512) |
|
632 |
scert = X509() |
|
633 |
scert.get_subject().commonName = "Server Certificate" |
|
634 |
scert.set_issuer(icert.get_subject()) |
|
635 |
scert.set_pubkey(skey) |
|
132.2.48
by Jean-Paul Calderone
Make timestamps bytes |
636 |
scert.set_notBefore(b("20000101000000Z")) |
637 |
scert.set_notAfter(b("20200101000000Z")) |
|
132.2.49
by Jean-Paul Calderone
Some fields of X509Extension are bytes, not text |
638 |
scert.add_extensions([ |
639 |
X509Extension(b('basicConstraints'), True, b('CA:false'))]) |
|
132.1.44
by Jean-Paul Calderone
Fix the scert.sign(skey, ...) bug; should have been scert.sign(ikey, ...) |
640 |
scert.set_serial_number(0) |
641 |
scert.sign(ikey, "sha1") |
|
642 |
||
132.1.35
by Jean-Paul Calderone
A test for use_certificate_chain_file which fails just like add_extra_chain_cert |
643 |
return [(cakey, cacert), (ikey, icert), (skey, scert)] |
644 |
||
645 |
||
646 |
def _handshake_test(self, serverContext, clientContext): |
|
647 |
"""
|
|
648 |
Verify that a client and server created with the given contexts can
|
|
649 |
successfully handshake and communicate.
|
|
650 |
"""
|
|
651 |
serverSocket, clientSocket = socket_pair() |
|
652 |
||
132.1.34
by Jean-Paul Calderone
Try for a real test of Context.add_extra_chain_cert; not working |
653 |
server = Connection(serverContext, serverSocket) |
132.1.12
by Jean-Paul Calderone
Test for Connection.shutdown |
654 |
server.set_accept_state() |
132.1.34
by Jean-Paul Calderone
Try for a real test of Context.add_extra_chain_cert; not working |
655 |
|
132.1.35
by Jean-Paul Calderone
A test for use_certificate_chain_file which fails just like add_extra_chain_cert |
656 |
client = Connection(clientContext, clientSocket) |
657 |
client.set_connect_state() |
|
658 |
||
659 |
# Make them talk to each other.
|
|
660 |
# self._interactInMemory(client, server)
|
|
661 |
for i in range(3): |
|
662 |
for s in [client, server]: |
|
663 |
try: |
|
664 |
s.do_handshake() |
|
665 |
except WantReadError: |
|
666 |
pass
|
|
667 |
||
668 |
||
669 |
def test_add_extra_chain_cert(self): |
|
670 |
"""
|
|
671 |
L{Context.add_extra_chain_cert} accepts an L{X509} instance to add to
|
|
672 |
the certificate chain.
|
|
673 |
||
674 |
See L{_create_certificate_chain} for the details of the certificate
|
|
675 |
chain tested.
|
|
676 |
||
677 |
The chain is tested by starting a server with scert and connecting
|
|
678 |
to it with a client which trusts cacert and requires verification to
|
|
679 |
succeed.
|
|
680 |
"""
|
|
681 |
chain = self._create_certificate_chain() |
|
682 |
[(cakey, cacert), (ikey, icert), (skey, scert)] = chain |
|
683 |
||
132.1.34
by Jean-Paul Calderone
Try for a real test of Context.add_extra_chain_cert; not working |
684 |
# Dump the CA certificate to a file because that's the only way to load
|
685 |
# it as a trusted CA in the client context.
|
|
686 |
for cert, name in [(cacert, 'ca.pem'), (icert, 'i.pem'), (scert, 's.pem')]: |
|
132.2.40
by Jean-Paul Calderone
Convert some bytes to text and vice versa |
687 |
fObj = open(name, 'w') |
132.2.41
by Jean-Paul Calderone
Convert some more bytes to text and vice versa |
688 |
fObj.write(dump_certificate(FILETYPE_PEM, cert).decode('ascii')) |
132.1.34
by Jean-Paul Calderone
Try for a real test of Context.add_extra_chain_cert; not working |
689 |
fObj.close() |
690 |
||
691 |
for key, name in [(cakey, 'ca.key'), (ikey, 'i.key'), (skey, 's.key')]: |
|
132.2.40
by Jean-Paul Calderone
Convert some bytes to text and vice versa |
692 |
fObj = open(name, 'w') |
132.2.41
by Jean-Paul Calderone
Convert some more bytes to text and vice versa |
693 |
fObj.write(dump_privatekey(FILETYPE_PEM, key).decode('ascii')) |
132.1.34
by Jean-Paul Calderone
Try for a real test of Context.add_extra_chain_cert; not working |
694 |
fObj.close() |
695 |
||
132.1.35
by Jean-Paul Calderone
A test for use_certificate_chain_file which fails just like add_extra_chain_cert |
696 |
# Create the server context
|
697 |
serverContext = Context(TLSv1_METHOD) |
|
698 |
serverContext.use_privatekey(skey) |
|
699 |
serverContext.use_certificate(scert) |
|
132.1.44
by Jean-Paul Calderone
Fix the scert.sign(skey, ...) bug; should have been scert.sign(ikey, ...) |
700 |
# The client already has cacert, we only need to give them icert.
|
132.1.35
by Jean-Paul Calderone
A test for use_certificate_chain_file which fails just like add_extra_chain_cert |
701 |
serverContext.add_extra_chain_cert(icert) |
702 |
||
132.1.34
by Jean-Paul Calderone
Try for a real test of Context.add_extra_chain_cert; not working |
703 |
# Create the client
|
704 |
clientContext = Context(TLSv1_METHOD) |
|
705 |
clientContext.set_verify( |
|
706 |
VERIFY_PEER | VERIFY_FAIL_IF_NO_PEER_CERT, verify_cb) |
|
707 |
clientContext.load_verify_locations('ca.pem') |
|
132.1.35
by Jean-Paul Calderone
A test for use_certificate_chain_file which fails just like add_extra_chain_cert |
708 |
|
709 |
# Try it out.
|
|
710 |
self._handshake_test(serverContext, clientContext) |
|
711 |
||
712 |
||
713 |
def test_use_certificate_chain_file(self): |
|
714 |
"""
|
|
715 |
L{Context.use_certificate_chain_file} reads a certificate chain from
|
|
716 |
the specified file.
|
|
717 |
||
718 |
The chain is tested by starting a server with scert and connecting
|
|
719 |
to it with a client which trusts cacert and requires verification to
|
|
720 |
succeed.
|
|
721 |
"""
|
|
722 |
chain = self._create_certificate_chain() |
|
723 |
[(cakey, cacert), (ikey, icert), (skey, scert)] = chain |
|
724 |
||
725 |
# Write out the chain file.
|
|
726 |
chainFile = self.mktemp() |
|
132.2.40
by Jean-Paul Calderone
Convert some bytes to text and vice versa |
727 |
fObj = open(chainFile, 'w') |
132.1.35
by Jean-Paul Calderone
A test for use_certificate_chain_file which fails just like add_extra_chain_cert |
728 |
# Most specific to least general.
|
132.2.40
by Jean-Paul Calderone
Convert some bytes to text and vice versa |
729 |
fObj.write(dump_certificate(FILETYPE_PEM, scert).decode('ascii')) |
730 |
fObj.write(dump_certificate(FILETYPE_PEM, icert).decode('ascii')) |
|
731 |
fObj.write(dump_certificate(FILETYPE_PEM, cacert).decode('ascii')) |
|
132.1.35
by Jean-Paul Calderone
A test for use_certificate_chain_file which fails just like add_extra_chain_cert |
732 |
fObj.close() |
733 |
||
734 |
serverContext = Context(TLSv1_METHOD) |
|
735 |
serverContext.use_certificate_chain_file(chainFile) |
|
736 |
serverContext.use_privatekey(skey) |
|
737 |
||
132.2.40
by Jean-Paul Calderone
Convert some bytes to text and vice versa |
738 |
fObj = open('ca.pem', 'w') |
739 |
fObj.write(dump_certificate(FILETYPE_PEM, cacert).decode('ascii')) |
|
132.1.35
by Jean-Paul Calderone
A test for use_certificate_chain_file which fails just like add_extra_chain_cert |
740 |
fObj.close() |
741 |
||
742 |
clientContext = Context(TLSv1_METHOD) |
|
743 |
clientContext.set_verify( |
|
744 |
VERIFY_PEER | VERIFY_FAIL_IF_NO_PEER_CERT, verify_cb) |
|
745 |
clientContext.load_verify_locations('ca.pem') |
|
746 |
||
747 |
self._handshake_test(serverContext, clientContext) |
|
748 |
||
132.1.42
by Jean-Paul Calderone
notes for later |
749 |
# XXX load_client_ca
|
750 |
# XXX set_session_id
|
|
132.1.51
by Jean-Paul Calderone
Add tests for get_verify_mode |
751 |
|
752 |
def test_get_verify_mode_wrong_args(self): |
|
753 |
"""
|
|
754 |
L{Context.get_verify_mode} raises L{TypeError} if called with any
|
|
755 |
arguments.
|
|
756 |
"""
|
|
757 |
context = Context(TLSv1_METHOD) |
|
758 |
self.assertRaises(TypeError, context.get_verify_mode, None) |
|
759 |
||
760 |
||
761 |
def test_get_verify_mode(self): |
|
762 |
"""
|
|
763 |
L{Context.get_verify_mode} returns the verify mode flags previously
|
|
764 |
passed to L{Context.set_verify}.
|
|
765 |
"""
|
|
766 |
context = Context(TLSv1_METHOD) |
|
767 |
self.assertEquals(context.get_verify_mode(), 0) |
|
768 |
context.set_verify( |
|
769 |
VERIFY_PEER | VERIFY_CLIENT_ONCE, lambda *args: None) |
|
770 |
self.assertEquals( |
|
771 |
context.get_verify_mode(), VERIFY_PEER | VERIFY_CLIENT_ONCE) |
|
772 |
||
132.1.52
by Jean-Paul Calderone
Sort of add a load_tmp_dh test; also a Context.set_cipher_list test |
773 |
|
774 |
def test_load_tmp_dh_wrong_args(self): |
|
775 |
"""
|
|
776 |
L{Context.load_tmp_dh} raises L{TypeError} if called with the wrong
|
|
777 |
number of arguments or with a non-C{str} argument.
|
|
778 |
"""
|
|
779 |
context = Context(TLSv1_METHOD) |
|
780 |
self.assertRaises(TypeError, context.load_tmp_dh) |
|
781 |
self.assertRaises(TypeError, context.load_tmp_dh, "foo", None) |
|
782 |
self.assertRaises(TypeError, context.load_tmp_dh, object()) |
|
783 |
||
784 |
||
785 |
def test_load_tmp_dh_missing_file(self): |
|
786 |
"""
|
|
787 |
L{Context.load_tmp_dh} raises L{OpenSSL.SSL.Error} if the specified file
|
|
788 |
does not exist.
|
|
789 |
"""
|
|
790 |
context = Context(TLSv1_METHOD) |
|
791 |
self.assertRaises(Error, context.load_tmp_dh, "hello") |
|
792 |
||
793 |
||
794 |
def test_load_tmp_dh(self): |
|
795 |
"""
|
|
796 |
L{Context.load_tmp_dh} loads Diffie-Hellman parameters from the
|
|
797 |
specified file.
|
|
798 |
"""
|
|
799 |
context = Context(TLSv1_METHOD) |
|
800 |
dhfilename = self.mktemp() |
|
801 |
dhfile = open(dhfilename, "w") |
|
802 |
dhfile.write(dhparam) |
|
803 |
dhfile.close() |
|
804 |
context.load_tmp_dh(dhfilename) |
|
805 |
# XXX What should I assert here? -exarkun
|
|
806 |
||
807 |
||
808 |
def test_set_cipher_list(self): |
|
809 |
"""
|
|
810 |
L{Context.set_cipher_list} accepts a C{str} naming the ciphers which
|
|
811 |
connections created with the context object will be able to choose from.
|
|
812 |
"""
|
|
813 |
context = Context(TLSv1_METHOD) |
|
814 |
context.set_cipher_list("hello world:EXP-RC4-MD5") |
|
815 |
conn = Connection(context, None) |
|
816 |
self.assertEquals(conn.get_cipher_list(), ["EXP-RC4-MD5"]) |
|
132.1.12
by Jean-Paul Calderone
Test for Connection.shutdown |
817 |
|
818 |
||
819 |
||
820 |
class ConnectionTests(TestCase, _LoopbackMixin): |
|
112.5.1
by Rick Dean
More unit tests about types |
821 |
"""
|
822 |
Unit tests for L{OpenSSL.SSL.Connection}.
|
|
823 |
"""
|
|
132.1.53
by Jean-Paul Calderone
Notes on further tests to write |
824 |
# XXX want_write
|
825 |
# XXX want_read
|
|
826 |
# XXX get_peer_certificate -> None
|
|
827 |
# XXX sock_shutdown
|
|
828 |
# XXX master_key -> TypeError
|
|
829 |
# XXX server_random -> TypeError
|
|
830 |
# XXX state_string
|
|
831 |
# XXX connect -> TypeError
|
|
832 |
# XXX connect_ex -> TypeError
|
|
833 |
# XXX set_connect_state -> TypeError
|
|
834 |
# XXX set_accept_state -> TypeError
|
|
835 |
# XXX renegotiate_pending
|
|
836 |
# XXX do_handshake -> TypeError
|
|
837 |
# XXX bio_read -> TypeError
|
|
838 |
# XXX recv -> TypeError
|
|
839 |
# XXX send -> TypeError
|
|
840 |
# XXX bio_write -> TypeError
|
|
841 |
||
112.5.1
by Rick Dean
More unit tests about types |
842 |
def test_type(self): |
843 |
"""
|
|
119
by Jean-Paul Calderone
Add a bunch of tests which assert that extension types are defined nicely. |
844 |
L{Connection} and L{ConnectionType} refer to the same type object and
|
845 |
can be used to create instances of that type.
|
|
112.5.1
by Rick Dean
More unit tests about types |
846 |
"""
|
119
by Jean-Paul Calderone
Add a bunch of tests which assert that extension types are defined nicely. |
847 |
self.assertIdentical(Connection, ConnectionType) |
112.5.1
by Rick Dean
More unit tests about types |
848 |
ctx = Context(TLSv1_METHOD) |
119
by Jean-Paul Calderone
Add a bunch of tests which assert that extension types are defined nicely. |
849 |
self.assertConsistentType(Connection, 'Connection', ctx, None) |
850 |
||
851 |
||
126
by Jean-Paul Calderone
Add a couple tests for Connection.get_context |
852 |
def test_get_context(self): |
853 |
"""
|
|
854 |
L{Connection.get_context} returns the L{Context} instance used to
|
|
855 |
construct the L{Connection} instance.
|
|
856 |
"""
|
|
857 |
context = Context(TLSv1_METHOD) |
|
858 |
connection = Connection(context, None) |
|
859 |
self.assertIdentical(connection.get_context(), context) |
|
860 |
||
861 |
||
862 |
def test_get_context_wrong_args(self): |
|
863 |
"""
|
|
864 |
L{Connection.get_context} raises L{TypeError} if called with any
|
|
865 |
arguments.
|
|
866 |
"""
|
|
867 |
connection = Connection(Context(TLSv1_METHOD), None) |
|
868 |
self.assertRaises(TypeError, connection.get_context, None) |
|
869 |
||
870 |
||
132.1.5
by Jean-Paul Calderone
Tests for Connection.pending |
871 |
def test_pending(self): |
132.1.46
by Jean-Paul Calderone
more docstrings |
872 |
"""
|
873 |
L{Connection.pending} returns the number of bytes available for
|
|
874 |
immediate read.
|
|
875 |
"""
|
|
132.1.5
by Jean-Paul Calderone
Tests for Connection.pending |
876 |
connection = Connection(Context(TLSv1_METHOD), None) |
877 |
self.assertEquals(connection.pending(), 0) |
|
878 |
||
879 |
||
880 |
def test_pending_wrong_args(self): |
|
132.1.46
by Jean-Paul Calderone
more docstrings |
881 |
"""
|
882 |
L{Connection.pending} raises L{TypeError} if called with any arguments.
|
|
883 |
"""
|
|
132.1.5
by Jean-Paul Calderone
Tests for Connection.pending |
884 |
connection = Connection(Context(TLSv1_METHOD), None) |
885 |
self.assertRaises(TypeError, connection.pending, None) |
|
886 |
||
887 |
||
132.1.13
by Jean-Paul Calderone
a slew of misc mostly wrong-args tests |
888 |
def test_connect_wrong_args(self): |
132.1.46
by Jean-Paul Calderone
more docstrings |
889 |
"""
|
890 |
L{Connection.connect} raises L{TypeError} if called with a non-address
|
|
891 |
argument or with the wrong number of arguments.
|
|
892 |
"""
|
|
132.1.13
by Jean-Paul Calderone
a slew of misc mostly wrong-args tests |
893 |
connection = Connection(Context(TLSv1_METHOD), socket()) |
894 |
self.assertRaises(TypeError, connection.connect, None) |
|
132.1.46
by Jean-Paul Calderone
more docstrings |
895 |
self.assertRaises(TypeError, connection.connect) |
896 |
self.assertRaises(TypeError, connection.connect, ("127.0.0.1", 1), None) |
|
132.1.13
by Jean-Paul Calderone
a slew of misc mostly wrong-args tests |
897 |
|
898 |
||
132.1.7
by Jean-Paul Calderone
test for successful and unsuccessful connect |
899 |
def test_connect_refused(self): |
132.1.46
by Jean-Paul Calderone
more docstrings |
900 |
"""
|
901 |
L{Connection.connect} raises L{socket.error} if the underlying socket
|
|
902 |
connect method raises it.
|
|
903 |
"""
|
|
132.1.7
by Jean-Paul Calderone
test for successful and unsuccessful connect |
904 |
client = socket() |
905 |
context = Context(TLSv1_METHOD) |
|
906 |
clientSSL = Connection(context, client) |
|
907 |
exc = self.assertRaises(error, clientSSL.connect, ("127.0.0.1", 1)) |
|
132.1.10
by Jean-Paul Calderone
Compat with pre-2.6 |
908 |
self.assertEquals(exc.args[0], ECONNREFUSED) |
132.1.7
by Jean-Paul Calderone
test for successful and unsuccessful connect |
909 |
|
910 |
||
911 |
def test_connect(self): |
|
132.1.46
by Jean-Paul Calderone
more docstrings |
912 |
"""
|
913 |
L{Connection.connect} establishes a connection to the specified address.
|
|
914 |
"""
|
|
132.1.8
by Jean-Paul Calderone
test for Connection.accept |
915 |
port = socket() |
916 |
port.bind(('', 0)) |
|
917 |
port.listen(3) |
|
918 |
||
919 |
clientSSL = Connection(Context(TLSv1_METHOD), socket()) |
|
135.1.3
by Jean-Paul Calderone
Hard-code loopback address because of Windows |
920 |
clientSSL.connect(('127.0.0.1', port.getsockname()[1])) |
921 |
# XXX An assertion? Or something?
|
|
132.1.8
by Jean-Paul Calderone
test for Connection.accept |
922 |
|
923 |
||
132.3.1
by Jean-Paul Calderone
Skip test_connect_ex on OS X |
924 |
if platform == "darwin": |
925 |
"connect_ex sometimes causes a kernel panic on OS X 10.6.4"
|
|
926 |
else: |
|
927 |
def test_connect_ex(self): |
|
928 |
"""
|
|
929 |
If there is a connection error, L{Connection.connect_ex} returns the
|
|
930 |
errno instead of raising an exception.
|
|
931 |
"""
|
|
932 |
port = socket() |
|
933 |
port.bind(('', 0)) |
|
934 |
port.listen(3) |
|
132.1.11
by Jean-Paul Calderone
test for connect_ex |
935 |
|
132.3.1
by Jean-Paul Calderone
Skip test_connect_ex on OS X |
936 |
clientSSL = Connection(Context(TLSv1_METHOD), socket()) |
937 |
clientSSL.setblocking(False) |
|
938 |
result = clientSSL.connect_ex(port.getsockname()) |
|
939 |
expected = (EINPROGRESS, EWOULDBLOCK) |
|
940 |
self.assertTrue( |
|
941 |
result in expected, "%r not in %r" % (result, expected)) |
|
132.1.11
by Jean-Paul Calderone
test for connect_ex |
942 |
|
943 |
||
132.1.13
by Jean-Paul Calderone
a slew of misc mostly wrong-args tests |
944 |
def test_accept_wrong_args(self): |
132.1.46
by Jean-Paul Calderone
more docstrings |
945 |
"""
|
946 |
L{Connection.accept} raises L{TypeError} if called with any arguments.
|
|
947 |
"""
|
|
132.1.13
by Jean-Paul Calderone
a slew of misc mostly wrong-args tests |
948 |
connection = Connection(Context(TLSv1_METHOD), socket()) |
949 |
self.assertRaises(TypeError, connection.accept, None) |
|
950 |
||
951 |
||
132.1.8
by Jean-Paul Calderone
test for Connection.accept |
952 |
def test_accept(self): |
132.1.46
by Jean-Paul Calderone
more docstrings |
953 |
"""
|
954 |
L{Connection.accept} accepts a pending connection attempt and returns a
|
|
955 |
tuple of a new L{Connection} (the accepted client) and the address the
|
|
956 |
connection originated from.
|
|
957 |
"""
|
|
132.1.7
by Jean-Paul Calderone
test for successful and unsuccessful connect |
958 |
ctx = Context(TLSv1_METHOD) |
959 |
ctx.use_privatekey(load_privatekey(FILETYPE_PEM, server_key_pem)) |
|
960 |
ctx.use_certificate(load_certificate(FILETYPE_PEM, server_cert_pem)) |
|
132.1.8
by Jean-Paul Calderone
test for Connection.accept |
961 |
port = socket() |
962 |
portSSL = Connection(ctx, port) |
|
963 |
portSSL.bind(('', 0)) |
|
964 |
portSSL.listen(3) |
|
965 |
||
966 |
clientSSL = Connection(Context(TLSv1_METHOD), socket()) |
|
135.1.3
by Jean-Paul Calderone
Hard-code loopback address because of Windows |
967 |
|
968 |
# Calling portSSL.getsockname() here to get the server IP address sounds
|
|
969 |
# great, but frequently fails on Windows.
|
|
970 |
clientSSL.connect(('127.0.0.1', portSSL.getsockname()[1])) |
|
132.1.8
by Jean-Paul Calderone
test for Connection.accept |
971 |
|
972 |
serverSSL, address = portSSL.accept() |
|
973 |
||
974 |
self.assertTrue(isinstance(serverSSL, Connection)) |
|
975 |
self.assertIdentical(serverSSL.get_context(), ctx) |
|
976 |
self.assertEquals(address, clientSSL.getsockname()) |
|
132.1.7
by Jean-Paul Calderone
test for successful and unsuccessful connect |
977 |
|
978 |
||
132.1.13
by Jean-Paul Calderone
a slew of misc mostly wrong-args tests |
979 |
def test_shutdown_wrong_args(self): |
132.1.46
by Jean-Paul Calderone
more docstrings |
980 |
"""
|
981 |
L{Connection.shutdown} raises L{TypeError} if called with the wrong
|
|
982 |
number of arguments or with arguments other than integers.
|
|
983 |
"""
|
|
132.1.13
by Jean-Paul Calderone
a slew of misc mostly wrong-args tests |
984 |
connection = Connection(Context(TLSv1_METHOD), None) |
985 |
self.assertRaises(TypeError, connection.shutdown, None) |
|
132.1.16
by Jean-Paul Calderone
more wrong args tests |
986 |
self.assertRaises(TypeError, connection.get_shutdown, None) |
987 |
self.assertRaises(TypeError, connection.set_shutdown) |
|
988 |
self.assertRaises(TypeError, connection.set_shutdown, None) |
|
989 |
self.assertRaises(TypeError, connection.set_shutdown, 0, 1) |
|
132.1.13
by Jean-Paul Calderone
a slew of misc mostly wrong-args tests |
990 |
|
991 |
||
132.1.12
by Jean-Paul Calderone
Test for Connection.shutdown |
992 |
def test_shutdown(self): |
132.1.46
by Jean-Paul Calderone
more docstrings |
993 |
"""
|
994 |
L{Connection.shutdown} performs an SSL-level connection shutdown.
|
|
995 |
"""
|
|
132.1.12
by Jean-Paul Calderone
Test for Connection.shutdown |
996 |
server, client = self._loopback() |
132.1.14
by Jean-Paul Calderone
somewhat more coverage in test_shutdown |
997 |
self.assertFalse(server.shutdown()) |
998 |
self.assertEquals(server.get_shutdown(), SENT_SHUTDOWN) |
|
132.1.12
by Jean-Paul Calderone
Test for Connection.shutdown |
999 |
self.assertRaises(ZeroReturnError, client.recv, 1024) |
132.1.14
by Jean-Paul Calderone
somewhat more coverage in test_shutdown |
1000 |
self.assertEquals(client.get_shutdown(), RECEIVED_SHUTDOWN) |
1001 |
client.shutdown() |
|
1002 |
self.assertEquals(client.get_shutdown(), SENT_SHUTDOWN|RECEIVED_SHUTDOWN) |
|
1003 |
self.assertRaises(ZeroReturnError, server.recv, 1024) |
|
1004 |
self.assertEquals(server.get_shutdown(), SENT_SHUTDOWN|RECEIVED_SHUTDOWN) |
|
132.1.12
by Jean-Paul Calderone
Test for Connection.shutdown |
1005 |
|
1006 |
||
132.1.15
by Jean-Paul Calderone
test for Connection.set_shutdown |
1007 |
def test_set_shutdown(self): |
132.1.46
by Jean-Paul Calderone
more docstrings |
1008 |
"""
|
1009 |
L{Connection.set_shutdown} sets the state of the SSL connection shutdown
|
|
1010 |
process.
|
|
1011 |
"""
|
|
132.1.15
by Jean-Paul Calderone
test for Connection.set_shutdown |
1012 |
connection = Connection(Context(TLSv1_METHOD), socket()) |
1013 |
connection.set_shutdown(RECEIVED_SHUTDOWN) |
|
1014 |
self.assertEquals(connection.get_shutdown(), RECEIVED_SHUTDOWN) |
|
1015 |
||
1016 |
||
132.1.13
by Jean-Paul Calderone
a slew of misc mostly wrong-args tests |
1017 |
def test_app_data_wrong_args(self): |
132.1.46
by Jean-Paul Calderone
more docstrings |
1018 |
"""
|
1019 |
L{Connection.set_app_data} raises L{TypeError} if called with other than
|
|
1020 |
one argument. L{Connection.get_app_data} raises L{TypeError} if called
|
|
1021 |
with any arguments.
|
|
1022 |
"""
|
|
132.1.13
by Jean-Paul Calderone
a slew of misc mostly wrong-args tests |
1023 |
conn = Connection(Context(TLSv1_METHOD), None) |
1024 |
self.assertRaises(TypeError, conn.get_app_data, None) |
|
1025 |
self.assertRaises(TypeError, conn.set_app_data) |
|
1026 |
self.assertRaises(TypeError, conn.set_app_data, None, None) |
|
1027 |
||
1028 |
||
132.1.9
by Jean-Paul Calderone
get_app_data and set_app_data |
1029 |
def test_app_data(self): |
132.1.46
by Jean-Paul Calderone
more docstrings |
1030 |
"""
|
1031 |
Any object can be set as app data by passing it to
|
|
1032 |
L{Connection.set_app_data} and later retrieved with
|
|
1033 |
L{Connection.get_app_data}.
|
|
1034 |
"""
|
|
132.1.9
by Jean-Paul Calderone
get_app_data and set_app_data |
1035 |
conn = Connection(Context(TLSv1_METHOD), None) |
1036 |
app_data = object() |
|
1037 |
conn.set_app_data(app_data) |
|
1038 |
self.assertIdentical(conn.get_app_data(), app_data) |
|
1039 |
||
1040 |
||
132.1.13
by Jean-Paul Calderone
a slew of misc mostly wrong-args tests |
1041 |
def test_makefile(self): |
132.1.46
by Jean-Paul Calderone
more docstrings |
1042 |
"""
|
1043 |
L{Connection.makefile} is not implemented and calling that method raises
|
|
1044 |
L{NotImplementedError}.
|
|
1045 |
"""
|
|
132.1.13
by Jean-Paul Calderone
a slew of misc mostly wrong-args tests |
1046 |
conn = Connection(Context(TLSv1_METHOD), None) |
1047 |
self.assertRaises(NotImplementedError, conn.makefile) |
|
1048 |
||
1049 |
||
119
by Jean-Paul Calderone
Add a bunch of tests which assert that extension types are defined nicely. |
1050 |
|
132.1.4
by Jean-Paul Calderone
simple tests for get_cipher_list |
1051 |
class ConnectionGetCipherListTests(TestCase): |
132.1.50
by Jean-Paul Calderone
Some more test method docstrings |
1052 |
"""
|
1053 |
Tests for L{Connection.get_cipher_list}.
|
|
1054 |
"""
|
|
132.1.48
by Jean-Paul Calderone
Rename test method |
1055 |
def test_wrong_args(self): |
132.1.50
by Jean-Paul Calderone
Some more test method docstrings |
1056 |
"""
|
1057 |
L{Connection.get_cipher_list} raises L{TypeError} if called with any
|
|
1058 |
arguments.
|
|
1059 |
"""
|
|
132.1.4
by Jean-Paul Calderone
simple tests for get_cipher_list |
1060 |
connection = Connection(Context(TLSv1_METHOD), None) |
1061 |
self.assertRaises(TypeError, connection.get_cipher_list, None) |
|
1062 |
||
1063 |
||
1064 |
def test_result(self): |
|
132.1.50
by Jean-Paul Calderone
Some more test method docstrings |
1065 |
"""
|
1066 |
L{Connection.get_cipher_list} returns a C{list} of C{str} giving the
|
|
1067 |
names of the ciphers which might be used.
|
|
1068 |
"""
|
|
132.1.4
by Jean-Paul Calderone
simple tests for get_cipher_list |
1069 |
connection = Connection(Context(TLSv1_METHOD), None) |
1070 |
ciphers = connection.get_cipher_list() |
|
1071 |
self.assertTrue(isinstance(ciphers, list)) |
|
1072 |
for cipher in ciphers: |
|
1073 |
self.assertTrue(isinstance(cipher, str)) |
|
1074 |
||
1075 |
||
1076 |
||
132.1.6
by Jean-Paul Calderone
Try testing renegotiation some; not much luck |
1077 |
class ConnectionSendallTests(TestCase, _LoopbackMixin): |
1078 |
"""
|
|
1079 |
Tests for L{Connection.sendall}.
|
|
1080 |
"""
|
|
132.1.50
by Jean-Paul Calderone
Some more test method docstrings |
1081 |
def test_wrong_args(self): |
132.1.6
by Jean-Paul Calderone
Try testing renegotiation some; not much luck |
1082 |
"""
|
1083 |
When called with arguments other than a single string,
|
|
1084 |
L{Connection.sendall} raises L{TypeError}.
|
|
1085 |
"""
|
|
1086 |
connection = Connection(Context(TLSv1_METHOD), None) |
|
1087 |
self.assertRaises(TypeError, connection.sendall) |
|
1088 |
self.assertRaises(TypeError, connection.sendall, object()) |
|
1089 |
self.assertRaises(TypeError, connection.sendall, "foo", "bar") |
|
1090 |
||
1091 |
||
132.1.1
by Jean-Paul Calderone
Some tests which are supposed to exercise the paths through Connection.sendall |
1092 |
def test_short(self): |
1093 |
"""
|
|
1094 |
L{Connection.sendall} transmits all of the bytes in the string passed to
|
|
1095 |
it.
|
|
1096 |
"""
|
|
1097 |
server, client = self._loopback() |
|
132.2.40
by Jean-Paul Calderone
Convert some bytes to text and vice versa |
1098 |
server.sendall(b('x')) |
1099 |
self.assertEquals(client.recv(1), b('x')) |
|
132.1.1
by Jean-Paul Calderone
Some tests which are supposed to exercise the paths through Connection.sendall |
1100 |
|
1101 |
||
1102 |
def test_long(self): |
|
132.1.50
by Jean-Paul Calderone
Some more test method docstrings |
1103 |
"""
|
1104 |
L{Connection.sendall} transmits all of the bytes in the string passed to
|
|
1105 |
it even if this requires multiple calls of an underlying write function.
|
|
1106 |
"""
|
|
132.1.1
by Jean-Paul Calderone
Some tests which are supposed to exercise the paths through Connection.sendall |
1107 |
server, client = self._loopback() |
134.1.3
by Jean-Paul Calderone
Tweak the write sizer |
1108 |
# Should be enough, underlying SSL_write should only do 16k at a time.
|
1109 |
# On Windows, after 32k of bytes the write will block (forever - because
|
|
1110 |
# no one is yet reading).
|
|
132.2.46
by Jean-Paul Calderone
merge trunk |
1111 |
message = b('x') * (1024 * 32 - 1) + b('y') |
132.1.1
by Jean-Paul Calderone
Some tests which are supposed to exercise the paths through Connection.sendall |
1112 |
server.sendall(message) |
1113 |
accum = [] |
|
1114 |
received = 0 |
|
1115 |
while received < len(message): |
|
132.2.41
by Jean-Paul Calderone
Convert some more bytes to text and vice versa |
1116 |
data = client.recv(1024) |
1117 |
accum.append(data) |
|
1118 |
received += len(data) |
|
132.2.47
by Jean-Paul Calderone
Avoid bytes literal |
1119 |
self.assertEquals(message, b('').join(accum)) |
132.1.1
by Jean-Paul Calderone
Some tests which are supposed to exercise the paths through Connection.sendall |
1120 |
|
1121 |
||
132.1.3
by Jean-Paul Calderone
Test a low-level failure in SSL_write |
1122 |
def test_closed(self): |
1123 |
"""
|
|
1124 |
If the underlying socket is closed, L{Connection.sendall} propagates the
|
|
1125 |
write error from the low level write call.
|
|
1126 |
"""
|
|
1127 |
server, client = self._loopback() |
|
132.3.2
by Jean-Paul Calderone
Try triggering the error by shutting down the sending underlying TCP socket |
1128 |
server.sock_shutdown(2) |
132.1.3
by Jean-Paul Calderone
Test a low-level failure in SSL_write |
1129 |
self.assertRaises(SysCallError, server.sendall, "hello, world") |
1130 |
||
132.1.1
by Jean-Paul Calderone
Some tests which are supposed to exercise the paths through Connection.sendall |
1131 |
|
132.1.5
by Jean-Paul Calderone
Tests for Connection.pending |
1132 |
|
132.1.6
by Jean-Paul Calderone
Try testing renegotiation some; not much luck |
1133 |
class ConnectionRenegotiateTests(TestCase, _LoopbackMixin): |
1134 |
"""
|
|
1135 |
Tests for SSL renegotiation APIs.
|
|
1136 |
"""
|
|
1137 |
def test_renegotiate_wrong_args(self): |
|
132.1.50
by Jean-Paul Calderone
Some more test method docstrings |
1138 |
"""
|
1139 |
L{Connection.renegotiate} raises L{TypeError} if called with any
|
|
1140 |
arguments.
|
|
1141 |
"""
|
|
132.1.6
by Jean-Paul Calderone
Try testing renegotiation some; not much luck |
1142 |
connection = Connection(Context(TLSv1_METHOD), None) |
1143 |
self.assertRaises(TypeError, connection.renegotiate, None) |
|
1144 |
||
1145 |
||
132.1.13
by Jean-Paul Calderone
a slew of misc mostly wrong-args tests |
1146 |
def test_total_renegotiations_wrong_args(self): |
132.1.50
by Jean-Paul Calderone
Some more test method docstrings |
1147 |
"""
|
1148 |
L{Connection.total_renegotiations} raises L{TypeError} if called with
|
|
1149 |
any arguments.
|
|
1150 |
"""
|
|
132.1.13
by Jean-Paul Calderone
a slew of misc mostly wrong-args tests |
1151 |
connection = Connection(Context(TLSv1_METHOD), None) |
1152 |
self.assertRaises(TypeError, connection.total_renegotiations, None) |
|
1153 |
||
1154 |
||
1155 |
def test_total_renegotiations(self): |
|
132.1.50
by Jean-Paul Calderone
Some more test method docstrings |
1156 |
"""
|
1157 |
L{Connection.total_renegotiations} returns C{0} before any
|
|
1158 |
renegotiations have happened.
|
|
1159 |
"""
|
|
132.1.13
by Jean-Paul Calderone
a slew of misc mostly wrong-args tests |
1160 |
connection = Connection(Context(TLSv1_METHOD), None) |
1161 |
self.assertEquals(connection.total_renegotiations(), 0) |
|
1162 |
||
1163 |
||
132.1.6
by Jean-Paul Calderone
Try testing renegotiation some; not much luck |
1164 |
# def test_renegotiate(self):
|
1165 |
# """
|
|
1166 |
# """
|
|
1167 |
# server, client = self._loopback()
|
|
1168 |
||
1169 |
# server.send("hello world")
|
|
1170 |
# self.assertEquals(client.recv(len("hello world")), "hello world")
|
|
1171 |
||
1172 |
# self.assertEquals(server.total_renegotiations(), 0)
|
|
1173 |
# self.assertTrue(server.renegotiate())
|
|
1174 |
||
1175 |
# server.setblocking(False)
|
|
1176 |
# client.setblocking(False)
|
|
1177 |
# while server.renegotiate_pending():
|
|
1178 |
# client.do_handshake()
|
|
1179 |
# server.do_handshake()
|
|
1180 |
||
1181 |
# self.assertEquals(server.total_renegotiations(), 1)
|
|
1182 |
||
1183 |
||
1184 |
||
1185 |
||
119
by Jean-Paul Calderone
Add a bunch of tests which assert that extension types are defined nicely. |
1186 |
class ErrorTests(TestCase): |
1187 |
"""
|
|
1188 |
Unit tests for L{OpenSSL.SSL.Error}.
|
|
1189 |
"""
|
|
1190 |
def test_type(self): |
|
1191 |
"""
|
|
1192 |
L{Error} is an exception type.
|
|
1193 |
"""
|
|
1194 |
self.assertTrue(issubclass(Error, Exception)) |
|
112.5.1
by Rick Dean
More unit tests about types |
1195 |
self.assertEqual(Error.__name__, 'Error') |
1196 |
||
1197 |
||
1198 |
||
83
by Jean-Paul Calderone
Expose some new DTLS-related constants in OpenSSL.SSL - OP_NO_QUERY_MTU, OP_COOKIE_EXCHANGE, and OP_NO_TICKET. |
1199 |
class ConstantsTests(TestCase): |
1200 |
"""
|
|
1201 |
Tests for the values of constants exposed in L{OpenSSL.SSL}.
|
|
1202 |
||
1203 |
These are values defined by OpenSSL intended only to be used as flags to
|
|
1204 |
OpenSSL APIs. The only assertions it seems can be made about them is
|
|
1205 |
their values.
|
|
1206 |
"""
|
|
87
by Jean-Paul Calderone
work around stupid feature deficiencies in unittest.py |
1207 |
# unittest.TestCase has no skip mechanism
|
1208 |
if OP_NO_QUERY_MTU is not None: |
|
1209 |
def test_op_no_query_mtu(self): |
|
1210 |
"""
|
|
1211 |
The value of L{OpenSSL.SSL.OP_NO_QUERY_MTU} is 0x1000, the value of
|
|
1212 |
I{SSL_OP_NO_QUERY_MTU} defined by I{openssl/ssl.h}.
|
|
1213 |
"""
|
|
1214 |
self.assertEqual(OP_NO_QUERY_MTU, 0x1000) |
|
1215 |
else: |
|
1216 |
"OP_NO_QUERY_MTU unavailable - OpenSSL version may be too old"
|
|
1217 |
||
1218 |
||
1219 |
if OP_COOKIE_EXCHANGE is not None: |
|
1220 |
def test_op_cookie_exchange(self): |
|
1221 |
"""
|
|
1222 |
The value of L{OpenSSL.SSL.OP_COOKIE_EXCHANGE} is 0x2000, the value
|
|
1223 |
of I{SSL_OP_COOKIE_EXCHANGE} defined by I{openssl/ssl.h}.
|
|
1224 |
"""
|
|
1225 |
self.assertEqual(OP_COOKIE_EXCHANGE, 0x2000) |
|
1226 |
else: |
|
1227 |
"OP_COOKIE_EXCHANGE unavailable - OpenSSL version may be too old"
|
|
1228 |
||
1229 |
||
1230 |
if OP_NO_TICKET is not None: |
|
1231 |
def test_op_no_ticket(self): |
|
1232 |
"""
|
|
1233 |
The value of L{OpenSSL.SSL.OP_NO_TICKET} is 0x4000, the value of
|
|
1234 |
I{SSL_OP_NO_TICKET} defined by I{openssl/ssl.h}.
|
|
1235 |
"""
|
|
1236 |
self.assertEqual(OP_NO_TICKET, 0x4000) |
|
1237 |
else: |
|
1238 |
"OP_NO_TICKET unavailable - OpenSSL version may be too old"
|
|
98.1.1
by Rick Dean
FILETYPE_TEXT for dumping certificates, private keys, and certificate signing requests. |
1239 |
|
1240 |
||
101.1.1
by Jean-Paul Calderone
Merge rick's bio_read_write branch |
1241 |
|
132.1.34
by Jean-Paul Calderone
Try for a real test of Context.add_extra_chain_cert; not working |
1242 |
class MemoryBIOTests(TestCase, _LoopbackMixin): |
101.1.3
by Jean-Paul Calderone
A few cleanups and simplifications to the mem bio tests |
1243 |
"""
|
1244 |
Tests for L{OpenSSL.SSL.Connection} using a memory BIO.
|
|
1245 |
"""
|
|
112.1.5
by Jean-Paul Calderone
Give _server and _client docstrings and make their sock parameter required; pass None to these methods from the existing memory bio tests; update test method names to match the prevailing test method naming convention (which is inconsistent with the rest of the naming convention in pyOpenSSL, oh my). |
1246 |
def _server(self, sock): |
1247 |
"""
|
|
1248 |
Create a new server-side SSL L{Connection} object wrapped around
|
|
1249 |
C{sock}.
|
|
1250 |
"""
|
|
101.1.4
by Jean-Paul Calderone
Refactor the existing memory bio tests a bit; add a new test which exercises the short-write case; change the error handling code in bio_read and bio_write to *not* use SSL_get_error, as these functions do nothing with SSL_* APIs |
1251 |
# Create the server side Connection. This is mostly setup boilerplate
|
1252 |
# - use TLSv1, use a particular certificate, etc.
|
|
1253 |
server_ctx = Context(TLSv1_METHOD) |
|
1254 |
server_ctx.set_options(OP_NO_SSLv2 | OP_NO_SSLv3 | OP_SINGLE_DH_USE ) |
|
1255 |
server_ctx.set_verify(VERIFY_PEER|VERIFY_FAIL_IF_NO_PEER_CERT|VERIFY_CLIENT_ONCE, verify_cb) |
|
1256 |
server_store = server_ctx.get_cert_store() |
|
1257 |
server_ctx.use_privatekey(load_privatekey(FILETYPE_PEM, server_key_pem)) |
|
1258 |
server_ctx.use_certificate(load_certificate(FILETYPE_PEM, server_cert_pem)) |
|
1259 |
server_ctx.check_privatekey() |
|
1260 |
server_store.add_cert(load_certificate(FILETYPE_PEM, root_cert_pem)) |
|
112.1.1
by Rick Dean
Add a new test case of connecting to ourself over an actual socket to pass data. |
1261 |
# Here the Connection is actually created. If None is passed as the 2nd
|
1262 |
# parameter, it indicates a memory BIO should be created.
|
|
1263 |
server_conn = Connection(server_ctx, sock) |
|
101.1.4
by Jean-Paul Calderone
Refactor the existing memory bio tests a bit; add a new test which exercises the short-write case; change the error handling code in bio_read and bio_write to *not* use SSL_get_error, as these functions do nothing with SSL_* APIs |
1264 |
server_conn.set_accept_state() |
1265 |
return server_conn |
|
1266 |
||
1267 |
||
112.1.5
by Jean-Paul Calderone
Give _server and _client docstrings and make their sock parameter required; pass None to these methods from the existing memory bio tests; update test method names to match the prevailing test method naming convention (which is inconsistent with the rest of the naming convention in pyOpenSSL, oh my). |
1268 |
def _client(self, sock): |
1269 |
"""
|
|
1270 |
Create a new client-side SSL L{Connection} object wrapped around
|
|
1271 |
C{sock}.
|
|
1272 |
"""
|
|
1273 |
# Now create the client side Connection. Similar boilerplate to the
|
|
1274 |
# above.
|
|
101.1.4
by Jean-Paul Calderone
Refactor the existing memory bio tests a bit; add a new test which exercises the short-write case; change the error handling code in bio_read and bio_write to *not* use SSL_get_error, as these functions do nothing with SSL_* APIs |
1275 |
client_ctx = Context(TLSv1_METHOD) |
1276 |
client_ctx.set_options(OP_NO_SSLv2 | OP_NO_SSLv3 | OP_SINGLE_DH_USE ) |
|
1277 |
client_ctx.set_verify(VERIFY_PEER|VERIFY_FAIL_IF_NO_PEER_CERT|VERIFY_CLIENT_ONCE, verify_cb) |
|
1278 |
client_store = client_ctx.get_cert_store() |
|
1279 |
client_ctx.use_privatekey(load_privatekey(FILETYPE_PEM, client_key_pem)) |
|
1280 |
client_ctx.use_certificate(load_certificate(FILETYPE_PEM, client_cert_pem)) |
|
1281 |
client_ctx.check_privatekey() |
|
1282 |
client_store.add_cert(load_certificate(FILETYPE_PEM, root_cert_pem)) |
|
112.1.1
by Rick Dean
Add a new test case of connecting to ourself over an actual socket to pass data. |
1283 |
client_conn = Connection(client_ctx, sock) |
101.1.4
by Jean-Paul Calderone
Refactor the existing memory bio tests a bit; add a new test which exercises the short-write case; change the error handling code in bio_read and bio_write to *not* use SSL_get_error, as these functions do nothing with SSL_* APIs |
1284 |
client_conn.set_connect_state() |
1285 |
return client_conn |
|
1286 |
||
1287 |
||
112.1.5
by Jean-Paul Calderone
Give _server and _client docstrings and make their sock parameter required; pass None to these methods from the existing memory bio tests; update test method names to match the prevailing test method naming convention (which is inconsistent with the rest of the naming convention in pyOpenSSL, oh my). |
1288 |
def test_memoryConnect(self): |
101.1.3
by Jean-Paul Calderone
A few cleanups and simplifications to the mem bio tests |
1289 |
"""
|
1290 |
Two L{Connection}s which use memory BIOs can be manually connected by
|
|
1291 |
reading from the output of each and writing those bytes to the input of
|
|
1292 |
the other and in this way establish a connection and exchange
|
|
1293 |
application-level bytes with each other.
|
|
1294 |
"""
|
|
112.1.5
by Jean-Paul Calderone
Give _server and _client docstrings and make their sock parameter required; pass None to these methods from the existing memory bio tests; update test method names to match the prevailing test method naming convention (which is inconsistent with the rest of the naming convention in pyOpenSSL, oh my). |
1295 |
server_conn = self._server(None) |
1296 |
client_conn = self._client(None) |
|
98.2.1
by Rick Dean
BIO read/write to allow for socketless SSL Connections |
1297 |
|
101.1.3
by Jean-Paul Calderone
A few cleanups and simplifications to the mem bio tests |
1298 |
# There should be no key or nonces yet.
|
1299 |
self.assertIdentical(server_conn.master_key(), None) |
|
1300 |
self.assertIdentical(server_conn.client_random(), None) |
|
1301 |
self.assertIdentical(server_conn.server_random(), None) |
|
1302 |
||
1303 |
# First, the handshake needs to happen. We'll deliver bytes back and
|
|
1304 |
# forth between the client and server until neither of them feels like
|
|
1305 |
# speaking any more.
|
|
132.1.34
by Jean-Paul Calderone
Try for a real test of Context.add_extra_chain_cert; not working |
1306 |
self.assertIdentical( |
1307 |
self._interactInMemory(client_conn, server_conn), None) |
|
101.1.3
by Jean-Paul Calderone
A few cleanups and simplifications to the mem bio tests |
1308 |
|
1309 |
# Now that the handshake is done, there should be a key and nonces.
|
|
1310 |
self.assertNotIdentical(server_conn.master_key(), None) |
|
1311 |
self.assertNotIdentical(server_conn.client_random(), None) |
|
1312 |
self.assertNotIdentical(server_conn.server_random(), None) |
|
112
by Jean-Paul Calderone
Fix assertions about client and server nonces |
1313 |
self.assertEquals(server_conn.client_random(), client_conn.client_random()) |
1314 |
self.assertEquals(server_conn.server_random(), client_conn.server_random()) |
|
1315 |
self.assertNotEquals(server_conn.client_random(), server_conn.server_random()) |
|
1316 |
self.assertNotEquals(client_conn.client_random(), client_conn.server_random()) |
|
101.1.3
by Jean-Paul Calderone
A few cleanups and simplifications to the mem bio tests |
1317 |
|
1318 |
# Here are the bytes we'll try to send.
|
|
132.2.39
by Jean-Paul Calderone
Convert a couple more things which should be byte strings into byte strings |
1319 |
important_message = b('One if by land, two if by sea.') |
98.2.1
by Rick Dean
BIO read/write to allow for socketless SSL Connections |
1320 |
|
101.1.3
by Jean-Paul Calderone
A few cleanups and simplifications to the mem bio tests |
1321 |
server_conn.write(important_message) |
1322 |
self.assertEquals( |
|
132.1.34
by Jean-Paul Calderone
Try for a real test of Context.add_extra_chain_cert; not working |
1323 |
self._interactInMemory(client_conn, server_conn), |
101.1.3
by Jean-Paul Calderone
A few cleanups and simplifications to the mem bio tests |
1324 |
(client_conn, important_message)) |
1325 |
||
1326 |
client_conn.write(important_message[::-1]) |
|
1327 |
self.assertEquals( |
|
132.1.34
by Jean-Paul Calderone
Try for a real test of Context.add_extra_chain_cert; not working |
1328 |
self._interactInMemory(client_conn, server_conn), |
101.1.3
by Jean-Paul Calderone
A few cleanups and simplifications to the mem bio tests |
1329 |
(server_conn, important_message[::-1])) |
98.2.1
by Rick Dean
BIO read/write to allow for socketless SSL Connections |
1330 |
|
1331 |
||
112.1.5
by Jean-Paul Calderone
Give _server and _client docstrings and make their sock parameter required; pass None to these methods from the existing memory bio tests; update test method names to match the prevailing test method naming convention (which is inconsistent with the rest of the naming convention in pyOpenSSL, oh my). |
1332 |
def test_socketConnect(self): |
112.1.1
by Rick Dean
Add a new test case of connecting to ourself over an actual socket to pass data. |
1333 |
"""
|
112.1.5
by Jean-Paul Calderone
Give _server and _client docstrings and make their sock parameter required; pass None to these methods from the existing memory bio tests; update test method names to match the prevailing test method naming convention (which is inconsistent with the rest of the naming convention in pyOpenSSL, oh my). |
1334 |
Just like L{test_memoryConnect} but with an actual socket.
|
1335 |
||
1336 |
This is primarily to rule out the memory BIO code as the source of
|
|
1337 |
any problems encountered while passing data over a L{Connection} (if
|
|
1338 |
this test fails, there must be a problem outside the memory BIO
|
|
1339 |
code, as no memory BIO is involved here). Even though this isn't a
|
|
1340 |
memory BIO test, it's convenient to have it here.
|
|
112.1.1
by Rick Dean
Add a new test case of connecting to ourself over an actual socket to pass data. |
1341 |
"""
|
132.3.4
by Jean-Paul Calderone
Switch to the loopback setup helper in test_socketConnect, incidentally converting the connections to blocking, which avoids the OS X recv error. |
1342 |
server_conn, client_conn = self._loopback() |
112.1.1
by Rick Dean
Add a new test case of connecting to ourself over an actual socket to pass data. |
1343 |
|
132.2.39
by Jean-Paul Calderone
Convert a couple more things which should be byte strings into byte strings |
1344 |
important_message = b("Help me Obi Wan Kenobi, you're my only hope.") |
112.1.1
by Rick Dean
Add a new test case of connecting to ourself over an actual socket to pass data. |
1345 |
client_conn.send(important_message) |
1346 |
msg = server_conn.recv(1024) |
|
1347 |
self.assertEqual(msg, important_message) |
|
1348 |
||
1349 |
# Again in the other direction, just for fun.
|
|
1350 |
important_message = important_message[::-1] |
|
1351 |
server_conn.send(important_message) |
|
1352 |
msg = client_conn.recv(1024) |
|
1353 |
self.assertEqual(msg, important_message) |
|
1354 |
||
1355 |
||
101.1.2
by Jean-Paul Calderone
Correct a few minor bugs |
1356 |
def test_socketOverridesMemory(self): |
98.2.1
by Rick Dean
BIO read/write to allow for socketless SSL Connections |
1357 |
"""
|
1358 |
Test that L{OpenSSL.SSL.bio_read} and L{OpenSSL.SSL.bio_write} don't
|
|
1359 |
work on L{OpenSSL.SSL.Connection}() that use sockets.
|
|
1360 |
"""
|
|
1361 |
context = Context(SSLv3_METHOD) |
|
1362 |
client = socket() |
|
1363 |
clientSSL = Connection(context, client) |
|
1364 |
self.assertRaises( TypeError, clientSSL.bio_read, 100) |
|
1365 |
self.assertRaises( TypeError, clientSSL.bio_write, "foo") |
|
101.1.9
by Jean-Paul Calderone
Make bio_shutdown raise an exception when called on a socket-based Connection |
1366 |
self.assertRaises( TypeError, clientSSL.bio_shutdown ) |
101.1.4
by Jean-Paul Calderone
Refactor the existing memory bio tests a bit; add a new test which exercises the short-write case; change the error handling code in bio_read and bio_write to *not* use SSL_get_error, as these functions do nothing with SSL_* APIs |
1367 |
|
1368 |
||
1369 |
def test_outgoingOverflow(self): |
|
1370 |
"""
|
|
1371 |
If more bytes than can be written to the memory BIO are passed to
|
|
1372 |
L{Connection.send} at once, the number of bytes which were written is
|
|
1373 |
returned and that many bytes from the beginning of the input can be
|
|
1374 |
read from the other end of the connection.
|
|
1375 |
"""
|
|
112.1.5
by Jean-Paul Calderone
Give _server and _client docstrings and make their sock parameter required; pass None to these methods from the existing memory bio tests; update test method names to match the prevailing test method naming convention (which is inconsistent with the rest of the naming convention in pyOpenSSL, oh my). |
1376 |
server = self._server(None) |
1377 |
client = self._client(None) |
|
101.1.4
by Jean-Paul Calderone
Refactor the existing memory bio tests a bit; add a new test which exercises the short-write case; change the error handling code in bio_read and bio_write to *not* use SSL_get_error, as these functions do nothing with SSL_* APIs |
1378 |
|
132.1.34
by Jean-Paul Calderone
Try for a real test of Context.add_extra_chain_cert; not working |
1379 |
self._interactInMemory(client, server) |
101.1.4
by Jean-Paul Calderone
Refactor the existing memory bio tests a bit; add a new test which exercises the short-write case; change the error handling code in bio_read and bio_write to *not* use SSL_get_error, as these functions do nothing with SSL_* APIs |
1380 |
|
1381 |
size = 2 ** 15 |
|
1382 |
sent = client.send("x" * size) |
|
1383 |
# Sanity check. We're trying to test what happens when the entire
|
|
1384 |
# input can't be sent. If the entire input was sent, this test is
|
|
1385 |
# meaningless.
|
|
1386 |
self.assertTrue(sent < size) |
|
1387 |
||
132.1.34
by Jean-Paul Calderone
Try for a real test of Context.add_extra_chain_cert; not working |
1388 |
receiver, received = self._interactInMemory(client, server) |
101.1.4
by Jean-Paul Calderone
Refactor the existing memory bio tests a bit; add a new test which exercises the short-write case; change the error handling code in bio_read and bio_write to *not* use SSL_get_error, as these functions do nothing with SSL_* APIs |
1389 |
self.assertIdentical(receiver, server) |
1390 |
||
1391 |
# We can rely on all of these bytes being received at once because
|
|
1392 |
# _loopback passes 2 ** 16 to recv - more than 2 ** 15.
|
|
1393 |
self.assertEquals(len(received), sent) |
|
101.1.5
by Jean-Paul Calderone
add Connection.bio_shutdown |
1394 |
|
1395 |
||
1396 |
def test_shutdown(self): |
|
1397 |
"""
|
|
1398 |
L{Connection.bio_shutdown} signals the end of the data stream from
|
|
1399 |
which the L{Connection} reads.
|
|
1400 |
"""
|
|
112.1.5
by Jean-Paul Calderone
Give _server and _client docstrings and make their sock parameter required; pass None to these methods from the existing memory bio tests; update test method names to match the prevailing test method naming convention (which is inconsistent with the rest of the naming convention in pyOpenSSL, oh my). |
1401 |
server = self._server(None) |
101.1.5
by Jean-Paul Calderone
add Connection.bio_shutdown |
1402 |
server.bio_shutdown() |
1403 |
e = self.assertRaises(Error, server.recv, 1024) |
|
1404 |
# We don't want WantReadError or ZeroReturnError or anything - it's a
|
|
1405 |
# handshake failure.
|
|
1406 |
self.assertEquals(e.__class__, Error) |
|
109
by Jean-Paul Calderone
Switch from Twisted's TestCase to the stdlib TestCase so there are fewer ways the test suite can be invoked |
1407 |
|
1408 |
||
122.3.11
by Ziga Seilnacht
Lowercase *client_CA* methods for consistency with the rest of PyOpenSSL. |
1409 |
def _check_client_ca_list(self, func): |
122.3.12
by Jean-Paul Calderone
Break up big set_client_ca_list test into a bunch of smaller ones |
1410 |
"""
|
1411 |
Verify the return value of the C{get_client_ca_list} method for server and client connections.
|
|
1412 |
||
1413 |
@param func: A function which will be called with the server context
|
|
1414 |
before the client and server are connected to each other. This
|
|
1415 |
function should specify a list of CAs for the server to send to the
|
|
1416 |
client and return that same list. The list will be used to verify
|
|
1417 |
that C{get_client_ca_list} returns the proper value at various
|
|
1418 |
times.
|
|
1419 |
"""
|
|
122.3.5
by Ziga Seilnacht
Allow setting and inspecting the preferred client certificate signer list. |
1420 |
server = self._server(None) |
1421 |
client = self._client(None) |
|
122.3.11
by Ziga Seilnacht
Lowercase *client_CA* methods for consistency with the rest of PyOpenSSL. |
1422 |
self.assertEqual(client.get_client_ca_list(), []) |
1423 |
self.assertEqual(server.get_client_ca_list(), []) |
|
122.3.5
by Ziga Seilnacht
Allow setting and inspecting the preferred client certificate signer list. |
1424 |
ctx = server.get_context() |
1425 |
expected = func(ctx) |
|
122.3.11
by Ziga Seilnacht
Lowercase *client_CA* methods for consistency with the rest of PyOpenSSL. |
1426 |
self.assertEqual(client.get_client_ca_list(), []) |
1427 |
self.assertEqual(server.get_client_ca_list(), expected) |
|
132.1.34
by Jean-Paul Calderone
Try for a real test of Context.add_extra_chain_cert; not working |
1428 |
self._interactInMemory(client, server) |
122.3.11
by Ziga Seilnacht
Lowercase *client_CA* methods for consistency with the rest of PyOpenSSL. |
1429 |
self.assertEqual(client.get_client_ca_list(), expected) |
1430 |
self.assertEqual(server.get_client_ca_list(), expected) |
|
1431 |
||
1432 |
||
122.3.12
by Jean-Paul Calderone
Break up big set_client_ca_list test into a bunch of smaller ones |
1433 |
def test_set_client_ca_list_errors(self): |
122.3.5
by Ziga Seilnacht
Allow setting and inspecting the preferred client certificate signer list. |
1434 |
"""
|
122.3.12
by Jean-Paul Calderone
Break up big set_client_ca_list test into a bunch of smaller ones |
1435 |
L{Context.set_client_ca_list} raises a L{TypeError} if called with a
|
1436 |
non-list or a list that contains objects other than X509Names.
|
|
122.3.5
by Ziga Seilnacht
Allow setting and inspecting the preferred client certificate signer list. |
1437 |
"""
|
1438 |
ctx = Context(TLSv1_METHOD) |
|
122.3.11
by Ziga Seilnacht
Lowercase *client_CA* methods for consistency with the rest of PyOpenSSL. |
1439 |
self.assertRaises(TypeError, ctx.set_client_ca_list, "spam") |
1440 |
self.assertRaises(TypeError, ctx.set_client_ca_list, ["spam"]) |
|
1441 |
self.assertIdentical(ctx.set_client_ca_list([]), None) |
|
1442 |
||
1443 |
||
122.3.12
by Jean-Paul Calderone
Break up big set_client_ca_list test into a bunch of smaller ones |
1444 |
def test_set_empty_ca_list(self): |
1445 |
"""
|
|
1446 |
If passed an empty list, L{Context.set_client_ca_list} configures the
|
|
1447 |
context to send no CA names to the client and, on both the server and
|
|
1448 |
client sides, L{Connection.get_client_ca_list} returns an empty list
|
|
1449 |
after the connection is set up.
|
|
1450 |
"""
|
|
122.3.11
by Ziga Seilnacht
Lowercase *client_CA* methods for consistency with the rest of PyOpenSSL. |
1451 |
def no_ca(ctx): |
1452 |
ctx.set_client_ca_list([]) |
|
122.3.5
by Ziga Seilnacht
Allow setting and inspecting the preferred client certificate signer list. |
1453 |
return [] |
122.3.11
by Ziga Seilnacht
Lowercase *client_CA* methods for consistency with the rest of PyOpenSSL. |
1454 |
self._check_client_ca_list(no_ca) |
122.3.5
by Ziga Seilnacht
Allow setting and inspecting the preferred client certificate signer list. |
1455 |
|
122.3.12
by Jean-Paul Calderone
Break up big set_client_ca_list test into a bunch of smaller ones |
1456 |
|
1457 |
def test_set_one_ca_list(self): |
|
1458 |
"""
|
|
1459 |
If passed a list containing a single X509Name,
|
|
1460 |
L{Context.set_client_ca_list} configures the context to send that CA
|
|
1461 |
name to the client and, on both the server and client sides,
|
|
1462 |
L{Connection.get_client_ca_list} returns a list containing that
|
|
1463 |
X509Name after the connection is set up.
|
|
1464 |
"""
|
|
1465 |
cacert = load_certificate(FILETYPE_PEM, root_cert_pem) |
|
1466 |
cadesc = cacert.get_subject() |
|
1467 |
def single_ca(ctx): |
|
1468 |
ctx.set_client_ca_list([cadesc]) |
|
1469 |
return [cadesc] |
|
1470 |
self._check_client_ca_list(single_ca) |
|
1471 |
||
1472 |
||
1473 |
def test_set_multiple_ca_list(self): |
|
1474 |
"""
|
|
1475 |
If passed a list containing multiple X509Name objects,
|
|
1476 |
L{Context.set_client_ca_list} configures the context to send those CA
|
|
1477 |
names to the client and, on both the server and client sides,
|
|
1478 |
L{Connection.get_client_ca_list} returns a list containing those
|
|
1479 |
X509Names after the connection is set up.
|
|
1480 |
"""
|
|
1481 |
secert = load_certificate(FILETYPE_PEM, server_cert_pem) |
|
1482 |
clcert = load_certificate(FILETYPE_PEM, server_cert_pem) |
|
1483 |
||
1484 |
sedesc = secert.get_subject() |
|
1485 |
cldesc = clcert.get_subject() |
|
1486 |
||
122.3.11
by Ziga Seilnacht
Lowercase *client_CA* methods for consistency with the rest of PyOpenSSL. |
1487 |
def multiple_ca(ctx): |
122.3.12
by Jean-Paul Calderone
Break up big set_client_ca_list test into a bunch of smaller ones |
1488 |
L = [sedesc, cldesc] |
122.3.11
by Ziga Seilnacht
Lowercase *client_CA* methods for consistency with the rest of PyOpenSSL. |
1489 |
ctx.set_client_ca_list(L) |
122.3.5
by Ziga Seilnacht
Allow setting and inspecting the preferred client certificate signer list. |
1490 |
return L |
122.3.11
by Ziga Seilnacht
Lowercase *client_CA* methods for consistency with the rest of PyOpenSSL. |
1491 |
self._check_client_ca_list(multiple_ca) |
122.3.5
by Ziga Seilnacht
Allow setting and inspecting the preferred client certificate signer list. |
1492 |
|
122.3.12
by Jean-Paul Calderone
Break up big set_client_ca_list test into a bunch of smaller ones |
1493 |
|
1494 |
def test_reset_ca_list(self): |
|
1495 |
"""
|
|
1496 |
If called multiple times, only the X509Names passed to the final call
|
|
1497 |
of L{Context.set_client_ca_list} are used to configure the CA names
|
|
1498 |
sent to the client.
|
|
1499 |
"""
|
|
1500 |
cacert = load_certificate(FILETYPE_PEM, root_cert_pem) |
|
1501 |
secert = load_certificate(FILETYPE_PEM, server_cert_pem) |
|
1502 |
clcert = load_certificate(FILETYPE_PEM, server_cert_pem) |
|
1503 |
||
1504 |
cadesc = cacert.get_subject() |
|
1505 |
sedesc = secert.get_subject() |
|
1506 |
cldesc = clcert.get_subject() |
|
1507 |
||
122.3.11
by Ziga Seilnacht
Lowercase *client_CA* methods for consistency with the rest of PyOpenSSL. |
1508 |
def changed_ca(ctx): |
1509 |
ctx.set_client_ca_list([sedesc, cldesc]) |
|
1510 |
ctx.set_client_ca_list([cadesc]) |
|
122.3.5
by Ziga Seilnacht
Allow setting and inspecting the preferred client certificate signer list. |
1511 |
return [cadesc] |
122.3.11
by Ziga Seilnacht
Lowercase *client_CA* methods for consistency with the rest of PyOpenSSL. |
1512 |
self._check_client_ca_list(changed_ca) |
122.3.5
by Ziga Seilnacht
Allow setting and inspecting the preferred client certificate signer list. |
1513 |
|
122.3.12
by Jean-Paul Calderone
Break up big set_client_ca_list test into a bunch of smaller ones |
1514 |
|
1515 |
def test_mutated_ca_list(self): |
|
1516 |
"""
|
|
1517 |
If the list passed to L{Context.set_client_ca_list} is mutated
|
|
1518 |
afterwards, this does not affect the list of CA names sent to the
|
|
1519 |
client.
|
|
1520 |
"""
|
|
1521 |
cacert = load_certificate(FILETYPE_PEM, root_cert_pem) |
|
1522 |
secert = load_certificate(FILETYPE_PEM, server_cert_pem) |
|
1523 |
||
1524 |
cadesc = cacert.get_subject() |
|
1525 |
sedesc = secert.get_subject() |
|
1526 |
||
122.3.11
by Ziga Seilnacht
Lowercase *client_CA* methods for consistency with the rest of PyOpenSSL. |
1527 |
def mutated_ca(ctx): |
122.3.5
by Ziga Seilnacht
Allow setting and inspecting the preferred client certificate signer list. |
1528 |
L = [cadesc] |
122.3.11
by Ziga Seilnacht
Lowercase *client_CA* methods for consistency with the rest of PyOpenSSL. |
1529 |
ctx.set_client_ca_list([cadesc]) |
122.3.5
by Ziga Seilnacht
Allow setting and inspecting the preferred client certificate signer list. |
1530 |
L.append(sedesc) |
1531 |
return [cadesc] |
|
122.3.11
by Ziga Seilnacht
Lowercase *client_CA* methods for consistency with the rest of PyOpenSSL. |
1532 |
self._check_client_ca_list(mutated_ca) |
1533 |
||
1534 |
||
122.3.12
by Jean-Paul Calderone
Break up big set_client_ca_list test into a bunch of smaller ones |
1535 |
def test_add_client_ca_errors(self): |
122.3.5
by Ziga Seilnacht
Allow setting and inspecting the preferred client certificate signer list. |
1536 |
"""
|
122.3.12
by Jean-Paul Calderone
Break up big set_client_ca_list test into a bunch of smaller ones |
1537 |
L{Context.add_client_ca} raises L{TypeError} if called with a non-X509
|
1538 |
object or with a number of arguments other than one.
|
|
122.3.5
by Ziga Seilnacht
Allow setting and inspecting the preferred client certificate signer list. |
1539 |
"""
|
1540 |
ctx = Context(TLSv1_METHOD) |
|
1541 |
cacert = load_certificate(FILETYPE_PEM, root_cert_pem) |
|
122.3.12
by Jean-Paul Calderone
Break up big set_client_ca_list test into a bunch of smaller ones |
1542 |
self.assertRaises(TypeError, ctx.add_client_ca) |
122.3.11
by Ziga Seilnacht
Lowercase *client_CA* methods for consistency with the rest of PyOpenSSL. |
1543 |
self.assertRaises(TypeError, ctx.add_client_ca, "spam") |
122.3.12
by Jean-Paul Calderone
Break up big set_client_ca_list test into a bunch of smaller ones |
1544 |
self.assertRaises(TypeError, ctx.add_client_ca, cacert, cacert) |
122.3.11
by Ziga Seilnacht
Lowercase *client_CA* methods for consistency with the rest of PyOpenSSL. |
1545 |
|
1546 |
||
122.3.13
by Jean-Paul Calderone
Break up big add_client_ca test into a bunch of smaller ones |
1547 |
def test_one_add_client_ca(self): |
122.3.5
by Ziga Seilnacht
Allow setting and inspecting the preferred client certificate signer list. |
1548 |
"""
|
122.3.13
by Jean-Paul Calderone
Break up big add_client_ca test into a bunch of smaller ones |
1549 |
A certificate's subject can be added as a CA to be sent to the client
|
1550 |
with L{Context.add_client_ca}.
|
|
122.3.5
by Ziga Seilnacht
Allow setting and inspecting the preferred client certificate signer list. |
1551 |
"""
|
1552 |
cacert = load_certificate(FILETYPE_PEM, root_cert_pem) |
|
1553 |
cadesc = cacert.get_subject() |
|
122.3.11
by Ziga Seilnacht
Lowercase *client_CA* methods for consistency with the rest of PyOpenSSL. |
1554 |
def single_ca(ctx): |
1555 |
ctx.add_client_ca(cacert) |
|
122.3.5
by Ziga Seilnacht
Allow setting and inspecting the preferred client certificate signer list. |
1556 |
return [cadesc] |
122.3.11
by Ziga Seilnacht
Lowercase *client_CA* methods for consistency with the rest of PyOpenSSL. |
1557 |
self._check_client_ca_list(single_ca) |
122.3.5
by Ziga Seilnacht
Allow setting and inspecting the preferred client certificate signer list. |
1558 |
|
122.3.13
by Jean-Paul Calderone
Break up big add_client_ca test into a bunch of smaller ones |
1559 |
|
1560 |
def test_multiple_add_client_ca(self): |
|
1561 |
"""
|
|
1562 |
Multiple CA names can be sent to the client by calling
|
|
1563 |
L{Context.add_client_ca} with multiple X509 objects.
|
|
1564 |
"""
|
|
1565 |
cacert = load_certificate(FILETYPE_PEM, root_cert_pem) |
|
1566 |
secert = load_certificate(FILETYPE_PEM, server_cert_pem) |
|
1567 |
||
1568 |
cadesc = cacert.get_subject() |
|
1569 |
sedesc = secert.get_subject() |
|
1570 |
||
122.3.11
by Ziga Seilnacht
Lowercase *client_CA* methods for consistency with the rest of PyOpenSSL. |
1571 |
def multiple_ca(ctx): |
1572 |
ctx.add_client_ca(cacert) |
|
1573 |
ctx.add_client_ca(secert) |
|
122.3.5
by Ziga Seilnacht
Allow setting and inspecting the preferred client certificate signer list. |
1574 |
return [cadesc, sedesc] |
122.3.11
by Ziga Seilnacht
Lowercase *client_CA* methods for consistency with the rest of PyOpenSSL. |
1575 |
self._check_client_ca_list(multiple_ca) |
122.3.5
by Ziga Seilnacht
Allow setting and inspecting the preferred client certificate signer list. |
1576 |
|
122.3.13
by Jean-Paul Calderone
Break up big add_client_ca test into a bunch of smaller ones |
1577 |
|
1578 |
def test_set_and_add_client_ca(self): |
|
1579 |
"""
|
|
1580 |
A call to L{Context.set_client_ca_list} followed by a call to
|
|
1581 |
L{Context.add_client_ca} results in using the CA names from the first
|
|
1582 |
call and the CA name from the second call.
|
|
1583 |
"""
|
|
1584 |
cacert = load_certificate(FILETYPE_PEM, root_cert_pem) |
|
1585 |
secert = load_certificate(FILETYPE_PEM, server_cert_pem) |
|
1586 |
clcert = load_certificate(FILETYPE_PEM, server_cert_pem) |
|
1587 |
||
1588 |
cadesc = cacert.get_subject() |
|
1589 |
sedesc = secert.get_subject() |
|
1590 |
cldesc = clcert.get_subject() |
|
1591 |
||
122.3.11
by Ziga Seilnacht
Lowercase *client_CA* methods for consistency with the rest of PyOpenSSL. |
1592 |
def mixed_set_add_ca(ctx): |
1593 |
ctx.set_client_ca_list([cadesc, sedesc]) |
|
1594 |
ctx.add_client_ca(clcert) |
|
122.3.5
by Ziga Seilnacht
Allow setting and inspecting the preferred client certificate signer list. |
1595 |
return [cadesc, sedesc, cldesc] |
122.3.11
by Ziga Seilnacht
Lowercase *client_CA* methods for consistency with the rest of PyOpenSSL. |
1596 |
self._check_client_ca_list(mixed_set_add_ca) |
122.3.5
by Ziga Seilnacht
Allow setting and inspecting the preferred client certificate signer list. |
1597 |
|
122.3.13
by Jean-Paul Calderone
Break up big add_client_ca test into a bunch of smaller ones |
1598 |
|
1599 |
def test_set_after_add_client_ca(self): |
|
1600 |
"""
|
|
1601 |
A call to L{Context.set_client_ca_list} after a call to
|
|
1602 |
L{Context.add_client_ca} replaces the CA name specified by the former
|
|
1603 |
call with the names specified by the latter cal.
|
|
1604 |
"""
|
|
1605 |
cacert = load_certificate(FILETYPE_PEM, root_cert_pem) |
|
1606 |
secert = load_certificate(FILETYPE_PEM, server_cert_pem) |
|
1607 |
clcert = load_certificate(FILETYPE_PEM, server_cert_pem) |
|
1608 |
||
1609 |
cadesc = cacert.get_subject() |
|
1610 |
sedesc = secert.get_subject() |
|
1611 |
||
122.3.11
by Ziga Seilnacht
Lowercase *client_CA* methods for consistency with the rest of PyOpenSSL. |
1612 |
def set_replaces_add_ca(ctx): |
1613 |
ctx.add_client_ca(clcert) |
|
1614 |
ctx.set_client_ca_list([cadesc]) |
|
1615 |
ctx.add_client_ca(secert) |
|
122.3.5
by Ziga Seilnacht
Allow setting and inspecting the preferred client certificate signer list. |
1616 |
return [cadesc, sedesc] |
122.3.11
by Ziga Seilnacht
Lowercase *client_CA* methods for consistency with the rest of PyOpenSSL. |
1617 |
self._check_client_ca_list(set_replaces_add_ca) |
122.3.5
by Ziga Seilnacht
Allow setting and inspecting the preferred client certificate signer list. |
1618 |
|
109
by Jean-Paul Calderone
Switch from Twisted's TestCase to the stdlib TestCase so there are fewer ways the test suite can be invoked |
1619 |
|
122.3.1
by Ziga Seilnacht
Types should have a correct __module__ attribute. |
1620 |
|
109
by Jean-Paul Calderone
Switch from Twisted's TestCase to the stdlib TestCase so there are fewer ways the test suite can be invoked |
1621 |
if __name__ == '__main__': |
1622 |
main() |