1
0
mirror of https://github.com/moepman/acertmgr.git synced 2025-01-07 18:25:25 +01:00
acertmgr/acertmgr_ssl.py

196 lines
7.0 KiB
Python
Raw Normal View History

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# acertmgr - ssl management functions
# Copyright (c) Markus Hauschild & David Klaftenegger, 2016.
# available under the ISC license, see LICENSE
from OpenSSL import crypto
import base64
import binascii
import copy
import datetime
import hashlib
import json
import subprocess
import textwrap
import time
import os
import re
try:
from urllib.request import urlopen # Python 3
except ImportError:
from urllib2 import urlopen # Python 2
DEFAULT_CA = "https://acme-staging.api.letsencrypt.org"
#DEFAULT_CA = "https://acme-v01.api.letsencrypt.org"
# @brief retrieve notBefore and notAfter dates of a certificate file
# @param cert_file the path to the certificate
# @return the tuple of dates: (notBefore, notAfter)
def cert_valid_times(cert_file):
with open(cert_file) as f:
cert_data = f.read()
cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert_data)
asn1time = str('%Y%m%d%H%M%SZ'.encode('utf8'))
not_before = datetime.datetime.strptime(str(cert.get_notBefore()), asn1time)
not_after = datetime.datetime.strptime(str(cert.get_notAfter()), asn1time)
return (not_before, not_after)
# @brief create a certificate signing request
# @param names list of domain names the certificate should be valid for
# @param key_data the key to use with the certificate in PEM format
# @return the CSR in PEM format
def cert_request(names, key_data):
req = crypto.X509Req()
req.get_subject().commonName = names[0]
entries = ['DNS:'+name for name in names]
2016-04-04 01:43:46 +02:00
extensions = [crypto.X509Extension('subjectAltName'.encode('utf8'), False, ', '.join(entries).encode('utf8'))]
req.add_extensions(extensions)
key = crypto.load_privatekey(crypto.FILETYPE_PEM, key_data)
req.set_pubkey(key)
req.sign(key, 'sha256')
#return crypto.dump_certificate_request(crypto.FILETYPE_PEM, req)
return req
# @brief helper function base64 encode for jose spec
# @param b the string to encode
# @return the encoded string
def base64_enc(b):
return base64.urlsafe_b64encode(b).decode('utf8').replace("=", "")
# @brief function to fetch certificate using ACME
# @param account_key_file the path to the account key
# @param csr the certificate signing request in pyopenssl format
# @param domains list of domains in the certificate, first is CN
# @param acme_dir directory for ACME challanges
# @param CA which signing CA to use
# @return the certificate in PEM format
# @note algorithm and parts of the code are from acme-tiny
def get_crt_from_csr(account_key_file, csr, domains, acme_dir, CA=DEFAULT_CA):
print("Reading account key...")
with open(account_key_file) as f:
account_key_data = f.read()
account_key = crypto.load_privatekey(crypto.FILETYPE_PEM, account_key_data)
proc = subprocess.Popen(['openssl', 'rsa', '-modulus', '-noout', '-text'],
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
2016-04-04 01:43:46 +02:00
out, err = proc.communicate(account_key_data.encode('utf8'))
if proc.returncode != 0:
raise IOError("OpenSSL Error: {0}".format(err))
pub_exp, pub_hex = re.search(
r"publicExponent: [0-9]+ \(0x([0-9A-F]+)\).+Modulus=([0-9A-F]+)",
out.decode('utf8'), re.DOTALL).groups()
pub_exp = "0{0}".format(pub_exp) if len(pub_exp) % 2 else pub_exp
header = {
"alg": "RS256",
"jwk": {
"e": base64_enc(binascii.unhexlify(pub_exp.encode("utf-8"))),
"kty": "RSA",
"n": base64_enc(binascii.unhexlify(pub_hex.encode("utf-8"))),
},
}
accountkey_json = json.dumps(header['jwk'], sort_keys=True, separators=(',', ':'))
thumbprint = base64_enc(hashlib.sha256(accountkey_json.encode('utf8')).digest())
# helper function make signed requests
def _send_signed_request(url, payload):
payload64 = base64_enc(json.dumps(payload).encode('utf8'))
protected = copy.deepcopy(header)
protected["nonce"] = urlopen(CA + "/directory").headers['Replay-Nonce']
protected64 = base64_enc(json.dumps(protected).encode('utf8'))
out = crypto.sign(account_key, '.'.join([protected64, payload64]), 'sha256')
data = json.dumps({
"header": header, "protected": protected64,
"payload": payload64, "signature": base64_enc(out),
})
try:
resp = urlopen(url, data.encode('utf8'))
return resp.getcode(), resp.read()
except IOError as e:
return getattr(e, "code", None), getattr(e, "read", e.__str__)()
code, result = _send_signed_request(CA + "/acme/new-reg", {
"resource": "new-reg",
"agreement": "https://letsencrypt.org/documents/LE-SA-v1.0.1-July-27-2015.pdf",
})
if code == 201:
print("Registered!")
elif code == 409:
print("Already registered!")
else:
raise ValueError("Error registering: {0} {1}".format(code, result))
# verify each domain
for domain in domains:
print("Verifying {0}...".format(domain))
# get new challenge
code, result = _send_signed_request(CA + "/acme/new-authz", {
"resource": "new-authz",
"identifier": {"type": "dns", "value": domain},
})
if code != 201:
raise ValueError("Error requesting challenges: {0} {1}".format(code, result))
# make the challenge file
challenge = [c for c in json.loads(result.decode('utf8'))['challenges'] if c['type'] == "http-01"][0]
token = re.sub(r"[^A-Za-z0-9_\-]", "_", challenge['token'])
keyauthorization = "{0}.{1}".format(token, thumbprint)
wellknown_path = os.path.join(acme_dir, token)
with open(wellknown_path, "w") as wellknown_file:
wellknown_file.write(keyauthorization)
# check that the file is in place
wellknown_url = "http://{0}/.well-known/acme-challenge/{1}".format(domain, token)
try:
resp = urlopen(wellknown_url)
resp_data = resp.read().decode('utf8').strip()
assert resp_data == keyauthorization
except (IOError, AssertionError):
os.remove(wellknown_path)
raise ValueError("Wrote file to {0}, but couldn't download {1}".format(
wellknown_path, wellknown_url))
# notify challenge are met
code, result = _send_signed_request(challenge['uri'], {
"resource": "challenge",
"keyAuthorization": keyauthorization,
})
if code != 202:
raise ValueError("Error triggering challenge: {0} {1}".format(code, result))
# wait for challenge to be verified
while True:
try:
resp = urlopen(challenge['uri'])
challenge_status = json.loads(resp.read().decode('utf8'))
except IOError as e:
raise ValueError("Error checking challenge: {0} {1}".format(
e.code, json.loads(e.read().decode('utf8'))))
if challenge_status['status'] == "pending":
time.sleep(2)
elif challenge_status['status'] == "valid":
print("{0} verified!".format(domain))
os.remove(wellknown_path)
break
else:
raise ValueError("{0} challenge did not pass: {1}".format(
domain, challenge_status))
# get the new certificate
print("Signing certificate...")
csr_der = crypto.dump_certificate_request(crypto.FILETYPE_ASN1, csr)
code, result = _send_signed_request(CA + "/acme/new-cert", {
"resource": "new-cert",
"csr": base64_enc(csr_der),
})
if code != 201:
raise ValueError("Error signing certificate: {0} {1}".format(code, result))
# return signed certificate!
print("Certificate signed!")
cert = crypto.load_certificate(crypto.FILETYPE_ASN1, result)
2016-04-04 01:43:46 +02:00
return crypto.dump_certificate(crypto.FILETYPE_PEM, cert).decode('utf8')