acertmgr/acertmgr/authority/v1.py

197 lines
8.7 KiB
Python

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# acertmgr - acme api v1 functions
# Copyright (c) Markus Hauschild & David Klaftenegger, 2016.
# Copyright (c) Rudolf Mayerhofer, 2019.
# available under the ISC license, see LICENSE
import copy
import json
import re
import time
from acertmgr import tools
from acertmgr.authority.acme import ACMEAuthority as AbstractACMEAuthority
from acertmgr.tools import log
class ACMEAuthority(AbstractACMEAuthority):
# @brief Init class with config
# @param config Configuration data
# @param key Account key data
def __init__(self, config, key):
log('You currently use ACMEv1 which is deprecated, consider using ACMEv2 (RFC8555) if at all possible.',
warning=True)
AbstractACMEAuthority.__init__(self, config, key)
self.registered_account = False
self.ca = config['authority']
self.agreement = config['authority_tos_agreement']
# @brief create the header information for ACME communication
# @param key the account key
# @return the header for ACME
def _prepare_header(self):
alg, jwk = tools.get_key_alg_and_jwk(self.key)
header = {
"alg": alg,
"jwk": jwk,
}
return header
# @brief helper function to make signed requests
# @param url the request URL
# @param header the message header
# @param payload the message
# @return tuple of return code and request answer
def _send_signed(self, url, header, payload):
payload64 = tools.bytes_to_base64url(json.dumps(payload).encode('utf8'))
protected = copy.deepcopy(header)
protected["nonce"] = tools.get_url(self.ca + "/directory").headers['Replay-Nonce']
protected64 = tools.bytes_to_base64url(json.dumps(protected).encode('utf8'))
out = tools.signature_of_str(self.key, '.'.join([protected64, payload64]))
data = json.dumps({
"header": header, "protected": protected64,
"payload": payload64, "signature": tools.bytes_to_base64url(out),
})
try:
resp = tools.get_url(url, data.encode('utf8'))
return resp.getcode(), resp.read()
except IOError as e:
return getattr(e, "code", None), getattr(e, "read", e.__str__)()
# @brief register an account over ACME
# @return True if new account was registered, False otherwise
def register_account(self):
if self.registered_account:
# We already have registered with this authority, just return
return
header = self._prepare_header()
code, result = self._send_signed(self.ca + "/acme/new-reg", header, {
"resource": "new-reg",
"agreement": self.agreement,
})
if code == 201:
log("Registered!")
self.registered_account = True
return True
elif code == 409:
log("Already registered!")
self.registered_account = True
return False
else:
raise ValueError("Error registering: {0} {1}".format(code, result))
# @brief function to fetch certificate using ACME
# @param csr the certificate signing request in pyopenssl format
# @param domains list of domains in the certificate, first is CN
# @param challenge_handlers a dict containing challenge for all given domains
# @return the certificate and corresponding ca as a tuple
# @note algorithm and parts of the code are from acme-tiny
def get_crt_from_csr(self, csr, domains, challenge_handlers):
header = self._prepare_header()
account_thumbprint = tools.bytes_to_base64url(
tools.hash_of_str(json.dumps(header['jwk'], sort_keys=True, separators=(',', ':'))))
challenges = dict()
tokens = dict()
authdomains = list()
# verify each domain
try:
for domain in domains:
log("Verifying {0}...".format(domain))
# get new challenge
code, result = self._send_signed(self.ca + "/acme/new-authz", header, {
"resource": "new-authz",
"identifier": {"type": "dns", "value": domain},
})
if code != 201:
raise ValueError("Error requesting challenges: {0} {1}".format(code, result))
# create the challenge
authz = json.loads(result.decode('utf8'))
if authz.get('status', 'no-status-found') == 'valid':
log("{} has already been verified".format(domain))
continue
challenges[domain] = [c for c in authz['challenges'] if
c['type'] == challenge_handlers[domain].get_challenge_type()][0]
tokens[domain] = re.sub(r"[^A-Za-z0-9_\-]", "_", challenges[domain]['token'])
if domain not in challenge_handlers:
raise ValueError("No challenge handler given for domain: {0}".format(domain))
challenge_handlers[domain].create_challenge(domain, account_thumbprint, tokens[domain])
authdomains.append(domain)
# after all challenges are created, start processing authorizations
for domain in authdomains:
try:
challenge_handlers[domain].start_challenge(domain, account_thumbprint, tokens[domain])
# notify challenge are met
log("Starting key authorization")
keyauthorization = "{0}.{1}".format(tokens[domain], account_thumbprint)
code, result = self._send_signed(challenges[domain]['uri'], header, {
"resource": "challenge",
"keyAuthorization": keyauthorization,
})
if code != 202:
raise ValueError("Error triggering challenge: {0} {1}".format(code, result))
# wait for challenge to be verified
while True:
try:
resp = tools.get_url(challenges[domain]['uri'])
challenge_status = json.loads(resp.read().decode('utf8'))
except IOError as e:
raise ValueError("Error checking challenge: {0} {1}".format(
e.code, json.loads(e.read().decode('utf8'))))
if challenge_status['status'] == "pending":
time.sleep(2)
elif challenge_status['status'] == "valid":
log("{0} verified!".format(domain))
break
else:
raise ValueError("{0} challenge did not pass: {1}".format(
domain, challenge_status))
finally:
challenge_handlers[domain].stop_challenge(domain, account_thumbprint, tokens[domain])
finally:
# Destroy challenge handlers in reverse order to replay
# any saved state information in the handlers correctly
for domain in reversed(domains):
try:
challenge_handlers[domain].destroy_challenge(domain, account_thumbprint, tokens[domain])
except Exception as e:
log('Challenge destruction failed: {}'.format(e), error=True)
# get the new certificate
log("Signing certificate...")
code, result = self._send_signed(self.ca + "/acme/new-cert", header, {
"resource": "new-cert",
"csr": tools.bytes_to_base64url(tools.convert_cert_to_der_bytes(csr)),
})
if code != 201:
raise ValueError("Error signing certificate: {0} {1}".format(code, result))
# return signed certificate!
log("Certificate signed!")
cert = tools.convert_der_bytes_to_cert(result)
return cert, tools.download_issuer_ca(cert)
# @brief function to revoke a certificate using ACME
# @param crt certificate to revoke
# @param reason (int) optional certificate revoke reason (see https://tools.ietf.org/html/rfc5280#section-5.3.1)
def revoke_crt(self, crt, reason=None):
header = self._prepare_header()
payload = {"resource": "revoke-cert",
"certificate": tools.bytes_to_base64url(tools.convert_cert_to_der_bytes(crt))}
if reason:
payload['reason'] = int(reason)
code, result = self._send_signed(self.ca + "/acme/revoke-cert", header, payload)
if code < 400:
log("Revocation successful")
else:
raise ValueError("Revocation failed: {}".format(result))