~ubuntu-branches/ubuntu/quantal/m2crypto/quantal

« back to all changes in this revision

Viewing changes to .pc/fix_kill_signal.patch/tests/test_ssl.py

  • Committer: Package Import Robot
  • Author(s): Julian Taylor
  • Date: 2011-08-10 17:01:24 UTC
  • mfrom: (23.3.1 oneiric)
  • Revision ID: package-import@ubuntu.com-20110810170124-ctuhz524p63n0hrz
* repackage upstream
  - remove non-free demo/x509/proxylib.py
* switch to dh_python2 (LP: #788514) 
* switch to 3.0 (quilt)
  - converted all upstream changes to patches
  - fix_sslv2_test.patch: testsuite patch for sslv2 deactivation
  - fix_build_with_new_openssl.patch: build fix from debian 0.20.1-1.1
* Merge from debian testing, remaining changes:
  - disable_sslv2_test.patch: disable sslv23_weak_crypto test as it is expected to
    fail with SSLv2 having been disabled in openssl 0.9.8o-1ubuntu3.
    (LP: #600549)
  - fix_sslv2_test2.patch: fix testsuite with newer openssl. (LP: #600549)
    - Backported from http://svn.osafoundation.org/viewvc/m2crypto/trunk/tests/test_smime.py?r1=698&r2=721
  - debian/control: Add openssl for FTBFS. 
  - debian/rules; enable testsuite, add more files to "clean" rule.
  - fix_kill_signal.patch: use signal 9 to kill old s_server processes
    to work around build HUP signal-ignore-mask (LP: #451998).

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#!/usr/bin/env python
 
2
 
 
3
"""Unit tests for M2Crypto.SSL.
 
4
 
 
5
Copyright (c) 2000-2004 Ng Pheng Siong. All rights reserved."""
 
6
 
 
7
"""
 
8
TODO
 
9
 
 
10
Server tests:
 
11
- ???
 
12
 
 
13
Others:
 
14
- ssl_dispatcher
 
15
- SSLServer
 
16
- ForkingSSLServer
 
17
- ThreadingSSLServer
 
18
"""
 
19
 
 
20
import os, socket, string, sys, tempfile, thread, time, unittest
 
21
from M2Crypto import Rand, SSL, m2, Err
 
22
 
 
23
from fips import fips_mode
 
24
 
 
25
srv_host = 'localhost'
 
26
srv_port = 64000
 
27
 
 
28
def verify_cb_new_function(ok, store):
 
29
    try:
 
30
        assert not ok
 
31
        err = store.get_error()
 
32
        assert err in [m2.X509_V_ERR_DEPTH_ZERO_SELF_SIGNED_CERT,
 
33
                       m2.X509_V_ERR_UNABLE_TO_GET_ISSUER_CERT_LOCALLY,
 
34
                       m2.X509_V_ERR_CERT_UNTRUSTED,
 
35
                       m2.X509_V_ERR_UNABLE_TO_VERIFY_LEAF_SIGNATURE]
 
36
        assert store.get_error_depth() == 0 
 
37
        app_data = m2.x509_store_ctx_get_app_data(store.ctx)
 
38
        assert app_data
 
39
        x509 = store.get_current_cert()
 
40
        assert x509
 
41
        stack = store.get1_chain()
 
42
        assert len(stack) == 1
 
43
        assert stack[0].as_pem() == x509.as_pem()
 
44
    except AssertionError, e:
 
45
        # If we let exceptions propagate from here the
 
46
        # caller may see strange errors. This is cleaner.
 
47
        return 0   
 
48
    return 1
 
49
 
 
50
class VerifyCB:
 
51
    def __call__(self, ok, store):
 
52
        return verify_cb_new_function(ok, store)
 
53
 
 
54
sleepTime = float(os.getenv('M2CRYPTO_TEST_SSL_SLEEP', 0.5))
 
55
 
 
56
def find_openssl():
 
57
    if os.name == 'nt' or sys.platform == 'cygwin':
 
58
        openssl = 'openssl.exe'
 
59
    else:
 
60
        openssl = 'openssl'
 
61
 
 
62
    plist = os.environ['PATH'].split(os.pathsep)
 
63
    for p in plist:
 
64
        try:
 
65
            dir = os.listdir(p)
 
66
            if openssl in dir:
 
67
                return True
 
68
        except:
 
69
            pass
 
70
    return False
 
71
 
 
72
class BaseSSLClientTestCase(unittest.TestCase):
 
73
 
 
74
    openssl_in_path = find_openssl()
 
75
 
 
76
    def start_server(self, args):
 
77
        if not self.openssl_in_path:
 
78
            raise Exception('openssl command not in PATH')
 
79
        
 
80
        pid = os.fork()
 
81
        if pid == 0:
 
82
            # openssl must be started in the tests directory for it
 
83
            # to find the .pem files
 
84
            os.chdir('tests')
 
85
            try:
 
86
                os.execvp('openssl', args)
 
87
            finally:
 
88
                os.chdir('..')
 
89
                
 
90
        else:
 
91
            time.sleep(sleepTime)
 
92
            return pid
 
93
 
 
94
    def stop_server(self, pid):
 
95
        os.kill(pid, 1)
 
96
        os.waitpid(pid, 0)
 
97
 
 
98
    def http_get(self, s):
 
99
        s.send('GET / HTTP/1.0\n\n') 
 
100
        resp = ''
 
101
        while 1:
 
102
            try:
 
103
                r = s.recv(4096)
 
104
                if not r:
 
105
                    break
 
106
            except SSL.SSLError: # s_server throws an 'unexpected eof'...
 
107
                break
 
108
            resp = resp + r 
 
109
        return resp
 
110
 
 
111
    def setUp(self):
 
112
        self.srv_host = srv_host
 
113
        self.srv_port = srv_port
 
114
        self.srv_addr = (srv_host, srv_port)
 
115
        self.srv_url = 'https://%s:%s/' % (srv_host, srv_port)
 
116
        self.args = ['s_server', '-quiet', '-www',
 
117
                     #'-cert', 'server.pem', Implicitly using this
 
118
                     '-accept', str(self.srv_port)]
 
119
 
 
120
    def tearDown(self):
 
121
        global srv_port
 
122
        srv_port = srv_port - 1
 
123
 
 
124
 
 
125
class PassSSLClientTestCase(BaseSSLClientTestCase):
 
126
        
 
127
    def test_pass(self):
 
128
        pass
 
129
 
 
130
class HttpslibSSLClientTestCase(BaseSSLClientTestCase):
 
131
 
 
132
    def test_HTTPSConnection(self):
 
133
        pid = self.start_server(self.args)
 
134
        try:
 
135
            from M2Crypto import httpslib
 
136
            c = httpslib.HTTPSConnection(srv_host, srv_port)
 
137
            c.request('GET', '/')
 
138
            data = c.getresponse().read()
 
139
            c.close()
 
140
        finally:
 
141
            self.stop_server(pid)
 
142
        self.failIf(string.find(data, 's_server -quiet -www') == -1)
 
143
 
 
144
    def test_HTTPSConnection_resume_session(self):
 
145
        pid = self.start_server(self.args)
 
146
        try:
 
147
            from M2Crypto import httpslib
 
148
            ctx = SSL.Context()
 
149
            ctx.load_verify_locations(cafile='tests/ca.pem')
 
150
            ctx.load_cert('tests/x509.pem')
 
151
            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 1)
 
152
            ctx.set_session_cache_mode(m2.SSL_SESS_CACHE_CLIENT)
 
153
            c = httpslib.HTTPSConnection(srv_host, srv_port, ssl_context=ctx)
 
154
            c.request('GET', '/')
 
155
            ses = c.get_session()
 
156
            t = ses.as_text()
 
157
            data = c.getresponse().read()
 
158
            # Appearently closing connection here screws session; Ali Polatel?
 
159
            # c.close()
 
160
            
 
161
            ctx2 = SSL.Context()
 
162
            ctx2.load_verify_locations(cafile='tests/ca.pem')
 
163
            ctx2.load_cert('tests/x509.pem')
 
164
            ctx2.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 1)
 
165
            ctx2.set_session_cache_mode(m2.SSL_SESS_CACHE_CLIENT)
 
166
            c2 = httpslib.HTTPSConnection(srv_host, srv_port, ssl_context=ctx2)
 
167
            c2.set_session(ses)
 
168
            c2.request('GET', '/')
 
169
            ses2 = c2.get_session()
 
170
            t2 = ses2.as_text()
 
171
            data = c2.getresponse().read()
 
172
            c.close()
 
173
            c2.close()
 
174
            assert t == t2, "Sessions did not match"
 
175
        finally:
 
176
            self.stop_server(pid)
 
177
        self.failIf(string.find(data, 's_server -quiet -www') == -1)
 
178
 
 
179
    def test_HTTPSConnection_secure_context(self):
 
180
        pid = self.start_server(self.args)
 
181
        try:
 
182
            from M2Crypto import httpslib
 
183
            ctx = SSL.Context()
 
184
            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)
 
185
            ctx.load_verify_locations('tests/ca.pem')
 
186
            c = httpslib.HTTPSConnection(srv_host, srv_port, ssl_context=ctx)
 
187
            c.request('GET', '/')
 
188
            data = c.getresponse().read()
 
189
            c.close()
 
190
        finally:
 
191
            self.stop_server(pid)
 
192
        self.failIf(string.find(data, 's_server -quiet -www') == -1)
 
193
 
 
194
    def test_HTTPSConnection_secure_context_fail(self):
 
195
        pid = self.start_server(self.args)
 
196
        try:
 
197
            from M2Crypto import httpslib
 
198
            ctx = SSL.Context()
 
199
            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)
 
200
            ctx.load_verify_locations('tests/server.pem')
 
201
            c = httpslib.HTTPSConnection(srv_host, srv_port, ssl_context=ctx)
 
202
            self.assertRaises(SSL.SSLError, c.request, 'GET', '/')
 
203
            c.close()
 
204
        finally:
 
205
            self.stop_server(pid)
 
206
 
 
207
    def test_HTTPS(self):
 
208
        pid = self.start_server(self.args)
 
209
        try:
 
210
            from M2Crypto import httpslib
 
211
            c = httpslib.HTTPS(srv_host, srv_port)
 
212
            c.putrequest('GET', '/')
 
213
            c.putheader('Accept', 'text/html')
 
214
            c.putheader('Accept', 'text/plain')
 
215
            c.endheaders()
 
216
            err, msg, headers = c.getreply()
 
217
            assert err == 200, err
 
218
            f = c.getfile()
 
219
            data = f.read()
 
220
            c.close()
 
221
        finally:
 
222
            self.stop_server(pid)
 
223
        self.failIf(string.find(data, 's_server -quiet -www') == -1)
 
224
 
 
225
    def test_HTTPS_secure_context(self):
 
226
        pid = self.start_server(self.args)
 
227
        try:
 
228
            from M2Crypto import httpslib
 
229
            ctx = SSL.Context()
 
230
            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)
 
231
            ctx.load_verify_locations('tests/ca.pem')
 
232
            c = httpslib.HTTPS(srv_host, srv_port, ssl_context=ctx)
 
233
            c.putrequest('GET', '/')
 
234
            c.putheader('Accept', 'text/html')
 
235
            c.putheader('Accept', 'text/plain')
 
236
            c.endheaders()
 
237
            err, msg, headers = c.getreply()
 
238
            assert err == 200, err
 
239
            f = c.getfile()
 
240
            data = f.read()
 
241
            c.close()
 
242
        finally:
 
243
            self.stop_server(pid)
 
244
        self.failIf(string.find(data, 's_server -quiet -www') == -1)
 
245
 
 
246
    def test_HTTPS_secure_context_fail(self):
 
247
        pid = self.start_server(self.args)
 
248
        try:
 
249
            from M2Crypto import httpslib
 
250
            ctx = SSL.Context()
 
251
            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)
 
252
            ctx.load_verify_locations('tests/server.pem')
 
253
            c = httpslib.HTTPS(srv_host, srv_port, ssl_context=ctx)
 
254
            c.putrequest('GET', '/')
 
255
            c.putheader('Accept', 'text/html')
 
256
            c.putheader('Accept', 'text/plain')
 
257
            self.assertRaises(SSL.SSLError, c.endheaders)
 
258
            c.close()
 
259
        finally:
 
260
            self.stop_server(pid)
 
261
            
 
262
    def test_HTTPSConnection_illegalkeywordarg(self):
 
263
        from M2Crypto import httpslib
 
264
        self.assertRaises(ValueError, httpslib.HTTPSConnection, 'example.org',
 
265
                          badKeyword=True)
 
266
 
 
267
 
 
268
class MiscSSLClientTestCase(BaseSSLClientTestCase):
 
269
 
 
270
    def test_no_connection(self):
 
271
        ctx = SSL.Context()
 
272
        s = SSL.Connection(ctx)
 
273
        
 
274
    def test_server_simple(self):
 
275
        pid = self.start_server(self.args)
 
276
        try:
 
277
            self.assertRaises(ValueError, SSL.Context, 'tlsv5')
 
278
            ctx = SSL.Context()
 
279
            s = SSL.Connection(ctx)
 
280
            s.connect(self.srv_addr)
 
281
            self.assertRaises(ValueError, s.read, 0)
 
282
            data = self.http_get(s)
 
283
            s.close()
 
284
        finally:
 
285
            self.stop_server(pid)
 
286
        self.failIf(string.find(data, 's_server -quiet -www') == -1)
 
287
 
 
288
    def test_server_simple_secure_context(self):
 
289
        pid = self.start_server(self.args)
 
290
        try:
 
291
            ctx = SSL.Context()
 
292
            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)
 
293
            ctx.load_verify_locations('tests/ca.pem')
 
294
            s = SSL.Connection(ctx)
 
295
            s.connect(self.srv_addr)
 
296
            data = self.http_get(s)
 
297
            s.close()
 
298
        finally:
 
299
            self.stop_server(pid)
 
300
        self.failIf(string.find(data, 's_server -quiet -www') == -1)
 
301
 
 
302
    def test_server_simple_secure_context_fail(self):
 
303
        pid = self.start_server(self.args)
 
304
        try:
 
305
            ctx = SSL.Context()
 
306
            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)
 
307
            ctx.load_verify_locations('tests/server.pem')
 
308
            s = SSL.Connection(ctx)
 
309
            self.assertRaises(SSL.SSLError, s.connect, self.srv_addr)
 
310
            s.close()
 
311
        finally:
 
312
            self.stop_server(pid)
 
313
 
 
314
    def test_server_simple_timeouts(self):
 
315
        pid = self.start_server(self.args)
 
316
        try:
 
317
            self.assertRaises(ValueError, SSL.Context, 'tlsv5')
 
318
            ctx = SSL.Context()
 
319
            s = SSL.Connection(ctx)
 
320
            
 
321
            r = s.get_socket_read_timeout()
 
322
            w = s.get_socket_write_timeout()
 
323
            assert r.sec == 0, r.sec
 
324
            assert r.microsec == 0, r.microsec
 
325
            assert w.sec == 0, w.sec
 
326
            assert w.microsec == 0, w.microsec
 
327
 
 
328
            s.set_socket_read_timeout(SSL.timeout())
 
329
            s.set_socket_write_timeout(SSL.timeout(909,9))
 
330
            r = s.get_socket_read_timeout()
 
331
            w = s.get_socket_write_timeout()
 
332
            assert r.sec == 600, r.sec
 
333
            assert r.microsec == 0, r.microsec
 
334
            assert w.sec == 909, w.sec
 
335
            #assert w.microsec == 9, w.microsec XXX 4000
 
336
            
 
337
            s.connect(self.srv_addr)
 
338
            data = self.http_get(s)
 
339
            s.close()
 
340
        finally:
 
341
            self.stop_server(pid)
 
342
        self.failIf(string.find(data, 's_server -quiet -www') == -1)
 
343
 
 
344
    def test_tls1_nok(self):
 
345
        if fips_mode: # TLS is required in FIPS mode
 
346
            return
 
347
        self.args.append('-no_tls1')
 
348
        pid = self.start_server(self.args)
 
349
        try:
 
350
            ctx = SSL.Context('tlsv1')
 
351
            s = SSL.Connection(ctx)
 
352
            try:
 
353
                s.connect(self.srv_addr)
 
354
            except SSL.SSLError, e:
 
355
                self.failUnlessEqual(e[0], 'wrong version number')
 
356
            s.close()
 
357
        finally:
 
358
            self.stop_server(pid)
 
359
 
 
360
    def test_tls1_ok(self):
 
361
        self.args.append('-tls1')
 
362
        pid = self.start_server(self.args)
 
363
        try:
 
364
            ctx = SSL.Context('tlsv1')
 
365
            s = SSL.Connection(ctx)
 
366
            s.connect(self.srv_addr)
 
367
            data = self.http_get(s)
 
368
            s.close()
 
369
        finally:
 
370
            self.stop_server(pid)
 
371
        self.failIf(string.find(data, 's_server -quiet -www') == -1)
 
372
 
 
373
    def test_sslv23_no_v2(self):
 
374
        if fips_mode: # TLS is required in FIPS mode
 
375
            return
 
376
        self.args.append('-no_tls1')
 
377
        pid = self.start_server(self.args)
 
378
        try:
 
379
            ctx = SSL.Context('sslv23')
 
380
            s = SSL.Connection(ctx)
 
381
            s.connect(self.srv_addr)
 
382
            self.failUnlessEqual(s.get_version(), 'SSLv3')
 
383
            s.close()
 
384
        finally:
 
385
            self.stop_server(pid)
 
386
 
 
387
    def test_sslv23_no_v2_no_service(self):
 
388
        if fips_mode: # TLS is required in FIPS mode
 
389
            return
 
390
        self.args = self.args + ['-no_tls1', '-no_ssl3']
 
391
        pid = self.start_server(self.args)
 
392
        try:
 
393
            ctx = SSL.Context('sslv23')
 
394
            s = SSL.Connection(ctx)
 
395
            self.assertRaises(SSL.SSLError, s.connect, self.srv_addr)
 
396
            s.close()
 
397
        finally:
 
398
            self.stop_server(pid)
 
399
 
 
400
    def disabled_test_sslv23_weak_crypto(self):
 
401
        if fips_mode: # TLS is required in FIPS mode
 
402
            return
 
403
        self.args = self.args + ['-no_tls1', '-no_ssl3']
 
404
        pid = self.start_server(self.args)
 
405
        try:
 
406
            ctx = SSL.Context('sslv23', weak_crypto=1)
 
407
            s = SSL.Connection(ctx)
 
408
            s.connect(self.srv_addr)
 
409
            self.failUnlessEqual(s.get_version(), 'SSLv2')
 
410
            s.close()
 
411
        finally:
 
412
            self.stop_server(pid)
 
413
 
 
414
    def test_cipher_mismatch(self):
 
415
        self.args = self.args + ['-cipher', 'AES256-SHA']
 
416
        pid = self.start_server(self.args)
 
417
        try:
 
418
            ctx = SSL.Context()
 
419
            s = SSL.Connection(ctx)
 
420
            s.set_cipher_list('AES128-SHA')
 
421
            try:
 
422
                s.connect(self.srv_addr)
 
423
            except SSL.SSLError, e:
 
424
                self.failUnlessEqual(e[0], 'sslv3 alert handshake failure')
 
425
            s.close()
 
426
        finally:
 
427
            self.stop_server(pid)
 
428
        
 
429
    def test_no_such_cipher(self):
 
430
        self.args = self.args + ['-cipher', 'AES128-SHA']
 
431
        pid = self.start_server(self.args)
 
432
        try:
 
433
            ctx = SSL.Context()
 
434
            s = SSL.Connection(ctx)
 
435
            s.set_cipher_list('EXP-RC2-MD5')
 
436
            try:
 
437
                s.connect(self.srv_addr)
 
438
            except SSL.SSLError, e:
 
439
                self.failUnlessEqual(e[0], 'no ciphers available')
 
440
            s.close()
 
441
        finally:
 
442
            self.stop_server(pid)
 
443
        
 
444
    def test_no_weak_cipher(self):
 
445
        if fips_mode: # Weak ciphers are prohibited
 
446
            return
 
447
        self.args = self.args + ['-cipher', 'EXP']
 
448
        pid = self.start_server(self.args)
 
449
        try:
 
450
            ctx = SSL.Context()
 
451
            s = SSL.Connection(ctx)
 
452
            try:
 
453
                s.connect(self.srv_addr)
 
454
            except SSL.SSLError, e:
 
455
                self.failUnlessEqual(e[0], 'sslv3 alert handshake failure')
 
456
            s.close()
 
457
        finally:
 
458
            self.stop_server(pid)
 
459
        
 
460
    def test_use_weak_cipher(self):
 
461
        if fips_mode: # Weak ciphers are prohibited
 
462
            return
 
463
        self.args = self.args + ['-cipher', 'EXP']
 
464
        pid = self.start_server(self.args)
 
465
        try:
 
466
            ctx = SSL.Context(weak_crypto=1)
 
467
            s = SSL.Connection(ctx)
 
468
            s.connect(self.srv_addr)
 
469
            data = self.http_get(s)
 
470
            s.close()
 
471
        finally:
 
472
            self.stop_server(pid)
 
473
        self.failIf(string.find(data, 's_server -quiet -www') == -1)
 
474
        
 
475
    def test_cipher_ok(self):
 
476
        self.args = self.args + ['-cipher', 'AES128-SHA']
 
477
        pid = self.start_server(self.args)
 
478
        try:
 
479
            ctx = SSL.Context()
 
480
            s = SSL.Connection(ctx)
 
481
            s.set_cipher_list('AES128-SHA')
 
482
            s.connect(self.srv_addr)
 
483
            data = self.http_get(s)
 
484
            
 
485
            assert s.get_cipher().name() == 'AES128-SHA', s.get_cipher().name()
 
486
            
 
487
            cipher_stack = s.get_ciphers()
 
488
            assert cipher_stack[0].name() == 'AES128-SHA', cipher_stack[0].name()
 
489
            self.assertRaises(IndexError, cipher_stack.__getitem__, 2)
 
490
            # For some reason there are 2 entries in the stack
 
491
            #assert len(cipher_stack) == 1, len(cipher_stack)
 
492
            assert s.get_cipher_list() == 'AES128-SHA', s.get_cipher_list()
 
493
            
 
494
            # Test Cipher_Stack iterator
 
495
            i = 0
 
496
            for cipher in cipher_stack:
 
497
                i += 1
 
498
                assert cipher.name() == 'AES128-SHA', '"%s"' % cipher.name()
 
499
                self.assertEqual('AES128-SHA-128', str(cipher))
 
500
            # For some reason there are 2 entries in the stack
 
501
            #assert i == 1, i
 
502
            self.assertEqual(i, len(cipher_stack))
 
503
            
 
504
            s.close()
 
505
        finally:
 
506
            self.stop_server(pid)
 
507
        self.failIf(string.find(data, 's_server -quiet -www') == -1)
 
508
        
 
509
    def verify_cb_new(self, ok, store):
 
510
        return verify_cb_new_function(ok, store)
 
511
 
 
512
    def test_verify_cb_new(self):
 
513
        pid = self.start_server(self.args)
 
514
        try:
 
515
            ctx = SSL.Context()
 
516
            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,
 
517
                           self.verify_cb_new)
 
518
            s = SSL.Connection(ctx)
 
519
            try:
 
520
                s.connect(self.srv_addr)
 
521
            except SSL.SSLError, e:
 
522
                assert 0, e
 
523
            data = self.http_get(s)
 
524
            s.close()
 
525
        finally:
 
526
            self.stop_server(pid)
 
527
        self.failIf(string.find(data, 's_server -quiet -www') == -1)
 
528
 
 
529
    def test_verify_cb_new_class(self):
 
530
        pid = self.start_server(self.args)
 
531
        try:
 
532
            ctx = SSL.Context()
 
533
            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,
 
534
                           VerifyCB())
 
535
            s = SSL.Connection(ctx)
 
536
            try:
 
537
                s.connect(self.srv_addr)
 
538
            except SSL.SSLError, e:
 
539
                assert 0, e
 
540
            data = self.http_get(s)
 
541
            s.close()
 
542
        finally:
 
543
            self.stop_server(pid)
 
544
        self.failIf(string.find(data, 's_server -quiet -www') == -1)
 
545
 
 
546
    def test_verify_cb_new_function(self):
 
547
        pid = self.start_server(self.args)
 
548
        try:
 
549
            ctx = SSL.Context()
 
550
            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,
 
551
                           verify_cb_new_function)
 
552
            s = SSL.Connection(ctx)
 
553
            try:
 
554
                s.connect(self.srv_addr)
 
555
            except SSL.SSLError, e:
 
556
                assert 0, e
 
557
            data = self.http_get(s)
 
558
            s.close()
 
559
        finally:
 
560
            self.stop_server(pid)
 
561
        self.failIf(string.find(data, 's_server -quiet -www') == -1)
 
562
 
 
563
    def test_verify_cb_lambda(self):
 
564
        pid = self.start_server(self.args)
 
565
        try:
 
566
            ctx = SSL.Context()
 
567
            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,
 
568
                           lambda ok, store: 1)
 
569
            s = SSL.Connection(ctx)
 
570
            try:
 
571
                s.connect(self.srv_addr)
 
572
            except SSL.SSLError, e:
 
573
                assert 0, e
 
574
            data = self.http_get(s)
 
575
            s.close()
 
576
        finally:
 
577
            self.stop_server(pid)
 
578
        self.failIf(string.find(data, 's_server -quiet -www') == -1)
 
579
 
 
580
    def verify_cb_exception(self, ok, store):
 
581
        raise Exception, 'We should fail verification'
 
582
 
 
583
    def test_verify_cb_exception(self):
 
584
        pid = self.start_server(self.args)
 
585
        try:
 
586
            ctx = SSL.Context()
 
587
            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,
 
588
                           self.verify_cb_exception)
 
589
            s = SSL.Connection(ctx)
 
590
            self.assertRaises(SSL.SSLError, s.connect, self.srv_addr)
 
591
            s.close()
 
592
        finally:
 
593
            self.stop_server(pid)
 
594
 
 
595
    def test_verify_cb_not_callable(self):
 
596
        ctx = SSL.Context()
 
597
        self.assertRaises(TypeError,
 
598
                          ctx.set_verify,
 
599
                          SSL.verify_peer | SSL.verify_fail_if_no_peer_cert,
 
600
                          9,
 
601
                          1)
 
602
 
 
603
    def test_verify_cb_wrong_callable(self):
 
604
        pid = self.start_server(self.args)
 
605
        try:
 
606
            ctx = SSL.Context()
 
607
            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,
 
608
                           lambda _: '')
 
609
            s = SSL.Connection(ctx)
 
610
            self.assertRaises(SSL.SSLError, s.connect, self.srv_addr)
 
611
            s.close()
 
612
        finally:
 
613
            self.stop_server(pid)
 
614
 
 
615
    def verify_cb_old(self, ctx_ptr, x509_ptr, err, depth, ok):
 
616
        try:
 
617
            from M2Crypto import X509
 
618
            assert not ok
 
619
            assert err == m2.X509_V_ERR_DEPTH_ZERO_SELF_SIGNED_CERT or \
 
620
                   err == m2.X509_V_ERR_UNABLE_TO_GET_ISSUER_CERT_LOCALLY or \
 
621
                   err == m2.X509_V_ERR_CERT_UNTRUSTED or \
 
622
                   err == m2.X509_V_ERR_UNABLE_TO_VERIFY_LEAF_SIGNATURE
 
623
            assert m2.ssl_ctx_get_cert_store(ctx_ptr)
 
624
            assert X509.X509(x509_ptr).as_pem()
 
625
        except AssertionError:
 
626
            # If we let exceptions propagate from here the
 
627
            # caller may see strange errors. This is cleaner.
 
628
            return 0
 
629
        return 1
 
630
 
 
631
    def test_verify_cb_old(self):
 
632
        pid = self.start_server(self.args)
 
633
        try:
 
634
            ctx = SSL.Context()
 
635
            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,
 
636
                           self.verify_cb_old)
 
637
            s = SSL.Connection(ctx)
 
638
            try:
 
639
                s.connect(self.srv_addr)
 
640
            except SSL.SSLError, e:
 
641
                assert 0, e
 
642
            data = self.http_get(s)
 
643
            s.close()
 
644
        finally:
 
645
            self.stop_server(pid)
 
646
        self.failIf(string.find(data, 's_server -quiet -www') == -1)
 
647
 
 
648
    def test_verify_allow_unknown_old(self):
 
649
        pid = self.start_server(self.args)
 
650
        try:
 
651
            ctx = SSL.Context()
 
652
            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,
 
653
                           SSL.cb.ssl_verify_callback)
 
654
            ctx.set_allow_unknown_ca(1)
 
655
            s = SSL.Connection(ctx)
 
656
            try:
 
657
                s.connect(self.srv_addr)
 
658
            except SSL.SSLError, e:
 
659
                assert 0, e
 
660
            data = self.http_get(s)
 
661
            s.close()
 
662
        finally:
 
663
            self.stop_server(pid)
 
664
        self.failIf(string.find(data, 's_server -quiet -www') == -1)
 
665
 
 
666
    def test_verify_allow_unknown_new(self):
 
667
        pid = self.start_server(self.args)
 
668
        try:
 
669
            ctx = SSL.Context()
 
670
            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,
 
671
                           SSL.cb.ssl_verify_callback_allow_unknown_ca)
 
672
            s = SSL.Connection(ctx)
 
673
            try:
 
674
                s.connect(self.srv_addr)
 
675
            except SSL.SSLError, e:
 
676
                assert 0, e
 
677
            data = self.http_get(s)
 
678
            s.close()
 
679
        finally:
 
680
            self.stop_server(pid)
 
681
        self.failIf(string.find(data, 's_server -quiet -www') == -1)
 
682
 
 
683
    def test_verify_cert(self):
 
684
        pid = self.start_server(self.args)
 
685
        try:
 
686
            ctx = SSL.Context()
 
687
            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)
 
688
            ctx.load_verify_locations('tests/ca.pem')
 
689
            s = SSL.Connection(ctx)
 
690
            try:
 
691
                s.connect(self.srv_addr)
 
692
            except SSL.SSLError, e:
 
693
                assert 0, e
 
694
            data = self.http_get(s)
 
695
            s.close()
 
696
        finally:
 
697
            self.stop_server(pid)
 
698
        self.failIf(string.find(data, 's_server -quiet -www') == -1)
 
699
 
 
700
    def test_verify_cert_fail(self):
 
701
        pid = self.start_server(self.args)
 
702
        try:
 
703
            ctx = SSL.Context()
 
704
            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)
 
705
            ctx.load_verify_locations('tests/server.pem')
 
706
            s = SSL.Connection(ctx)
 
707
            self.assertRaises(SSL.SSLError, s.connect, self.srv_addr)
 
708
            s.close()
 
709
        finally:
 
710
            self.stop_server(pid)
 
711
 
 
712
    def test_verify_cert_mutual_auth(self):
 
713
        self.args.extend(['-Verify', '2', '-CAfile', 'ca.pem'])        
 
714
        pid = self.start_server(self.args)
 
715
        try:
 
716
            ctx = SSL.Context()
 
717
            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)
 
718
            ctx.load_verify_locations('tests/ca.pem')
 
719
            ctx.load_cert('tests/x509.pem')
 
720
            s = SSL.Connection(ctx)
 
721
            try:
 
722
                s.connect(self.srv_addr)
 
723
            except SSL.SSLError, e:
 
724
                assert 0, e
 
725
            data = self.http_get(s)
 
726
            s.close()
 
727
        finally:
 
728
            self.stop_server(pid)
 
729
        self.failIf(string.find(data, 's_server -quiet -www') == -1)
 
730
 
 
731
    def test_verify_cert_mutual_auth_servernbio(self):
 
732
        self.args.extend(['-Verify', '2', '-CAfile', 'ca.pem', '-nbio'])
 
733
        pid = self.start_server(self.args)
 
734
        try:
 
735
            ctx = SSL.Context()
 
736
            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)
 
737
            ctx.load_verify_locations('tests/ca.pem')
 
738
            ctx.load_cert('tests/x509.pem')
 
739
            s = SSL.Connection(ctx)
 
740
            try:
 
741
                s.connect(self.srv_addr)
 
742
            except SSL.SSLError, e:
 
743
                assert 0, e
 
744
            data = self.http_get(s)
 
745
            s.close()
 
746
        finally:
 
747
            self.stop_server(pid)
 
748
        self.failIf(string.find(data, 's_server -quiet -www') == -1)
 
749
 
 
750
    def test_verify_cert_mutual_auth_fail(self):
 
751
        self.args.extend(['-Verify', '2', '-CAfile', 'ca.pem'])        
 
752
        pid = self.start_server(self.args)
 
753
        try:
 
754
            ctx = SSL.Context()
 
755
            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)
 
756
            ctx.load_verify_locations('tests/ca.pem')
 
757
            s = SSL.Connection(ctx)
 
758
            self.assertRaises(SSL.SSLError, s.connect, self.srv_addr)
 
759
            s.close()
 
760
        finally:
 
761
            self.stop_server(pid)
 
762
 
 
763
    def test_verify_nocert_fail(self):
 
764
        self.args.extend(['-nocert'])        
 
765
        pid = self.start_server(self.args)
 
766
        try:
 
767
            ctx = SSL.Context()
 
768
            ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)
 
769
            ctx.load_verify_locations('tests/ca.pem')
 
770
            s = SSL.Connection(ctx)
 
771
            self.assertRaises(SSL.SSLError, s.connect, self.srv_addr)
 
772
            s.close()
 
773
        finally:
 
774
            self.stop_server(pid)
 
775
 
 
776
    def test_blocking0(self):
 
777
        pid = self.start_server(self.args)
 
778
        try:
 
779
            ctx = SSL.Context()
 
780
            s = SSL.Connection(ctx)
 
781
            s.setblocking(0)
 
782
            self.assertRaises(Exception, s.connect, self.srv_addr)
 
783
            s.close()
 
784
        finally:
 
785
            self.stop_server(pid)
 
786
 
 
787
    def test_blocking1(self):
 
788
        pid = self.start_server(self.args)
 
789
        try:
 
790
            ctx = SSL.Context()
 
791
            s = SSL.Connection(ctx)
 
792
            s.setblocking(1)
 
793
            try:
 
794
                s.connect(self.srv_addr)
 
795
            except SSL.SSLError, e:
 
796
                assert 0, e
 
797
            data = self.http_get(s)
 
798
            s.close()
 
799
        finally:
 
800
            self.stop_server(pid)
 
801
        self.failIf(string.find(data, 's_server -quiet -www') == -1)
 
802
 
 
803
    def test_makefile(self):
 
804
        pid = self.start_server(self.args)
 
805
        try:
 
806
            ctx = SSL.Context()
 
807
            s = SSL.Connection(ctx)
 
808
            try:
 
809
                s.connect(self.srv_addr)
 
810
            except SSL.SSLError, e:
 
811
                assert 0, e
 
812
            bio = s.makefile('rw')
 
813
            #s.close()  # XXX bug 6628?
 
814
            bio.write('GET / HTTP/1.0\n\n')
 
815
            bio.flush()
 
816
            data = bio.read()
 
817
            bio.close()
 
818
            s.close()
 
819
        finally:
 
820
            self.stop_server(pid)
 
821
        self.failIf(string.find(data, 's_server -quiet -www') == -1)
 
822
 
 
823
    def test_makefile_err(self):
 
824
        pid = self.start_server(self.args)
 
825
        try:
 
826
            ctx = SSL.Context()
 
827
            s = SSL.Connection(ctx)
 
828
            try:
 
829
                s.connect(self.srv_addr)
 
830
            except SSL.SSLError, e:
 
831
                assert 0, e
 
832
            f = s.makefile()
 
833
            data = self.http_get(s)
 
834
            s.close()
 
835
            del f
 
836
            del s
 
837
            err_code = Err.peek_error_code()
 
838
            assert not err_code, 'Unexpected error: %s' % err_code
 
839
            err = Err.get_error()
 
840
            assert not err, 'Unexpected error: %s' % err
 
841
        finally:
 
842
            self.stop_server(pid)
 
843
        self.failIf(string.find(data, 's_server -quiet -www') == -1)
 
844
 
 
845
    def test_info_callback(self):
 
846
        pid = self.start_server(self.args)
 
847
        try:
 
848
            ctx = SSL.Context()
 
849
            ctx.set_info_callback()
 
850
            s = SSL.Connection(ctx)
 
851
            s.connect(self.srv_addr)
 
852
            data = self.http_get(s)
 
853
            s.close()
 
854
        finally:
 
855
            self.stop_server(pid)
 
856
        self.failIf(string.find(data, 's_server -quiet -www') == -1)
 
857
 
 
858
 
 
859
class UrllibSSLClientTestCase(BaseSSLClientTestCase):
 
860
 
 
861
    def test_urllib(self):
 
862
        pid = self.start_server(self.args)
 
863
        try:
 
864
            from M2Crypto import m2urllib
 
865
            url = m2urllib.FancyURLopener()
 
866
            url.addheader('Connection', 'close')
 
867
            u = url.open('https://%s:%s/' % (srv_host, srv_port))
 
868
            data = u.read()
 
869
            u.close()
 
870
        finally:
 
871
            self.stop_server(pid)
 
872
        self.failIf(string.find(data, 's_server -quiet -www') == -1)
 
873
 
 
874
    # XXX Don't actually know how to use m2urllib safely!
 
875
    #def test_urllib_secure_context(self):
 
876
    #def test_urllib_secure_context_fail(self):
 
877
 
 
878
    # XXX Don't actually know how to use m2urllib safely!
 
879
    #def test_urllib_safe_context(self):
 
880
    #def test_urllib_safe_context_fail(self):
 
881
 
 
882
 
 
883
class Urllib2SSLClientTestCase(BaseSSLClientTestCase):
 
884
 
 
885
    if sys.version_info >= (2,4):
 
886
        def test_urllib2(self):
 
887
            pid = self.start_server(self.args)
 
888
            try:
 
889
                from M2Crypto import m2urllib2
 
890
                opener = m2urllib2.build_opener()
 
891
                opener.addheaders = [('Connection', 'close')]
 
892
                u = opener.open('https://%s:%s/' % (srv_host, srv_port))
 
893
                data = u.read()
 
894
                u.close()
 
895
            finally:
 
896
                self.stop_server(pid)
 
897
            self.failIf(string.find(data, 's_server -quiet -www') == -1)
 
898
    
 
899
        def test_urllib2_secure_context(self):
 
900
            pid = self.start_server(self.args)
 
901
            try:
 
902
                ctx = SSL.Context()
 
903
                ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)
 
904
                ctx.load_verify_locations('tests/ca.pem')
 
905
                
 
906
                from M2Crypto import m2urllib2
 
907
                opener = m2urllib2.build_opener(ctx)
 
908
                opener.addheaders = [('Connection', 'close')]           
 
909
                u = opener.open('https://%s:%s/' % (srv_host, srv_port))
 
910
                data = u.read()
 
911
                u.close()
 
912
            finally:
 
913
                self.stop_server(pid)
 
914
            self.failIf(string.find(data, 's_server -quiet -www') == -1)
 
915
    
 
916
        def test_urllib2_secure_context_fail(self):
 
917
            pid = self.start_server(self.args)
 
918
            try:
 
919
                ctx = SSL.Context()
 
920
                ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)
 
921
                ctx.load_verify_locations('tests/server.pem')
 
922
                
 
923
                from M2Crypto import m2urllib2
 
924
                opener = m2urllib2.build_opener(ctx)
 
925
                opener.addheaders = [('Connection', 'close')]
 
926
                self.assertRaises(SSL.SSLError, opener.open, 'https://%s:%s/' % (srv_host, srv_port))
 
927
            finally:
 
928
                self.stop_server(pid)
 
929
 
 
930
        def test_z_urllib2_opener(self):
 
931
            pid = self.start_server(self.args)
 
932
            try:
 
933
                ctx = SSL.Context()
 
934
 
 
935
                from M2Crypto import m2urllib2
 
936
                opener = m2urllib2.build_opener(ctx, m2urllib2.HTTPBasicAuthHandler())
 
937
                m2urllib2.install_opener(opener)
 
938
                req = m2urllib2.Request('https://%s:%s/' % (srv_host, srv_port))
 
939
                u = m2urllib2.urlopen(req)
 
940
                data = u.read()
 
941
                u.close()
 
942
            finally:
 
943
                self.stop_server(pid)
 
944
            self.failIf(string.find(data, 's_server -quiet -www') == -1)
 
945
 
 
946
        def test_urllib2_opener_handlers(self):
 
947
            ctx = SSL.Context()
 
948
 
 
949
            from M2Crypto import m2urllib2
 
950
            opener = m2urllib2.build_opener(ctx,
 
951
                                            m2urllib2.HTTPBasicAuthHandler())
 
952
 
 
953
        def test_urllib2_leak(self):
 
954
            pid = self.start_server(self.args)
 
955
            try:
 
956
                import gc
 
957
                from M2Crypto import m2urllib2
 
958
                o = m2urllib2.build_opener()
 
959
                r = o.open('https://%s:%s/' % (srv_host, srv_port))
 
960
                s = [r.fp._sock.fp]
 
961
                r.close()
 
962
                self.assertEqual(len(gc.get_referrers(s[0])), 1)
 
963
            finally:
 
964
                self.stop_server(pid)
 
965
 
 
966
 
 
967
class TwistedSSLClientTestCase(BaseSSLClientTestCase):
 
968
 
 
969
    def test_twisted_wrapper(self):
 
970
        # Test only when twisted and ZopeInterfaces are present
 
971
        try:
 
972
            from twisted.internet.protocol import ClientFactory
 
973
            from twisted.protocols.basic import LineReceiver
 
974
            from twisted.internet import reactor
 
975
            import M2Crypto.SSL.TwistedProtocolWrapper as wrapper
 
976
        except ImportError:
 
977
            import warnings
 
978
            warnings.warn('Skipping twisted wrapper test because twisted not found')
 
979
            return
 
980
        
 
981
        class EchoClient(LineReceiver):
 
982
            def connectionMade(self):
 
983
                self.sendLine('GET / HTTP/1.0\n\n')
 
984
 
 
985
            def lineReceived(self, line):
 
986
                global twisted_data
 
987
                twisted_data += line
 
988
 
 
989
        class EchoClientFactory(ClientFactory):
 
990
            protocol = EchoClient
 
991
        
 
992
            def clientConnectionFailed(self, connector, reason):
 
993
                reactor.stop()
 
994
                assert 0, reason
 
995
        
 
996
            def clientConnectionLost(self, connector, reason):
 
997
                reactor.stop()
 
998
                
 
999
        pid = self.start_server(self.args)
 
1000
 
 
1001
        class ContextFactory:
 
1002
            def getContext(self):
 
1003
                return SSL.Context()
 
1004
 
 
1005
        try:
 
1006
            global twisted_data
 
1007
            twisted_data = ''
 
1008
            
 
1009
            contextFactory = ContextFactory()
 
1010
            factory = EchoClientFactory()
 
1011
            wrapper.connectSSL(srv_host, srv_port, factory, contextFactory)
 
1012
            reactor.run() # This will block until reactor.stop() is called
 
1013
        finally:
 
1014
            self.stop_server(pid)
 
1015
        self.failIf(string.find(twisted_data, 's_server -quiet -www') == -1)
 
1016
 
 
1017
 
 
1018
twisted_data = ''
 
1019
 
 
1020
 
 
1021
class XmlRpcLibTestCase(unittest.TestCase):
 
1022
    def test_lib(self):
 
1023
        from M2Crypto import m2xmlrpclib
 
1024
        m2xmlrpclib.SSL_Transport()
 
1025
        # XXX need server to test against
 
1026
 
 
1027
 
 
1028
class FtpsLibTestCase(unittest.TestCase):
 
1029
    def test_lib(self):
 
1030
        from M2Crypto import ftpslib
 
1031
        ftpslib.FTP_TLS()
 
1032
        # XXX need server to test against
 
1033
 
 
1034
 
 
1035
class CheckerTestCase(unittest.TestCase):
 
1036
    def test_checker(self):
 
1037
        from M2Crypto.SSL import Checker
 
1038
        from M2Crypto import X509
 
1039
 
 
1040
        check = Checker.Checker(host=srv_host,
 
1041
                                peerCertHash='7B754EFA41A264AAD370D43460BC8229F9354ECE')
 
1042
        x509 = X509.load_cert('tests/server.pem')
 
1043
        assert check(x509, srv_host)
 
1044
        self.assertRaises(Checker.WrongHost, check, x509, 'example.com')
 
1045
        
 
1046
        import doctest
 
1047
        doctest.testmod(Checker)
 
1048
        
 
1049
    
 
1050
class ContextTestCase(unittest.TestCase):
 
1051
    def test_ctx_load_verify_locations(self):
 
1052
        ctx = SSL.Context()
 
1053
        self.assertRaises(ValueError, ctx.load_verify_locations, None, None)
 
1054
        
 
1055
    def test_map(self):
 
1056
        from M2Crypto.SSL.Context import map, _ctxmap
 
1057
        assert isinstance(map(), _ctxmap)
 
1058
        ctx = SSL.Context()
 
1059
        assert map()
 
1060
        ctx.close()
 
1061
        assert map() is _ctxmap.singleton
 
1062
 
 
1063
    def test_certstore(self):
 
1064
        ctx = SSL.Context()
 
1065
        ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)
 
1066
        ctx.load_verify_locations('tests/ca.pem')
 
1067
        ctx.load_cert('tests/x509.pem')
 
1068
 
 
1069
        from M2Crypto import X509
 
1070
        store = ctx.get_cert_store()
 
1071
        assert isinstance(store, X509.X509_Store)
 
1072
 
 
1073
 
 
1074
class SessionTestCase(unittest.TestCase):
 
1075
    def test_session_load_bad(self):
 
1076
        self.assertRaises(SSL.SSLError, SSL.Session.load_session,
 
1077
                          'tests/signer.pem')
 
1078
 
 
1079
class FtpslibTestCase(unittest.TestCase):
 
1080
    def test_26_compat(self):
 
1081
        from M2Crypto import ftpslib
 
1082
        f = ftpslib.FTP_TLS()
 
1083
        # 2.6 used to raise AttributeError:
 
1084
        self.assertRaises(socket.gaierror, f.connect, 'no-such-host-dfgHJK56789', 990)
 
1085
 
 
1086
 
 
1087
def suite():
 
1088
    suite = unittest.TestSuite()
 
1089
    suite.addTest(unittest.makeSuite(CheckerTestCase))
 
1090
    suite.addTest(unittest.makeSuite(ContextTestCase))
 
1091
    suite.addTest(unittest.makeSuite(SessionTestCase))
 
1092
    suite.addTest(unittest.makeSuite(XmlRpcLibTestCase))
 
1093
    suite.addTest(unittest.makeSuite(FtpsLibTestCase))
 
1094
    suite.addTest(unittest.makeSuite(PassSSLClientTestCase))
 
1095
    suite.addTest(unittest.makeSuite(HttpslibSSLClientTestCase))
 
1096
    suite.addTest(unittest.makeSuite(UrllibSSLClientTestCase))
 
1097
    suite.addTest(unittest.makeSuite(Urllib2SSLClientTestCase))
 
1098
    suite.addTest(unittest.makeSuite(MiscSSLClientTestCase))
 
1099
    suite.addTest(unittest.makeSuite(FtpslibTestCase))
 
1100
    try:
 
1101
        import M2Crypto.SSL.TwistedProtocolWrapper as wrapper
 
1102
        suite.addTest(unittest.makeSuite(TwistedSSLClientTestCase))
 
1103
    except ImportError:
 
1104
        pass
 
1105
    return suite    
 
1106
    
 
1107
 
 
1108
def zap_servers():
 
1109
    s = 's_server'
 
1110
    fn = tempfile.mktemp() 
 
1111
    cmd = 'ps | egrep %s > %s' % (s, fn)
 
1112
    os.system(cmd)
 
1113
    f = open(fn)
 
1114
    while 1:
 
1115
        ps = f.readline()
 
1116
        if not ps:
 
1117
            break
 
1118
        chunk = string.split(ps)
 
1119
        pid, cmd = chunk[0], chunk[4]
 
1120
        if cmd == s:
 
1121
            os.kill(int(pid), 1)
 
1122
    f.close()
 
1123
    os.unlink(fn)
 
1124
 
 
1125
 
 
1126
if __name__ == '__main__':
 
1127
    report_leaks = 0
 
1128
    
 
1129
    if report_leaks:
 
1130
        import gc
 
1131
        gc.enable()
 
1132
        gc.set_debug(gc.DEBUG_LEAK & ~gc.DEBUG_SAVEALL)
 
1133
    
 
1134
    try:
 
1135
        Rand.load_file('randpool.dat', -1) 
 
1136
        unittest.TextTestRunner().run(suite())
 
1137
        Rand.save_file('randpool.dat')
 
1138
    finally:
 
1139
        zap_servers()
 
1140
 
 
1141
    if report_leaks:
 
1142
        import alltests
 
1143
        alltests.dump_garbage()