1
# Test the support for SSL and sockets
5
from test import support
13
import urllib.parse, urllib.request
18
from http.server import HTTPServer, SimpleHTTPRequestHandler
20
# Optionally test SSL support, if we have it in the tested platform
29
SVN_PYTHON_ORG_ROOT_CERT = None
31
def handle_error(prefix):
32
exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
34
sys.stdout.write(prefix + exc_format)
37
class BasicTests(unittest.TestCase):
39
def testSSLconnect(self):
40
if not support.is_resource_enabled('network'):
42
s = ssl.wrap_socket(socket.socket(socket.AF_INET),
43
cert_reqs=ssl.CERT_NONE)
44
s.connect(("svn.python.org", 443))
47
raise support.TestFailed("Peer cert %s shouldn't be here!")
50
# this should fail because we have no verification certs
51
s = ssl.wrap_socket(socket.socket(socket.AF_INET),
52
cert_reqs=ssl.CERT_REQUIRED)
54
s.connect(("svn.python.org", 443))
60
def testCrucialConstants(self):
72
sys.stdout.write("\n RAND_status is %d (%s)\n"
73
% (v, (v and "sufficient randomness") or
74
"insufficient randomness"))
80
print("didn't raise TypeError")
81
ssl.RAND_add("this is a random string", 75.0)
83
def testParseCert(self):
84
# note that this uses an 'unofficial' function in _ssl.c,
85
# provided solely for this test, to exercise the certificate
87
p = ssl._ssl._test_decode_cert(CERTFILE, False)
89
sys.stdout.write("\n" + pprint.pformat(p) + "\n")
91
def testDERtoPEM(self):
93
pem = open(SVN_PYTHON_ORG_ROOT_CERT, 'r').read()
94
d1 = ssl.PEM_cert_to_DER_cert(pem)
95
p2 = ssl.DER_cert_to_PEM_cert(d1)
96
d2 = ssl.PEM_cert_to_DER_cert(p2)
98
raise support.TestFailed("PEM-to-DER or DER-to-PEM translation failed")
100
class NetworkedTests(unittest.TestCase):
102
def testConnect(self):
103
s = ssl.wrap_socket(socket.socket(socket.AF_INET),
104
cert_reqs=ssl.CERT_NONE)
105
s.connect(("svn.python.org", 443))
108
raise support.TestFailed("Peer cert %s shouldn't be here!")
111
# this should fail because we have no verification certs
112
s = ssl.wrap_socket(socket.socket(socket.AF_INET),
113
cert_reqs=ssl.CERT_REQUIRED)
115
s.connect(("svn.python.org", 443))
121
# this should succeed because we specify the root cert
122
s = ssl.wrap_socket(socket.socket(socket.AF_INET),
123
cert_reqs=ssl.CERT_REQUIRED,
124
ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
126
s.connect(("svn.python.org", 443))
127
except ssl.SSLError as x:
128
raise support.TestFailed("Unexpected exception %s" % x)
132
def testNonBlockingHandshake(self):
133
s = socket.socket(socket.AF_INET)
134
s.connect(("svn.python.org", 443))
136
s = ssl.wrap_socket(s,
137
cert_reqs=ssl.CERT_NONE,
138
do_handshake_on_connect=False)
145
except ssl.SSLError as err:
146
if err.args[0] == ssl.SSL_ERROR_WANT_READ:
147
select.select([s], [], [])
148
elif err.args[0] == ssl.SSL_ERROR_WANT_WRITE:
149
select.select([], [s], [])
154
sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
156
def testFetchServerCert(self):
158
pem = ssl.get_server_certificate(("svn.python.org", 443))
160
raise support.TestFailed("No server certificate on svn.python.org:443!")
165
pem = ssl.get_server_certificate(("svn.python.org", 443), ca_certs=CERTFILE)
166
except ssl.SSLError as x:
169
sys.stdout.write("%s\n" % x)
171
raise support.TestFailed("Got server certificate %s for svn.python.org!" % pem)
173
pem = ssl.get_server_certificate(("svn.python.org", 443), ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
175
raise support.TestFailed("No server certificate on svn.python.org:443!")
177
sys.stdout.write("\nVerified certificate for svn.python.org:443 is\n%s\n" % pem)
183
_have_threads = False
188
class ThreadedEchoServer(threading.Thread):
190
class ConnectionHandler(threading.Thread):
192
"""A mildly complicated class, because we want it to work both
193
with and without the SSL wrapper around the socket connection, so
194
that we can test the STARTTLS functionality."""
196
def __init__(self, server, connsock, addr):
201
self.sock.setblocking(1)
203
threading.Thread.__init__(self)
206
def wrap_conn (self):
208
self.sslconn = ssl.wrap_socket(self.sock, server_side=True,
209
certfile=self.server.certificate,
210
ssl_version=self.server.protocol,
211
ca_certs=self.server.cacerts,
212
cert_reqs=self.server.certreqs)
214
if self.server.chatty:
215
handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
216
if not self.server.expect_bad_connects:
217
# here, we want to stop the server, because this shouldn't
218
# happen in the context of our test case
220
# normally, we'd just stop here, but for the test
221
# harness, we want to stop the server
227
if self.server.certreqs == ssl.CERT_REQUIRED:
228
cert = self.sslconn.getpeercert()
229
if support.verbose and self.server.chatty:
230
sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n")
231
cert_binary = self.sslconn.getpeercert(True)
232
if support.verbose and self.server.chatty:
233
sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n")
234
cipher = self.sslconn.cipher()
235
if support.verbose and self.server.chatty:
236
sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
241
return self.sslconn.read()
243
return self.sock.recv(1024)
245
def write(self, bytes):
247
return self.sslconn.write(bytes)
249
return self.sock.send(bytes)
259
if not self.server.starttls_server:
260
if not self.wrap_conn():
265
amsg = (msg and str(msg, 'ASCII', 'strict')) or ''
267
# eof, so quit this handler
270
elif amsg.strip() == 'over':
271
if support.verbose and self.server.connectionchatty:
272
sys.stdout.write(" server: client closed connection\n")
275
elif (self.server.starttls_server and
276
amsg.strip() == 'STARTTLS'):
277
if support.verbose and self.server.connectionchatty:
278
sys.stdout.write(" server: read STARTTLS from client, sending OK...\n")
279
self.write("OK\n".encode("ASCII", "strict"))
280
if not self.wrap_conn():
282
elif (self.server.starttls_server and self.sslconn
283
and amsg.strip() == 'ENDTLS'):
284
if support.verbose and self.server.connectionchatty:
285
sys.stdout.write(" server: read ENDTLS from client, sending OK...\n")
286
self.write("OK\n".encode("ASCII", "strict"))
287
self.sock = self.sslconn.unwrap()
289
if support.verbose and self.server.connectionchatty:
290
sys.stdout.write(" server: connection is now unencrypted...\n")
292
if (support.verbose and
293
self.server.connectionchatty):
294
ctype = (self.sslconn and "encrypted") or "unencrypted"
295
sys.stdout.write(" server: read %s (%s), sending back %s (%s)...\n"
296
% (repr(msg), ctype, repr(msg.lower()), ctype))
297
self.write(amsg.lower().encode('ASCII', 'strict'))
299
if self.server.chatty:
300
handle_error("Test server failure:\n")
303
# normally, we'd just stop here, but for the test
304
# harness, we want to stop the server
309
def __init__(self, certificate, ssl_version=None,
310
certreqs=None, cacerts=None, expect_bad_connects=False,
311
chatty=True, connectionchatty=False, starttls_server=False):
312
if ssl_version is None:
313
ssl_version = ssl.PROTOCOL_TLSv1
315
certreqs = ssl.CERT_NONE
316
self.certificate = certificate
317
self.protocol = ssl_version
318
self.certreqs = certreqs
319
self.cacerts = cacerts
320
self.expect_bad_connects = expect_bad_connects
322
self.connectionchatty = connectionchatty
323
self.starttls_server = starttls_server
324
self.sock = socket.socket()
325
self.port = support.bind_port(self.sock)
328
threading.Thread.__init__(self)
331
def start (self, flag=None):
333
threading.Thread.start(self)
336
self.sock.settimeout(0.5)
344
newconn, connaddr = self.sock.accept()
345
if support.verbose and self.chatty:
346
sys.stdout.write(' server: new connection from '
347
+ repr(connaddr) + '\n')
348
handler = self.ConnectionHandler(self, newconn, connaddr)
350
except socket.timeout:
352
except KeyboardInterrupt:
356
handle_error("Test server failure:\n")
362
class OurHTTPSServer(threading.Thread):
364
# This one's based on HTTPServer, which is based on SocketServer
366
class HTTPSServer(HTTPServer):
368
def __init__(self, server_address, RequestHandlerClass, certfile):
370
HTTPServer.__init__(self, server_address, RequestHandlerClass)
371
# we assume the certfile contains both private key and certificate
372
self.certfile = certfile
374
self.active_lock = threading.Lock()
375
self.allow_reuse_address = True
378
return ('<%s %s:%s>' %
379
(self.__class__.__name__,
383
def get_request (self):
384
# override this to wrap socket with SSL
385
sock, addr = self.socket.accept()
386
sslconn = ssl.wrap_socket(sock, server_side=True,
387
certfile=self.certfile)
390
# The methods overridden below this are mainly so that we
391
# can run it in a thread and be able to stop it from another
392
# You probably wouldn't need them in other uses.
394
def server_activate(self):
395
# We want to run this in a thread for testing purposes,
396
# so we override this to set timeout, so that we get
397
# a chance to stop the server
398
self.socket.settimeout(0.5)
399
HTTPServer.server_activate(self)
401
def serve_forever(self):
402
# We want this to run in a thread, so we use a slightly
403
# modified version of "forever".
407
# We need to lock while handling the request.
408
# Another thread can close the socket after self.active
409
# has been checked and before the request is handled.
410
# This causes an exception when using the closed socket.
411
with self.active_lock:
414
self.handle_request()
415
except socket.timeout:
417
except KeyboardInterrupt:
421
sys.stdout.write(''.join(traceback.format_exception(*sys.exc_info())))
425
def server_close(self):
426
# Again, we want this to run in a thread, so we need to override
427
# close to clear the "active" flag, so that serve_forever() will
429
with self.active_lock:
430
HTTPServer.server_close(self)
433
class RootedHTTPRequestHandler(SimpleHTTPRequestHandler):
435
# need to override translate_path to get a known root,
436
# instead of using os.curdir, since the test could be
439
server_version = "TestHTTPS/1.0"
443
def translate_path(self, path):
444
"""Translate a /-separated PATH to the local filename syntax.
446
Components that mean special things to the local file system
447
(e.g. drive or directory names) are ignored. (XXX They should
448
probably be diagnosed.)
451
# abandon query parameters
452
path = urllib.parse.urlparse(path)[2]
453
path = os.path.normpath(urllib.parse.unquote(path))
454
words = path.split('/')
455
words = filter(None, words)
458
drive, word = os.path.splitdrive(word)
459
head, word = os.path.split(word)
460
if word in self.root: continue
461
path = os.path.join(path, word)
464
def log_message(self, format, *args):
466
# we override this to suppress logging unless "verbose"
469
sys.stdout.write(" server (%s:%d %s):\n [%s] %s\n" %
470
(self.server.server_address,
471
self.server.server_port,
472
self.request.cipher(),
473
self.log_date_time_string(),
477
def __init__(self, certfile):
480
self.RootedHTTPRequestHandler.root = os.path.split(CERTFILE)[0]
481
self.port = support.find_unused_port()
482
self.server = self.HTTPSServer(
483
(HOST, self.port), self.RootedHTTPRequestHandler, certfile)
484
threading.Thread.__init__(self)
488
return "<%s %s>" % (self.__class__.__name__, self.server)
490
def start (self, flag=None):
492
threading.Thread.start(self)
498
self.server.serve_forever()
503
self.server.server_close()
506
class AsyncoreEchoServer(threading.Thread):
508
# this one's based on asyncore.dispatcher
510
class EchoServer (asyncore.dispatcher):
512
class ConnectionHandler (asyncore.dispatcher_with_send):
514
def __init__(self, conn, certfile):
515
self.socket = ssl.wrap_socket(conn, server_side=True,
517
do_handshake_on_connect=False)
518
asyncore.dispatcher_with_send.__init__(self, self.socket)
519
# now we have to do the handshake
520
# we'll just do it the easy way, and block the connection
521
# till it's finished. If we were doing it right, we'd
522
# do this in multiple calls to handle_read...
523
self.do_handshake(block=True)
526
if isinstance(self.socket, ssl.SSLSocket):
527
while self.socket.pending() > 0:
528
self.handle_read_event()
531
def handle_read(self):
532
data = self.recv(1024)
534
sys.stdout.write(" server: read %s from client\n" % repr(data))
538
self.send(str(data, 'ASCII', 'strict').lower().encode('ASCII', 'strict'))
540
def handle_close(self):
543
sys.stdout.write(" server: closed connection %s\n" % self.socket)
545
def handle_error(self):
548
def __init__(self, port, certfile):
550
self.certfile = certfile
551
asyncore.dispatcher.__init__(self)
552
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
553
self.bind(('', port))
556
def handle_accept(self):
557
sock_obj, addr = self.accept()
559
sys.stdout.write(" server: new connection from %s:%s\n" %addr)
560
self.ConnectionHandler(sock_obj, self.certfile)
562
def handle_error(self):
565
def __init__(self, certfile):
568
self.port = support.find_unused_port()
569
self.server = self.EchoServer(self.port, certfile)
570
threading.Thread.__init__(self)
574
return "<%s %s>" % (self.__class__.__name__, self.server)
576
def start (self, flag=None):
578
threading.Thread.start(self)
594
def badCertTest (certfile):
595
server = ThreadedEchoServer(CERTFILE,
596
certreqs=ssl.CERT_REQUIRED,
597
cacerts=CERTFILE, chatty=False,
598
connectionchatty=False)
599
flag = threading.Event()
601
# wait for it to start
606
s = ssl.wrap_socket(socket.socket(),
608
ssl_version=ssl.PROTOCOL_TLSv1)
609
s.connect((HOST, server.port))
610
except ssl.SSLError as x:
612
sys.stdout.write("\nSSLError is %s\n" % x)
613
except socket.error as x:
615
sys.stdout.write("\nsocket.error is %s\n" % x)
617
raise support.TestFailed(
618
"Use of invalid cert should have failed!")
623
def serverParamsTest (certfile, protocol, certreqs, cacertsfile,
624
client_certfile, client_protocol=None,
626
chatty=False, connectionchatty=False):
628
server = ThreadedEchoServer(certfile,
630
ssl_version=protocol,
633
connectionchatty=False)
634
flag = threading.Event()
636
# wait for it to start
639
if client_protocol is None:
640
client_protocol = protocol
642
s = ssl.wrap_socket(socket.socket(),
644
certfile=client_certfile,
645
ca_certs=cacertsfile,
647
ssl_version=client_protocol)
648
s.connect((HOST, server.port))
649
except ssl.SSLError as x:
650
raise support.TestFailed("Unexpected SSL error: " + str(x))
651
except Exception as x:
652
raise support.TestFailed("Unexpected exception: " + str(x))
657
" client: sending %s...\n" % (repr(indata)))
658
s.write(indata.encode('ASCII', 'strict'))
662
sys.stdout.write(" client: read %s\n" % repr(outdata))
663
outdata = str(outdata, 'ASCII', 'strict')
664
if outdata != indata.lower():
665
raise support.TestFailed(
666
"bad data <<%s>> (%d) received; expected <<%s>> (%d)\n"
667
% (repr(outdata[:min(len(outdata),20)]), len(outdata),
668
repr(indata[:min(len(indata),20)].lower()), len(indata)))
669
s.write("over\n".encode("ASCII", "strict"))
672
sys.stdout.write(" client: closing connection.\n")
678
def tryProtocolCombo (server_protocol,
683
if certsreqs is None:
684
certsreqs = ssl.CERT_NONE
686
if certsreqs == ssl.CERT_NONE:
687
certtype = "CERT_NONE"
688
elif certsreqs == ssl.CERT_OPTIONAL:
689
certtype = "CERT_OPTIONAL"
690
elif certsreqs == ssl.CERT_REQUIRED:
691
certtype = "CERT_REQUIRED"
693
formatstr = (expectedToWork and " %s->%s %s\n") or " {%s->%s} %s\n"
694
sys.stdout.write(formatstr %
695
(ssl.get_protocol_name(client_protocol),
696
ssl.get_protocol_name(server_protocol),
699
serverParamsTest(CERTFILE, server_protocol, certsreqs,
700
CERTFILE, CERTFILE, client_protocol,
701
chatty=False, connectionchatty=False)
702
except support.TestFailed:
706
if not expectedToWork:
707
raise support.TestFailed(
708
"Client protocol %s succeeded with server protocol %s!"
709
% (ssl.get_protocol_name(client_protocol),
710
ssl.get_protocol_name(server_protocol)))
713
class ThreadedTests(unittest.TestCase):
718
sys.stdout.write("\n")
719
serverParamsTest(CERTFILE, ssl.PROTOCOL_TLSv1, ssl.CERT_NONE,
720
CERTFILE, CERTFILE, ssl.PROTOCOL_TLSv1,
721
chatty=True, connectionchatty=True)
723
def testReadCert(self):
726
sys.stdout.write("\n")
728
server = ThreadedEchoServer(CERTFILE,
729
certreqs=ssl.CERT_NONE,
730
ssl_version=ssl.PROTOCOL_SSLv23,
733
flag = threading.Event()
735
# wait for it to start
740
s = ssl.wrap_socket(socket.socket(),
743
cert_reqs=ssl.CERT_REQUIRED,
744
ssl_version=ssl.PROTOCOL_SSLv23)
745
s.connect((HOST, server.port))
746
except ssl.SSLError as x:
747
raise support.TestFailed(
748
"Unexpected SSL error: " + str(x))
749
except Exception as x:
750
raise support.TestFailed(
751
"Unexpected exception: " + str(x))
754
raise support.TestFailed(
755
"Can't SSL-handshake with test server")
756
cert = s.getpeercert()
758
raise support.TestFailed(
759
"Can't get peer certificate.")
762
sys.stdout.write(pprint.pformat(cert) + '\n')
763
sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
764
if 'subject' not in cert:
765
raise support.TestFailed(
766
"No subject field in certificate: %s." %
767
pprint.pformat(cert))
768
if ((('organizationName', 'Python Software Foundation'),)
769
not in cert['subject']):
770
raise support.TestFailed(
771
"Missing or invalid 'organizationName' field in certificate subject; "
772
"should be 'Python Software Foundation'.")
778
def testNULLcert(self):
779
badCertTest(os.path.join(os.path.dirname(__file__) or os.curdir,
781
def testMalformedCert(self):
782
badCertTest(os.path.join(os.path.dirname(__file__) or os.curdir,
784
def testWrongCert(self):
785
badCertTest(os.path.join(os.path.dirname(__file__) or os.curdir,
787
def testMalformedKey(self):
788
badCertTest(os.path.join(os.path.dirname(__file__) or os.curdir,
791
def testRudeShutdown(self):
793
listener_ready = threading.Event()
794
listener_gone = threading.Event()
795
port = support.find_unused_port()
797
# `listener` runs in a thread. It opens a socket listening on
798
# PORT, and sits in an accept() until the main thread connects.
799
# Then it rudely closes the socket, and sets Event `listener_gone`
800
# to let the main thread know the socket is gone.
807
s = None # reclaim the socket object, which also closes it
811
listener_ready.wait()
813
s.connect((HOST, port))
816
ssl_sock = ssl.wrap_socket(s)
820
raise support.TestFailed(
821
'connecting to closed SSL socket should have failed')
823
t = threading.Thread(target=listener)
828
def testProtocolSSL2(self):
830
sys.stdout.write("\n")
831
tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
832
tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL)
833
tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED)
834
tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, True)
835
tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False)
836
tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
838
def testProtocolSSL23(self):
840
sys.stdout.write("\n")
842
tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv2, True)
843
except support.TestFailed as x:
844
# this fails on some older versions of OpenSSL (0.9.7l, for instance)
847
" SSL2 client to SSL23 server test unexpectedly failed:\n %s\n"
849
tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True)
850
tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True)
851
tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True)
853
tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True, ssl.CERT_OPTIONAL)
854
tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_OPTIONAL)
855
tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True, ssl.CERT_OPTIONAL)
857
tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True, ssl.CERT_REQUIRED)
858
tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_REQUIRED)
859
tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True, ssl.CERT_REQUIRED)
861
def testProtocolSSL3(self):
863
sys.stdout.write("\n")
864
tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True)
865
tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True, ssl.CERT_OPTIONAL)
866
tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True, ssl.CERT_REQUIRED)
867
tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False)
868
tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv23, False)
869
tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False)
871
def testProtocolTLS1(self):
873
sys.stdout.write("\n")
874
tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True)
875
tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True, ssl.CERT_OPTIONAL)
876
tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True, ssl.CERT_REQUIRED)
877
tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False)
878
tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)
879
tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv23, False)
881
def testSTARTTLS (self):
883
msgs = ("msg 1", "MSG 2", "STARTTLS", "MSG 3", "msg 4", "ENDTLS", "msg 5", "msg 6")
885
server = ThreadedEchoServer(CERTFILE,
886
ssl_version=ssl.PROTOCOL_TLSv1,
887
starttls_server=True,
889
connectionchatty=True)
890
flag = threading.Event()
892
# wait for it to start
900
s.connect((HOST, server.port))
901
except Exception as x:
902
raise support.TestFailed("Unexpected exception: " + str(x))
905
sys.stdout.write("\n")
907
msg = indata.encode('ASCII', 'replace')
910
" client: sending %s...\n" % repr(msg))
913
outdata = conn.read()
916
outdata = s.recv(1024)
917
if (indata == "STARTTLS" and
918
str(outdata, 'ASCII', 'replace').strip().lower().startswith("ok")):
920
msg = str(outdata, 'ASCII', 'replace')
922
" client: read %s from server, starting TLS...\n"
924
conn = ssl.wrap_socket(s, ssl_version=ssl.PROTOCOL_TLSv1)
926
elif (indata == "ENDTLS" and
927
str(outdata, 'ASCII', 'replace').strip().lower().startswith("ok")):
929
msg = str(outdata, 'ASCII', 'replace')
931
" client: read %s from server, ending TLS...\n"
937
msg = str(outdata, 'ASCII', 'replace')
939
" client: read %s from server\n" % repr(msg))
941
sys.stdout.write(" client: closing connection.\n")
943
conn.write("over\n".encode("ASCII", "strict"))
945
s.send("over\n".encode("ASCII", "strict"))
954
def testSocketServer(self):
956
server = OurHTTPSServer(CERTFILE)
957
flag = threading.Event()
959
# wait for it to start
964
sys.stdout.write('\n')
965
d1 = open(CERTFILE, 'rb').read()
967
# now fetch the same data from the HTTPS server
968
url = 'https://%s:%d/%s' % (
969
HOST, server.port, os.path.split(CERTFILE)[1])
970
f = urllib.request.urlopen(url)
971
dlen = f.info().get("content-length")
972
if dlen and (int(dlen) > 0):
973
d2 = f.read(int(dlen))
976
" client: read %d bytes from remote server '%s'\n"
980
msg = ''.join(traceback.format_exception(*sys.exc_info()))
982
sys.stdout.write('\n' + msg)
983
raise support.TestFailed(msg)
986
print("d1 is", len(d1), repr(d1))
987
print("d2 is", len(d2), repr(d2))
988
raise support.TestFailed(
989
"Couldn't fetch data from HTTPS server")
992
sys.stdout.write('stopping server\n')
995
sys.stdout.write('joining thread\n')
998
def testAsyncoreServer(self):
1001
sys.stdout.write("\n")
1004
server = AsyncoreEchoServer(CERTFILE)
1005
flag = threading.Event()
1007
# wait for it to start
1011
s = ssl.wrap_socket(socket.socket())
1012
s.connect((HOST, server.port))
1013
except ssl.SSLError as x:
1014
raise support.TestFailed("Unexpected SSL error: " + str(x))
1015
except Exception as x:
1016
raise support.TestFailed("Unexpected exception: " + str(x))
1020
" client: sending %s...\n" % (repr(indata)))
1021
s.sendall(indata.encode('ASCII', 'strict'))
1024
sys.stdout.write(" client: read %s\n" % repr(outdata))
1025
outdata = str(outdata, 'ASCII', 'strict')
1026
if outdata != indata.lower():
1027
raise support.TestFailed(
1028
"bad data <<%s>> (%d) received; expected <<%s>> (%d)\n"
1029
% (repr(outdata[:min(len(outdata),20)]), len(outdata),
1030
repr(indata[:min(len(indata),20)].lower()), len(indata)))
1031
s.write("over\n".encode("ASCII", "strict"))
1033
sys.stdout.write(" client: closing connection.\n")
1039
def testAllRecvAndSendMethods(self):
1042
sys.stdout.write("\n")
1044
server = ThreadedEchoServer(CERTFILE,
1045
certreqs=ssl.CERT_NONE,
1046
ssl_version=ssl.PROTOCOL_TLSv1,
1049
connectionchatty=False)
1050
flag = threading.Event()
1052
# wait for it to start
1056
s = ssl.wrap_socket(socket.socket(),
1060
cert_reqs=ssl.CERT_NONE,
1061
ssl_version=ssl.PROTOCOL_TLSv1)
1062
s.connect((HOST, server.port))
1063
except ssl.SSLError as x:
1064
raise support.TestFailed("Unexpected SSL error: " + str(x))
1065
except Exception as x:
1066
raise support.TestFailed("Unexpected exception: " + str(x))
1068
# helper methods for standardising recv* method signatures
1070
b = bytearray(b"\0"*100)
1071
count = s.recv_into(b)
1074
def _recvfrom_into():
1075
b = bytearray(b"\0"*100)
1076
count, addr = s.recvfrom_into(b)
1079
# (name, method, whether to expect success, *args)
1081
('send', s.send, True, []),
1082
('sendto', s.sendto, False, ["some.address"]),
1083
('sendall', s.sendall, True, []),
1086
('recv', s.recv, True, []),
1087
('recvfrom', s.recvfrom, False, ["some.address"]),
1088
('recv_into', _recv_into, True, []),
1089
('recvfrom_into', _recvfrom_into, False, []),
1091
data_prefix = "PREFIX_"
1093
for meth_name, send_meth, expect_success, args in send_methods:
1094
indata = data_prefix + meth_name
1096
send_meth(indata.encode('ASCII', 'strict'), *args)
1098
outdata = str(outdata, 'ASCII', 'strict')
1099
if outdata != indata.lower():
1100
raise support.TestFailed(
1101
"While sending with <<{name:s}>> bad data "
1102
"<<{outdata:s}>> ({nout:d}) received; "
1103
"expected <<{indata:s}>> ({nin:d})\n".format(
1104
name=meth_name, outdata=repr(outdata[:20]),
1106
indata=repr(indata[:20]), nin=len(indata)
1109
except ValueError as e:
1111
raise support.TestFailed(
1112
"Failed to send with method <<{name:s}>>; "
1113
"expected to succeed.\n".format(name=meth_name)
1115
if not str(e).startswith(meth_name):
1116
raise support.TestFailed(
1117
"Method <<{name:s}>> failed with unexpected "
1118
"exception message: {exp:s}\n".format(
1119
name=meth_name, exp=e
1123
for meth_name, recv_meth, expect_success, args in recv_methods:
1124
indata = data_prefix + meth_name
1126
s.send(indata.encode('ASCII', 'strict'))
1127
outdata = recv_meth(*args)
1128
outdata = str(outdata, 'ASCII', 'strict')
1129
if outdata != indata.lower():
1130
raise support.TestFailed(
1131
"While receiving with <<{name:s}>> bad data "
1132
"<<{outdata:s}>> ({nout:d}) received; "
1133
"expected <<{indata:s}>> ({nin:d})\n".format(
1134
name=meth_name, outdata=repr(outdata[:20]),
1136
indata=repr(indata[:20]), nin=len(indata)
1139
except ValueError as e:
1141
raise support.TestFailed(
1142
"Failed to receive with method <<{name:s}>>; "
1143
"expected to succeed.\n".format(name=meth_name)
1145
if not str(e).startswith(meth_name):
1146
raise support.TestFailed(
1147
"Method <<{name:s}>> failed with unexpected "
1148
"exception message: {exp:s}\n".format(
1149
name=meth_name, exp=e
1155
s.write("over\n".encode("ASCII", "strict"))
1162
def test_main(verbose=False):
1164
raise support.TestSkipped("No SSL support")
1166
global CERTFILE, SVN_PYTHON_ORG_ROOT_CERT
1167
CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir,
1169
SVN_PYTHON_ORG_ROOT_CERT = os.path.join(
1170
os.path.dirname(__file__) or os.curdir,
1171
"https_svn_python_org_root.pem")
1173
if (not os.path.exists(CERTFILE) or
1174
not os.path.exists(SVN_PYTHON_ORG_ROOT_CERT)):
1175
raise support.TestFailed("Can't read certificate files!")
1177
tests = [BasicTests]
1179
if support.is_resource_enabled('network'):
1180
tests.append(NetworkedTests)
1183
thread_info = support.threading_setup()
1184
if thread_info and support.is_resource_enabled('network'):
1185
tests.append(ThreadedTests)
1187
support.run_unittest(*tests)
1190
support.threading_cleanup(*thread_info)
1192
if __name__ == "__main__":