diff --git a/acertmgr/__init__.py b/acertmgr/__init__.py index 2778219..6f5f3b9 100755 --- a/acertmgr/__init__.py +++ b/acertmgr/__init__.py @@ -11,34 +11,22 @@ import importlib import io import os import pwd -import shutil import stat import subprocess -import tempfile from acertmgr import configuration, tools -# @brief check whether existing target file is still valid or source crt has been updated -# @param target string containing the path to the target file -# @param file string containing the path to the certificate file -# @return True if target file is at least as new as the certificate, False otherwise -def target_is_current(target, file): - if not os.path.isfile(target): - return False - target_date = os.path.getmtime(target) - crt_date = os.path.getmtime(file) - return target_date >= crt_date - - # @brief create a authority for the given configuration # @param settings the authority configuration options def create_authority(settings): acc_file = settings['account_key'] - if not os.path.isfile(acc_file): - print("Account key not found at '{0}'. Creating RSA key.".format(acc_file)) - tools.new_account_key(acc_file) - acc_key = tools.read_pem_key(acc_file) + if os.path.isfile(acc_file): + print("Reading account key from {}".format(acc_file)) + acc_key = tools.read_pem_file(acc_file, key=True) + else: + print("Account key not found at '{0}'. Creating key.".format(acc_file)) + acc_key = tools.new_account_key(acc_file) authority_module = importlib.import_module("acertmgr.authority.{0}".format(settings["api"])) authority_class = getattr(authority_module, "ACMEAuthority") @@ -63,44 +51,35 @@ def create_challenge_handler(settings): def cert_get(settings): print("Getting certificate for '%s'." % settings['domains']) - key_file = settings['key_file'] - key_length = settings['key_length'] - if not os.path.isfile(key_file): - print("SSL key not found at '{0}'. Creating {1} bit RSA key.".format(key_file, key_length)) - tools.new_ssl_key(key_file, key_length) - acme = create_authority(settings) + acme.register_account() - filename = settings['id'] - _, csr_file = tempfile.mkstemp(".csr", "%s." % filename) - _, crt_file = tempfile.mkstemp(".crt", "%s." % filename) - - # find challenge handlers for this certificate + # create challenge handlers for this certificate challenge_handlers = dict() for domain in settings['domainlist']: # Create the challenge handler challenge_handlers[domain] = create_challenge_handler(settings['handlers'][domain]) - try: - key = tools.read_pem_key(key_file) - cr = tools.new_cert_request(settings['domainlist'], key) - print("Reading account key...") - acme.register_account() - crt, ca = acme.get_crt_from_csr(cr, settings['domainlist'], challenge_handlers) - with io.open(crt_file, "w") as crt_fd: - crt_fd.write(tools.convert_cert_to_pem_str(crt)) + # create ssl key + key_file = settings['key_file'] + key_length = settings['key_length'] + if os.path.isfile(key_file): + key = tools.read_pem_file(key_file, key=True) + else: + print("SSL key not found at '{0}'. Creating {1} bit key.".format(key_file, key_length)) + key = tools.new_ssl_key(key_file, key_length) - # if resulting certificate is valid: store in final location - if tools.is_cert_valid(crt_file, 60): - crt_final = settings['cert_file'] - shutil.copy2(crt_file, crt_final) - os.chmod(crt_final, stat.S_IREAD) - if "static_ca" in settings and not settings['static_ca'] and ca is not None: - with io.open(settings['ca_file'], "w") as ca_fd: - ca_fd.write(tools.convert_cert_to_pem_str(ca)) - finally: - os.remove(csr_file) - os.remove(crt_file) + # create ssl csr + cr = tools.new_cert_request(settings['domainlist'], key) + + # request cert with csr + crt, ca = acme.get_crt_from_csr(cr, settings['domainlist'], challenge_handlers) + + # if resulting certificate is valid: store in final location + if tools.is_cert_valid(crt, settings['ttl_days']): + tools.write_pem_file(crt, settings['cert_file'], stat.S_IREAD) + if "static_ca" in settings and not settings['static_ca'] and ca is not None: + tools.write_pem_file(ca, settings['ca_file']) # @brief put new certificate in place @@ -165,8 +144,7 @@ def main(): # check certificate validity and obtain/renew certificates if needed for config in configs: cert_file = config['cert_file'] - ttl_days = int(config['ttl_days']) - if not tools.is_cert_valid(cert_file, ttl_days): + if not os.path.isfile(cert_file) or not tools.is_cert_valid(cert_file, config['ttl_days']): cert_get(config) for cfg in config['actions']: if not tools.target_is_current(cfg['path'], cert_file): diff --git a/acertmgr/configuration.py b/acertmgr/configuration.py index b32a943..c72bdfd 100644 --- a/acertmgr/configuration.py +++ b/acertmgr/configuration.py @@ -10,8 +10,8 @@ import argparse import copy import hashlib import io -import os import json +import os # Backward compatiblity for older versions/installations of acertmgr LEGACY_WORK_DIR = "/etc/acme" @@ -111,6 +111,7 @@ def parse_config_entry(entry, globalconfig, work_dir, authority_tos_agreement): # TTL days update_config_value(config, 'ttl_days', entry, globalconfig, DEFAULT_TTL) + config['ttl_days'] = int(config['ttl_days']) # SSL cert location (with compatibility to older versions) if 'server_cert' in globalconfig: diff --git a/acertmgr/tools.py b/acertmgr/tools.py index 59b453c..dac33e4 100644 --- a/acertmgr/tools.py +++ b/acertmgr/tools.py @@ -34,34 +34,20 @@ def get_url(url, data=None, headers=None): return urlopen(Request(url, data=data, headers={} if headers is None else headers)) -# @brief retrieve notBefore and notAfter dates of a certificate file -# @param cert_file the path to the certificate -# @return the tuple of dates: (notBefore, notAfter) -def get_cert_valid_times(cert_file): - with io.open(cert_file, 'r') as f: - cert = convert_pem_str_to_cert(f.read()) - return cert.not_valid_before, cert.not_valid_after - - # @brief check whether existing certificate is still valid or expiring soon # @param crt_file string containing the path to the certificate file # @param ttl_days the minimum amount of days for which the certificate must be valid # @return True if certificate is still valid for at least ttl_days, False otherwise -def is_cert_valid(crt_file, ttl_days): - if not os.path.isfile(crt_file): +def is_cert_valid(cert, ttl_days): + now = datetime.datetime.now() + if cert.not_valid_before > now: + raise InvalidCertificateError("Certificate seems to be from the future") + + expiry_limit = now + datetime.timedelta(days=ttl_days) + if cert.not_valid_after < expiry_limit: return False - else: - (valid_from, valid_to) = get_cert_valid_times(crt_file) - now = datetime.datetime.now() - if valid_from > now: - raise InvalidCertificateError("Certificate seems to be from the future") - - expiry_limit = now + datetime.timedelta(days=ttl_days) - if valid_to < expiry_limit: - return False - - return True + return True # @brief create a certificate signing request @@ -84,15 +70,15 @@ def new_cert_request(names, key): return req -# @brief generate a new ssl key -# @param path path where the new key file should be written -def new_account_key(path, key_size=4096): +# @brief generate a new account key +# @param path path where the new key file should be written in PEM format (optional) +def new_account_key(path=None, key_size=4096): return new_ssl_key(path, key_size) # @brief generate a new ssl key -# @param path path where the new key file should be written -def new_ssl_key(path, key_size=4096): +# @param path path where the new key file should be written in PEM format (optional) +def new_ssl_key(path=None, key_size=4096): private_key = rsa.generate_private_key( public_exponent=65537, key_size=key_size, @@ -103,21 +89,37 @@ def new_ssl_key(path, key_size=4096): format=serialization.PrivateFormat.TraditionalOpenSSL, encryption_algorithm=serialization.NoEncryption() ) - with io.open(path, 'wb') as pem_out: - pem_out.write(pem) - try: - os.chmod(path, int("0400", 8)) - except OSError: - print('Warning: Could not set file permissions on {0}!'.format(path)) + if path is not None: + with io.open(path, 'wb') as pem_out: + pem_out.write(pem) + try: + os.chmod(path, int("0400", 8)) + except OSError: + print('Warning: Could not set file permissions on {0}!'.format(path)) + return private_key # @brief read a key from file -# @param path path to key file +# @param path path to file +# @param key indicate whether we are loading a key # @return the key in pyopenssl format -def read_pem_key(path): +def read_pem_file(path, key=False): with io.open(path, 'r') as f: - key_data = f.read().encode('utf-8') - return serialization.load_pem_private_key(key_data, None, default_backend()) + if key: + return serialization.load_pem_private_key(f.read().encode('utf-8'), None, default_backend()) + else: + return convert_pem_str_to_cert(f.read()) + + +# @brief write cert data to PEM formatted file +def write_pem_file(crt, path, perms=None): + with io.open(path, "w") as f: + f.write(convert_cert_to_pem_str(crt)) + if perms: + try: + os.chmod(path, perms) + except OSError: + print('Warning: Could not set file permissions ({0}) on {1}!'.format(perms, path)) # @brief download the issuer ca for a given certificate @@ -196,3 +198,15 @@ def number_to_byte_format(num): n = format(num, 'x') n = "0{0}".format(n) if len(n) % 2 else n return binascii.unhexlify(n) + + +# @brief check whether existing target file is still valid or source crt has been updated +# @param target string containing the path to the target file +# @param file string containing the path to the certificate file +# @return True if target file is at least as new as the certificate, False otherwise +def target_is_current(target, file): + if not os.path.isfile(target): + return False + target_date = os.path.getmtime(target) + crt_date = os.path.getmtime(file) + return target_date >= crt_date