mirror of
https://github.com/moepman/acertmgr.git
synced 2025-01-01 03:21:49 +01:00
configuration: Split into separate module
This commit is contained in:
parent
39855323aa
commit
90b25e2f3b
163
acertmgr.py
163
acertmgr.py
@ -8,7 +8,6 @@
|
|||||||
|
|
||||||
|
|
||||||
import grp
|
import grp
|
||||||
import hashlib
|
|
||||||
import importlib
|
import importlib
|
||||||
import os
|
import os
|
||||||
import pwd
|
import pwd
|
||||||
@ -17,110 +16,89 @@ import stat
|
|||||||
import subprocess
|
import subprocess
|
||||||
import tempfile
|
import tempfile
|
||||||
|
|
||||||
|
import configuration
|
||||||
import tools
|
import tools
|
||||||
|
|
||||||
ACME_DIR = "/etc/acme"
|
|
||||||
ACME_CONF = os.path.join(ACME_DIR, "acme.conf")
|
|
||||||
ACME_CONFD = os.path.join(ACME_DIR, "domains.d")
|
|
||||||
|
|
||||||
ACME_DEFAULT_SERVER_KEY = os.path.join(ACME_DIR, "server.key")
|
|
||||||
ACME_DEFAULT_ACCOUNT_KEY = os.path.join(ACME_DIR, "account.key")
|
|
||||||
|
|
||||||
|
|
||||||
# @brief check whether existing target file is still valid or source crt has been updated
|
# @brief check whether existing target file is still valid or source crt has been updated
|
||||||
# @param target string containing the path to the target file
|
# @param target string containing the path to the target file
|
||||||
# @param crt_file string containing the path to the certificate file
|
# @param file string containing the path to the certificate file
|
||||||
# @return True if target file is at least as new as the certificate, False otherwise
|
# @return True if target file is at least as new as the certificate, False otherwise
|
||||||
def target_is_current(target, crt_file):
|
def target_is_current(target, file):
|
||||||
if not os.path.isfile(target):
|
if not os.path.isfile(target):
|
||||||
return False
|
return False
|
||||||
target_date = os.path.getmtime(target)
|
target_date = os.path.getmtime(target)
|
||||||
crt_date = os.path.getmtime(crt_file)
|
crt_date = os.path.getmtime(file)
|
||||||
return target_date >= crt_date
|
return target_date >= crt_date
|
||||||
|
|
||||||
|
|
||||||
# @brief create a authority for the given configuration
|
# @brief create a authority for the given configuration
|
||||||
# @param config the authority configuration options
|
# @param settings the authority configuration options
|
||||||
def create_authority(config):
|
def create_authority(settings):
|
||||||
if "apiversion" in config:
|
if "api" in settings:
|
||||||
apiversion = config["apiversion"]
|
api = settings["api"]
|
||||||
else:
|
else:
|
||||||
apiversion = "v1"
|
api = "v1"
|
||||||
|
|
||||||
acc_file = config['account_key']
|
acc_file = settings['account_key']
|
||||||
if not os.path.isfile(acc_file):
|
if not os.path.isfile(acc_file):
|
||||||
print("Account key not found at '{0}'. Creating RSA key.".format(acc_file))
|
print("Account key not found at '{0}'. Creating RSA key.".format(acc_file))
|
||||||
tools.new_rsa_key(acc_file)
|
tools.new_rsa_key(acc_file)
|
||||||
acc_key = tools.read_key(acc_file)
|
acc_key = tools.read_key(acc_file)
|
||||||
|
|
||||||
authority_module = importlib.import_module("authority.{0}".format(apiversion))
|
authority_module = importlib.import_module("authority.{0}".format(api))
|
||||||
authority_class = getattr(authority_module, "ACMEAuthority")
|
authority_class = getattr(authority_module, "ACMEAuthority")
|
||||||
return authority_class(config.get('authority'),acc_key)
|
return authority_class(settings.get('authority'), acc_key)
|
||||||
|
|
||||||
|
|
||||||
# @brief create a challenge handler for the given configuration
|
# @brief create a challenge handler for the given configuration
|
||||||
# @param config the domain's configuration options
|
# @param settings the domain's configuration options
|
||||||
def create_challenge_handler(config):
|
def create_challenge_handler(settings):
|
||||||
if "mode" in config:
|
if "mode" in settings:
|
||||||
mode = config["mode"]
|
mode = settings["mode"]
|
||||||
else:
|
else:
|
||||||
mode = "standalone"
|
mode = "standalone"
|
||||||
|
|
||||||
handler_module = importlib.import_module("modes.{0}".format(mode))
|
handler_module = importlib.import_module("modes.{0}".format(mode))
|
||||||
handler_class = getattr(handler_module, "ChallengeHandler")
|
handler_class = getattr(handler_module, "ChallengeHandler")
|
||||||
return handler_class(config)
|
return handler_class(settings)
|
||||||
|
|
||||||
|
|
||||||
# @brief fetch new certificate from letsencrypt
|
# @brief fetch new certificate from letsencrypt
|
||||||
# @param domains string containing all domain names
|
# @param settings the domain's configuration options
|
||||||
# @param globalconfig the global configuration options
|
def cert_get(settings):
|
||||||
# @param handlerconfigs the domain's handler configuration options
|
print("Getting certificate for '%s'." % settings['domains'])
|
||||||
def cert_get(domains, globalconfig, handlerconfigs):
|
|
||||||
print("Getting certificate for '%s'." % domains)
|
|
||||||
|
|
||||||
key_file = globalconfig['server_key']
|
key_file = settings['key_file']
|
||||||
|
key_length = settings['key_length']
|
||||||
if not os.path.isfile(key_file):
|
if not os.path.isfile(key_file):
|
||||||
print("Server key not found at '{0}'. Creating RSA key.".format(key_file))
|
print("SSL key not found at '{0}'. Creating {1} bit RSA key.".format(key_file, key_length))
|
||||||
tools.new_rsa_key(key_file)
|
tools.new_rsa_key(key_file, key_length)
|
||||||
|
|
||||||
acme = create_authority(globalconfig)
|
acme = create_authority(settings)
|
||||||
|
|
||||||
filename = hashlib.md5(domains).hexdigest()
|
filename = settings['id']
|
||||||
_, csr_file = tempfile.mkstemp(".csr", "%s." % filename)
|
_, csr_file = tempfile.mkstemp(".csr", "%s." % filename)
|
||||||
_, crt_file = tempfile.mkstemp(".crt", "%s." % filename)
|
_, crt_file = tempfile.mkstemp(".crt", "%s." % filename)
|
||||||
|
|
||||||
# find challenge handlers for this certificate
|
# find challenge handlers for this certificate
|
||||||
challenge_handlers = dict()
|
challenge_handlers = dict()
|
||||||
domainlist = domains.split(' ')
|
for domain in config['domainlist']:
|
||||||
for domain in domainlist:
|
|
||||||
# Use global config as base handler config
|
|
||||||
cfg = globalconfig.deepcopy()
|
|
||||||
|
|
||||||
# Determine generic domain handler config values
|
|
||||||
genericfgs = [x for x in handlerconfigs if 'domain' not in x]
|
|
||||||
if len(genericfgs) > 0:
|
|
||||||
cfg = cfg.update(genericfgs[0])
|
|
||||||
|
|
||||||
# Update handler config with more specific values
|
|
||||||
specificcfgs = [x for x in handlerconfigs if ('domain' in x and x['domain'] == domain)]
|
|
||||||
if len(specificcfgs) > 0:
|
|
||||||
cfg = cfg.update(specificcfgs[0])
|
|
||||||
|
|
||||||
# Create the challenge handler
|
# Create the challenge handler
|
||||||
challenge_handlers[domain] = create_challenge_handler(cfg)
|
challenge_handlers[domain] = create_challenge_handler(settings['handlers'][domain])
|
||||||
|
|
||||||
try:
|
try:
|
||||||
key = tools.read_key(key_file)
|
key = tools.read_key(key_file)
|
||||||
cr = tools.new_cert_request(domainlist, key)
|
cr = tools.new_cert_request(config['domainlist'], key)
|
||||||
print("Reading account key...")
|
print("Reading account key...")
|
||||||
acme.register_account()
|
acme.register_account()
|
||||||
crt = acme.get_crt_from_csr(cr, domainlist, challenge_handlers)
|
crt = acme.get_crt_from_csr(cr, config['domainlist'], challenge_handlers)
|
||||||
with open(crt_file, "w") as crt_fd:
|
with open(crt_file, "w") as crt_fd:
|
||||||
crt_fd.write(tools.convert_cert_to_pem(crt))
|
crt_fd.write(tools.convert_cert_to_pem(crt))
|
||||||
|
|
||||||
# if resulting certificate is valid: store in final location
|
# if resulting certificate is valid: store in final location
|
||||||
if tools.is_cert_valid(crt_file, 60):
|
if tools.is_cert_valid(crt_file, 60):
|
||||||
crt_final = os.path.join(ACME_DIR, (hashlib.md5(domains).hexdigest() + ".crt"))
|
crt_final = settings['cert_file']
|
||||||
shutil.copy2(crt_file, crt_final)
|
shutil.copy2(crt_file, crt_final)
|
||||||
os.chmod(crt_final, stat.S_IREAD)
|
os.chmod(crt_final, stat.S_IREAD)
|
||||||
|
|
||||||
@ -134,7 +112,7 @@ def cert_get(domains, globalconfig, handlerconfigs):
|
|||||||
# @return the action to be executed after the certificate update
|
# @return the action to be executed after the certificate update
|
||||||
def cert_put(settings):
|
def cert_put(settings):
|
||||||
# TODO error handling
|
# TODO error handling
|
||||||
ca_file = settings.get("cafile", "")
|
ca_file = settings['ca_file']
|
||||||
crt_user = settings['user']
|
crt_user = settings['user']
|
||||||
crt_group = settings['group']
|
crt_group = settings['group']
|
||||||
crt_perm = settings['perm']
|
crt_perm = settings['perm']
|
||||||
@ -143,8 +121,8 @@ def cert_put(settings):
|
|||||||
crt_format = [str.strip(x) for x in crt_format]
|
crt_format = [str.strip(x) for x in crt_format]
|
||||||
crt_action = settings['action']
|
crt_action = settings['action']
|
||||||
|
|
||||||
key_file = settings['server_key']
|
key_file = settings['key_file']
|
||||||
crt_final = os.path.join(ACME_DIR, (hashlib.md5(domains).hexdigest() + ".crt"))
|
crt_final = settings['cert_file']
|
||||||
|
|
||||||
with open(crt_path, "w+") as crt_fd:
|
with open(crt_path, "w+") as crt_fd:
|
||||||
for fmt in crt_format:
|
for fmt in crt_format:
|
||||||
@ -181,75 +159,22 @@ def cert_put(settings):
|
|||||||
return crt_action
|
return crt_action
|
||||||
|
|
||||||
|
|
||||||
# @brief augment configuration with defaults
|
|
||||||
# @param domainconfig the domain configuration
|
|
||||||
# @param defaults the default configuration
|
|
||||||
# @return the augmented configuration
|
|
||||||
def complete_config(domainconfig, globalconfig):
|
|
||||||
defaults = globalconfig['defaults']
|
|
||||||
domainconfig['server_key'] = globalconfig['server_key']
|
|
||||||
for name, value in defaults.items():
|
|
||||||
if name not in domainconfig:
|
|
||||||
domainconfig[name] = value
|
|
||||||
if 'action' not in domainconfig:
|
|
||||||
domainconfig['action'] = None
|
|
||||||
return domainconfig
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
config = dict()
|
# load config
|
||||||
# load global configuration
|
configs = configuration.load()
|
||||||
if os.path.isfile(ACME_CONF):
|
|
||||||
with open(ACME_CONF) as config_fd:
|
|
||||||
try:
|
|
||||||
import json
|
|
||||||
|
|
||||||
config = json.load(config_fd)
|
|
||||||
except json.JSONDecodeError:
|
|
||||||
import yaml
|
|
||||||
|
|
||||||
config = yaml.load(config_fd)
|
|
||||||
if 'defaults' not in config:
|
|
||||||
config['defaults'] = {}
|
|
||||||
if 'server_key' not in config:
|
|
||||||
config['server_key'] = ACME_DEFAULT_SERVER_KEY
|
|
||||||
if 'account_key' not in config:
|
|
||||||
config['account_key'] = ACME_DEFAULT_ACCOUNT_KEY
|
|
||||||
|
|
||||||
config['domains'] = []
|
|
||||||
# load domain configuration
|
|
||||||
for config_file in os.listdir(ACME_CONFD):
|
|
||||||
if config_file.endswith(".conf"):
|
|
||||||
with open(os.path.join(ACME_CONFD, config_file)) as config_fd:
|
|
||||||
try:
|
|
||||||
import json
|
|
||||||
|
|
||||||
for entry in json.load(config_fd).items():
|
|
||||||
config['domains'].append(entry)
|
|
||||||
except json.JSONDecodeError:
|
|
||||||
import yaml
|
|
||||||
|
|
||||||
for entry in yaml.load(config_fd).items():
|
|
||||||
config['domains'].append(entry)
|
|
||||||
|
|
||||||
# post-update actions (run only once)
|
# post-update actions (run only once)
|
||||||
actions = set()
|
actions = set()
|
||||||
|
|
||||||
# check certificate validity and obtain/renew certificates if needed
|
# check certificate validity and obtain/renew certificates if needed
|
||||||
for domains, domaincfgs in config['domains']:
|
for config in configs:
|
||||||
# skip domains without any output files
|
cert_file = config['cert_file']
|
||||||
if domaincfgs is None:
|
ttl_days = int(config.get('ttl_days', configuration.ACME_DEFAULT_TTL))
|
||||||
continue
|
if not tools.is_cert_valid(cert_file, ttl_days):
|
||||||
crt_file = os.path.join(ACME_DIR, (hashlib.md5(domains).hexdigest() + ".crt"))
|
cert_get(config)
|
||||||
ttl_days = int(config.get('ttl_days', 15))
|
for cfg in config['actions']:
|
||||||
if not tools.is_cert_valid(crt_file, ttl_days):
|
if not target_is_current(cfg['path'], cert_file):
|
||||||
# Get certificates using handler configs (contain element 'mode')
|
actions.add(cert_put(cfg))
|
||||||
cert_get(domains, config, [x for x in domaincfgs if 'mode' in x])
|
|
||||||
# Run actions from config (contain element 'path')
|
|
||||||
for actioncfg in [x for x in domaincfgs if 'path' in x]:
|
|
||||||
actioncfg = complete_config(actioncfg, config)
|
|
||||||
if not target_is_current(actioncfg['path'], crt_file):
|
|
||||||
actions.add(cert_put(actioncfg))
|
|
||||||
|
|
||||||
# run post-update actions
|
# run post-update actions
|
||||||
for action in actions:
|
for action in actions:
|
||||||
|
166
configuration.py
Normal file
166
configuration.py
Normal file
@ -0,0 +1,166 @@
|
|||||||
|
#!/usr/bin/env python
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
|
# config - acertmgr config parser
|
||||||
|
# Copyright (c) Markus Hauschild & David Klaftenegger, 2016.
|
||||||
|
# Copyright (c) Rudolf Mayerhofer, 2019.
|
||||||
|
# available under the ISC license, see LICENSE
|
||||||
|
|
||||||
|
import copy
|
||||||
|
import io
|
||||||
|
import os
|
||||||
|
|
||||||
|
import tools
|
||||||
|
|
||||||
|
ACME_DIR = "/etc/acme"
|
||||||
|
ACME_CONF = os.path.join(ACME_DIR, "acme.conf")
|
||||||
|
ACME_CONFD = os.path.join(ACME_DIR, "domains.d")
|
||||||
|
|
||||||
|
ACME_DEFAULT_ACCOUNT_KEY = os.path.join(ACME_DIR, "account.key")
|
||||||
|
ACME_DEFAULT_KEY_LENGTH = 4096 # bits
|
||||||
|
ACME_DEFAULT_TTL = 15 # days
|
||||||
|
|
||||||
|
|
||||||
|
# @brief augment configuration with defaults
|
||||||
|
# @param domainconfig the domain configuration
|
||||||
|
# @param defaults the default configuration
|
||||||
|
# @return the augmented configuration
|
||||||
|
def complete_action_config(domainconfig, config):
|
||||||
|
defaults = config['defaults']
|
||||||
|
domainconfig['ca_file'] = config['ca_file']
|
||||||
|
domainconfig['cert_file'] = config['cert_file']
|
||||||
|
domainconfig['key_file'] = config['key_file']
|
||||||
|
for name, value in defaults.items():
|
||||||
|
if name not in domainconfig:
|
||||||
|
domainconfig[name] = value
|
||||||
|
if 'action' not in domainconfig:
|
||||||
|
domainconfig['action'] = None
|
||||||
|
return domainconfig
|
||||||
|
|
||||||
|
|
||||||
|
# @brief load the configuration from a file
|
||||||
|
def parse_config_entry(entry, globalconfig):
|
||||||
|
config = dict()
|
||||||
|
|
||||||
|
# Basic domain information
|
||||||
|
config['domains'], data = entry
|
||||||
|
config['domainlist'] = config['domains'].split(' ')
|
||||||
|
config['id'] = tools.to_unique_id(config['domains'])
|
||||||
|
|
||||||
|
# Defaults
|
||||||
|
config['defaults'] = globalconfig.get('defaults', {})
|
||||||
|
|
||||||
|
# API version
|
||||||
|
apis = [x for x in entry if 'api' in x]
|
||||||
|
if len(apis) > 0:
|
||||||
|
config['api'] = apis[0]
|
||||||
|
|
||||||
|
# Certificate authority
|
||||||
|
authorities = [x for x in entry if 'authority' in x]
|
||||||
|
if len(authorities) > 0:
|
||||||
|
config['authority'] = authorities[0]
|
||||||
|
else:
|
||||||
|
config['authority'] = globalconfig.get('authority')
|
||||||
|
|
||||||
|
# Account key
|
||||||
|
acc_keys = [x for x in entry if 'account_key' in x]
|
||||||
|
if len(acc_keys) > 0:
|
||||||
|
config['account_key'] = acc_keys[0]
|
||||||
|
else:
|
||||||
|
config['account_key'] = globalconfig.get('account_key', ACME_DEFAULT_ACCOUNT_KEY)
|
||||||
|
|
||||||
|
# Certificate directory
|
||||||
|
cert_dirs = [x for x in entry if 'cert_dir' in x]
|
||||||
|
if len(cert_dirs) > 0:
|
||||||
|
config['cert_dir'] = cert_dirs[0]
|
||||||
|
else:
|
||||||
|
config['cert_dir'] = globalconfig.get('cert_dir', ACME_DIR)
|
||||||
|
|
||||||
|
# SSL CA location
|
||||||
|
ca_files = [x for x in entry if 'ca_file' in x]
|
||||||
|
if len(ca_files) > 0:
|
||||||
|
config['ca_file'] = ca_files[0]
|
||||||
|
else:
|
||||||
|
config['ca_file'] = globalconfig.get('server_ca',
|
||||||
|
os.path.join(config['cert_dir'], "{}.ca".format(config['id'])))
|
||||||
|
|
||||||
|
# SSL cert location
|
||||||
|
cert_files = [x for x in entry if 'cert_file' in x]
|
||||||
|
if len(cert_files) > 0:
|
||||||
|
config['cert_file'] = cert_files[0]
|
||||||
|
else:
|
||||||
|
config['cert_file'] = globalconfig.get('server_cert',
|
||||||
|
os.path.join(config['cert_dir'], "{}.crt".format(config['id'])))
|
||||||
|
|
||||||
|
# SSL key location
|
||||||
|
key_files = [x for x in entry if 'key_file' in x]
|
||||||
|
if len(key_files) > 0:
|
||||||
|
config['key_file'] = key_files[0]
|
||||||
|
else:
|
||||||
|
config['key_file'] = globalconfig.get('server_key',
|
||||||
|
os.path.join(config['cert_dir'], "{}.key".format(config['id'])))
|
||||||
|
|
||||||
|
# SSL key length (if it has to be generated)
|
||||||
|
key_lengths = [x for x in entry if 'key_file' in x]
|
||||||
|
if len(key_lengths) > 0:
|
||||||
|
config['key_length'] = int(key_lengths[0])
|
||||||
|
else:
|
||||||
|
config['key_length'] = ACME_DEFAULT_KEY_LENGTH
|
||||||
|
|
||||||
|
# Domain action configuration
|
||||||
|
config['actions'] = list()
|
||||||
|
for actioncfg in [x for x in data if 'path' in x]:
|
||||||
|
config['actions'].append(complete_action_config(actioncfg, config))
|
||||||
|
|
||||||
|
# Domain challenge handler configuration
|
||||||
|
config['handlers'] = dict()
|
||||||
|
handlerconfigs = [x for x in data if 'mode' in x]
|
||||||
|
for domain in config['domainlist']:
|
||||||
|
# Use global config as base handler config
|
||||||
|
cfg = copy.deepcopy(globalconfig)
|
||||||
|
|
||||||
|
# Determine generic domain handler config values
|
||||||
|
genericfgs = [x for x in handlerconfigs if 'domain' not in x]
|
||||||
|
if len(genericfgs) > 0:
|
||||||
|
cfg.update(genericfgs[0])
|
||||||
|
|
||||||
|
# Update handler config with more specific values
|
||||||
|
specificcfgs = [x for x in handlerconfigs if ('domain' in x and x['domain'] == domain)]
|
||||||
|
if len(specificcfgs) > 0:
|
||||||
|
cfg.update(specificcfgs[0])
|
||||||
|
|
||||||
|
config['handlers'][domain] = cfg
|
||||||
|
|
||||||
|
return config
|
||||||
|
|
||||||
|
|
||||||
|
# @brief load the configuration from a file
|
||||||
|
def load():
|
||||||
|
globalconfig = dict()
|
||||||
|
# load global configuration
|
||||||
|
if os.path.isfile(ACME_CONF):
|
||||||
|
with open(ACME_CONF) as config_fd:
|
||||||
|
try:
|
||||||
|
import json
|
||||||
|
globalconfig = json.load(config_fd)
|
||||||
|
except ValueError:
|
||||||
|
import yaml
|
||||||
|
config_fd.seek(0)
|
||||||
|
globalconfig = yaml.load(config_fd)
|
||||||
|
|
||||||
|
config = list()
|
||||||
|
# load domain configuration
|
||||||
|
for config_file in os.listdir(ACME_CONFD):
|
||||||
|
if config_file.endswith(".conf"):
|
||||||
|
with open(os.path.join(ACME_CONFD, config_file)) as config_fd:
|
||||||
|
try:
|
||||||
|
import json
|
||||||
|
for entry in json.load(config_fd).items():
|
||||||
|
config.append(parse_config_entry(entry, globalconfig))
|
||||||
|
except ValueError:
|
||||||
|
import yaml
|
||||||
|
config_fd.seek(0)
|
||||||
|
for entry in yaml.load(config_fd).items():
|
||||||
|
config.append(parse_config_entry(entry, globalconfig))
|
||||||
|
|
||||||
|
return config
|
8
tools.py
8
tools.py
@ -10,6 +10,7 @@ import base64
|
|||||||
import binascii
|
import binascii
|
||||||
import datetime
|
import datetime
|
||||||
import os
|
import os
|
||||||
|
import hashlib
|
||||||
|
|
||||||
from cryptography import x509
|
from cryptography import x509
|
||||||
from cryptography.hazmat.backends import default_backend
|
from cryptography.hazmat.backends import default_backend
|
||||||
@ -118,3 +119,10 @@ def byte_string_format(num):
|
|||||||
n = format(num, 'x')
|
n = format(num, 'x')
|
||||||
n = "0{0}".format(n) if len(n) % 2 else n
|
n = "0{0}".format(n) if len(n) % 2 else n
|
||||||
return binascii.unhexlify(n)
|
return binascii.unhexlify(n)
|
||||||
|
|
||||||
|
|
||||||
|
# @brief convert a string to an ID
|
||||||
|
# @param data data to convert to id
|
||||||
|
# @return unique id string
|
||||||
|
def to_unique_id(data):
|
||||||
|
return hashlib.md5(data).hexdigest()
|
Loading…
Reference in New Issue
Block a user