~ubuntu-branches/ubuntu/maverick/python3.1/maverick

« back to all changes in this revision

Viewing changes to Lib/test/test_ssl.py

  • Committer: Bazaar Package Importer
  • Author(s): Matthias Klose
  • Date: 2009-03-23 00:01:27 UTC
  • Revision ID: james.westby@ubuntu.com-20090323000127-5fstfxju4ufrhthq
Tags: upstream-3.1~a1+20090322
ImportĀ upstreamĀ versionĀ 3.1~a1+20090322

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Test the support for SSL and sockets
 
2
 
 
3
import sys
 
4
import unittest
 
5
from test import support
 
6
import socket
 
7
import select
 
8
import errno
 
9
import subprocess
 
10
import time
 
11
import os
 
12
import pprint
 
13
import urllib.parse, urllib.request
 
14
import shutil
 
15
import traceback
 
16
import asyncore
 
17
 
 
18
from http.server import HTTPServer, SimpleHTTPRequestHandler
 
19
 
 
20
# Optionally test SSL support, if we have it in the tested platform
 
21
skip_expected = False
 
22
try:
 
23
    import ssl
 
24
except ImportError:
 
25
    skip_expected = True
 
26
 
 
27
HOST = support.HOST
 
28
CERTFILE = None
 
29
SVN_PYTHON_ORG_ROOT_CERT = None
 
30
 
 
31
def handle_error(prefix):
 
32
    exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
 
33
    if support.verbose:
 
34
        sys.stdout.write(prefix + exc_format)
 
35
 
 
36
 
 
37
class BasicTests(unittest.TestCase):
 
38
 
 
39
    def testSSLconnect(self):
 
40
        if not support.is_resource_enabled('network'):
 
41
            return
 
42
        s = ssl.wrap_socket(socket.socket(socket.AF_INET),
 
43
                            cert_reqs=ssl.CERT_NONE)
 
44
        s.connect(("svn.python.org", 443))
 
45
        c = s.getpeercert()
 
46
        if c:
 
47
            raise support.TestFailed("Peer cert %s shouldn't be here!")
 
48
        s.close()
 
49
 
 
50
        # this should fail because we have no verification certs
 
51
        s = ssl.wrap_socket(socket.socket(socket.AF_INET),
 
52
                            cert_reqs=ssl.CERT_REQUIRED)
 
53
        try:
 
54
            s.connect(("svn.python.org", 443))
 
55
        except ssl.SSLError:
 
56
            pass
 
57
        finally:
 
58
            s.close()
 
59
 
 
60
    def testCrucialConstants(self):
 
61
        ssl.PROTOCOL_SSLv2
 
62
        ssl.PROTOCOL_SSLv23
 
63
        ssl.PROTOCOL_SSLv3
 
64
        ssl.PROTOCOL_TLSv1
 
65
        ssl.CERT_NONE
 
66
        ssl.CERT_OPTIONAL
 
67
        ssl.CERT_REQUIRED
 
68
 
 
69
    def testRAND(self):
 
70
        v = ssl.RAND_status()
 
71
        if support.verbose:
 
72
            sys.stdout.write("\n RAND_status is %d (%s)\n"
 
73
                             % (v, (v and "sufficient randomness") or
 
74
                                "insufficient randomness"))
 
75
        try:
 
76
            ssl.RAND_egd(1)
 
77
        except TypeError:
 
78
            pass
 
79
        else:
 
80
            print("didn't raise TypeError")
 
81
        ssl.RAND_add("this is a random string", 75.0)
 
82
 
 
83
    def testParseCert(self):
 
84
        # note that this uses an 'unofficial' function in _ssl.c,
 
85
        # provided solely for this test, to exercise the certificate
 
86
        # parsing code
 
87
        p = ssl._ssl._test_decode_cert(CERTFILE, False)
 
88
        if support.verbose:
 
89
            sys.stdout.write("\n" + pprint.pformat(p) + "\n")
 
90
 
 
91
    def testDERtoPEM(self):
 
92
 
 
93
        pem = open(SVN_PYTHON_ORG_ROOT_CERT, 'r').read()
 
94
        d1 = ssl.PEM_cert_to_DER_cert(pem)
 
95
        p2 = ssl.DER_cert_to_PEM_cert(d1)
 
96
        d2 = ssl.PEM_cert_to_DER_cert(p2)
 
97
        if (d1 != d2):
 
98
            raise support.TestFailed("PEM-to-DER or DER-to-PEM translation failed")
 
99
 
 
100
class NetworkedTests(unittest.TestCase):
 
101
 
 
102
    def testConnect(self):
 
103
        s = ssl.wrap_socket(socket.socket(socket.AF_INET),
 
104
                            cert_reqs=ssl.CERT_NONE)
 
105
        s.connect(("svn.python.org", 443))
 
106
        c = s.getpeercert()
 
107
        if c:
 
108
            raise support.TestFailed("Peer cert %s shouldn't be here!")
 
109
        s.close()
 
110
 
 
111
        # this should fail because we have no verification certs
 
112
        s = ssl.wrap_socket(socket.socket(socket.AF_INET),
 
113
                            cert_reqs=ssl.CERT_REQUIRED)
 
114
        try:
 
115
            s.connect(("svn.python.org", 443))
 
116
        except ssl.SSLError:
 
117
            pass
 
118
        finally:
 
119
            s.close()
 
120
 
 
121
        # this should succeed because we specify the root cert
 
122
        s = ssl.wrap_socket(socket.socket(socket.AF_INET),
 
123
                            cert_reqs=ssl.CERT_REQUIRED,
 
124
                            ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
 
125
        try:
 
126
            s.connect(("svn.python.org", 443))
 
127
        except ssl.SSLError as x:
 
128
            raise support.TestFailed("Unexpected exception %s" % x)
 
129
        finally:
 
130
            s.close()
 
131
 
 
132
    def testNonBlockingHandshake(self):
 
133
        s = socket.socket(socket.AF_INET)
 
134
        s.connect(("svn.python.org", 443))
 
135
        s.setblocking(False)
 
136
        s = ssl.wrap_socket(s,
 
137
                            cert_reqs=ssl.CERT_NONE,
 
138
                            do_handshake_on_connect=False)
 
139
        count = 0
 
140
        while True:
 
141
            try:
 
142
                count += 1
 
143
                s.do_handshake()
 
144
                break
 
145
            except ssl.SSLError as err:
 
146
                if err.args[0] == ssl.SSL_ERROR_WANT_READ:
 
147
                    select.select([s], [], [])
 
148
                elif err.args[0] == ssl.SSL_ERROR_WANT_WRITE:
 
149
                    select.select([], [s], [])
 
150
                else:
 
151
                    raise
 
152
        s.close()
 
153
        if support.verbose:
 
154
            sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
 
155
 
 
156
    def testFetchServerCert(self):
 
157
 
 
158
        pem = ssl.get_server_certificate(("svn.python.org", 443))
 
159
        if not pem:
 
160
            raise support.TestFailed("No server certificate on svn.python.org:443!")
 
161
 
 
162
        return
 
163
 
 
164
        try:
 
165
            pem = ssl.get_server_certificate(("svn.python.org", 443), ca_certs=CERTFILE)
 
166
        except ssl.SSLError as x:
 
167
            #should fail
 
168
            if support.verbose:
 
169
                sys.stdout.write("%s\n" % x)
 
170
        else:
 
171
            raise support.TestFailed("Got server certificate %s for svn.python.org!" % pem)
 
172
 
 
173
        pem = ssl.get_server_certificate(("svn.python.org", 443), ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
 
174
        if not pem:
 
175
            raise support.TestFailed("No server certificate on svn.python.org:443!")
 
176
        if support.verbose:
 
177
            sys.stdout.write("\nVerified certificate for svn.python.org:443 is\n%s\n" % pem)
 
178
 
 
179
 
 
180
try:
 
181
    import threading
 
182
except ImportError:
 
183
    _have_threads = False
 
184
else:
 
185
 
 
186
    _have_threads = True
 
187
 
 
188
    class ThreadedEchoServer(threading.Thread):
 
189
 
 
190
        class ConnectionHandler(threading.Thread):
 
191
 
 
192
            """A mildly complicated class, because we want it to work both
 
193
            with and without the SSL wrapper around the socket connection, so
 
194
            that we can test the STARTTLS functionality."""
 
195
 
 
196
            def __init__(self, server, connsock, addr):
 
197
                self.server = server
 
198
                self.running = False
 
199
                self.sock = connsock
 
200
                self.addr = addr
 
201
                self.sock.setblocking(1)
 
202
                self.sslconn = None
 
203
                threading.Thread.__init__(self)
 
204
                self.daemon = True
 
205
 
 
206
            def wrap_conn (self):
 
207
                try:
 
208
                    self.sslconn = ssl.wrap_socket(self.sock, server_side=True,
 
209
                                                   certfile=self.server.certificate,
 
210
                                                   ssl_version=self.server.protocol,
 
211
                                                   ca_certs=self.server.cacerts,
 
212
                                                   cert_reqs=self.server.certreqs)
 
213
                except:
 
214
                    if self.server.chatty:
 
215
                        handle_error("\n server:  bad connection attempt from " + repr(self.addr) + ":\n")
 
216
                    if not self.server.expect_bad_connects:
 
217
                        # here, we want to stop the server, because this shouldn't
 
218
                        # happen in the context of our test case
 
219
                        self.running = False
 
220
                        # normally, we'd just stop here, but for the test
 
221
                        # harness, we want to stop the server
 
222
                        self.server.stop()
 
223
                    self.close()
 
224
                    return False
 
225
 
 
226
                else:
 
227
                    if self.server.certreqs == ssl.CERT_REQUIRED:
 
228
                        cert = self.sslconn.getpeercert()
 
229
                        if support.verbose and self.server.chatty:
 
230
                            sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n")
 
231
                        cert_binary = self.sslconn.getpeercert(True)
 
232
                        if support.verbose and self.server.chatty:
 
233
                            sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n")
 
234
                    cipher = self.sslconn.cipher()
 
235
                    if support.verbose and self.server.chatty:
 
236
                        sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
 
237
                    return True
 
238
 
 
239
            def read(self):
 
240
                if self.sslconn:
 
241
                    return self.sslconn.read()
 
242
                else:
 
243
                    return self.sock.recv(1024)
 
244
 
 
245
            def write(self, bytes):
 
246
                if self.sslconn:
 
247
                    return self.sslconn.write(bytes)
 
248
                else:
 
249
                    return self.sock.send(bytes)
 
250
 
 
251
            def close(self):
 
252
                if self.sslconn:
 
253
                    self.sslconn.close()
 
254
                else:
 
255
                    self.sock.close()
 
256
 
 
257
            def run (self):
 
258
                self.running = True
 
259
                if not self.server.starttls_server:
 
260
                    if not self.wrap_conn():
 
261
                        return
 
262
                while self.running:
 
263
                    try:
 
264
                        msg = self.read()
 
265
                        amsg = (msg and str(msg, 'ASCII', 'strict')) or ''
 
266
                        if not msg:
 
267
                            # eof, so quit this handler
 
268
                            self.running = False
 
269
                            self.close()
 
270
                        elif amsg.strip() == 'over':
 
271
                            if support.verbose and self.server.connectionchatty:
 
272
                                sys.stdout.write(" server: client closed connection\n")
 
273
                            self.close()
 
274
                            return
 
275
                        elif (self.server.starttls_server and
 
276
                              amsg.strip() == 'STARTTLS'):
 
277
                            if support.verbose and self.server.connectionchatty:
 
278
                                sys.stdout.write(" server: read STARTTLS from client, sending OK...\n")
 
279
                            self.write("OK\n".encode("ASCII", "strict"))
 
280
                            if not self.wrap_conn():
 
281
                                return
 
282
                        elif (self.server.starttls_server and self.sslconn
 
283
                              and amsg.strip() == 'ENDTLS'):
 
284
                            if support.verbose and self.server.connectionchatty:
 
285
                                sys.stdout.write(" server: read ENDTLS from client, sending OK...\n")
 
286
                            self.write("OK\n".encode("ASCII", "strict"))
 
287
                            self.sock = self.sslconn.unwrap()
 
288
                            self.sslconn = None
 
289
                            if support.verbose and self.server.connectionchatty:
 
290
                                sys.stdout.write(" server: connection is now unencrypted...\n")
 
291
                        else:
 
292
                            if (support.verbose and
 
293
                                self.server.connectionchatty):
 
294
                                ctype = (self.sslconn and "encrypted") or "unencrypted"
 
295
                                sys.stdout.write(" server: read %s (%s), sending back %s (%s)...\n"
 
296
                                                 % (repr(msg), ctype, repr(msg.lower()), ctype))
 
297
                            self.write(amsg.lower().encode('ASCII', 'strict'))
 
298
                    except socket.error:
 
299
                        if self.server.chatty:
 
300
                            handle_error("Test server failure:\n")
 
301
                        self.close()
 
302
                        self.running = False
 
303
                        # normally, we'd just stop here, but for the test
 
304
                        # harness, we want to stop the server
 
305
                        self.server.stop()
 
306
                    except:
 
307
                        handle_error('')
 
308
 
 
309
        def __init__(self, certificate, ssl_version=None,
 
310
                     certreqs=None, cacerts=None, expect_bad_connects=False,
 
311
                     chatty=True, connectionchatty=False, starttls_server=False):
 
312
            if ssl_version is None:
 
313
                ssl_version = ssl.PROTOCOL_TLSv1
 
314
            if certreqs is None:
 
315
                certreqs = ssl.CERT_NONE
 
316
            self.certificate = certificate
 
317
            self.protocol = ssl_version
 
318
            self.certreqs = certreqs
 
319
            self.cacerts = cacerts
 
320
            self.expect_bad_connects = expect_bad_connects
 
321
            self.chatty = chatty
 
322
            self.connectionchatty = connectionchatty
 
323
            self.starttls_server = starttls_server
 
324
            self.sock = socket.socket()
 
325
            self.port = support.bind_port(self.sock)
 
326
            self.flag = None
 
327
            self.active = False
 
328
            threading.Thread.__init__(self)
 
329
            self.daemon = True
 
330
 
 
331
        def start (self, flag=None):
 
332
            self.flag = flag
 
333
            threading.Thread.start(self)
 
334
 
 
335
        def run (self):
 
336
            self.sock.settimeout(0.5)
 
337
            self.sock.listen(5)
 
338
            self.active = True
 
339
            if self.flag:
 
340
                # signal an event
 
341
                self.flag.set()
 
342
            while self.active:
 
343
                try:
 
344
                    newconn, connaddr = self.sock.accept()
 
345
                    if support.verbose and self.chatty:
 
346
                        sys.stdout.write(' server:  new connection from '
 
347
                                         + repr(connaddr) + '\n')
 
348
                    handler = self.ConnectionHandler(self, newconn, connaddr)
 
349
                    handler.start()
 
350
                except socket.timeout:
 
351
                    pass
 
352
                except KeyboardInterrupt:
 
353
                    self.stop()
 
354
                except:
 
355
                    if self.chatty:
 
356
                        handle_error("Test server failure:\n")
 
357
            self.sock.close()
 
358
 
 
359
        def stop (self):
 
360
            self.active = False
 
361
 
 
362
    class OurHTTPSServer(threading.Thread):
 
363
 
 
364
        # This one's based on HTTPServer, which is based on SocketServer
 
365
 
 
366
        class HTTPSServer(HTTPServer):
 
367
 
 
368
            def __init__(self, server_address, RequestHandlerClass, certfile):
 
369
 
 
370
                HTTPServer.__init__(self, server_address, RequestHandlerClass)
 
371
                # we assume the certfile contains both private key and certificate
 
372
                self.certfile = certfile
 
373
                self.active = False
 
374
                self.active_lock = threading.Lock()
 
375
                self.allow_reuse_address = True
 
376
 
 
377
            def __str__(self):
 
378
                return ('<%s %s:%s>' %
 
379
                        (self.__class__.__name__,
 
380
                         self.server_name,
 
381
                         self.server_port))
 
382
 
 
383
            def get_request (self):
 
384
                # override this to wrap socket with SSL
 
385
                sock, addr = self.socket.accept()
 
386
                sslconn = ssl.wrap_socket(sock, server_side=True,
 
387
                                          certfile=self.certfile)
 
388
                return sslconn, addr
 
389
 
 
390
            # The methods overridden below this are mainly so that we
 
391
            # can run it in a thread and be able to stop it from another
 
392
            # You probably wouldn't need them in other uses.
 
393
 
 
394
            def server_activate(self):
 
395
                # We want to run this in a thread for testing purposes,
 
396
                # so we override this to set timeout, so that we get
 
397
                # a chance to stop the server
 
398
                self.socket.settimeout(0.5)
 
399
                HTTPServer.server_activate(self)
 
400
 
 
401
            def serve_forever(self):
 
402
                # We want this to run in a thread, so we use a slightly
 
403
                # modified version of "forever".
 
404
                self.active = True
 
405
                while 1:
 
406
                    try:
 
407
                        # We need to lock while handling the request.
 
408
                        # Another thread can close the socket after self.active
 
409
                        # has been checked and before the request is handled.
 
410
                        # This causes an exception when using the closed socket.
 
411
                        with self.active_lock:
 
412
                            if not self.active:
 
413
                                break
 
414
                            self.handle_request()
 
415
                    except socket.timeout:
 
416
                        pass
 
417
                    except KeyboardInterrupt:
 
418
                        self.server_close()
 
419
                        return
 
420
                    except:
 
421
                        sys.stdout.write(''.join(traceback.format_exception(*sys.exc_info())))
 
422
                        break
 
423
                    time.sleep(0.1)
 
424
 
 
425
            def server_close(self):
 
426
                # Again, we want this to run in a thread, so we need to override
 
427
                # close to clear the "active" flag, so that serve_forever() will
 
428
                # terminate.
 
429
                with self.active_lock:
 
430
                    HTTPServer.server_close(self)
 
431
                    self.active = False
 
432
 
 
433
        class RootedHTTPRequestHandler(SimpleHTTPRequestHandler):
 
434
 
 
435
            # need to override translate_path to get a known root,
 
436
            # instead of using os.curdir, since the test could be
 
437
            # run from anywhere
 
438
 
 
439
            server_version = "TestHTTPS/1.0"
 
440
 
 
441
            root = None
 
442
 
 
443
            def translate_path(self, path):
 
444
                """Translate a /-separated PATH to the local filename syntax.
 
445
 
 
446
                Components that mean special things to the local file system
 
447
                (e.g. drive or directory names) are ignored.  (XXX They should
 
448
                probably be diagnosed.)
 
449
 
 
450
                """
 
451
                # abandon query parameters
 
452
                path = urllib.parse.urlparse(path)[2]
 
453
                path = os.path.normpath(urllib.parse.unquote(path))
 
454
                words = path.split('/')
 
455
                words = filter(None, words)
 
456
                path = self.root
 
457
                for word in words:
 
458
                    drive, word = os.path.splitdrive(word)
 
459
                    head, word = os.path.split(word)
 
460
                    if word in self.root: continue
 
461
                    path = os.path.join(path, word)
 
462
                return path
 
463
 
 
464
            def log_message(self, format, *args):
 
465
 
 
466
                # we override this to suppress logging unless "verbose"
 
467
 
 
468
                if support.verbose:
 
469
                    sys.stdout.write(" server (%s:%d %s):\n   [%s] %s\n" %
 
470
                                     (self.server.server_address,
 
471
                                      self.server.server_port,
 
472
                                      self.request.cipher(),
 
473
                                      self.log_date_time_string(),
 
474
                                      format%args))
 
475
 
 
476
 
 
477
        def __init__(self, certfile):
 
478
            self.flag = None
 
479
            self.active = False
 
480
            self.RootedHTTPRequestHandler.root = os.path.split(CERTFILE)[0]
 
481
            self.port = support.find_unused_port()
 
482
            self.server = self.HTTPSServer(
 
483
                (HOST, self.port), self.RootedHTTPRequestHandler, certfile)
 
484
            threading.Thread.__init__(self)
 
485
            self.daemon = True
 
486
 
 
487
        def __str__(self):
 
488
            return "<%s %s>" % (self.__class__.__name__, self.server)
 
489
 
 
490
        def start (self, flag=None):
 
491
            self.flag = flag
 
492
            threading.Thread.start(self)
 
493
 
 
494
        def run (self):
 
495
            self.active = True
 
496
            if self.flag:
 
497
                self.flag.set()
 
498
            self.server.serve_forever()
 
499
            self.active = False
 
500
 
 
501
        def stop (self):
 
502
            self.active = False
 
503
            self.server.server_close()
 
504
 
 
505
 
 
506
    class AsyncoreEchoServer(threading.Thread):
 
507
 
 
508
        # this one's based on asyncore.dispatcher
 
509
 
 
510
        class EchoServer (asyncore.dispatcher):
 
511
 
 
512
            class ConnectionHandler (asyncore.dispatcher_with_send):
 
513
 
 
514
                def __init__(self, conn, certfile):
 
515
                    self.socket = ssl.wrap_socket(conn, server_side=True,
 
516
                                                  certfile=certfile,
 
517
                                                  do_handshake_on_connect=False)
 
518
                    asyncore.dispatcher_with_send.__init__(self, self.socket)
 
519
                    # now we have to do the handshake
 
520
                    # we'll just do it the easy way, and block the connection
 
521
                    # till it's finished.  If we were doing it right, we'd
 
522
                    # do this in multiple calls to handle_read...
 
523
                    self.do_handshake(block=True)
 
524
 
 
525
                def readable(self):
 
526
                    if isinstance(self.socket, ssl.SSLSocket):
 
527
                        while self.socket.pending() > 0:
 
528
                            self.handle_read_event()
 
529
                    return True
 
530
 
 
531
                def handle_read(self):
 
532
                    data = self.recv(1024)
 
533
                    if support.verbose:
 
534
                        sys.stdout.write(" server:  read %s from client\n" % repr(data))
 
535
                    if not data:
 
536
                        self.close()
 
537
                    else:
 
538
                        self.send(str(data, 'ASCII', 'strict').lower().encode('ASCII', 'strict'))
 
539
 
 
540
                def handle_close(self):
 
541
                    self.close()
 
542
                    if support.verbose:
 
543
                        sys.stdout.write(" server:  closed connection %s\n" % self.socket)
 
544
 
 
545
                def handle_error(self):
 
546
                    raise
 
547
 
 
548
            def __init__(self, port, certfile):
 
549
                self.port = port
 
550
                self.certfile = certfile
 
551
                asyncore.dispatcher.__init__(self)
 
552
                self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
 
553
                self.bind(('', port))
 
554
                self.listen(5)
 
555
 
 
556
            def handle_accept(self):
 
557
                sock_obj, addr = self.accept()
 
558
                if support.verbose:
 
559
                    sys.stdout.write(" server:  new connection from %s:%s\n" %addr)
 
560
                self.ConnectionHandler(sock_obj, self.certfile)
 
561
 
 
562
            def handle_error(self):
 
563
                raise
 
564
 
 
565
        def __init__(self, certfile):
 
566
            self.flag = None
 
567
            self.active = False
 
568
            self.port = support.find_unused_port()
 
569
            self.server = self.EchoServer(self.port, certfile)
 
570
            threading.Thread.__init__(self)
 
571
            self.daemon = True
 
572
 
 
573
        def __str__(self):
 
574
            return "<%s %s>" % (self.__class__.__name__, self.server)
 
575
 
 
576
        def start (self, flag=None):
 
577
            self.flag = flag
 
578
            threading.Thread.start(self)
 
579
 
 
580
        def run (self):
 
581
            self.active = True
 
582
            if self.flag:
 
583
                self.flag.set()
 
584
            while self.active:
 
585
                try:
 
586
                    asyncore.loop(1)
 
587
                except:
 
588
                    pass
 
589
 
 
590
        def stop (self):
 
591
            self.active = False
 
592
            self.server.close()
 
593
 
 
594
    def badCertTest (certfile):
 
595
        server = ThreadedEchoServer(CERTFILE,
 
596
                                    certreqs=ssl.CERT_REQUIRED,
 
597
                                    cacerts=CERTFILE, chatty=False,
 
598
                                    connectionchatty=False)
 
599
        flag = threading.Event()
 
600
        server.start(flag)
 
601
        # wait for it to start
 
602
        flag.wait()
 
603
        # try to connect
 
604
        try:
 
605
            try:
 
606
                s = ssl.wrap_socket(socket.socket(),
 
607
                                    certfile=certfile,
 
608
                                    ssl_version=ssl.PROTOCOL_TLSv1)
 
609
                s.connect((HOST, server.port))
 
610
            except ssl.SSLError as x:
 
611
                if support.verbose:
 
612
                    sys.stdout.write("\nSSLError is %s\n" % x)
 
613
            except socket.error as x:
 
614
                if support.verbose:
 
615
                    sys.stdout.write("\nsocket.error is %s\n" % x)
 
616
            else:
 
617
                raise support.TestFailed(
 
618
                    "Use of invalid cert should have failed!")
 
619
        finally:
 
620
            server.stop()
 
621
            server.join()
 
622
 
 
623
    def serverParamsTest (certfile, protocol, certreqs, cacertsfile,
 
624
                          client_certfile, client_protocol=None,
 
625
                          indata="FOO\n",
 
626
                          chatty=False, connectionchatty=False):
 
627
 
 
628
        server = ThreadedEchoServer(certfile,
 
629
                                    certreqs=certreqs,
 
630
                                    ssl_version=protocol,
 
631
                                    cacerts=cacertsfile,
 
632
                                    chatty=chatty,
 
633
                                    connectionchatty=False)
 
634
        flag = threading.Event()
 
635
        server.start(flag)
 
636
        # wait for it to start
 
637
        flag.wait()
 
638
        # try to connect
 
639
        if client_protocol is None:
 
640
            client_protocol = protocol
 
641
        try:
 
642
            s = ssl.wrap_socket(socket.socket(),
 
643
                                server_side=False,
 
644
                                certfile=client_certfile,
 
645
                                ca_certs=cacertsfile,
 
646
                                cert_reqs=certreqs,
 
647
                                ssl_version=client_protocol)
 
648
            s.connect((HOST, server.port))
 
649
        except ssl.SSLError as x:
 
650
            raise support.TestFailed("Unexpected SSL error:  " + str(x))
 
651
        except Exception as x:
 
652
            raise support.TestFailed("Unexpected exception:  " + str(x))
 
653
        else:
 
654
            if connectionchatty:
 
655
                if support.verbose:
 
656
                    sys.stdout.write(
 
657
                        " client:  sending %s...\n" % (repr(indata)))
 
658
            s.write(indata.encode('ASCII', 'strict'))
 
659
            outdata = s.read()
 
660
            if connectionchatty:
 
661
                if support.verbose:
 
662
                    sys.stdout.write(" client:  read %s\n" % repr(outdata))
 
663
            outdata = str(outdata, 'ASCII', 'strict')
 
664
            if outdata != indata.lower():
 
665
                raise support.TestFailed(
 
666
                    "bad data <<%s>> (%d) received; expected <<%s>> (%d)\n"
 
667
                    % (repr(outdata[:min(len(outdata),20)]), len(outdata),
 
668
                       repr(indata[:min(len(indata),20)].lower()), len(indata)))
 
669
            s.write("over\n".encode("ASCII", "strict"))
 
670
            if connectionchatty:
 
671
                if support.verbose:
 
672
                    sys.stdout.write(" client:  closing connection.\n")
 
673
            s.close()
 
674
        finally:
 
675
            server.stop()
 
676
            server.join()
 
677
 
 
678
    def tryProtocolCombo (server_protocol,
 
679
                          client_protocol,
 
680
                          expectedToWork,
 
681
                          certsreqs=None):
 
682
 
 
683
        if certsreqs is None:
 
684
            certsreqs = ssl.CERT_NONE
 
685
 
 
686
        if certsreqs == ssl.CERT_NONE:
 
687
            certtype = "CERT_NONE"
 
688
        elif certsreqs == ssl.CERT_OPTIONAL:
 
689
            certtype = "CERT_OPTIONAL"
 
690
        elif certsreqs == ssl.CERT_REQUIRED:
 
691
            certtype = "CERT_REQUIRED"
 
692
        if support.verbose:
 
693
            formatstr = (expectedToWork and " %s->%s %s\n") or " {%s->%s} %s\n"
 
694
            sys.stdout.write(formatstr %
 
695
                             (ssl.get_protocol_name(client_protocol),
 
696
                              ssl.get_protocol_name(server_protocol),
 
697
                              certtype))
 
698
        try:
 
699
            serverParamsTest(CERTFILE, server_protocol, certsreqs,
 
700
                             CERTFILE, CERTFILE, client_protocol,
 
701
                             chatty=False, connectionchatty=False)
 
702
        except support.TestFailed:
 
703
            if expectedToWork:
 
704
                raise
 
705
        else:
 
706
            if not expectedToWork:
 
707
                raise support.TestFailed(
 
708
                    "Client protocol %s succeeded with server protocol %s!"
 
709
                    % (ssl.get_protocol_name(client_protocol),
 
710
                       ssl.get_protocol_name(server_protocol)))
 
711
 
 
712
 
 
713
    class ThreadedTests(unittest.TestCase):
 
714
 
 
715
        def testEcho (self):
 
716
 
 
717
            if support.verbose:
 
718
                sys.stdout.write("\n")
 
719
            serverParamsTest(CERTFILE, ssl.PROTOCOL_TLSv1, ssl.CERT_NONE,
 
720
                             CERTFILE, CERTFILE, ssl.PROTOCOL_TLSv1,
 
721
                             chatty=True, connectionchatty=True)
 
722
 
 
723
        def testReadCert(self):
 
724
 
 
725
            if support.verbose:
 
726
                sys.stdout.write("\n")
 
727
            s2 = socket.socket()
 
728
            server = ThreadedEchoServer(CERTFILE,
 
729
                                        certreqs=ssl.CERT_NONE,
 
730
                                        ssl_version=ssl.PROTOCOL_SSLv23,
 
731
                                        cacerts=CERTFILE,
 
732
                                        chatty=False)
 
733
            flag = threading.Event()
 
734
            server.start(flag)
 
735
            # wait for it to start
 
736
            flag.wait()
 
737
            # try to connect
 
738
            try:
 
739
                try:
 
740
                    s = ssl.wrap_socket(socket.socket(),
 
741
                                        certfile=CERTFILE,
 
742
                                        ca_certs=CERTFILE,
 
743
                                        cert_reqs=ssl.CERT_REQUIRED,
 
744
                                        ssl_version=ssl.PROTOCOL_SSLv23)
 
745
                    s.connect((HOST, server.port))
 
746
                except ssl.SSLError as x:
 
747
                    raise support.TestFailed(
 
748
                        "Unexpected SSL error:  " + str(x))
 
749
                except Exception as x:
 
750
                    raise support.TestFailed(
 
751
                        "Unexpected exception:  " + str(x))
 
752
                else:
 
753
                    if not s:
 
754
                        raise support.TestFailed(
 
755
                            "Can't SSL-handshake with test server")
 
756
                    cert = s.getpeercert()
 
757
                    if not cert:
 
758
                        raise support.TestFailed(
 
759
                            "Can't get peer certificate.")
 
760
                    cipher = s.cipher()
 
761
                    if support.verbose:
 
762
                        sys.stdout.write(pprint.pformat(cert) + '\n')
 
763
                        sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
 
764
                    if 'subject' not in cert:
 
765
                        raise support.TestFailed(
 
766
                            "No subject field in certificate: %s." %
 
767
                            pprint.pformat(cert))
 
768
                    if ((('organizationName', 'Python Software Foundation'),)
 
769
                        not in cert['subject']):
 
770
                        raise support.TestFailed(
 
771
                            "Missing or invalid 'organizationName' field in certificate subject; "
 
772
                            "should be 'Python Software Foundation'.")
 
773
                    s.close()
 
774
            finally:
 
775
                server.stop()
 
776
                server.join()
 
777
 
 
778
        def testNULLcert(self):
 
779
            badCertTest(os.path.join(os.path.dirname(__file__) or os.curdir,
 
780
                                     "nullcert.pem"))
 
781
        def testMalformedCert(self):
 
782
            badCertTest(os.path.join(os.path.dirname(__file__) or os.curdir,
 
783
                                     "badcert.pem"))
 
784
        def testWrongCert(self):
 
785
            badCertTest(os.path.join(os.path.dirname(__file__) or os.curdir,
 
786
                                     "wrongcert.pem"))
 
787
        def testMalformedKey(self):
 
788
            badCertTest(os.path.join(os.path.dirname(__file__) or os.curdir,
 
789
                                     "badkey.pem"))
 
790
 
 
791
        def testRudeShutdown(self):
 
792
 
 
793
            listener_ready = threading.Event()
 
794
            listener_gone = threading.Event()
 
795
            port = support.find_unused_port()
 
796
 
 
797
            # `listener` runs in a thread.  It opens a socket listening on
 
798
            # PORT, and sits in an accept() until the main thread connects.
 
799
            # Then it rudely closes the socket, and sets Event `listener_gone`
 
800
            # to let the main thread know the socket is gone.
 
801
            def listener():
 
802
                s = socket.socket()
 
803
                s.bind((HOST, port))
 
804
                s.listen(5)
 
805
                listener_ready.set()
 
806
                s.accept()
 
807
                s = None # reclaim the socket object, which also closes it
 
808
                listener_gone.set()
 
809
 
 
810
            def connector():
 
811
                listener_ready.wait()
 
812
                s = socket.socket()
 
813
                s.connect((HOST, port))
 
814
                listener_gone.wait()
 
815
                try:
 
816
                    ssl_sock = ssl.wrap_socket(s)
 
817
                except IOError:
 
818
                    pass
 
819
                else:
 
820
                    raise support.TestFailed(
 
821
                          'connecting to closed SSL socket should have failed')
 
822
 
 
823
            t = threading.Thread(target=listener)
 
824
            t.start()
 
825
            connector()
 
826
            t.join()
 
827
 
 
828
        def testProtocolSSL2(self):
 
829
            if support.verbose:
 
830
                sys.stdout.write("\n")
 
831
            tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
 
832
            tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL)
 
833
            tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED)
 
834
            tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, True)
 
835
            tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False)
 
836
            tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
 
837
 
 
838
        def testProtocolSSL23(self):
 
839
            if support.verbose:
 
840
                sys.stdout.write("\n")
 
841
            try:
 
842
                tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv2, True)
 
843
            except support.TestFailed as x:
 
844
                # this fails on some older versions of OpenSSL (0.9.7l, for instance)
 
845
                if support.verbose:
 
846
                    sys.stdout.write(
 
847
                        " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n"
 
848
                        % str(x))
 
849
            tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True)
 
850
            tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True)
 
851
            tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True)
 
852
 
 
853
            tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True, ssl.CERT_OPTIONAL)
 
854
            tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_OPTIONAL)
 
855
            tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True, ssl.CERT_OPTIONAL)
 
856
 
 
857
            tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True, ssl.CERT_REQUIRED)
 
858
            tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_REQUIRED)
 
859
            tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True, ssl.CERT_REQUIRED)
 
860
 
 
861
        def testProtocolSSL3(self):
 
862
            if support.verbose:
 
863
                sys.stdout.write("\n")
 
864
            tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True)
 
865
            tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True, ssl.CERT_OPTIONAL)
 
866
            tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True, ssl.CERT_REQUIRED)
 
867
            tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False)
 
868
            tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv23, False)
 
869
            tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False)
 
870
 
 
871
        def testProtocolTLS1(self):
 
872
            if support.verbose:
 
873
                sys.stdout.write("\n")
 
874
            tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True)
 
875
            tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True, ssl.CERT_OPTIONAL)
 
876
            tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True, ssl.CERT_REQUIRED)
 
877
            tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False)
 
878
            tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)
 
879
            tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv23, False)
 
880
 
 
881
        def testSTARTTLS (self):
 
882
 
 
883
            msgs = ("msg 1", "MSG 2", "STARTTLS", "MSG 3", "msg 4", "ENDTLS", "msg 5", "msg 6")
 
884
 
 
885
            server = ThreadedEchoServer(CERTFILE,
 
886
                                        ssl_version=ssl.PROTOCOL_TLSv1,
 
887
                                        starttls_server=True,
 
888
                                        chatty=True,
 
889
                                        connectionchatty=True)
 
890
            flag = threading.Event()
 
891
            server.start(flag)
 
892
            # wait for it to start
 
893
            flag.wait()
 
894
            # try to connect
 
895
            wrapped = False
 
896
            try:
 
897
                try:
 
898
                    s = socket.socket()
 
899
                    s.setblocking(1)
 
900
                    s.connect((HOST, server.port))
 
901
                except Exception as x:
 
902
                    raise support.TestFailed("Unexpected exception:  " + str(x))
 
903
                else:
 
904
                    if support.verbose:
 
905
                        sys.stdout.write("\n")
 
906
                    for indata in msgs:
 
907
                        msg = indata.encode('ASCII', 'replace')
 
908
                        if support.verbose:
 
909
                            sys.stdout.write(
 
910
                                " client:  sending %s...\n" % repr(msg))
 
911
                        if wrapped:
 
912
                            conn.write(msg)
 
913
                            outdata = conn.read()
 
914
                        else:
 
915
                            s.send(msg)
 
916
                            outdata = s.recv(1024)
 
917
                        if (indata == "STARTTLS" and
 
918
                            str(outdata, 'ASCII', 'replace').strip().lower().startswith("ok")):
 
919
                            if support.verbose:
 
920
                                msg = str(outdata, 'ASCII', 'replace')
 
921
                                sys.stdout.write(
 
922
                                    " client:  read %s from server, starting TLS...\n"
 
923
                                    % repr(msg))
 
924
                            conn = ssl.wrap_socket(s, ssl_version=ssl.PROTOCOL_TLSv1)
 
925
                            wrapped = True
 
926
                        elif (indata == "ENDTLS" and
 
927
                              str(outdata, 'ASCII', 'replace').strip().lower().startswith("ok")):
 
928
                            if support.verbose:
 
929
                                msg = str(outdata, 'ASCII', 'replace')
 
930
                                sys.stdout.write(
 
931
                                    " client:  read %s from server, ending TLS...\n"
 
932
                                    % repr(msg))
 
933
                            s = conn.unwrap()
 
934
                            wrapped = False
 
935
                        else:
 
936
                            if support.verbose:
 
937
                                msg = str(outdata, 'ASCII', 'replace')
 
938
                                sys.stdout.write(
 
939
                                    " client:  read %s from server\n" % repr(msg))
 
940
                    if support.verbose:
 
941
                        sys.stdout.write(" client:  closing connection.\n")
 
942
                    if wrapped:
 
943
                        conn.write("over\n".encode("ASCII", "strict"))
 
944
                    else:
 
945
                        s.send("over\n".encode("ASCII", "strict"))
 
946
                if wrapped:
 
947
                    conn.close()
 
948
                else:
 
949
                    s.close()
 
950
            finally:
 
951
                server.stop()
 
952
                server.join()
 
953
 
 
954
        def testSocketServer(self):
 
955
 
 
956
            server = OurHTTPSServer(CERTFILE)
 
957
            flag = threading.Event()
 
958
            server.start(flag)
 
959
            # wait for it to start
 
960
            flag.wait()
 
961
            # try to connect
 
962
            try:
 
963
                if support.verbose:
 
964
                    sys.stdout.write('\n')
 
965
                d1 = open(CERTFILE, 'rb').read()
 
966
                d2 = ''
 
967
                # now fetch the same data from the HTTPS server
 
968
                url = 'https://%s:%d/%s' % (
 
969
                    HOST, server.port, os.path.split(CERTFILE)[1])
 
970
                f = urllib.request.urlopen(url)
 
971
                dlen = f.info().get("content-length")
 
972
                if dlen and (int(dlen) > 0):
 
973
                    d2 = f.read(int(dlen))
 
974
                    if support.verbose:
 
975
                        sys.stdout.write(
 
976
                            " client: read %d bytes from remote server '%s'\n"
 
977
                            % (len(d2), server))
 
978
                f.close()
 
979
            except:
 
980
                msg = ''.join(traceback.format_exception(*sys.exc_info()))
 
981
                if support.verbose:
 
982
                    sys.stdout.write('\n' + msg)
 
983
                raise support.TestFailed(msg)
 
984
            else:
 
985
                if not (d1 == d2):
 
986
                    print("d1 is", len(d1), repr(d1))
 
987
                    print("d2 is", len(d2), repr(d2))
 
988
                    raise support.TestFailed(
 
989
                        "Couldn't fetch data from HTTPS server")
 
990
            finally:
 
991
                if support.verbose:
 
992
                    sys.stdout.write('stopping server\n')
 
993
                server.stop()
 
994
                if support.verbose:
 
995
                    sys.stdout.write('joining thread\n')
 
996
                server.join()
 
997
 
 
998
        def testAsyncoreServer(self):
 
999
 
 
1000
            if support.verbose:
 
1001
                sys.stdout.write("\n")
 
1002
 
 
1003
            indata="FOO\n"
 
1004
            server = AsyncoreEchoServer(CERTFILE)
 
1005
            flag = threading.Event()
 
1006
            server.start(flag)
 
1007
            # wait for it to start
 
1008
            flag.wait()
 
1009
            # try to connect
 
1010
            try:
 
1011
                s = ssl.wrap_socket(socket.socket())
 
1012
                s.connect((HOST, server.port))
 
1013
            except ssl.SSLError as x:
 
1014
                raise support.TestFailed("Unexpected SSL error:  " + str(x))
 
1015
            except Exception as x:
 
1016
                raise support.TestFailed("Unexpected exception:  " + str(x))
 
1017
            else:
 
1018
                if support.verbose:
 
1019
                    sys.stdout.write(
 
1020
                        " client:  sending %s...\n" % (repr(indata)))
 
1021
                s.sendall(indata.encode('ASCII', 'strict'))
 
1022
                outdata = s.recv()
 
1023
                if support.verbose:
 
1024
                    sys.stdout.write(" client:  read %s\n" % repr(outdata))
 
1025
                outdata = str(outdata, 'ASCII', 'strict')
 
1026
                if outdata != indata.lower():
 
1027
                    raise support.TestFailed(
 
1028
                        "bad data <<%s>> (%d) received; expected <<%s>> (%d)\n"
 
1029
                        % (repr(outdata[:min(len(outdata),20)]), len(outdata),
 
1030
                           repr(indata[:min(len(indata),20)].lower()), len(indata)))
 
1031
                s.write("over\n".encode("ASCII", "strict"))
 
1032
                if support.verbose:
 
1033
                    sys.stdout.write(" client:  closing connection.\n")
 
1034
                s.close()
 
1035
            finally:
 
1036
                server.stop()
 
1037
                server.join()
 
1038
 
 
1039
        def testAllRecvAndSendMethods(self):
 
1040
 
 
1041
            if support.verbose:
 
1042
                sys.stdout.write("\n")
 
1043
 
 
1044
            server = ThreadedEchoServer(CERTFILE,
 
1045
                                        certreqs=ssl.CERT_NONE,
 
1046
                                        ssl_version=ssl.PROTOCOL_TLSv1,
 
1047
                                        cacerts=CERTFILE,
 
1048
                                        chatty=True,
 
1049
                                        connectionchatty=False)
 
1050
            flag = threading.Event()
 
1051
            server.start(flag)
 
1052
            # wait for it to start
 
1053
            flag.wait()
 
1054
            # try to connect
 
1055
            try:
 
1056
                s = ssl.wrap_socket(socket.socket(),
 
1057
                                    server_side=False,
 
1058
                                    certfile=CERTFILE,
 
1059
                                    ca_certs=CERTFILE,
 
1060
                                    cert_reqs=ssl.CERT_NONE,
 
1061
                                    ssl_version=ssl.PROTOCOL_TLSv1)
 
1062
                s.connect((HOST, server.port))
 
1063
            except ssl.SSLError as x:
 
1064
                raise support.TestFailed("Unexpected SSL error:  " + str(x))
 
1065
            except Exception as x:
 
1066
                raise support.TestFailed("Unexpected exception:  " + str(x))
 
1067
            else:
 
1068
                # helper methods for standardising recv* method signatures
 
1069
                def _recv_into():
 
1070
                    b = bytearray(b"\0"*100)
 
1071
                    count = s.recv_into(b)
 
1072
                    return b[:count]
 
1073
 
 
1074
                def _recvfrom_into():
 
1075
                    b = bytearray(b"\0"*100)
 
1076
                    count, addr = s.recvfrom_into(b)
 
1077
                    return b[:count]
 
1078
 
 
1079
                # (name, method, whether to expect success, *args)
 
1080
                send_methods = [
 
1081
                    ('send', s.send, True, []),
 
1082
                    ('sendto', s.sendto, False, ["some.address"]),
 
1083
                    ('sendall', s.sendall, True, []),
 
1084
                ]
 
1085
                recv_methods = [
 
1086
                    ('recv', s.recv, True, []),
 
1087
                    ('recvfrom', s.recvfrom, False, ["some.address"]),
 
1088
                    ('recv_into', _recv_into, True, []),
 
1089
                    ('recvfrom_into', _recvfrom_into, False, []),
 
1090
                ]
 
1091
                data_prefix = "PREFIX_"
 
1092
 
 
1093
                for meth_name, send_meth, expect_success, args in send_methods:
 
1094
                    indata = data_prefix + meth_name
 
1095
                    try:
 
1096
                        send_meth(indata.encode('ASCII', 'strict'), *args)
 
1097
                        outdata = s.read()
 
1098
                        outdata = str(outdata, 'ASCII', 'strict')
 
1099
                        if outdata != indata.lower():
 
1100
                            raise support.TestFailed(
 
1101
                                "While sending with <<{name:s}>> bad data "
 
1102
                                "<<{outdata:s}>> ({nout:d}) received; "
 
1103
                                "expected <<{indata:s}>> ({nin:d})\n".format(
 
1104
                                    name=meth_name, outdata=repr(outdata[:20]),
 
1105
                                    nout=len(outdata),
 
1106
                                    indata=repr(indata[:20]), nin=len(indata)
 
1107
                                )
 
1108
                            )
 
1109
                    except ValueError as e:
 
1110
                        if expect_success:
 
1111
                            raise support.TestFailed(
 
1112
                                "Failed to send with method <<{name:s}>>; "
 
1113
                                "expected to succeed.\n".format(name=meth_name)
 
1114
                            )
 
1115
                        if not str(e).startswith(meth_name):
 
1116
                            raise support.TestFailed(
 
1117
                                "Method <<{name:s}>> failed with unexpected "
 
1118
                                "exception message: {exp:s}\n".format(
 
1119
                                    name=meth_name, exp=e
 
1120
                                )
 
1121
                            )
 
1122
 
 
1123
                for meth_name, recv_meth, expect_success, args in recv_methods:
 
1124
                    indata = data_prefix + meth_name
 
1125
                    try:
 
1126
                        s.send(indata.encode('ASCII', 'strict'))
 
1127
                        outdata = recv_meth(*args)
 
1128
                        outdata = str(outdata, 'ASCII', 'strict')
 
1129
                        if outdata != indata.lower():
 
1130
                            raise support.TestFailed(
 
1131
                                "While receiving with <<{name:s}>> bad data "
 
1132
                                "<<{outdata:s}>> ({nout:d}) received; "
 
1133
                                "expected <<{indata:s}>> ({nin:d})\n".format(
 
1134
                                    name=meth_name, outdata=repr(outdata[:20]),
 
1135
                                    nout=len(outdata),
 
1136
                                    indata=repr(indata[:20]), nin=len(indata)
 
1137
                                )
 
1138
                            )
 
1139
                    except ValueError as e:
 
1140
                        if expect_success:
 
1141
                            raise support.TestFailed(
 
1142
                                "Failed to receive with method <<{name:s}>>; "
 
1143
                                "expected to succeed.\n".format(name=meth_name)
 
1144
                            )
 
1145
                        if not str(e).startswith(meth_name):
 
1146
                            raise support.TestFailed(
 
1147
                                "Method <<{name:s}>> failed with unexpected "
 
1148
                                "exception message: {exp:s}\n".format(
 
1149
                                    name=meth_name, exp=e
 
1150
                                )
 
1151
                            )
 
1152
                        # consume data
 
1153
                        s.read()
 
1154
 
 
1155
                s.write("over\n".encode("ASCII", "strict"))
 
1156
                s.close()
 
1157
            finally:
 
1158
                server.stop()
 
1159
                server.join()
 
1160
 
 
1161
 
 
1162
def test_main(verbose=False):
 
1163
    if skip_expected:
 
1164
        raise support.TestSkipped("No SSL support")
 
1165
 
 
1166
    global CERTFILE, SVN_PYTHON_ORG_ROOT_CERT
 
1167
    CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir,
 
1168
                            "keycert.pem")
 
1169
    SVN_PYTHON_ORG_ROOT_CERT = os.path.join(
 
1170
        os.path.dirname(__file__) or os.curdir,
 
1171
        "https_svn_python_org_root.pem")
 
1172
 
 
1173
    if (not os.path.exists(CERTFILE) or
 
1174
        not os.path.exists(SVN_PYTHON_ORG_ROOT_CERT)):
 
1175
        raise support.TestFailed("Can't read certificate files!")
 
1176
 
 
1177
    tests = [BasicTests]
 
1178
 
 
1179
    if support.is_resource_enabled('network'):
 
1180
        tests.append(NetworkedTests)
 
1181
 
 
1182
    if _have_threads:
 
1183
        thread_info = support.threading_setup()
 
1184
        if thread_info and support.is_resource_enabled('network'):
 
1185
            tests.append(ThreadedTests)
 
1186
 
 
1187
    support.run_unittest(*tests)
 
1188
 
 
1189
    if _have_threads:
 
1190
        support.threading_cleanup(*thread_info)
 
1191
 
 
1192
if __name__ == "__main__":
 
1193
    test_main()