~bloodearnest/charms/trusty/x509-cert/trunk

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
#!/usr/bin/env python3
import os
from datetime import datetime, timedelta

import ipaddress
from cryptography import x509
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization, asymmetric

backend = default_backend()


def combine_cert(key, cert, chain):
    """Combine cert, key and chain into a single file."""
    combined = key + cert
    if chain:
        combined += chain
    return combined


def generate_cert(hostname, public_ip, private_ip):
    """Generates a standards-conforming self-signed x509 cert and key."""
    key = asymmetric.rsa.generate_private_key(
        public_exponent=65537,
        key_size=1024,
        backend=backend,
    )
    public_key = key.public_key()

    now = datetime.utcnow()
    # technically deprecated, but we'll put it in for legacy client support
    name = x509.Name([
        x509.NameAttribute(x509.NameOID.COMMON_NAME, hostname)
    ])
    alt_names = x509.SubjectAlternativeName([
        # rfc5280 specifies CommonName MUST be included if set
        x509.DNSName(hostname),
        # openssl needs DNS SAN entries for IPs...
        x509.DNSName(public_ip),
        x509.DNSName(private_ip),
        # ...but gotls need the probably more correct IP SAN entries
        x509.IPAddress(ipaddress.IPv4Address(public_ip)),
        x509.IPAddress(ipaddress.IPv4Address(private_ip)),
    ])
    # enable CA usage, or else gotls (rightly) complains
    basic_contraints = x509.BasicConstraints(ca=True, path_length=0)
    # enable usage for all purposes, for stricter clients
    key_usage = x509.KeyUsage(
        digital_signature=True,
        content_commitment=True,
        key_encipherment=True,
        data_encipherment=True,
        key_agreement=True,
        key_cert_sign=True,
        crl_sign=True,
        encipher_only=False,
        decipher_only=False,
    )
    # used to identify the key even further. Not strictly needed for
    # self-signed certs, but included in case of picky clients
    subject_keyid = x509.SubjectKeyIdentifier.from_public_key(public_key)
    auth_keyid = x509.AuthorityKeyIdentifier.from_issuer_public_key(public_key)
    cert = (
        x509.CertificateBuilder()
        .subject_name(name)
        .issuer_name(name)
        .public_key(public_key)
        # arbitrary serial number, as we use a new key pair each time
        .serial_number(1000)
        .not_valid_before(now)
        .not_valid_after(now + timedelta(days=10*365))
        # rfc5820: https://tools.ietf.org/html/rfc5280#section-4.2.1
        # MUST be in server cert, SHOULD be non-critical if CommonName used
        .add_extension(alt_names, critical=False)
        # MUST be in CA cert, MUST be critical:
        .add_extension(basic_contraints, critical=True)
        # MUST be in CA cert, SHOULD be critical
        .add_extension(key_usage, critical=True)
        # SHOULD be in CA cert, MUST be non-critical
        .add_extension(subject_keyid, critical=False)
        # MAY be in CA cert, MUST be non-critical
        .add_extension(auth_keyid, critical=False)
        .sign(key, hashes.SHA256(), backend)
    )

    cert_pem = cert.public_bytes(encoding=serialization.Encoding.PEM)
    key_pem = key.private_bytes(
        encoding=serialization.Encoding.PEM,
        format=serialization.PrivateFormat.TraditionalOpenSSL,
        encryption_algorithm=serialization.NoEncryption(),
    )
    return cert_pem, key_pem


def check_cert(pem, hostname, public_ip, private_ip):
    """Parse and check a cert is still valid."""

    cert = x509.load_pem_x509_certificate(pem, backend)

    common_name = list(cert.subject)[0].value
    if hostname != common_name:
        return True, "Common Name != {}".format(hostname)

    cert_addresses = set()
    unit_addresses = set([
        hostname,
        public_ip,
        private_ip,
        ipaddress.IPv4Address(public_ip),
        ipaddress.IPv4Address(private_ip),
    ])
    for ext in cert.extensions:
        if ext.oid == x509.SubjectAlternativeName.oid:
            cert_addresses |= set(
                ext.value.get_values_for_type(x509.GeneralName)
            )

    if cert_addresses != unit_addresses:
        return True, "SubjectAlternativeName != {}".format(unit_addresses)

    return False, None


def _write_certs():
    """Utilities to dump the certs to stdout."""
    import sys
    usage = "usage: {} hostname public_ip private_ip [>cert] [3>key] [4>combo]"
    if len(sys.argv) != 4:
        print(usage.format(sys.argv[0]))
        sys.exit(3)

    hostname, ip1, ip2 = sys.argv[1:]
    cert, key = generate_cert(hostname, ip1, ip2)
    combined = combine_cert(key, cert, None)
    try:
        os.write(3, key)
    except OSError:
        os.write(sys.stdout.fileno(), key)

    os.write(sys.stdout.fileno(), cert)

    # did the they ask for a combined cert?
    try:
        os.write(4, combined)
    except OSError:
        pass


if __name__ == '__main__':
    _write_certs()