1
0
mirror of https://github.com/moepman/acertmgr.git synced 2024-06-02 05:12:35 +02:00
acertmgr/acertmgr/__init__.py

222 lines
9.0 KiB
Python
Raw Normal View History

#!/usr/bin/env python
2016-01-10 15:00:43 +01:00
# -*- coding: utf-8 -*-
# Automated Certificate Manager using ACME
2016-04-04 01:17:58 +02:00
# Copyright (c) Markus Hauschild & David Klaftenegger, 2016.
2019-01-08 08:12:20 +01:00
# Copyright (c) Rudolf Mayerhofer, 2019.
2016-04-04 01:17:58 +02:00
# available under the ISC license, see LICENSE
2016-01-10 15:00:43 +01:00
2016-01-21 16:43:49 +01:00
import grp
2019-02-18 20:45:28 +01:00
import io
2016-01-10 15:00:43 +01:00
import os
2016-01-21 16:43:49 +01:00
import pwd
import stat
2019-01-08 08:12:20 +01:00
import subprocess
from acertmgr import configuration, tools
from acertmgr.authority import authority
from acertmgr.modes import challenge_handler
from acertmgr.tools import log
# @brief fetch new certificate from letsencrypt
# @param settings the domain's configuration options
def cert_get(settings):
log("Getting certificate for %s" % settings['domainlist'])
2019-01-08 08:12:20 +01:00
acme = authority(settings['authority'])
acme.register_account()
2019-01-08 08:12:20 +01:00
# create challenge handlers for this certificate
challenge_handlers = dict()
for domain in settings['domainlist']:
# Create the challenge handler
challenge_handlers[domain] = challenge_handler(settings['handlers'][domain])
2019-01-08 08:12:20 +01:00
# create ssl key
key_file = settings['key_file']
key_length = settings['key_length']
if os.path.isfile(key_file):
key = tools.read_pem_file(key_file, key=True)
else:
log("SSL key not found at '{0}'. Creating {1} bit key.".format(key_file, key_length))
key = tools.new_ssl_key(key_file, key_length)
# create ssl csr
csr_file = settings['csr_file']
if os.path.isfile(csr_file) and str(settings['csr_static']).lower() == 'true':
log('Loading CSR from {}'.format(csr_file))
cr = tools.read_pem_file(csr_file, csr=True)
else:
log('Generating CSR for {}'.format(settings['domainlist']))
must_staple = str(settings.get('cert_must_staple')).lower() == "true"
cr = tools.new_cert_request(settings['domainlist'], key, must_staple)
tools.write_pem_file(cr, csr_file)
# request cert with csr
crt, ca = acme.get_crt_from_csr(cr, settings['domainlist'], challenge_handlers)
# if resulting certificate is valid: store in final location
if tools.is_cert_valid(crt, settings['ttl_days']):
log("Certificate '{}' renewed and valid until {}".format(tools.get_cert_cn(crt),
tools.get_cert_valid_until(crt)))
tools.write_pem_file(crt, settings['cert_file'], stat.S_IREAD)
if (not str(settings.get('ca_static')).lower() == 'true' or not os.path.exists(settings['ca_file'])) \
and ca is not None:
tools.write_pem_file(ca, settings['ca_file'])
# @brief put new certificate in place
# @param settings the domain's configuration options
# @return the action to be executed after the certificate update
2019-01-08 08:12:20 +01:00
def cert_put(settings):
# TODO error handling
ca_file = settings['ca_file']
2019-01-08 08:12:20 +01:00
crt_user = settings['user']
crt_group = settings['group']
crt_perm = settings['perm']
crt_path = settings['path']
crt_format = settings['format'].split(",")
crt_format = [str.strip(x) for x in crt_format]
crt_action = settings['action']
key_file = settings['key_file']
crt_final = settings['cert_file']
2019-01-08 08:12:20 +01:00
2019-01-21 16:18:47 +01:00
with io.open(crt_path, "w+") as crt_fd:
2019-01-08 08:12:20 +01:00
for fmt in crt_format:
if fmt == "crt":
2019-01-21 16:18:47 +01:00
src_fd = io.open(crt_final, "r")
2019-01-08 08:12:20 +01:00
crt_fd.write(src_fd.read())
src_fd.close()
if fmt == "key":
2019-01-21 16:18:47 +01:00
src_fd = io.open(key_file, "r")
2019-01-08 08:12:20 +01:00
crt_fd.write(src_fd.read())
src_fd.close()
if fmt == "ca":
if not os.path.isfile(ca_file):
raise FileNotFoundError("The CA certificate file (%s) is missing!" % ca_file)
2019-01-21 16:18:47 +01:00
src_fd = io.open(ca_file, "r")
2019-01-08 08:12:20 +01:00
crt_fd.write(src_fd.read())
src_fd.close()
else:
# TODO error handling
pass
# set owner and permissions
uid = pwd.getpwnam(crt_user).pw_uid
gid = grp.getgrnam(crt_group).gr_gid
try:
os.chown(crt_path, uid, gid)
except OSError:
log('Could not set certificate file ownership!', warning=True)
2019-01-08 08:12:20 +01:00
try:
os.chmod(crt_path, int(crt_perm, 8))
except OSError:
log('Could not set certificate file permissions!', warning=True)
2019-01-08 08:12:20 +01:00
return crt_action
2016-01-10 15:00:43 +01:00
def cert_revoke(cert, configs, fallback_authority, reason=None):
domains = set(tools.get_cert_domains(cert))
acmeconfig = None
for config in configs:
if domains == set(config['domainlist']):
acmeconfig = config['authority']
break
if not acmeconfig:
acmeconfig = fallback_authority
log("No matching authority found to revoke {}: {}, using globalconfig/defaults".format(tools.get_cert_cn(cert),
tools.get_cert_domains(cert)), warning=True)
acme = authority(acmeconfig)
acme.register_account()
acme.revoke_crt(cert, reason)
def main():
# load config
runtimeconfig, domainconfigs = configuration.load()
if runtimeconfig.get('mode') == 'revoke':
# Mode: revoke certificate
log("Revoking {}".format(runtimeconfig['revoke']))
cert_revoke(tools.read_pem_file(runtimeconfig['revoke']),
domainconfigs,
runtimeconfig['fallback_authority'],
runtimeconfig['revoke_reason'])
else:
# Mode: issue certificates (implicit)
# post-update actions (run only once)
actions = set()
superseded = set()
exceptions = list()
# check certificate validity and obtain/renew certificates if needed
for config in domainconfigs:
try:
cert = None
if os.path.isfile(config['cert_file']):
cert = tools.read_pem_file(config['cert_file'])
if not cert or not tools.is_cert_valid(cert, config['ttl_days']) or (
'force_renew' in runtimeconfig and
all(d in config['domainlist'] for d in runtimeconfig['force_renew'])):
cert_get(config)
if str(config.get('cert_revoke_superseded')).lower() == 'true' and cert:
superseded.add(cert)
except Exception as e:
log("Certificate issue/renew failed", e, error=True)
exceptions.append(e)
# deploy new certificates after all are renewed
deployment_success = True
for config in domainconfigs:
try:
for cfg in config['actions']:
if not tools.target_is_current(cfg['path'], config['cert_file']):
log("Updating '{}' due to newer version".format(cfg['path']))
actions.add(cert_put(cfg))
except Exception as e:
log("Certificate deployment failed", e, error=True)
exceptions.append(e)
deployment_success = False
# run post-update actions
for action in actions:
if action is not None:
try:
# Run actions in a shell environment (to allow shell syntax) as stated in the configuration
output = subprocess.check_output(action, shell=True, stderr=subprocess.STDOUT)
logmsg = "Action succeeded: {}".format(action)
if len(output) > 0:
if getattr(output, 'decode', None):
# Decode function available? Use it to get a proper str
output = output.decode('utf-8')
logmsg += os.linesep + tools.indent(output, 18) # 18 = len("Action succeeded: ")
log(logmsg)
except subprocess.CalledProcessError as e:
output = e.output
logmsg = "Action failed: ({}) {}".format(e.returncode, e.cmd)
if len(output) > 0:
if getattr(output, 'decode', None):
# Decode function available? Use it to get a proper str
output = output.decode('utf-8')
logmsg += os.linesep + tools.indent(output, 15) # 15 = len("Action failed: ")
log(logmsg, error=True)
exceptions.append(e)
deployment_success = False
# revoke old certificates as superseded
if deployment_success:
for superseded_cert in superseded:
try:
log("Revoking '{}' valid until {} as superseded".format(
tools.get_cert_cn(superseded_cert),
tools.get_cert_valid_until(superseded_cert)))
cert_revoke(superseded_cert, domainconfigs, runtimeconfig['fallback_authority'], reason=4)
except Exception as e:
log("Certificate supersede revoke failed", e, error=True)
exceptions.append(e)
# throw a RuntimeError with all exceptions caught while working if there were any
if len(exceptions) > 0:
raise RuntimeError("{} exception(s) occurred during processing".format(len(exceptions)))