1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
|
#!/usr/bin/env python3
import os
from datetime import datetime, timedelta
import ipaddress
from cryptography import x509
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization, asymmetric
backend = default_backend()
def combine_cert(key, cert, chain):
"""Combine cert, key and chain into a single file."""
combined = key + cert
if chain:
combined += chain
return combined
def generate_cert(hostname, public_ip, private_ip):
"""Generates a standards-conforming self-signed x509 cert and key."""
key = asymmetric.rsa.generate_private_key(
public_exponent=65537,
key_size=1024,
backend=backend,
)
public_key = key.public_key()
now = datetime.utcnow()
# technically deprecated, but we'll put it in for legacy client support
name = x509.Name([
x509.NameAttribute(x509.NameOID.COMMON_NAME, hostname)
])
alt_names = x509.SubjectAlternativeName([
# rfc5280 specifies CommonName MUST be included if set
x509.DNSName(hostname),
# openssl needs DNS SAN entries for IPs...
x509.DNSName(public_ip),
x509.DNSName(private_ip),
# ...but gotls need the probably more correct IP SAN entries
x509.IPAddress(ipaddress.IPv4Address(public_ip)),
x509.IPAddress(ipaddress.IPv4Address(private_ip)),
])
# enable CA usage, or else gotls (rightly) complains
basic_contraints = x509.BasicConstraints(ca=True, path_length=0)
# enable usage for all purposes, for stricter clients
key_usage = x509.KeyUsage(
digital_signature=True,
content_commitment=True,
key_encipherment=True,
data_encipherment=True,
key_agreement=True,
key_cert_sign=True,
crl_sign=True,
encipher_only=False,
decipher_only=False,
)
# used to identify the key even further. Not strictly needed for
# self-signed certs, but included in case of picky clients
subject_keyid = x509.SubjectKeyIdentifier.from_public_key(public_key)
auth_keyid = x509.AuthorityKeyIdentifier.from_issuer_public_key(public_key)
cert = (
x509.CertificateBuilder()
.subject_name(name)
.issuer_name(name)
.public_key(public_key)
# arbitrary serial number, as we use a new key pair each time
.serial_number(1000)
.not_valid_before(now)
.not_valid_after(now + timedelta(days=10*365))
# rfc5820: https://tools.ietf.org/html/rfc5280#section-4.2.1
# MUST be in server cert, SHOULD be non-critical if CommonName used
.add_extension(alt_names, critical=False)
# MUST be in CA cert, MUST be critical:
.add_extension(basic_contraints, critical=True)
# MUST be in CA cert, SHOULD be critical
.add_extension(key_usage, critical=True)
# SHOULD be in CA cert, MUST be non-critical
.add_extension(subject_keyid, critical=False)
# MAY be in CA cert, MUST be non-critical
.add_extension(auth_keyid, critical=False)
.sign(key, hashes.SHA256(), backend)
)
cert_pem = cert.public_bytes(encoding=serialization.Encoding.PEM)
key_pem = key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
)
return cert_pem, key_pem
def check_cert(pem, hostname, public_ip, private_ip):
"""Parse and check a cert is still valid."""
cert = x509.load_pem_x509_certificate(pem, backend)
common_name = list(cert.subject)[0].value
if hostname != common_name:
return True, "Common Name != {}".format(hostname)
cert_addresses = set()
unit_addresses = set([
hostname,
public_ip,
private_ip,
ipaddress.IPv4Address(public_ip),
ipaddress.IPv4Address(private_ip),
])
for ext in cert.extensions:
if ext.oid == x509.SubjectAlternativeName.oid:
cert_addresses |= set(
ext.value.get_values_for_type(x509.GeneralName)
)
if cert_addresses != unit_addresses:
return True, "SubjectAlternativeName != {}".format(unit_addresses)
return False, None
def _write_certs():
"""Utilities to dump the certs to stdout."""
import sys
usage = "usage: {} hostname public_ip private_ip [>cert] [3>key] [4>combo]"
if len(sys.argv) != 4:
print(usage.format(sys.argv[0]))
sys.exit(3)
hostname, ip1, ip2 = sys.argv[1:]
cert, key = generate_cert(hostname, ip1, ip2)
combined = combine_cert(key, cert, None)
try:
os.write(3, key)
except OSError:
os.write(sys.stdout.fileno(), key)
os.write(sys.stdout.fileno(), cert)
# did the they ask for a combined cert?
try:
os.write(4, combined)
except OSError:
pass
if __name__ == '__main__':
_write_certs()
|