1
0
mirror of https://github.com/moepman/acertmgr.git synced 2024-11-13 06:45:24 +01:00

Refactor and cleanup codebase

This commit is contained in:
Kishi85 2019-01-08 08:12:20 +01:00
parent 2446b1d3d2
commit 93377fd3a9
6 changed files with 462 additions and 429 deletions

1
.gitignore vendored
View File

@ -1,2 +1,3 @@
.idea/
*.pyc
__pycache__

View File

@ -3,174 +3,153 @@
# Automated Certificate Manager using ACME
# Copyright (c) Markus Hauschild & David Klaftenegger, 2016.
# Copyright (c) Rudolf Mayerhofer, 2019.
# available under the ISC license, see LICENSE
import acertmgr_ssl
import acertmgr_web
import datetime
import dateutil.relativedelta
import acme
import tools
import grp
import hashlib
import os
import pwd
import shutil
import subprocess
import stat
import subprocess
import tempfile
import yaml
import acertmgr_web
ACME_DIR="/etc/acme"
ACME_CONF=os.path.join(ACME_DIR, "acme.conf")
ACME_CONFD=os.path.join(ACME_DIR, "domains.d")
ACME_DIR = "/etc/acme"
ACME_CONF = os.path.join(ACME_DIR, "acme.conf")
ACME_CONFD = os.path.join(ACME_DIR, "domains.d")
ACME_DEFAULT_SERVER_KEY = os.path.join(ACME_DIR, "server.key")
ACME_DEFAULT_ACCOUNT_KEY = os.path.join(ACME_DIR, "account.key")
class FileNotFoundError(OSError):
pass
pass
class InvalidCertificateError(Exception):
pass
# @brief check whether existing target file is still valid or source crt has been updated
# @param target string containing the path to the target file
# @param crt_file string containing the path to the certificate file
# @return True if target file is at least as new as the certificate, False otherwise
def target_isCurrent(target, crt_file):
if not os.path.isfile(target):
return False
target_date = os.path.getmtime(target)
crt_date = os.path.getmtime(crt_file)
return target_date >= crt_date
# @brief check whether existing certificate is still valid or expiring soon
# @param crt_file string containing the path to the certificate file
# @param ttl_days the minimum amount of days for which the certificate must be valid
# @return True if certificate is still valid for at least ttl_days, False otherwise
def cert_isValid(crt_file, ttl_days):
if not os.path.isfile(crt_file):
return False
else:
(valid_from, valid_to) = acertmgr_ssl.cert_valid_times(crt_file)
now = datetime.datetime.now()
if valid_from > now:
raise InvalidCertificateError("Certificate seems to be from the future")
expiry_limit = now + dateutil.relativedelta.relativedelta(days=+ttl_days)
if valid_to < expiry_limit:
return False
return True
def target_is_current(target, crt_file):
if not os.path.isfile(target):
return False
target_date = os.path.getmtime(target)
crt_date = os.path.getmtime(crt_file)
return target_date >= crt_date
# @brief fetch new certificate from letsencrypt
# @param domain string containing the domain name
# @param settings the domain's configuration options
def cert_get(domains, settings):
print("Getting certificate for %s." % domains)
print("Getting certificate for %s." % domains)
key_file = settings['server_key']
if not os.path.isfile(key_file):
raise FileNotFoundError("The server key file (%s) is missing!" % key_file)
key_file = settings['server_key']
if not os.path.isfile(key_file):
raise FileNotFoundError("The server key file (%s) is missing!" % key_file)
acc_file = settings['account_key']
if not os.path.isfile(acc_file):
raise FileNotFoundError("The account key file (%s) is missing!" % acc_file)
acc_file = settings['account_key']
if not os.path.isfile(acc_file):
raise FileNotFoundError("The account key file (%s) is missing!" % acc_file)
filename = hashlib.md5(domains).hexdigest()
_, csr_file = tempfile.mkstemp(".csr", "%s." % filename)
_, crt_file = tempfile.mkstemp(".crt", "%s." % filename)
filename = hashlib.md5(domains).hexdigest()
_, csr_file = tempfile.mkstemp(".csr", "%s." % filename)
_, crt_file = tempfile.mkstemp(".crt", "%s." % filename)
challenge_dir = settings.get("webdir", "/var/www/acme-challenge/")
if not os.path.isdir(challenge_dir):
raise FileNotFoundError("Challenge directory (%s) does not exist!" % challenge_dir)
challenge_dir = settings.get("webdir", "/var/www/acme-challenge/")
if not os.path.isdir(challenge_dir):
raise FileNotFoundError("Challenge directory (%s) does not exist!" % challenge_dir)
if settings['mode'] == 'standalone':
port = settings.get('port', 80)
current_dir = '.'
server = None
if settings['mode'] == 'standalone':
port = settings.get('port', 80)
current_dir = os.getcwd()
os.chdir(challenge_dir)
server = acertmgr_web.ACMEHTTPServer(port)
server.start()
try:
key = acertmgr_ssl.read_key(key_file)
cr = acertmgr_ssl.cert_request(domains.split(), key)
print("Reading account key...")
acc_key = acertmgr_ssl.read_key(acc_file)
acertmgr_ssl.register_account(acc_key, settings['authority'])
crt = acertmgr_ssl.get_crt_from_csr(acc_key, cr, domains.split(), challenge_dir, settings['authority'])
with open(crt_file, "w") as crt_fd:
crt_fd.write(acertmgr_ssl.cert_to_pem(crt))
current_dir = os.getcwd()
os.chdir(challenge_dir)
server = acertmgr_web.ACMEHTTPServer(port)
server.start()
try:
key = tools.read_key(key_file)
cr = tools.new_cert_request(domains.split(), key)
print("Reading account key...")
acc_key = tools.read_key(acc_file)
acme.register_account(acc_key, settings['authority'])
crt = acme.get_crt_from_csr(acc_key, cr, domains.split(), challenge_dir, settings['authority'])
with open(crt_file, "w") as crt_fd:
crt_fd.write(tools.convert_cert_to_pem(crt))
# if resulting certificate is valid: store in final location
if cert_isValid(crt_file, 60):
crt_final = os.path.join(ACME_DIR, ("%s.crt" % domain))
shutil.copy2(crt_file, crt_final)
os.chmod(crt_final, stat.S_IREAD)
# if resulting certificate is valid: store in final location
if tools.is_cert_valid(crt_file, 60):
crt_final = os.path.join(ACME_DIR, ("%s.crt" % domains.split(" ")[0]))
shutil.copy2(crt_file, crt_final)
os.chmod(crt_final, stat.S_IREAD)
finally:
if settings['mode'] == 'standalone':
server.stop()
os.chdir(current_dir)
os.remove(csr_file)
os.remove(crt_file)
finally:
if settings['mode'] == 'standalone':
server.stop()
os.chdir(current_dir)
os.remove(csr_file)
os.remove(crt_file)
# @brief put new certificate in place
# @param domain string containing the domain name
# @param settings the domain's configuration options
# @return the action to be executed after the certificate update
def cert_put(domain, settings):
# TODO error handling
ca_file = settings.get("cafile", "")
crt_user = settings['user']
crt_group = settings['group']
crt_perm = settings['perm']
crt_path = settings['path']
crt_format = settings['format'].split(",")
crt_format = [str.strip(x) for x in crt_format]
crt_action = settings['action']
def cert_put(settings):
# TODO error handling
ca_file = settings.get("cafile", "")
crt_user = settings['user']
crt_group = settings['group']
crt_perm = settings['perm']
crt_path = settings['path']
crt_format = settings['format'].split(",")
crt_format = [str.strip(x) for x in crt_format]
crt_action = settings['action']
key_file = settings['server_key']
crt_final = os.path.join(ACME_DIR, (hashlib.md5(domains).hexdigest() + ".crt"))
key_file = settings['server_key']
crt_final = os.path.join(ACME_DIR, (hashlib.md5(domains).hexdigest() + ".crt"))
with open(crt_path, "w+") as crt_fd:
for fmt in crt_format:
if fmt == "crt":
src_fd = open(crt_final, "r")
crt_fd.write(src_fd.read())
src_fd.close()
if fmt == "key":
src_fd = open(key_file, "r")
crt_fd.write(src_fd.read())
src_fd.close()
if fmt == "ca":
if not os.path.isfile(ca_file):
raise FileNotFoundError("The CA certificate file (%s) is missing!" % ca_file)
src_fd = open(ca_file, "r")
crt_fd.write(src_fd.read())
src_fd.close()
else:
# TODO error handling
pass
with open(crt_path, "w+") as crt_fd:
for fmt in crt_format:
if fmt == "crt":
src_fd = open(crt_final, "r")
crt_fd.write(src_fd.read())
src_fd.close()
if fmt == "key":
src_fd = open(key_file, "r")
crt_fd.write(src_fd.read())
src_fd.close()
if fmt == "ca":
if not os.path.isfile(ca_file):
raise FileNotFoundError("The CA certificate file (%s) is missing!" % ca_file)
src_fd = open(ca_file, "r")
crt_fd.write(src_fd.read())
src_fd.close()
else:
# TODO error handling
pass
# set owner and permissions
uid = pwd.getpwnam(crt_user).pw_uid
gid = grp.getgrnam(crt_group).gr_gid
try:
os.chown(crt_path, uid, gid)
except OSError:
print('Warning: Could not set certificate file ownership!')
try:
os.chmod(crt_path, int(crt_perm, 8))
except OSError:
print('Warning: Could not set certificate file permissions!')
# set owner and permissions
uid = pwd.getpwnam(crt_user).pw_uid
gid = grp.getgrnam(crt_group).gr_gid
try:
os.chown(crt_path, uid, gid)
except OSError:
print('Warning: Could not set certificate file ownership!')
try:
os.chmod(crt_path, int(crt_perm, 8))
except OSError:
print('Warning: Could not set certificate file permissions!')
return crt_action
return crt_action
# @brief augment configuration with defaults
@ -178,56 +157,55 @@ def cert_put(domain, settings):
# @param defaults the default configuration
# @return the augmented configuration
def complete_config(domainconfig, globalconfig):
defaults = globalconfig['defaults']
domainconfig['server_key'] = globalconfig['server_key']
for name, value in defaults.items():
if name not in domainconfig:
domainconfig[name] = value
if 'action' not in domainconfig:
domainconfig['action'] = None
return domainconfig
defaults = globalconfig['defaults']
domainconfig['server_key'] = globalconfig['server_key']
for name, value in defaults.items():
if name not in domainconfig:
domainconfig[name] = value
if 'action' not in domainconfig:
domainconfig['action'] = None
return domainconfig
if __name__ == "__main__":
# load global configuration
if os.path.isfile(ACME_CONF):
with open(ACME_CONF) as config_fd:
config = yaml.load(config_fd)
if not config:
config = {}
if 'defaults' not in config:
config['defaults'] = {}
if 'server_key' not in config:
config['server_key'] = ACME_DEFAULT_SERVER_KEY
if 'account_key' not in config:
config['account_key'] = ACME_DEFAULT_ACCOUNT_KEY
# load global configuration
config = {}
if os.path.isfile(ACME_CONF):
with open(ACME_CONF) as config_fd:
config = yaml.load(config_fd)
if 'defaults' not in config:
config['defaults'] = {}
if 'server_key' not in config:
config['server_key'] = ACME_DEFAULT_SERVER_KEY
if 'account_key' not in config:
config['account_key'] = ACME_DEFAULT_ACCOUNT_KEY
config['domains'] = []
# load domain configuration
for config_file in os.listdir(ACME_CONFD):
if config_file.endswith(".conf"):
with open(os.path.join(ACME_CONFD, config_file)) as config_fd:
for entry in yaml.load(config_fd).items():
config['domains'].append(entry)
config['domains'] = []
# load domain configuration
for config_file in os.listdir(ACME_CONFD):
if config_file.endswith(".conf"):
with open(os.path.join(ACME_CONFD, config_file)) as config_fd:
for entry in yaml.load(config_fd).items():
config['domains'].append(entry)
# post-update actions (run only once)
actions = set()
# post-update actions (run only once)
actions = set()
# check certificate validity and obtain/renew certificates if needed
for domains, domaincfgs in config['domains']:
# skip domains without any output files
if domaincfgs is None:
continue
crt_file = os.path.join(ACME_DIR, (hashlib.md5(domains).hexdigest() + ".crt"))
ttl_days = int(config.get('ttl_days', 15))
if not cert_isValid(crt_file, ttl_days):
cert_get(domains, config)
for domaincfg in domaincfgs:
cfg = complete_config(domaincfg, config)
if not target_isCurrent(cfg['path'], crt_file):
actions.add(cert_put(domains, cfg))
# check certificate validity and obtain/renew certificates if needed
for domains, domaincfgs in config['domains']:
# skip domains without any output files
if domaincfgs is None:
continue
crt_file = os.path.join(ACME_DIR, (hashlib.md5(domains).hexdigest() + ".crt"))
ttl_days = int(config.get('ttl_days', 15))
if not tools.is_cert_valid(crt_file, ttl_days):
cert_get(domains, config)
for domaincfg in domaincfgs:
cfg = complete_config(domaincfg, config)
if not target_is_current(cfg['path'], crt_file):
actions.add(cert_put(cfg))
# run post-update actions
for action in actions:
if action is not None:
subprocess.call(action.split())
# run post-update actions
for action in actions:
if action is not None:
subprocess.call(action.split())

View File

@ -1,222 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# acertmgr - ssl management functions
# Copyright (c) Markus Hauschild & David Klaftenegger, 2016.
# available under the ISC license, see LICENSE
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.x509.oid import NameOID
import base64
import binascii
import copy
import json
import time
import os
import re
try:
from urllib.request import urlopen # Python 3
except ImportError:
from urllib2 import urlopen # Python 2
# @brief retrieve notBefore and notAfter dates of a certificate file
# @param cert_file the path to the certificate
# @return the tuple of dates: (notBefore, notAfter)
def cert_valid_times(cert_file):
with open(cert_file, 'r') as f:
cert_data = f.read()
cert = x509.load_pem_x509_certificate(cert_data, default_backend())
return (cert.not_valid_before, cert.not_valid_after)
# @brief create a certificate signing request
# @param names list of domain names the certificate should be valid for
# @param key the key to use with the certificate in cryptography format
# @return the CSR in cryptography format
def cert_request(names, key):
primary_name = x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, names[0].decode('utf8'))])
all_names = x509.SubjectAlternativeName([x509.DNSName(name.decode('utf8')) for name in names])
req = x509.CertificateSigningRequestBuilder()
req = req.subject_name(primary_name)
req = req.add_extension(all_names, critical=False)
req = req.sign(key, hashes.SHA256(), default_backend())
return req
# @brief convert certificate to PEM format
# @param cert certificate object in cryptography format
# @return the certificate in PEM format
def cert_to_pem(cert):
return cert.public_bytes(serialization.Encoding.PEM).decode('utf8')
# @brief read a key from file
# @param path path to key file
# @return the key in cryptography format
def read_key(path):
with open(path, 'r') as f:
key_data = f.read()
return serialization.load_pem_private_key(key_data, None, default_backend())
# @brief convert numbers to byte-string
# @param num number to convert
# @return byte-string containing the number
# @todo better code welcome
def byte_string_format(num):
n = format(num, 'x')
n = "0{0}".format(n) if len(n) % 2 else n
return binascii.unhexlify(n)
# @brief create the header information for ACME communication
# @param key the account key
# @return the header for ACME
def acme_header(key):
numbers = key.public_key().public_numbers()
header = {
"alg": "RS256",
"jwk": {
"e": base64_enc(byte_string_format(numbers.e)),
"kty": "RSA",
"n": base64_enc(byte_string_format(numbers.n)),
},
}
return header
# @brief register an account over ACME
# @param account_key the account key to register
# @param CA the certificate authority to register with
# @return True if new account was registered, False otherwise
def register_account(account_key, CA):
header = acme_header(account_key)
code, result = send_signed(account_key, CA, CA + "/acme/new-reg", header, {
"resource": "new-reg",
"agreement": "https://letsencrypt.org/documents/LE-SA-v1.2-November-15-2017.pdf",
})
if code == 201:
print("Registered!")
return True
elif code == 409:
print("Already registered!")
return False
else:
raise ValueError("Error registering: {0} {1}".format(code, result))
# @brief helper function to base64 encode for JSON objects
# @param b the string to encode
# @return the encoded string
def base64_enc(b):
return base64.urlsafe_b64encode(b).decode('utf8').replace("=", "")
# @brief helper function to make signed requests
# @param CA the certificate authority
# @param url the request URL
# @param header the message header
# @param payload the message
# @return tuple of return code and request answer
def send_signed(account_key, CA, url, header, payload):
payload64 = base64_enc(json.dumps(payload).encode('utf8'))
protected = copy.deepcopy(header)
protected["nonce"] = urlopen(CA + "/directory").headers['Replay-Nonce']
protected64 = base64_enc(json.dumps(protected).encode('utf8'))
# @todo check why this padding is not working
#pad = padding.PSS(mgf=padding.MGF1(hashes.SHA256()), salt_length=padding.PSS.MAX_LENGTH)
pad = padding.PKCS1v15()
out = account_key.sign('.'.join([protected64, payload64]).encode('utf8'), pad, hashes.SHA256())
data = json.dumps({
"header": header, "protected": protected64,
"payload": payload64, "signature": base64_enc(out),
})
try:
resp = urlopen(url, data.encode('utf8'))
return resp.getcode(), resp.read()
except IOError as e:
return getattr(e, "code", None), getattr(e, "read", e.__str__)()
# @brief function to fetch certificate using ACME
# @param account_key the account key in pyopenssl format
# @param csr the certificate signing request in pyopenssl format
# @param domains list of domains in the certificate, first is CN
# @param acme_dir directory for ACME challanges
# @param CA which signing CA to use
# @return the certificate in pyopenssl format
# @note algorithm and parts of the code are from acme-tiny
def get_crt_from_csr(account_key, csr, domains, acme_dir, CA):
header = acme_header(account_key)
accountkey_json = json.dumps(header['jwk'], sort_keys=True, separators=(',', ':'))
account_hash = hashes.Hash(hashes.SHA256(), backend=default_backend())
account_hash.update(accountkey_json.encode('utf8'))
account_thumbprint = base64_enc(account_hash.finalize())
# verify each domain
for domain in domains:
print("Verifying {0}...".format(domain))
# get new challenge
code, result = send_signed(account_key, CA, CA + "/acme/new-authz", header, {
"resource": "new-authz",
"identifier": {"type": "dns", "value": domain},
})
if code != 201:
raise ValueError("Error requesting challenges: {0} {1}".format(code, result))
# make the challenge file
challenge = [c for c in json.loads(result.decode('utf8'))['challenges'] if c['type'] == "http-01"][0]
token = re.sub(r"[^A-Za-z0-9_\-]", "_", challenge['token'])
keyauthorization = "{0}.{1}".format(token, account_thumbprint)
wellknown_path = os.path.join(acme_dir, token)
with open(wellknown_path, "w") as wellknown_file:
wellknown_file.write(keyauthorization)
# check that the file is in place
wellknown_url = "http://{0}/.well-known/acme-challenge/{1}".format(domain, token)
try:
resp = urlopen(wellknown_url)
resp_data = resp.read().decode('utf8').strip()
assert resp_data == keyauthorization
except (IOError, AssertionError):
os.remove(wellknown_path)
raise ValueError("Wrote file to {0}, but couldn't download {1}".format(
wellknown_path, wellknown_url))
# notify challenge are met
code, result = send_signed(account_key, CA, challenge['uri'], header, {
"resource": "challenge",
"keyAuthorization": keyauthorization,
})
if code != 202:
raise ValueError("Error triggering challenge: {0} {1}".format(code, result))
# wait for challenge to be verified
while True:
try:
resp = urlopen(challenge['uri'])
challenge_status = json.loads(resp.read().decode('utf8'))
except IOError as e:
raise ValueError("Error checking challenge: {0} {1}".format(
e.code, json.loads(e.read().decode('utf8'))))
if challenge_status['status'] == "pending":
time.sleep(2)
elif challenge_status['status'] == "valid":
print("{0} verified!".format(domain))
os.remove(wellknown_path)
break
else:
raise ValueError("{0} challenge did not pass: {1}".format(
domain, challenge_status))
# get the new certificate
print("Signing certificate...")
csr_der = csr.public_bytes(serialization.Encoding.DER)
code, result = send_signed(account_key, CA, CA + "/acme/new-cert", header, {
"resource": "new-cert",
"csr": base64_enc(csr_der),
})
if code != 201:
raise ValueError("Error signing certificate: {0} {1}".format(code, result))
# return signed certificate!
print("Certificate signed!")
cert = x509.load_der_x509_certificate(result, default_backend())
return cert

View File

@ -6,62 +6,65 @@
# available under the ISC license, see LICENSE
try:
from SimpleHTTPServer import SimpleHTTPRequestHandler
from SimpleHTTPServer import SimpleHTTPRequestHandler
except ImportError:
from http.server import SimpleHTTPRequestHandler
from http.server import SimpleHTTPRequestHandler
try:
from SocketServer import TCPServer as HTTPServer
from SocketServer import TCPServer as HTTPServer
except ImportError:
from http.server import HTTPServer
from http.server import HTTPServer
import threading
# @brief custom request handler for ACME challenges
# @note current working directory is temporarily changed by the script before
# the webserver starts, which allows using SimpleHTTPRequestHandler
class ACMERequestHandler(SimpleHTTPRequestHandler):
# @brief remove directories from GET URL
# @details the current working directory contains the challenge files,
# there is no need for creating subdirectories for the path
# that ACME expects.
# Additionally, this allows redirecting the ACME path to this
# webserver without having to know which subdirectory is
# redirected, which simplifies integration with existing
# webservers.
def translate_path(self, path):
spath = path.split('/')
assert(spath[0] == '')
spath = spath[1:]
if spath[0] == '.well-known':
spath = spath[1:]
if spath[0] == 'acme-challenge':
spath = spath[1:]
assert(len(spath) == 1)
spath.insert(0, '')
path = '/'.join(spath)
return SimpleHTTPRequestHandler.translate_path(self, path)
# @brief remove directories from GET URL
# @details the current working directory contains the challenge files,
# there is no need for creating subdirectories for the path
# that ACME expects.
# Additionally, this allows redirecting the ACME path to this
# webserver without having to know which subdirectory is
# redirected, which simplifies integration with existing
# webservers.
def translate_path(self, path):
spath = path.split('/')
assert (spath[0] == '')
spath = spath[1:]
if spath[0] == '.well-known':
spath = spath[1:]
if spath[0] == 'acme-challenge':
spath = spath[1:]
assert (len(spath) == 1)
spath.insert(0, '')
path = '/'.join(spath)
return SimpleHTTPRequestHandler.translate_path(self, path)
# @brief start the standalone webserver
# @param server the HTTPServer object
# @note this function is used to be passed to threading.Thread
def start_standalone(server):
server.serve_forever()
server.serve_forever()
# @brief a simple webserver for challanges
class ACMEHTTPServer:
# @brief create webserver instance
# @param port the port to listen on
def __init__(self, port=80):
HTTPServer.allow_reuse_address = True
self.server = HTTPServer(("", port), ACMERequestHandler)
# @brief create webserver instance
# @param port the port to listen on
def __init__(self, port=80):
HTTPServer.allow_reuse_address = True
self.server = HTTPServer(("", port), ACMERequestHandler)
# @brief start the webserver
def start(self):
self.server_thread = threading.Thread(target=start_standalone, args=(self.server, ))
self.server_thread.start()
# @brief stop the webserver
def stop(self):
self.server.shutdown()
self.server_thread.join()
# @brief start the webserver
def start(self):
self.server_thread = threading.Thread(target=start_standalone, args=(self.server,))
self.server_thread.start()
# @brief stop the webserver
def stop(self):
self.server.shutdown()
self.server_thread.join()

175
acme.py Normal file
View File

@ -0,0 +1,175 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# acertmgr - ssl management functions
# Copyright (c) Markus Hauschild & David Klaftenegger, 2016.
# Copyright (c) Rudolf Mayerhofer, 2019.
# available under the ISC license, see LICENSE
import tools
import base64
import copy
import json
import os
import re
import time
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import padding
try:
from urllib.request import urlopen # Python 3
except ImportError:
from urllib2 import urlopen # Python 2
# @brief create the header information for ACME communication
# @param key the account key
# @return the header for ACME
def acme_header(key):
numbers = key.public_key().public_numbers()
header = {
"alg": "RS256",
"jwk": {
"e": tools.to_json_base64(tools.byte_string_format(numbers.e)),
"kty": "RSA",
"n": tools.to_json_base64(tools.byte_string_format(numbers.n)),
},
}
return header
# @brief register an account over ACME
# @param account_key the account key to register
# @param CA the certificate authority to register with
# @return True if new account was registered, False otherwise
def register_account(account_key, ca):
header = acme_header(account_key)
code, result = send_signed(account_key, ca, ca + "/acme/new-reg", header, {
"resource": "new-reg",
"agreement": "https://letsencrypt.org/documents/LE-SA-v1.2-November-15-2017.pdf",
})
if code == 201:
print("Registered!")
return True
elif code == 409:
print("Already registered!")
return False
else:
raise ValueError("Error registering: {0} {1}".format(code, result))
# @brief helper function to make signed requests
# @param CA the certificate authority
# @param url the request URL
# @param header the message header
# @param payload the message
# @return tuple of return code and request answer
def send_signed(account_key, ca, url, header, payload):
payload64 = tools.to_json_base64(json.dumps(payload).encode('utf8'))
protected = copy.deepcopy(header)
protected["nonce"] = urlopen(ca + "/directory").headers['Replay-Nonce']
protected64 = tools.to_json_base64(json.dumps(protected).encode('utf8'))
# @todo check why this padding is not working
# pad = padding.PSS(mgf=padding.MGF1(hashes.SHA256()), salt_length=padding.PSS.MAX_LENGTH)
pad = padding.PKCS1v15()
out = account_key.sign('.'.join([protected64, payload64]).encode('utf8'), pad, hashes.SHA256())
data = json.dumps({
"header": header, "protected": protected64,
"payload": payload64, "signature": tools.to_json_base64(out),
})
try:
resp = urlopen(url, data.encode('utf8'))
return resp.getcode(), resp.read()
except IOError as e:
return getattr(e, "code", None), getattr(e, "read", e.__str__)()
# @brief function to fetch certificate using ACME
# @param account_key the account key in pyopenssl format
# @param csr the certificate signing request in pyopenssl format
# @param domains list of domains in the certificate, first is CN
# @param acme_dir directory for ACME challanges
# @param CA which signing CA to use
# @return the certificate in pyopenssl format
# @note algorithm and parts of the code are from acme-tiny
def get_crt_from_csr(account_key, csr, domains, acme_dir, ca):
header = acme_header(account_key)
accountkey_json = json.dumps(header['jwk'], sort_keys=True, separators=(',', ':'))
account_hash = hashes.Hash(hashes.SHA256(), backend=default_backend())
account_hash.update(accountkey_json.encode('utf8'))
account_thumbprint = tools.to_json_base64(account_hash.finalize())
# verify each domain
for domain in domains:
print("Verifying {0}...".format(domain))
# get new challenge
code, result = send_signed(account_key, ca, ca + "/acme/new-authz", header, {
"resource": "new-authz",
"identifier": {"type": "dns", "value": domain},
})
if code != 201:
raise ValueError("Error requesting challenges: {0} {1}".format(code, result))
# make the challenge file
challenge = [c for c in json.loads(result.decode('utf8'))['challenges'] if c['type'] == "http-01"][0]
token = re.sub(r"[^A-Za-z0-9_\-]", "_", challenge['token'])
keyauthorization = "{0}.{1}".format(token, account_thumbprint)
wellknown_path = os.path.join(acme_dir, token)
with open(wellknown_path, "w") as wellknown_file:
wellknown_file.write(keyauthorization)
# check that the file is in place
wellknown_url = "http://{0}/.well-known/acme-challenge/{1}".format(domain, token)
try:
resp = urlopen(wellknown_url)
resp_data = resp.read().decode('utf8').strip()
assert resp_data == keyauthorization
except (IOError, AssertionError):
os.remove(wellknown_path)
raise ValueError("Wrote file to {0}, but couldn't download {1}".format(
wellknown_path, wellknown_url))
# notify challenge are met
code, result = send_signed(account_key, ca, challenge['uri'], header, {
"resource": "challenge",
"keyAuthorization": keyauthorization,
})
if code != 202:
raise ValueError("Error triggering challenge: {0} {1}".format(code, result))
# wait for challenge to be verified
while True:
try:
resp = urlopen(challenge['uri'])
challenge_status = json.loads(resp.read().decode('utf8'))
except IOError as e:
raise ValueError("Error checking challenge: {0} {1}".format(
e.code, json.loads(e.read().decode('utf8'))))
if challenge_status['status'] == "pending":
time.sleep(2)
elif challenge_status['status'] == "valid":
print("{0} verified!".format(domain))
os.remove(wellknown_path)
break
else:
raise ValueError("{0} challenge did not pass: {1}".format(
domain, challenge_status))
# get the new certificate
print("Signing certificate...")
csr_der = csr.public_bytes(serialization.Encoding.DER)
code, result = send_signed(account_key, ca, ca + "/acme/new-cert", header, {
"resource": "new-cert",
"csr": tools.to_json_base64(csr_der),
})
if code != 201:
raise ValueError("Error signing certificate: {0} {1}".format(code, result))
# return signed certificate!
print("Certificate signed!")
cert = x509.load_der_x509_certificate(result, default_backend())
return cert

98
tools.py Normal file
View File

@ -0,0 +1,98 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# acertmgr - various support functions
# Copyright (c) Markus Hauschild & David Klaftenegger, 2016.
# Copyright (c) Rudolf Mayerhofer, 2019.
# available under the ISC license, see LICENSE
import base64
import binascii
import datetime
import os
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.x509.oid import NameOID
class InvalidCertificateError(Exception):
pass
# @brief retrieve notBefore and notAfter dates of a certificate file
# @param cert_file the path to the certificate
# @return the tuple of dates: (notBefore, notAfter)
def get_cert_valid_times(cert_file):
with open(cert_file, 'r') as f:
cert_data = f.read()
cert = x509.load_pem_x509_certificate(cert_data, default_backend())
return cert.not_valid_before, cert.not_valid_after
# @brief check whether existing certificate is still valid or expiring soon
# @param crt_file string containing the path to the certificate file
# @param ttl_days the minimum amount of days for which the certificate must be valid
# @return True if certificate is still valid for at least ttl_days, False otherwise
def is_cert_valid(crt_file, ttl_days):
if not os.path.isfile(crt_file):
return False
else:
(valid_from, valid_to) = get_cert_valid_times(crt_file)
now = datetime.datetime.now()
if valid_from > now:
raise InvalidCertificateError("Certificate seems to be from the future")
expiry_limit = now + datetime.timedelta(days=ttl_days)
if valid_to < expiry_limit:
return False
return True
# @brief create a certificate signing request
# @param names list of domain names the certificate should be valid for
# @param key the key to use with the certificate in pyopenssl format
# @return the CSR in pyopenssl format
def new_cert_request(names, key):
primary_name = x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, names[0].decode('utf8'))])
all_names = x509.SubjectAlternativeName([x509.DNSName(name.decode('utf8')) for name in names])
req = x509.CertificateSigningRequestBuilder()
req = req.subject_name(primary_name)
req = req.add_extension(all_names, critical=False)
req = req.sign(key, hashes.SHA256(), default_backend())
return req
# @brief convert certificate to PEM format
# @param cert certificate object in pyopenssl format
# @return the certificate in PEM format
def convert_cert_to_pem(cert):
return cert.public_bytes(serialization.Encoding.PEM).decode('utf8')
# @brief read a key from file
# @param path path to key file
# @return the key in pyopenssl format
def read_key(path):
with open(path, 'r') as f:
key_data = f.read()
return serialization.load_pem_private_key(key_data, None, default_backend())
# @brief helper function to base64 encode for JSON objects
# @param b the string to encode
# @return the encoded string
def to_json_base64(b):
return base64.urlsafe_b64encode(b).decode('utf8').replace("=", "")
# @brief convert numbers to byte-string
# @param num number to convert
# @return byte-string containing the number
def byte_string_format(num):
n = format(num, 'x')
n = "0{0}".format(n) if len(n) % 2 else n
return binascii.unhexlify(n)