diff --git a/.gitignore b/.gitignore index a295864..5d4ba57 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ +.idea/ *.pyc __pycache__ diff --git a/acertmgr.py b/acertmgr.py index 4ea0775..4ae5e58 100755 --- a/acertmgr.py +++ b/acertmgr.py @@ -3,174 +3,153 @@ # Automated Certificate Manager using ACME # Copyright (c) Markus Hauschild & David Klaftenegger, 2016. +# Copyright (c) Rudolf Mayerhofer, 2019. # available under the ISC license, see LICENSE - -import acertmgr_ssl -import acertmgr_web -import datetime -import dateutil.relativedelta +import acme +import tools import grp import hashlib import os import pwd import shutil -import subprocess import stat +import subprocess import tempfile + import yaml +import acertmgr_web -ACME_DIR="/etc/acme" -ACME_CONF=os.path.join(ACME_DIR, "acme.conf") -ACME_CONFD=os.path.join(ACME_DIR, "domains.d") +ACME_DIR = "/etc/acme" +ACME_CONF = os.path.join(ACME_DIR, "acme.conf") +ACME_CONFD = os.path.join(ACME_DIR, "domains.d") ACME_DEFAULT_SERVER_KEY = os.path.join(ACME_DIR, "server.key") ACME_DEFAULT_ACCOUNT_KEY = os.path.join(ACME_DIR, "account.key") + class FileNotFoundError(OSError): - pass + pass -class InvalidCertificateError(Exception): - pass - # @brief check whether existing target file is still valid or source crt has been updated # @param target string containing the path to the target file # @param crt_file string containing the path to the certificate file # @return True if target file is at least as new as the certificate, False otherwise -def target_isCurrent(target, crt_file): - if not os.path.isfile(target): - return False - target_date = os.path.getmtime(target) - crt_date = os.path.getmtime(crt_file) - return target_date >= crt_date - -# @brief check whether existing certificate is still valid or expiring soon -# @param crt_file string containing the path to the certificate file -# @param ttl_days the minimum amount of days for which the certificate must be valid -# @return True if certificate is still valid for at least ttl_days, False otherwise -def cert_isValid(crt_file, ttl_days): - if not os.path.isfile(crt_file): - return False - else: - (valid_from, valid_to) = acertmgr_ssl.cert_valid_times(crt_file) - - now = datetime.datetime.now() - if valid_from > now: - raise InvalidCertificateError("Certificate seems to be from the future") - - expiry_limit = now + dateutil.relativedelta.relativedelta(days=+ttl_days) - if valid_to < expiry_limit: - return False - - return True +def target_is_current(target, crt_file): + if not os.path.isfile(target): + return False + target_date = os.path.getmtime(target) + crt_date = os.path.getmtime(crt_file) + return target_date >= crt_date # @brief fetch new certificate from letsencrypt # @param domain string containing the domain name # @param settings the domain's configuration options def cert_get(domains, settings): - print("Getting certificate for %s." % domains) + print("Getting certificate for %s." % domains) - key_file = settings['server_key'] - if not os.path.isfile(key_file): - raise FileNotFoundError("The server key file (%s) is missing!" % key_file) + key_file = settings['server_key'] + if not os.path.isfile(key_file): + raise FileNotFoundError("The server key file (%s) is missing!" % key_file) - acc_file = settings['account_key'] - if not os.path.isfile(acc_file): - raise FileNotFoundError("The account key file (%s) is missing!" % acc_file) + acc_file = settings['account_key'] + if not os.path.isfile(acc_file): + raise FileNotFoundError("The account key file (%s) is missing!" % acc_file) - filename = hashlib.md5(domains).hexdigest() - _, csr_file = tempfile.mkstemp(".csr", "%s." % filename) - _, crt_file = tempfile.mkstemp(".crt", "%s." % filename) + filename = hashlib.md5(domains).hexdigest() + _, csr_file = tempfile.mkstemp(".csr", "%s." % filename) + _, crt_file = tempfile.mkstemp(".crt", "%s." % filename) - challenge_dir = settings.get("webdir", "/var/www/acme-challenge/") - if not os.path.isdir(challenge_dir): - raise FileNotFoundError("Challenge directory (%s) does not exist!" % challenge_dir) + challenge_dir = settings.get("webdir", "/var/www/acme-challenge/") + if not os.path.isdir(challenge_dir): + raise FileNotFoundError("Challenge directory (%s) does not exist!" % challenge_dir) - if settings['mode'] == 'standalone': - port = settings.get('port', 80) + current_dir = '.' + server = None + if settings['mode'] == 'standalone': + port = settings.get('port', 80) - current_dir = os.getcwd() - os.chdir(challenge_dir) - server = acertmgr_web.ACMEHTTPServer(port) - server.start() - try: - key = acertmgr_ssl.read_key(key_file) - cr = acertmgr_ssl.cert_request(domains.split(), key) - print("Reading account key...") - acc_key = acertmgr_ssl.read_key(acc_file) - acertmgr_ssl.register_account(acc_key, settings['authority']) - crt = acertmgr_ssl.get_crt_from_csr(acc_key, cr, domains.split(), challenge_dir, settings['authority']) - with open(crt_file, "w") as crt_fd: - crt_fd.write(acertmgr_ssl.cert_to_pem(crt)) + current_dir = os.getcwd() + os.chdir(challenge_dir) + server = acertmgr_web.ACMEHTTPServer(port) + server.start() + try: + key = tools.read_key(key_file) + cr = tools.new_cert_request(domains.split(), key) + print("Reading account key...") + acc_key = tools.read_key(acc_file) + acme.register_account(acc_key, settings['authority']) + crt = acme.get_crt_from_csr(acc_key, cr, domains.split(), challenge_dir, settings['authority']) + with open(crt_file, "w") as crt_fd: + crt_fd.write(tools.convert_cert_to_pem(crt)) - # if resulting certificate is valid: store in final location - if cert_isValid(crt_file, 60): - crt_final = os.path.join(ACME_DIR, ("%s.crt" % domain)) - shutil.copy2(crt_file, crt_final) - os.chmod(crt_final, stat.S_IREAD) + # if resulting certificate is valid: store in final location + if tools.is_cert_valid(crt_file, 60): + crt_final = os.path.join(ACME_DIR, ("%s.crt" % domains.split(" ")[0])) + shutil.copy2(crt_file, crt_final) + os.chmod(crt_final, stat.S_IREAD) - finally: - if settings['mode'] == 'standalone': - server.stop() - os.chdir(current_dir) - os.remove(csr_file) - os.remove(crt_file) + finally: + if settings['mode'] == 'standalone': + server.stop() + os.chdir(current_dir) + os.remove(csr_file) + os.remove(crt_file) # @brief put new certificate in place -# @param domain string containing the domain name # @param settings the domain's configuration options # @return the action to be executed after the certificate update -def cert_put(domain, settings): - # TODO error handling - ca_file = settings.get("cafile", "") - crt_user = settings['user'] - crt_group = settings['group'] - crt_perm = settings['perm'] - crt_path = settings['path'] - crt_format = settings['format'].split(",") - crt_format = [str.strip(x) for x in crt_format] - crt_action = settings['action'] +def cert_put(settings): + # TODO error handling + ca_file = settings.get("cafile", "") + crt_user = settings['user'] + crt_group = settings['group'] + crt_perm = settings['perm'] + crt_path = settings['path'] + crt_format = settings['format'].split(",") + crt_format = [str.strip(x) for x in crt_format] + crt_action = settings['action'] - key_file = settings['server_key'] - crt_final = os.path.join(ACME_DIR, (hashlib.md5(domains).hexdigest() + ".crt")) + key_file = settings['server_key'] + crt_final = os.path.join(ACME_DIR, (hashlib.md5(domains).hexdigest() + ".crt")) - with open(crt_path, "w+") as crt_fd: - for fmt in crt_format: - if fmt == "crt": - src_fd = open(crt_final, "r") - crt_fd.write(src_fd.read()) - src_fd.close() - if fmt == "key": - src_fd = open(key_file, "r") - crt_fd.write(src_fd.read()) - src_fd.close() - if fmt == "ca": - if not os.path.isfile(ca_file): - raise FileNotFoundError("The CA certificate file (%s) is missing!" % ca_file) - src_fd = open(ca_file, "r") - crt_fd.write(src_fd.read()) - src_fd.close() - else: - # TODO error handling - pass + with open(crt_path, "w+") as crt_fd: + for fmt in crt_format: + if fmt == "crt": + src_fd = open(crt_final, "r") + crt_fd.write(src_fd.read()) + src_fd.close() + if fmt == "key": + src_fd = open(key_file, "r") + crt_fd.write(src_fd.read()) + src_fd.close() + if fmt == "ca": + if not os.path.isfile(ca_file): + raise FileNotFoundError("The CA certificate file (%s) is missing!" % ca_file) + src_fd = open(ca_file, "r") + crt_fd.write(src_fd.read()) + src_fd.close() + else: + # TODO error handling + pass - # set owner and permissions - uid = pwd.getpwnam(crt_user).pw_uid - gid = grp.getgrnam(crt_group).gr_gid - try: - os.chown(crt_path, uid, gid) - except OSError: - print('Warning: Could not set certificate file ownership!') - try: - os.chmod(crt_path, int(crt_perm, 8)) - except OSError: - print('Warning: Could not set certificate file permissions!') + # set owner and permissions + uid = pwd.getpwnam(crt_user).pw_uid + gid = grp.getgrnam(crt_group).gr_gid + try: + os.chown(crt_path, uid, gid) + except OSError: + print('Warning: Could not set certificate file ownership!') + try: + os.chmod(crt_path, int(crt_perm, 8)) + except OSError: + print('Warning: Could not set certificate file permissions!') - return crt_action + return crt_action # @brief augment configuration with defaults @@ -178,56 +157,55 @@ def cert_put(domain, settings): # @param defaults the default configuration # @return the augmented configuration def complete_config(domainconfig, globalconfig): - defaults = globalconfig['defaults'] - domainconfig['server_key'] = globalconfig['server_key'] - for name, value in defaults.items(): - if name not in domainconfig: - domainconfig[name] = value - if 'action' not in domainconfig: - domainconfig['action'] = None - return domainconfig + defaults = globalconfig['defaults'] + domainconfig['server_key'] = globalconfig['server_key'] + for name, value in defaults.items(): + if name not in domainconfig: + domainconfig[name] = value + if 'action' not in domainconfig: + domainconfig['action'] = None + return domainconfig if __name__ == "__main__": - # load global configuration - if os.path.isfile(ACME_CONF): - with open(ACME_CONF) as config_fd: - config = yaml.load(config_fd) - if not config: - config = {} - if 'defaults' not in config: - config['defaults'] = {} - if 'server_key' not in config: - config['server_key'] = ACME_DEFAULT_SERVER_KEY - if 'account_key' not in config: - config['account_key'] = ACME_DEFAULT_ACCOUNT_KEY + # load global configuration + config = {} + if os.path.isfile(ACME_CONF): + with open(ACME_CONF) as config_fd: + config = yaml.load(config_fd) + if 'defaults' not in config: + config['defaults'] = {} + if 'server_key' not in config: + config['server_key'] = ACME_DEFAULT_SERVER_KEY + if 'account_key' not in config: + config['account_key'] = ACME_DEFAULT_ACCOUNT_KEY - config['domains'] = [] - # load domain configuration - for config_file in os.listdir(ACME_CONFD): - if config_file.endswith(".conf"): - with open(os.path.join(ACME_CONFD, config_file)) as config_fd: - for entry in yaml.load(config_fd).items(): - config['domains'].append(entry) + config['domains'] = [] + # load domain configuration + for config_file in os.listdir(ACME_CONFD): + if config_file.endswith(".conf"): + with open(os.path.join(ACME_CONFD, config_file)) as config_fd: + for entry in yaml.load(config_fd).items(): + config['domains'].append(entry) - # post-update actions (run only once) - actions = set() + # post-update actions (run only once) + actions = set() - # check certificate validity and obtain/renew certificates if needed - for domains, domaincfgs in config['domains']: - # skip domains without any output files - if domaincfgs is None: - continue - crt_file = os.path.join(ACME_DIR, (hashlib.md5(domains).hexdigest() + ".crt")) - ttl_days = int(config.get('ttl_days', 15)) - if not cert_isValid(crt_file, ttl_days): - cert_get(domains, config) - for domaincfg in domaincfgs: - cfg = complete_config(domaincfg, config) - if not target_isCurrent(cfg['path'], crt_file): - actions.add(cert_put(domains, cfg)) + # check certificate validity and obtain/renew certificates if needed + for domains, domaincfgs in config['domains']: + # skip domains without any output files + if domaincfgs is None: + continue + crt_file = os.path.join(ACME_DIR, (hashlib.md5(domains).hexdigest() + ".crt")) + ttl_days = int(config.get('ttl_days', 15)) + if not tools.is_cert_valid(crt_file, ttl_days): + cert_get(domains, config) + for domaincfg in domaincfgs: + cfg = complete_config(domaincfg, config) + if not target_is_current(cfg['path'], crt_file): + actions.add(cert_put(cfg)) - # run post-update actions - for action in actions: - if action is not None: - subprocess.call(action.split()) + # run post-update actions + for action in actions: + if action is not None: + subprocess.call(action.split()) diff --git a/acertmgr_ssl.py b/acertmgr_ssl.py deleted file mode 100644 index ad2a379..0000000 --- a/acertmgr_ssl.py +++ /dev/null @@ -1,222 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -# acertmgr - ssl management functions -# Copyright (c) Markus Hauschild & David Klaftenegger, 2016. -# available under the ISC license, see LICENSE - -from cryptography import x509 -from cryptography.hazmat.backends import default_backend -from cryptography.hazmat.primitives import hashes, serialization -from cryptography.hazmat.primitives.asymmetric import padding -from cryptography.x509.oid import NameOID -import base64 -import binascii -import copy -import json -import time -import os -import re -try: - from urllib.request import urlopen # Python 3 -except ImportError: - from urllib2 import urlopen # Python 2 - -# @brief retrieve notBefore and notAfter dates of a certificate file -# @param cert_file the path to the certificate -# @return the tuple of dates: (notBefore, notAfter) -def cert_valid_times(cert_file): - with open(cert_file, 'r') as f: - cert_data = f.read() - cert = x509.load_pem_x509_certificate(cert_data, default_backend()) - return (cert.not_valid_before, cert.not_valid_after) - -# @brief create a certificate signing request -# @param names list of domain names the certificate should be valid for -# @param key the key to use with the certificate in cryptography format -# @return the CSR in cryptography format -def cert_request(names, key): - primary_name = x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, names[0].decode('utf8'))]) - all_names = x509.SubjectAlternativeName([x509.DNSName(name.decode('utf8')) for name in names]) - req = x509.CertificateSigningRequestBuilder() - req = req.subject_name(primary_name) - req = req.add_extension(all_names, critical=False) - req = req.sign(key, hashes.SHA256(), default_backend()) - return req - -# @brief convert certificate to PEM format -# @param cert certificate object in cryptography format -# @return the certificate in PEM format -def cert_to_pem(cert): - return cert.public_bytes(serialization.Encoding.PEM).decode('utf8') - -# @brief read a key from file -# @param path path to key file -# @return the key in cryptography format -def read_key(path): - with open(path, 'r') as f: - key_data = f.read() - return serialization.load_pem_private_key(key_data, None, default_backend()) - -# @brief convert numbers to byte-string -# @param num number to convert -# @return byte-string containing the number -# @todo better code welcome -def byte_string_format(num): - n = format(num, 'x') - n = "0{0}".format(n) if len(n) % 2 else n - return binascii.unhexlify(n) - -# @brief create the header information for ACME communication -# @param key the account key -# @return the header for ACME -def acme_header(key): - numbers = key.public_key().public_numbers() - header = { - "alg": "RS256", - "jwk": { - "e": base64_enc(byte_string_format(numbers.e)), - "kty": "RSA", - "n": base64_enc(byte_string_format(numbers.n)), - }, - } - return header - -# @brief register an account over ACME -# @param account_key the account key to register -# @param CA the certificate authority to register with -# @return True if new account was registered, False otherwise -def register_account(account_key, CA): - header = acme_header(account_key) - code, result = send_signed(account_key, CA, CA + "/acme/new-reg", header, { - "resource": "new-reg", - "agreement": "https://letsencrypt.org/documents/LE-SA-v1.2-November-15-2017.pdf", - }) - if code == 201: - print("Registered!") - return True - elif code == 409: - print("Already registered!") - return False - else: - raise ValueError("Error registering: {0} {1}".format(code, result)) - -# @brief helper function to base64 encode for JSON objects -# @param b the string to encode -# @return the encoded string -def base64_enc(b): - return base64.urlsafe_b64encode(b).decode('utf8').replace("=", "") - - -# @brief helper function to make signed requests -# @param CA the certificate authority -# @param url the request URL -# @param header the message header -# @param payload the message -# @return tuple of return code and request answer -def send_signed(account_key, CA, url, header, payload): - payload64 = base64_enc(json.dumps(payload).encode('utf8')) - protected = copy.deepcopy(header) - protected["nonce"] = urlopen(CA + "/directory").headers['Replay-Nonce'] - protected64 = base64_enc(json.dumps(protected).encode('utf8')) - # @todo check why this padding is not working - #pad = padding.PSS(mgf=padding.MGF1(hashes.SHA256()), salt_length=padding.PSS.MAX_LENGTH) - pad = padding.PKCS1v15() - out = account_key.sign('.'.join([protected64, payload64]).encode('utf8'), pad, hashes.SHA256()) - data = json.dumps({ - "header": header, "protected": protected64, - "payload": payload64, "signature": base64_enc(out), - }) - try: - resp = urlopen(url, data.encode('utf8')) - return resp.getcode(), resp.read() - except IOError as e: - return getattr(e, "code", None), getattr(e, "read", e.__str__)() - -# @brief function to fetch certificate using ACME -# @param account_key the account key in pyopenssl format -# @param csr the certificate signing request in pyopenssl format -# @param domains list of domains in the certificate, first is CN -# @param acme_dir directory for ACME challanges -# @param CA which signing CA to use -# @return the certificate in pyopenssl format -# @note algorithm and parts of the code are from acme-tiny -def get_crt_from_csr(account_key, csr, domains, acme_dir, CA): - header = acme_header(account_key) - accountkey_json = json.dumps(header['jwk'], sort_keys=True, separators=(',', ':')) - account_hash = hashes.Hash(hashes.SHA256(), backend=default_backend()) - account_hash.update(accountkey_json.encode('utf8')) - account_thumbprint = base64_enc(account_hash.finalize()) - - # verify each domain - for domain in domains: - print("Verifying {0}...".format(domain)) - - # get new challenge - code, result = send_signed(account_key, CA, CA + "/acme/new-authz", header, { - "resource": "new-authz", - "identifier": {"type": "dns", "value": domain}, - }) - if code != 201: - raise ValueError("Error requesting challenges: {0} {1}".format(code, result)) - - # make the challenge file - challenge = [c for c in json.loads(result.decode('utf8'))['challenges'] if c['type'] == "http-01"][0] - token = re.sub(r"[^A-Za-z0-9_\-]", "_", challenge['token']) - keyauthorization = "{0}.{1}".format(token, account_thumbprint) - wellknown_path = os.path.join(acme_dir, token) - with open(wellknown_path, "w") as wellknown_file: - wellknown_file.write(keyauthorization) - - # check that the file is in place - wellknown_url = "http://{0}/.well-known/acme-challenge/{1}".format(domain, token) - try: - resp = urlopen(wellknown_url) - resp_data = resp.read().decode('utf8').strip() - assert resp_data == keyauthorization - except (IOError, AssertionError): - os.remove(wellknown_path) - raise ValueError("Wrote file to {0}, but couldn't download {1}".format( - wellknown_path, wellknown_url)) - - # notify challenge are met - code, result = send_signed(account_key, CA, challenge['uri'], header, { - "resource": "challenge", - "keyAuthorization": keyauthorization, - }) - if code != 202: - raise ValueError("Error triggering challenge: {0} {1}".format(code, result)) - - # wait for challenge to be verified - while True: - try: - resp = urlopen(challenge['uri']) - challenge_status = json.loads(resp.read().decode('utf8')) - except IOError as e: - raise ValueError("Error checking challenge: {0} {1}".format( - e.code, json.loads(e.read().decode('utf8')))) - if challenge_status['status'] == "pending": - time.sleep(2) - elif challenge_status['status'] == "valid": - print("{0} verified!".format(domain)) - os.remove(wellknown_path) - break - else: - raise ValueError("{0} challenge did not pass: {1}".format( - domain, challenge_status)) - - # get the new certificate - print("Signing certificate...") - csr_der = csr.public_bytes(serialization.Encoding.DER) - code, result = send_signed(account_key, CA, CA + "/acme/new-cert", header, { - "resource": "new-cert", - "csr": base64_enc(csr_der), - }) - if code != 201: - raise ValueError("Error signing certificate: {0} {1}".format(code, result)) - - # return signed certificate! - print("Certificate signed!") - cert = x509.load_der_x509_certificate(result, default_backend()) - return cert - diff --git a/acertmgr_web.py b/acertmgr_web.py index 2565b79..db91906 100644 --- a/acertmgr_web.py +++ b/acertmgr_web.py @@ -6,62 +6,65 @@ # available under the ISC license, see LICENSE try: - from SimpleHTTPServer import SimpleHTTPRequestHandler + from SimpleHTTPServer import SimpleHTTPRequestHandler except ImportError: - from http.server import SimpleHTTPRequestHandler + from http.server import SimpleHTTPRequestHandler try: - from SocketServer import TCPServer as HTTPServer + from SocketServer import TCPServer as HTTPServer except ImportError: - from http.server import HTTPServer + from http.server import HTTPServer import threading + # @brief custom request handler for ACME challenges # @note current working directory is temporarily changed by the script before # the webserver starts, which allows using SimpleHTTPRequestHandler class ACMERequestHandler(SimpleHTTPRequestHandler): - # @brief remove directories from GET URL - # @details the current working directory contains the challenge files, - # there is no need for creating subdirectories for the path - # that ACME expects. - # Additionally, this allows redirecting the ACME path to this - # webserver without having to know which subdirectory is - # redirected, which simplifies integration with existing - # webservers. - def translate_path(self, path): - spath = path.split('/') - assert(spath[0] == '') - spath = spath[1:] - if spath[0] == '.well-known': - spath = spath[1:] - if spath[0] == 'acme-challenge': - spath = spath[1:] - assert(len(spath) == 1) - spath.insert(0, '') - path = '/'.join(spath) - return SimpleHTTPRequestHandler.translate_path(self, path) + # @brief remove directories from GET URL + # @details the current working directory contains the challenge files, + # there is no need for creating subdirectories for the path + # that ACME expects. + # Additionally, this allows redirecting the ACME path to this + # webserver without having to know which subdirectory is + # redirected, which simplifies integration with existing + # webservers. + def translate_path(self, path): + spath = path.split('/') + assert (spath[0] == '') + spath = spath[1:] + if spath[0] == '.well-known': + spath = spath[1:] + if spath[0] == 'acme-challenge': + spath = spath[1:] + assert (len(spath) == 1) + spath.insert(0, '') + path = '/'.join(spath) + return SimpleHTTPRequestHandler.translate_path(self, path) + # @brief start the standalone webserver # @param server the HTTPServer object # @note this function is used to be passed to threading.Thread def start_standalone(server): - server.serve_forever() + server.serve_forever() + # @brief a simple webserver for challanges class ACMEHTTPServer: - # @brief create webserver instance - # @param port the port to listen on - def __init__(self, port=80): - HTTPServer.allow_reuse_address = True - self.server = HTTPServer(("", port), ACMERequestHandler) + # @brief create webserver instance + # @param port the port to listen on + def __init__(self, port=80): + HTTPServer.allow_reuse_address = True + self.server = HTTPServer(("", port), ACMERequestHandler) - # @brief start the webserver - def start(self): - self.server_thread = threading.Thread(target=start_standalone, args=(self.server, )) - self.server_thread.start() - - # @brief stop the webserver - def stop(self): - self.server.shutdown() - self.server_thread.join() + # @brief start the webserver + def start(self): + self.server_thread = threading.Thread(target=start_standalone, args=(self.server,)) + self.server_thread.start() + + # @brief stop the webserver + def stop(self): + self.server.shutdown() + self.server_thread.join() diff --git a/acme.py b/acme.py new file mode 100644 index 0000000..b070bb1 --- /dev/null +++ b/acme.py @@ -0,0 +1,175 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# acertmgr - ssl management functions +# Copyright (c) Markus Hauschild & David Klaftenegger, 2016. +# Copyright (c) Rudolf Mayerhofer, 2019. +# available under the ISC license, see LICENSE + +import tools +import base64 +import copy +import json +import os +import re +import time + +from cryptography import x509 +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives import hashes, serialization +from cryptography.hazmat.primitives.asymmetric import padding + +try: + from urllib.request import urlopen # Python 3 +except ImportError: + from urllib2 import urlopen # Python 2 + + +# @brief create the header information for ACME communication +# @param key the account key +# @return the header for ACME +def acme_header(key): + numbers = key.public_key().public_numbers() + header = { + "alg": "RS256", + "jwk": { + "e": tools.to_json_base64(tools.byte_string_format(numbers.e)), + "kty": "RSA", + "n": tools.to_json_base64(tools.byte_string_format(numbers.n)), + }, + } + return header + + +# @brief register an account over ACME +# @param account_key the account key to register +# @param CA the certificate authority to register with +# @return True if new account was registered, False otherwise +def register_account(account_key, ca): + header = acme_header(account_key) + code, result = send_signed(account_key, ca, ca + "/acme/new-reg", header, { + "resource": "new-reg", + "agreement": "https://letsencrypt.org/documents/LE-SA-v1.2-November-15-2017.pdf", + }) + if code == 201: + print("Registered!") + return True + elif code == 409: + print("Already registered!") + return False + else: + raise ValueError("Error registering: {0} {1}".format(code, result)) + + +# @brief helper function to make signed requests +# @param CA the certificate authority +# @param url the request URL +# @param header the message header +# @param payload the message +# @return tuple of return code and request answer +def send_signed(account_key, ca, url, header, payload): + payload64 = tools.to_json_base64(json.dumps(payload).encode('utf8')) + protected = copy.deepcopy(header) + protected["nonce"] = urlopen(ca + "/directory").headers['Replay-Nonce'] + protected64 = tools.to_json_base64(json.dumps(protected).encode('utf8')) + # @todo check why this padding is not working + # pad = padding.PSS(mgf=padding.MGF1(hashes.SHA256()), salt_length=padding.PSS.MAX_LENGTH) + pad = padding.PKCS1v15() + out = account_key.sign('.'.join([protected64, payload64]).encode('utf8'), pad, hashes.SHA256()) + data = json.dumps({ + "header": header, "protected": protected64, + "payload": payload64, "signature": tools.to_json_base64(out), + }) + try: + resp = urlopen(url, data.encode('utf8')) + return resp.getcode(), resp.read() + except IOError as e: + return getattr(e, "code", None), getattr(e, "read", e.__str__)() + + +# @brief function to fetch certificate using ACME +# @param account_key the account key in pyopenssl format +# @param csr the certificate signing request in pyopenssl format +# @param domains list of domains in the certificate, first is CN +# @param acme_dir directory for ACME challanges +# @param CA which signing CA to use +# @return the certificate in pyopenssl format +# @note algorithm and parts of the code are from acme-tiny +def get_crt_from_csr(account_key, csr, domains, acme_dir, ca): + header = acme_header(account_key) + accountkey_json = json.dumps(header['jwk'], sort_keys=True, separators=(',', ':')) + account_hash = hashes.Hash(hashes.SHA256(), backend=default_backend()) + account_hash.update(accountkey_json.encode('utf8')) + account_thumbprint = tools.to_json_base64(account_hash.finalize()) + + # verify each domain + for domain in domains: + print("Verifying {0}...".format(domain)) + + # get new challenge + code, result = send_signed(account_key, ca, ca + "/acme/new-authz", header, { + "resource": "new-authz", + "identifier": {"type": "dns", "value": domain}, + }) + if code != 201: + raise ValueError("Error requesting challenges: {0} {1}".format(code, result)) + + # make the challenge file + challenge = [c for c in json.loads(result.decode('utf8'))['challenges'] if c['type'] == "http-01"][0] + token = re.sub(r"[^A-Za-z0-9_\-]", "_", challenge['token']) + keyauthorization = "{0}.{1}".format(token, account_thumbprint) + wellknown_path = os.path.join(acme_dir, token) + with open(wellknown_path, "w") as wellknown_file: + wellknown_file.write(keyauthorization) + + # check that the file is in place + wellknown_url = "http://{0}/.well-known/acme-challenge/{1}".format(domain, token) + try: + resp = urlopen(wellknown_url) + resp_data = resp.read().decode('utf8').strip() + assert resp_data == keyauthorization + except (IOError, AssertionError): + os.remove(wellknown_path) + raise ValueError("Wrote file to {0}, but couldn't download {1}".format( + wellknown_path, wellknown_url)) + + # notify challenge are met + code, result = send_signed(account_key, ca, challenge['uri'], header, { + "resource": "challenge", + "keyAuthorization": keyauthorization, + }) + if code != 202: + raise ValueError("Error triggering challenge: {0} {1}".format(code, result)) + + # wait for challenge to be verified + while True: + try: + resp = urlopen(challenge['uri']) + challenge_status = json.loads(resp.read().decode('utf8')) + except IOError as e: + raise ValueError("Error checking challenge: {0} {1}".format( + e.code, json.loads(e.read().decode('utf8')))) + if challenge_status['status'] == "pending": + time.sleep(2) + elif challenge_status['status'] == "valid": + print("{0} verified!".format(domain)) + os.remove(wellknown_path) + break + else: + raise ValueError("{0} challenge did not pass: {1}".format( + domain, challenge_status)) + + # get the new certificate + print("Signing certificate...") + csr_der = csr.public_bytes(serialization.Encoding.DER) + code, result = send_signed(account_key, ca, ca + "/acme/new-cert", header, { + "resource": "new-cert", + "csr": tools.to_json_base64(csr_der), + }) + if code != 201: + raise ValueError("Error signing certificate: {0} {1}".format(code, result)) + + # return signed certificate! + print("Certificate signed!") + cert = x509.load_der_x509_certificate(result, default_backend()) + return cert diff --git a/tools.py b/tools.py new file mode 100644 index 0000000..76fe01b --- /dev/null +++ b/tools.py @@ -0,0 +1,98 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# acertmgr - various support functions +# Copyright (c) Markus Hauschild & David Klaftenegger, 2016. +# Copyright (c) Rudolf Mayerhofer, 2019. +# available under the ISC license, see LICENSE + +import base64 +import binascii +import datetime +import os + +from cryptography import x509 +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives import hashes, serialization +from cryptography.x509.oid import NameOID + + +class InvalidCertificateError(Exception): + pass + + +# @brief retrieve notBefore and notAfter dates of a certificate file +# @param cert_file the path to the certificate +# @return the tuple of dates: (notBefore, notAfter) +def get_cert_valid_times(cert_file): + with open(cert_file, 'r') as f: + cert_data = f.read() + cert = x509.load_pem_x509_certificate(cert_data, default_backend()) + return cert.not_valid_before, cert.not_valid_after + + +# @brief check whether existing certificate is still valid or expiring soon +# @param crt_file string containing the path to the certificate file +# @param ttl_days the minimum amount of days for which the certificate must be valid +# @return True if certificate is still valid for at least ttl_days, False otherwise +def is_cert_valid(crt_file, ttl_days): + if not os.path.isfile(crt_file): + return False + else: + (valid_from, valid_to) = get_cert_valid_times(crt_file) + + now = datetime.datetime.now() + if valid_from > now: + raise InvalidCertificateError("Certificate seems to be from the future") + + expiry_limit = now + datetime.timedelta(days=ttl_days) + if valid_to < expiry_limit: + return False + + return True + + +# @brief create a certificate signing request +# @param names list of domain names the certificate should be valid for +# @param key the key to use with the certificate in pyopenssl format +# @return the CSR in pyopenssl format +def new_cert_request(names, key): + primary_name = x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, names[0].decode('utf8'))]) + all_names = x509.SubjectAlternativeName([x509.DNSName(name.decode('utf8')) for name in names]) + req = x509.CertificateSigningRequestBuilder() + req = req.subject_name(primary_name) + req = req.add_extension(all_names, critical=False) + req = req.sign(key, hashes.SHA256(), default_backend()) + return req + + +# @brief convert certificate to PEM format +# @param cert certificate object in pyopenssl format +# @return the certificate in PEM format +def convert_cert_to_pem(cert): + return cert.public_bytes(serialization.Encoding.PEM).decode('utf8') + + +# @brief read a key from file +# @param path path to key file +# @return the key in pyopenssl format +def read_key(path): + with open(path, 'r') as f: + key_data = f.read() + return serialization.load_pem_private_key(key_data, None, default_backend()) + + +# @brief helper function to base64 encode for JSON objects +# @param b the string to encode +# @return the encoded string +def to_json_base64(b): + return base64.urlsafe_b64encode(b).decode('utf8').replace("=", "") + + +# @brief convert numbers to byte-string +# @param num number to convert +# @return byte-string containing the number +def byte_string_format(num): + n = format(num, 'x') + n = "0{0}".format(n) if len(n) % 2 else n + return binascii.unhexlify(n)