~jblunck/apport/opensuse-11.1

« back to all changes in this revision

Viewing changes to apport/crashdb_impl/HTTPSValidateCertificateConnection.py

  • Committer: Jan Blunck
  • Date: 2008-11-27 17:04:31 UTC
  • Revision ID: jblunck@suse.de-20081127170431-h12wlofvgtim4e3s
- Use M2Crypto library instead of OpenSSL since that actually works

Show diffs side-by-side

added added

removed removed

Lines of Context:
17
17
'''
18
18
 
19
19
import urllib2, httplib, socket
20
 
import OpenSSL
 
20
from M2Crypto import SSL
21
21
import os
22
22
 
23
23
class HTTPSValidateCertificateConnection(httplib.HTTPConnection):
26
26
    default_port = httplib.HTTPS_PORT
27
27
    cert_location = '/etc/ssl/certs/'
28
28
 
29
 
    def __init__(self, host, port=None):
30
 
        httplib.HTTPConnection.__init__(self, host, port)
31
 
 
32
 
    def _verify(self, conn, cert, errnum, depth, ok):
33
 
        # This obviously has to be updated
34
 
#        print '_verify (ok=%d, state=%s):' % ( ok, conn.state_string() )
35
 
#        print 'Got certificate: %s' % cert.get_subject()
36
 
#        print 'Issued by: %s' % cert.get_issuer()
37
 
 
38
 
#        peer_cert = conn.get_peer_certificate()
39
 
#        if peer_cert != None:
40
 
#            print 'Peer (%s) certificate: %s' % (conn.getpeername(), peer_cert.get_issuer())
41
 
#        print '  errnum %s, errdepth %d' % (errnum, depth)
42
 
        return ok
 
29
    def __init__(self, host, port=None, strict=None,
 
30
                 timeout=socket._GLOBAL_DEFAULT_TIMEOUT):
 
31
        httplib.HTTPConnection.__init__(self, host, port, strict, timeout)
43
32
 
44
33
    def connect(self):
45
34
        "Connect to a host on a given (SSL) port."
46
35
 
 
36
        # Setup SSL context to demand a certificate
 
37
        ctx = SSL.Context('sslv23')
 
38
        ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 10)
 
39
        ctx.load_verify_locations(HTTPSValidateCertificateConnection.cert_location)
 
40
 
 
41
        # Create real socket
47
42
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
48
 
        sock.connect((self.host, self.port))
49
 
        ctx = OpenSSL.SSL.Context(OpenSSL.SSL.SSLv23_METHOD)
50
 
        # Demand a certificate                                                  
51
 
        ctx.set_verify(OpenSSL.SSL.VERIFY_PEER, self._verify)
52
 
        ctx.set_verify_depth(10)
53
 
        ctx.load_verify_locations(HTTPSValidateCertificateConnection.cert_location)
54
 
        ssl = OpenSSL.SSL.Connection(ctx, sock)
55
 
        ssl.connect_ex((self.host, self.port))
56
 
        self.sock = httplib.FakeSocket(sock, ssl)
 
43
        ssl = SSL.Connection(ctx, sock)
 
44
        self.sock = ssl
 
45
        self.sock.connect((self.host, self.port))
 
46
        self.connected = True
57
47
 
58
48
    def set_cert_location(cls, value):
59
49
        cls.cert_location = value
70
60
 
71
61
    class __HTTPSValidateCertificateConnectionTest(unittest.TestCase):
72
62
        def test_connect(self):
73
 
            HTTPSValidateCertificateConnection.set_cert_location('CA.cert')
 
63
            HTTPSValidateCertificateConnection.set_cert_location('/usr/share/apport/CA.cert')
74
64
            opener = urllib2.build_opener(HTTPSValidateCertificateHandler)
75
65
            try:
76
66
                answer = opener.open('https://imap.suse.de').read()
77
 
            except OpenSSL.SSL.Error:
 
67
            except SSL.Error:
78
68
                self.fail()
79
69
 
80
70
    unittest.main()