1
# Copyright (C) Jean-Paul Calderone 2008, All rights reserved
1
# Copyright (C) Jean-Paul Calderone 2008-2010, All rights reserved
4
4
Unit tests for L{OpenSSL.SSL}.
7
from errno import ECONNREFUSED, EINPROGRESS
7
8
from sys import platform
8
from socket import socket
9
from socket import error, socket
9
10
from os import makedirs
10
11
from os.path import join
11
12
from unittest import main
13
from OpenSSL.crypto import TYPE_RSA, FILETYPE_PEM, PKey, dump_privatekey, load_certificate, load_privatekey
14
from OpenSSL.SSL import WantReadError, Context, ContextType, Connection, ConnectionType, Error
14
from OpenSSL.crypto import TYPE_RSA, FILETYPE_PEM, FILETYPE_ASN1
15
from OpenSSL.crypto import PKey, X509, X509Extension
16
from OpenSSL.crypto import dump_privatekey, load_privatekey
17
from OpenSSL.crypto import dump_certificate, load_certificate
19
from OpenSSL.SSL import SENT_SHUTDOWN, RECEIVED_SHUTDOWN
15
20
from OpenSSL.SSL import SSLv2_METHOD, SSLv3_METHOD, SSLv23_METHOD, TLSv1_METHOD
16
21
from OpenSSL.SSL import OP_NO_SSLv2, OP_NO_SSLv3, OP_SINGLE_DH_USE
17
22
from OpenSSL.SSL import VERIFY_PEER, VERIFY_FAIL_IF_NO_PEER_CERT, VERIFY_CLIENT_ONCE
23
from OpenSSL.SSL import Error, SysCallError, WantReadError, ZeroReturnError
24
from OpenSSL.SSL import Context, ContextType, Connection, ConnectionType
18
26
from OpenSSL.test.util import TestCase
19
27
from OpenSSL.test.test_crypto import cleartextCertificatePEM, cleartextPrivateKeyPEM
20
from OpenSSL.test.test_crypto import client_cert_pem, client_key_pem, server_cert_pem, server_key_pem, root_cert_pem
28
from OpenSSL.test.test_crypto import client_cert_pem, client_key_pem
29
from OpenSSL.test.test_crypto import server_cert_pem, server_key_pem, root_cert_pem
22
32
from OpenSSL.SSL import OP_NO_QUERY_MTU
23
33
except ImportError:
65
class ContextTests(TestCase):
90
Helper mixin which defines methods for creating a connected socket pair and
91
for forcing two connected SSL sockets to talk to each other via memory BIOs.
94
(server, client) = socket_pair()
96
ctx = Context(TLSv1_METHOD)
97
ctx.use_privatekey(load_privatekey(FILETYPE_PEM, server_key_pem))
98
ctx.use_certificate(load_certificate(FILETYPE_PEM, server_cert_pem))
99
server = Connection(ctx, server)
100
server.set_accept_state()
101
client = Connection(Context(TLSv1_METHOD), client)
102
client.set_connect_state()
105
for conn in [client, server]:
108
except WantReadError:
111
server.setblocking(True)
112
client.setblocking(True)
113
return server, client
116
def _interactInMemory(self, client_conn, server_conn):
118
Try to read application bytes from each of the two L{Connection}
119
objects. Copy bytes back and forth between their send/receive buffers
120
for as long as there is anything to copy. When there is nothing more
121
to copy, return C{None}. If one of them actually manages to deliver
122
some application bytes, return a two-tuple of the connection from which
123
the bytes were read and the bytes themselves.
127
# Loop until neither side has anything to say
130
# Copy stuff from each side's send buffer to the other side's
132
for (read, write) in [(client_conn, server_conn),
133
(server_conn, client_conn)]:
135
# Give the side a chance to generate some more bytes, or
138
bytes = read.recv(2 ** 16)
139
except WantReadError:
140
# It didn't succeed, so we'll hope it generated some
144
# It did succeed, so we'll stop now and let the caller deal
149
# Keep copying as long as there's more stuff there.
151
dirty = read.bio_read(4096)
152
except WantReadError:
153
# Okay, nothing more waiting to be sent. Stop
154
# processing this send buffer.
157
# Keep track of the fact that someone generated some
160
write.bio_write(dirty)
164
class ContextTests(TestCase, _LoopbackMixin):
67
166
Unit tests for L{OpenSSL.SSL.Context}.
97
196
self.assertRaises(TypeError, ctx.use_privatekey, "")
199
def test_set_app_data_wrong_args(self):
201
L{Context.set_app_data} raises L{TypeError} if called with other than
204
context = Context(TLSv1_METHOD)
205
self.assertRaises(TypeError, context.set_app_data)
206
self.assertRaises(TypeError, context.set_app_data, None, None)
209
def test_get_app_data_wrong_args(self):
211
L{Context.get_app_data} raises L{TypeError} if called with any
214
context = Context(TLSv1_METHOD)
215
self.assertRaises(TypeError, context.get_app_data, None)
218
def test_app_data(self):
220
L{Context.set_app_data} stores an object for later retrieval using
221
L{Context.get_app_data}.
224
context = Context(TLSv1_METHOD)
225
context.set_app_data(app_data)
226
self.assertIdentical(context.get_app_data(), app_data)
229
def test_set_options_wrong_args(self):
231
L{Context.set_options} raises L{TypeError} if called with the wrong
232
number of arguments or a non-C{int} argument.
234
context = Context(TLSv1_METHOD)
235
self.assertRaises(TypeError, context.set_options)
236
self.assertRaises(TypeError, context.set_options, None)
237
self.assertRaises(TypeError, context.set_options, 1, None)
240
def test_set_timeout_wrong_args(self):
242
L{Context.set_timeout} raises L{TypeError} if called with the wrong
243
number of arguments or a non-C{int} argument.
245
context = Context(TLSv1_METHOD)
246
self.assertRaises(TypeError, context.set_timeout)
247
self.assertRaises(TypeError, context.set_timeout, None)
248
self.assertRaises(TypeError, context.set_timeout, 1, None)
251
def test_get_timeout_wrong_args(self):
253
L{Context.get_timeout} raises L{TypeError} if called with any arguments.
255
context = Context(TLSv1_METHOD)
256
self.assertRaises(TypeError, context.get_timeout, None)
259
def test_timeout(self):
261
L{Context.set_timeout} sets the session timeout for all connections
262
created using the context object. L{Context.get_timeout} retrieves this
265
context = Context(TLSv1_METHOD)
266
context.set_timeout(1234)
267
self.assertEquals(context.get_timeout(), 1234)
270
def test_set_verify_depth_wrong_args(self):
272
L{Context.set_verify_depth} raises L{TypeError} if called with the wrong
273
number of arguments or a non-C{int} argument.
275
context = Context(TLSv1_METHOD)
276
self.assertRaises(TypeError, context.set_verify_depth)
277
self.assertRaises(TypeError, context.set_verify_depth, None)
278
self.assertRaises(TypeError, context.set_verify_depth, 1, None)
281
def test_get_verify_depth_wrong_args(self):
283
L{Context.get_verify_depth} raises L{TypeError} if called with any arguments.
285
context = Context(TLSv1_METHOD)
286
self.assertRaises(TypeError, context.get_verify_depth, None)
289
def test_verify_depth(self):
291
L{Context.set_verify_depth} sets the number of certificates in a chain
292
to follow before giving up. The value can be retrieved with
293
L{Context.get_verify_depth}.
295
context = Context(TLSv1_METHOD)
296
context.set_verify_depth(11)
297
self.assertEquals(context.get_verify_depth(), 11)
300
def _write_encrypted_pem(self, passphrase):
302
Write a new private key out to a new file, encrypted using the given
303
passphrase. Return the path to the new file.
306
key.generate_key(TYPE_RSA, 128)
307
pemFile = self.mktemp()
308
fObj = file(pemFile, 'w')
309
fObj.write(dump_privatekey(FILETYPE_PEM, key, "blowfish", passphrase))
314
def test_set_passwd_cb_wrong_args(self):
316
L{Context.set_passwd_cb} raises L{TypeError} if called with the
317
wrong arguments or with a non-callable first argument.
319
context = Context(TLSv1_METHOD)
320
self.assertRaises(TypeError, context.set_passwd_cb)
321
self.assertRaises(TypeError, context.set_passwd_cb, None)
322
self.assertRaises(TypeError, context.set_passwd_cb, lambda: None, None, None)
100
325
def test_set_passwd_cb(self):
102
327
L{Context.set_passwd_cb} accepts a callable which will be invoked when
103
328
a private key is loaded from an encrypted PEM.
106
key.generate_key(TYPE_RSA, 128)
107
pemFile = self.mktemp()
108
fObj = file(pemFile, 'w')
109
330
passphrase = "foobar"
110
fObj.write(dump_privatekey(FILETYPE_PEM, key, "blowfish", passphrase))
331
pemFile = self._write_encrypted_pem(passphrase)
114
333
def passphraseCallback(maxlen, verify, extra):
115
334
calledWith.append((maxlen, verify, extra))
123
342
self.assertEqual(calledWith[0][2], None)
345
def test_passwd_callback_exception(self):
347
L{Context.use_privatekey_file} propagates any exception raised by the
350
pemFile = self._write_encrypted_pem("monkeys are nice")
351
def passphraseCallback(maxlen, verify, extra):
352
raise RuntimeError("Sorry, I am a fail.")
354
context = Context(TLSv1_METHOD)
355
context.set_passwd_cb(passphraseCallback)
356
self.assertRaises(RuntimeError, context.use_privatekey_file, pemFile)
359
def test_passwd_callback_false(self):
361
L{Context.use_privatekey_file} raises L{OpenSSL.SSL.Error} if the
362
passphrase callback returns a false value.
364
pemFile = self._write_encrypted_pem("monkeys are nice")
365
def passphraseCallback(maxlen, verify, extra):
368
context = Context(TLSv1_METHOD)
369
context.set_passwd_cb(passphraseCallback)
370
self.assertRaises(Error, context.use_privatekey_file, pemFile)
373
def test_passwd_callback_non_string(self):
375
L{Context.use_privatekey_file} raises L{OpenSSL.SSL.Error} if the
376
passphrase callback returns a true non-string value.
378
pemFile = self._write_encrypted_pem("monkeys are nice")
379
def passphraseCallback(maxlen, verify, extra):
382
context = Context(TLSv1_METHOD)
383
context.set_passwd_cb(passphraseCallback)
384
self.assertRaises(Error, context.use_privatekey_file, pemFile)
387
def test_passwd_callback_too_long(self):
389
If the passphrase returned by the passphrase callback returns a string
390
longer than the indicated maximum length, it is truncated.
392
# A priori knowledge!
393
passphrase = "x" * 1024
394
pemFile = self._write_encrypted_pem(passphrase)
395
def passphraseCallback(maxlen, verify, extra):
396
assert maxlen == 1024
397
return passphrase + "y"
399
context = Context(TLSv1_METHOD)
400
context.set_passwd_cb(passphraseCallback)
401
# This shall succeed because the truncated result is the correct
403
context.use_privatekey_file(pemFile)
126
406
def test_set_info_callback(self):
128
408
L{Context.set_info_callback} accepts a callable which will be invoked
290
588
self.assertRaises(TypeError, context.add_extra_chain_cert, object(), object())
591
def _create_certificate_chain(self):
593
Construct and return a chain of certificates.
595
1. A new self-signed certificate authority certificate (cacert)
596
2. A new intermediate certificate signed by cacert (icert)
597
3. A new server certificate signed by icert (scert)
599
caext = X509Extension('basicConstraints', False, 'CA:true')
603
cakey.generate_key(TYPE_RSA, 512)
605
cacert.get_subject().commonName = "Authority Certificate"
606
cacert.set_issuer(cacert.get_subject())
607
cacert.set_pubkey(cakey)
608
cacert.set_notBefore("20000101000000Z")
609
cacert.set_notAfter("20200101000000Z")
610
cacert.add_extensions([caext])
611
cacert.set_serial_number(0)
612
cacert.sign(cakey, "sha1")
616
ikey.generate_key(TYPE_RSA, 512)
618
icert.get_subject().commonName = "Intermediate Certificate"
619
icert.set_issuer(cacert.get_subject())
620
icert.set_pubkey(ikey)
621
icert.set_notBefore("20000101000000Z")
622
icert.set_notAfter("20200101000000Z")
623
icert.add_extensions([caext])
624
icert.set_serial_number(0)
625
icert.sign(cakey, "sha1")
629
skey.generate_key(TYPE_RSA, 512)
631
scert.get_subject().commonName = "Server Certificate"
632
scert.set_issuer(icert.get_subject())
633
scert.set_pubkey(skey)
634
scert.set_notBefore("20000101000000Z")
635
scert.set_notAfter("20200101000000Z")
636
scert.add_extensions([X509Extension('basicConstraints', True, 'CA:false')])
637
scert.set_serial_number(0)
638
scert.sign(ikey, "sha1")
640
return [(cakey, cacert), (ikey, icert), (skey, scert)]
643
def _handshake_test(self, serverContext, clientContext):
645
Verify that a client and server created with the given contexts can
646
successfully handshake and communicate.
648
serverSocket, clientSocket = socket_pair()
650
server = Connection(serverContext, serverSocket)
651
server.set_accept_state()
653
client = Connection(clientContext, clientSocket)
654
client.set_connect_state()
656
# Make them talk to each other.
657
# self._interactInMemory(client, server)
659
for s in [client, server]:
662
except WantReadError:
293
666
def test_add_extra_chain_cert(self):
295
668
L{Context.add_extra_chain_cert} accepts an L{X509} instance to add to
296
669
the certificate chain.
298
context = Context(TLSv1_METHOD)
299
context.add_extra_chain_cert(load_certificate(FILETYPE_PEM, cleartextCertificatePEM))
300
# XXX Oh no, actually asserting something about its behavior would be really hard.
305
class ConnectionTests(TestCase):
671
See L{_create_certificate_chain} for the details of the certificate
674
The chain is tested by starting a server with scert and connecting
675
to it with a client which trusts cacert and requires verification to
678
chain = self._create_certificate_chain()
679
[(cakey, cacert), (ikey, icert), (skey, scert)] = chain
681
# Dump the CA certificate to a file because that's the only way to load
682
# it as a trusted CA in the client context.
683
for cert, name in [(cacert, 'ca.pem'), (icert, 'i.pem'), (scert, 's.pem')]:
684
fObj = file(name, 'w')
685
fObj.write(dump_certificate(FILETYPE_PEM, cert))
687
fObj = file(name.replace('pem', 'asn1'), 'w')
688
fObj.write(dump_certificate(FILETYPE_ASN1, cert))
691
for key, name in [(cakey, 'ca.key'), (ikey, 'i.key'), (skey, 's.key')]:
692
fObj = file(name, 'w')
693
fObj.write(dump_privatekey(FILETYPE_PEM, key))
696
# Create the server context
697
serverContext = Context(TLSv1_METHOD)
698
serverContext.use_privatekey(skey)
699
serverContext.use_certificate(scert)
700
# The client already has cacert, we only need to give them icert.
701
serverContext.add_extra_chain_cert(icert)
704
clientContext = Context(TLSv1_METHOD)
705
clientContext.set_verify(
706
VERIFY_PEER | VERIFY_FAIL_IF_NO_PEER_CERT, verify_cb)
707
clientContext.load_verify_locations('ca.pem')
710
self._handshake_test(serverContext, clientContext)
713
def test_use_certificate_chain_file(self):
715
L{Context.use_certificate_chain_file} reads a certificate chain from
718
The chain is tested by starting a server with scert and connecting
719
to it with a client which trusts cacert and requires verification to
722
chain = self._create_certificate_chain()
723
[(cakey, cacert), (ikey, icert), (skey, scert)] = chain
725
# Write out the chain file.
726
chainFile = self.mktemp()
727
fObj = file(chainFile, 'w')
728
# Most specific to least general.
729
fObj.write(dump_certificate(FILETYPE_PEM, scert))
730
fObj.write(dump_certificate(FILETYPE_PEM, icert))
731
fObj.write(dump_certificate(FILETYPE_PEM, cacert))
734
serverContext = Context(TLSv1_METHOD)
735
serverContext.use_certificate_chain_file(chainFile)
736
serverContext.use_privatekey(skey)
738
fObj = file('ca.pem', 'w')
739
fObj.write(dump_certificate(FILETYPE_PEM, cacert))
742
clientContext = Context(TLSv1_METHOD)
743
clientContext.set_verify(
744
VERIFY_PEER | VERIFY_FAIL_IF_NO_PEER_CERT, verify_cb)
745
clientContext.load_verify_locations('ca.pem')
747
self._handshake_test(serverContext, clientContext)
752
def test_get_verify_mode_wrong_args(self):
754
L{Context.get_verify_mode} raises L{TypeError} if called with any
757
context = Context(TLSv1_METHOD)
758
self.assertRaises(TypeError, context.get_verify_mode, None)
761
def test_get_verify_mode(self):
763
L{Context.get_verify_mode} returns the verify mode flags previously
764
passed to L{Context.set_verify}.
766
context = Context(TLSv1_METHOD)
767
self.assertEquals(context.get_verify_mode(), 0)
769
VERIFY_PEER | VERIFY_CLIENT_ONCE, lambda *args: None)
771
context.get_verify_mode(), VERIFY_PEER | VERIFY_CLIENT_ONCE)
774
def test_load_tmp_dh_wrong_args(self):
776
L{Context.load_tmp_dh} raises L{TypeError} if called with the wrong
777
number of arguments or with a non-C{str} argument.
779
context = Context(TLSv1_METHOD)
780
self.assertRaises(TypeError, context.load_tmp_dh)
781
self.assertRaises(TypeError, context.load_tmp_dh, "foo", None)
782
self.assertRaises(TypeError, context.load_tmp_dh, object())
785
def test_load_tmp_dh_missing_file(self):
787
L{Context.load_tmp_dh} raises L{OpenSSL.SSL.Error} if the specified file
790
context = Context(TLSv1_METHOD)
791
self.assertRaises(Error, context.load_tmp_dh, "hello")
794
def test_load_tmp_dh(self):
796
L{Context.load_tmp_dh} loads Diffie-Hellman parameters from the
799
context = Context(TLSv1_METHOD)
800
dhfilename = self.mktemp()
801
dhfile = open(dhfilename, "w")
802
dhfile.write(dhparam)
804
context.load_tmp_dh(dhfilename)
805
# XXX What should I assert here? -exarkun
808
def test_set_cipher_list(self):
810
L{Context.set_cipher_list} accepts a C{str} naming the ciphers which
811
connections created with the context object will be able to choose from.
813
context = Context(TLSv1_METHOD)
814
context.set_cipher_list("hello world:EXP-RC4-MD5")
815
conn = Connection(context, None)
816
self.assertEquals(conn.get_cipher_list(), ["EXP-RC4-MD5"])
820
class ConnectionTests(TestCase, _LoopbackMixin):
307
822
Unit tests for L{OpenSSL.SSL.Connection}.
826
# XXX get_peer_certificate -> None
828
# XXX master_key -> TypeError
829
# XXX server_random -> TypeError
831
# XXX connect -> TypeError
832
# XXX connect_ex -> TypeError
833
# XXX set_connect_state -> TypeError
834
# XXX set_accept_state -> TypeError
835
# XXX renegotiate_pending
836
# XXX do_handshake -> TypeError
837
# XXX bio_read -> TypeError
838
# XXX recv -> TypeError
839
# XXX send -> TypeError
840
# XXX bio_write -> TypeError
309
842
def test_type(self):
311
844
L{Connection} and L{ConnectionType} refer to the same type object and
335
868
self.assertRaises(TypeError, connection.get_context, None)
871
def test_pending(self):
873
L{Connection.pending} returns the number of bytes available for
876
connection = Connection(Context(TLSv1_METHOD), None)
877
self.assertEquals(connection.pending(), 0)
880
def test_pending_wrong_args(self):
882
L{Connection.pending} raises L{TypeError} if called with any arguments.
884
connection = Connection(Context(TLSv1_METHOD), None)
885
self.assertRaises(TypeError, connection.pending, None)
888
def test_connect_wrong_args(self):
890
L{Connection.connect} raises L{TypeError} if called with a non-address
891
argument or with the wrong number of arguments.
893
connection = Connection(Context(TLSv1_METHOD), socket())
894
self.assertRaises(TypeError, connection.connect, None)
895
self.assertRaises(TypeError, connection.connect)
896
self.assertRaises(TypeError, connection.connect, ("127.0.0.1", 1), None)
899
def test_connect_refused(self):
901
L{Connection.connect} raises L{socket.error} if the underlying socket
902
connect method raises it.
905
context = Context(TLSv1_METHOD)
906
clientSSL = Connection(context, client)
907
exc = self.assertRaises(error, clientSSL.connect, ("127.0.0.1", 1))
908
self.assertEquals(exc.args[0], ECONNREFUSED)
911
def test_connect(self):
913
L{Connection.connect} establishes a connection to the specified address.
919
clientSSL = Connection(Context(TLSv1_METHOD), socket())
920
clientSSL.connect(port.getsockname())
923
def test_connect_ex(self):
925
If there is a connection error, L{Connection.connect_ex} returns the
926
errno instead of raising an exception.
932
clientSSL = Connection(Context(TLSv1_METHOD), socket())
933
clientSSL.setblocking(False)
935
clientSSL.connect_ex(port.getsockname()), EINPROGRESS)
938
def test_accept_wrong_args(self):
940
L{Connection.accept} raises L{TypeError} if called with any arguments.
942
connection = Connection(Context(TLSv1_METHOD), socket())
943
self.assertRaises(TypeError, connection.accept, None)
946
def test_accept(self):
948
L{Connection.accept} accepts a pending connection attempt and returns a
949
tuple of a new L{Connection} (the accepted client) and the address the
950
connection originated from.
952
ctx = Context(TLSv1_METHOD)
953
ctx.use_privatekey(load_privatekey(FILETYPE_PEM, server_key_pem))
954
ctx.use_certificate(load_certificate(FILETYPE_PEM, server_cert_pem))
956
portSSL = Connection(ctx, port)
957
portSSL.bind(('', 0))
960
clientSSL = Connection(Context(TLSv1_METHOD), socket())
961
clientSSL.connect(portSSL.getsockname())
963
serverSSL, address = portSSL.accept()
965
self.assertTrue(isinstance(serverSSL, Connection))
966
self.assertIdentical(serverSSL.get_context(), ctx)
967
self.assertEquals(address, clientSSL.getsockname())
970
def test_shutdown_wrong_args(self):
972
L{Connection.shutdown} raises L{TypeError} if called with the wrong
973
number of arguments or with arguments other than integers.
975
connection = Connection(Context(TLSv1_METHOD), None)
976
self.assertRaises(TypeError, connection.shutdown, None)
977
self.assertRaises(TypeError, connection.get_shutdown, None)
978
self.assertRaises(TypeError, connection.set_shutdown)
979
self.assertRaises(TypeError, connection.set_shutdown, None)
980
self.assertRaises(TypeError, connection.set_shutdown, 0, 1)
983
def test_shutdown(self):
985
L{Connection.shutdown} performs an SSL-level connection shutdown.
987
server, client = self._loopback()
988
self.assertFalse(server.shutdown())
989
self.assertEquals(server.get_shutdown(), SENT_SHUTDOWN)
990
self.assertRaises(ZeroReturnError, client.recv, 1024)
991
self.assertEquals(client.get_shutdown(), RECEIVED_SHUTDOWN)
993
self.assertEquals(client.get_shutdown(), SENT_SHUTDOWN|RECEIVED_SHUTDOWN)
994
self.assertRaises(ZeroReturnError, server.recv, 1024)
995
self.assertEquals(server.get_shutdown(), SENT_SHUTDOWN|RECEIVED_SHUTDOWN)
998
def test_set_shutdown(self):
1000
L{Connection.set_shutdown} sets the state of the SSL connection shutdown
1003
connection = Connection(Context(TLSv1_METHOD), socket())
1004
connection.set_shutdown(RECEIVED_SHUTDOWN)
1005
self.assertEquals(connection.get_shutdown(), RECEIVED_SHUTDOWN)
1008
def test_app_data_wrong_args(self):
1010
L{Connection.set_app_data} raises L{TypeError} if called with other than
1011
one argument. L{Connection.get_app_data} raises L{TypeError} if called
1014
conn = Connection(Context(TLSv1_METHOD), None)
1015
self.assertRaises(TypeError, conn.get_app_data, None)
1016
self.assertRaises(TypeError, conn.set_app_data)
1017
self.assertRaises(TypeError, conn.set_app_data, None, None)
1020
def test_app_data(self):
1022
Any object can be set as app data by passing it to
1023
L{Connection.set_app_data} and later retrieved with
1024
L{Connection.get_app_data}.
1026
conn = Connection(Context(TLSv1_METHOD), None)
1028
conn.set_app_data(app_data)
1029
self.assertIdentical(conn.get_app_data(), app_data)
1032
def test_makefile(self):
1034
L{Connection.makefile} is not implemented and calling that method raises
1035
L{NotImplementedError}.
1037
conn = Connection(Context(TLSv1_METHOD), None)
1038
self.assertRaises(NotImplementedError, conn.makefile)
1042
class ConnectionGetCipherListTests(TestCase):
1044
Tests for L{Connection.get_cipher_list}.
1046
def test_wrong_args(self):
1048
L{Connection.get_cipher_list} raises L{TypeError} if called with any
1051
connection = Connection(Context(TLSv1_METHOD), None)
1052
self.assertRaises(TypeError, connection.get_cipher_list, None)
1055
def test_result(self):
1057
L{Connection.get_cipher_list} returns a C{list} of C{str} giving the
1058
names of the ciphers which might be used.
1060
connection = Connection(Context(TLSv1_METHOD), None)
1061
ciphers = connection.get_cipher_list()
1062
self.assertTrue(isinstance(ciphers, list))
1063
for cipher in ciphers:
1064
self.assertTrue(isinstance(cipher, str))
1068
class ConnectionSendallTests(TestCase, _LoopbackMixin):
1070
Tests for L{Connection.sendall}.
1072
def test_wrong_args(self):
1074
When called with arguments other than a single string,
1075
L{Connection.sendall} raises L{TypeError}.
1077
connection = Connection(Context(TLSv1_METHOD), None)
1078
self.assertRaises(TypeError, connection.sendall)
1079
self.assertRaises(TypeError, connection.sendall, object())
1080
self.assertRaises(TypeError, connection.sendall, "foo", "bar")
1083
def test_short(self):
1085
L{Connection.sendall} transmits all of the bytes in the string passed to
1088
server, client = self._loopback()
1090
self.assertEquals(client.recv(1), 'x')
1093
def test_long(self):
1095
L{Connection.sendall} transmits all of the bytes in the string passed to
1096
it even if this requires multiple calls of an underlying write function.
1098
server, client = self._loopback()
1099
message ='x' * 1024 * 128 + 'y'
1100
server.sendall(message)
1103
while received < len(message):
1104
bytes = client.recv(1024)
1106
received += len(bytes)
1107
self.assertEquals(message, ''.join(accum))
1110
def test_closed(self):
1112
If the underlying socket is closed, L{Connection.sendall} propagates the
1113
write error from the low level write call.
1115
server, client = self._loopback()
1117
server.sendall("hello, world")
1118
self.assertRaises(SysCallError, server.sendall, "hello, world")
1122
class ConnectionRenegotiateTests(TestCase, _LoopbackMixin):
1124
Tests for SSL renegotiation APIs.
1126
def test_renegotiate_wrong_args(self):
1128
L{Connection.renegotiate} raises L{TypeError} if called with any
1131
connection = Connection(Context(TLSv1_METHOD), None)
1132
self.assertRaises(TypeError, connection.renegotiate, None)
1135
def test_total_renegotiations_wrong_args(self):
1137
L{Connection.total_renegotiations} raises L{TypeError} if called with
1140
connection = Connection(Context(TLSv1_METHOD), None)
1141
self.assertRaises(TypeError, connection.total_renegotiations, None)
1144
def test_total_renegotiations(self):
1146
L{Connection.total_renegotiations} returns C{0} before any
1147
renegotiations have happened.
1149
connection = Connection(Context(TLSv1_METHOD), None)
1150
self.assertEquals(connection.total_renegotiations(), 0)
1153
# def test_renegotiate(self):
1156
# server, client = self._loopback()
1158
# server.send("hello world")
1159
# self.assertEquals(client.recv(len("hello world")), "hello world")
1161
# self.assertEquals(server.total_renegotiations(), 0)
1162
# self.assertTrue(server.renegotiate())
1164
# server.setblocking(False)
1165
# client.setblocking(False)
1166
# while server.renegotiate_pending():
1167
# client.do_handshake()
1168
# server.do_handshake()
1170
# self.assertEquals(server.total_renegotiations(), 1)
339
1175
class ErrorTests(TestCase):