acertmgr: rework how files are handled in general

- Remove unnecessary tempfiles and keep as much in memory as possible
- Unify the way PEM files are written and read
This commit is contained in:
Kishi85 2019-03-24 17:15:30 +01:00
parent 46efc1038c
commit 7ee34912c1
3 changed files with 81 additions and 88 deletions

View File

@ -11,34 +11,22 @@ import importlib
import io
import os
import pwd
import shutil
import stat
import subprocess
import tempfile
from acertmgr import configuration, tools
# @brief check whether existing target file is still valid or source crt has been updated
# @param target string containing the path to the target file
# @param file string containing the path to the certificate file
# @return True if target file is at least as new as the certificate, False otherwise
def target_is_current(target, file):
if not os.path.isfile(target):
return False
target_date = os.path.getmtime(target)
crt_date = os.path.getmtime(file)
return target_date >= crt_date
# @brief create a authority for the given configuration
# @param settings the authority configuration options
def create_authority(settings):
acc_file = settings['account_key']
if not os.path.isfile(acc_file):
print("Account key not found at '{0}'. Creating RSA key.".format(acc_file))
tools.new_account_key(acc_file)
acc_key = tools.read_pem_key(acc_file)
if os.path.isfile(acc_file):
print("Reading account key from {}".format(acc_file))
acc_key = tools.read_pem_file(acc_file, key=True)
else:
print("Account key not found at '{0}'. Creating key.".format(acc_file))
acc_key = tools.new_account_key(acc_file)
authority_module = importlib.import_module("acertmgr.authority.{0}".format(settings["api"]))
authority_class = getattr(authority_module, "ACMEAuthority")
@ -63,44 +51,35 @@ def create_challenge_handler(settings):
def cert_get(settings):
print("Getting certificate for '%s'." % settings['domains'])
key_file = settings['key_file']
key_length = settings['key_length']
if not os.path.isfile(key_file):
print("SSL key not found at '{0}'. Creating {1} bit RSA key.".format(key_file, key_length))
tools.new_ssl_key(key_file, key_length)
acme = create_authority(settings)
acme.register_account()
filename = settings['id']
_, csr_file = tempfile.mkstemp(".csr", "%s." % filename)
_, crt_file = tempfile.mkstemp(".crt", "%s." % filename)
# find challenge handlers for this certificate
# create challenge handlers for this certificate
challenge_handlers = dict()
for domain in settings['domainlist']:
# Create the challenge handler
challenge_handlers[domain] = create_challenge_handler(settings['handlers'][domain])
try:
key = tools.read_pem_key(key_file)
cr = tools.new_cert_request(settings['domainlist'], key)
print("Reading account key...")
acme.register_account()
crt, ca = acme.get_crt_from_csr(cr, settings['domainlist'], challenge_handlers)
with io.open(crt_file, "w") as crt_fd:
crt_fd.write(tools.convert_cert_to_pem_str(crt))
# create ssl key
key_file = settings['key_file']
key_length = settings['key_length']
if os.path.isfile(key_file):
key = tools.read_pem_file(key_file, key=True)
else:
print("SSL key not found at '{0}'. Creating {1} bit key.".format(key_file, key_length))
key = tools.new_ssl_key(key_file, key_length)
# if resulting certificate is valid: store in final location
if tools.is_cert_valid(crt_file, 60):
crt_final = settings['cert_file']
shutil.copy2(crt_file, crt_final)
os.chmod(crt_final, stat.S_IREAD)
if "static_ca" in settings and not settings['static_ca'] and ca is not None:
with io.open(settings['ca_file'], "w") as ca_fd:
ca_fd.write(tools.convert_cert_to_pem_str(ca))
finally:
os.remove(csr_file)
os.remove(crt_file)
# create ssl csr
cr = tools.new_cert_request(settings['domainlist'], key)
# request cert with csr
crt, ca = acme.get_crt_from_csr(cr, settings['domainlist'], challenge_handlers)
# if resulting certificate is valid: store in final location
if tools.is_cert_valid(crt, settings['ttl_days']):
tools.write_pem_file(crt, settings['cert_file'], stat.S_IREAD)
if "static_ca" in settings and not settings['static_ca'] and ca is not None:
tools.write_pem_file(ca, settings['ca_file'])
# @brief put new certificate in place
@ -165,8 +144,7 @@ def main():
# check certificate validity and obtain/renew certificates if needed
for config in configs:
cert_file = config['cert_file']
ttl_days = int(config['ttl_days'])
if not tools.is_cert_valid(cert_file, ttl_days):
if not os.path.isfile(cert_file) or not tools.is_cert_valid(cert_file, config['ttl_days']):
cert_get(config)
for cfg in config['actions']:
if not tools.target_is_current(cfg['path'], cert_file):

View File

@ -10,8 +10,8 @@ import argparse
import copy
import hashlib
import io
import os
import json
import os
# Backward compatiblity for older versions/installations of acertmgr
LEGACY_WORK_DIR = "/etc/acme"
@ -111,6 +111,7 @@ def parse_config_entry(entry, globalconfig, work_dir, authority_tos_agreement):
# TTL days
update_config_value(config, 'ttl_days', entry, globalconfig, DEFAULT_TTL)
config['ttl_days'] = int(config['ttl_days'])
# SSL cert location (with compatibility to older versions)
if 'server_cert' in globalconfig:

View File

@ -34,34 +34,20 @@ def get_url(url, data=None, headers=None):
return urlopen(Request(url, data=data, headers={} if headers is None else headers))
# @brief retrieve notBefore and notAfter dates of a certificate file
# @param cert_file the path to the certificate
# @return the tuple of dates: (notBefore, notAfter)
def get_cert_valid_times(cert_file):
with io.open(cert_file, 'r') as f:
cert = convert_pem_str_to_cert(f.read())
return cert.not_valid_before, cert.not_valid_after
# @brief check whether existing certificate is still valid or expiring soon
# @param crt_file string containing the path to the certificate file
# @param ttl_days the minimum amount of days for which the certificate must be valid
# @return True if certificate is still valid for at least ttl_days, False otherwise
def is_cert_valid(crt_file, ttl_days):
if not os.path.isfile(crt_file):
def is_cert_valid(cert, ttl_days):
now = datetime.datetime.now()
if cert.not_valid_before > now:
raise InvalidCertificateError("Certificate seems to be from the future")
expiry_limit = now + datetime.timedelta(days=ttl_days)
if cert.not_valid_after < expiry_limit:
return False
else:
(valid_from, valid_to) = get_cert_valid_times(crt_file)
now = datetime.datetime.now()
if valid_from > now:
raise InvalidCertificateError("Certificate seems to be from the future")
expiry_limit = now + datetime.timedelta(days=ttl_days)
if valid_to < expiry_limit:
return False
return True
return True
# @brief create a certificate signing request
@ -84,15 +70,15 @@ def new_cert_request(names, key):
return req
# @brief generate a new ssl key
# @param path path where the new key file should be written
def new_account_key(path, key_size=4096):
# @brief generate a new account key
# @param path path where the new key file should be written in PEM format (optional)
def new_account_key(path=None, key_size=4096):
return new_ssl_key(path, key_size)
# @brief generate a new ssl key
# @param path path where the new key file should be written
def new_ssl_key(path, key_size=4096):
# @param path path where the new key file should be written in PEM format (optional)
def new_ssl_key(path=None, key_size=4096):
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=key_size,
@ -103,21 +89,37 @@ def new_ssl_key(path, key_size=4096):
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption()
)
with io.open(path, 'wb') as pem_out:
pem_out.write(pem)
try:
os.chmod(path, int("0400", 8))
except OSError:
print('Warning: Could not set file permissions on {0}!'.format(path))
if path is not None:
with io.open(path, 'wb') as pem_out:
pem_out.write(pem)
try:
os.chmod(path, int("0400", 8))
except OSError:
print('Warning: Could not set file permissions on {0}!'.format(path))
return private_key
# @brief read a key from file
# @param path path to key file
# @param path path to file
# @param key indicate whether we are loading a key
# @return the key in pyopenssl format
def read_pem_key(path):
def read_pem_file(path, key=False):
with io.open(path, 'r') as f:
key_data = f.read().encode('utf-8')
return serialization.load_pem_private_key(key_data, None, default_backend())
if key:
return serialization.load_pem_private_key(f.read().encode('utf-8'), None, default_backend())
else:
return convert_pem_str_to_cert(f.read())
# @brief write cert data to PEM formatted file
def write_pem_file(crt, path, perms=None):
with io.open(path, "w") as f:
f.write(convert_cert_to_pem_str(crt))
if perms:
try:
os.chmod(path, perms)
except OSError:
print('Warning: Could not set file permissions ({0}) on {1}!'.format(perms, path))
# @brief download the issuer ca for a given certificate
@ -196,3 +198,15 @@ def number_to_byte_format(num):
n = format(num, 'x')
n = "0{0}".format(n) if len(n) % 2 else n
return binascii.unhexlify(n)
# @brief check whether existing target file is still valid or source crt has been updated
# @param target string containing the path to the target file
# @param file string containing the path to the certificate file
# @return True if target file is at least as new as the certificate, False otherwise
def target_is_current(target, file):
if not os.path.isfile(target):
return False
target_date = os.path.getmtime(target)
crt_date = os.path.getmtime(file)
return target_date >= crt_date