#!/usr/bin/env python
# -*- coding: utf-8 -*-
# acertmgr - acme api v1 functions
# Copyright (c) Markus Hauschild & David Klaftenegger, 2016.
# Copyright (c) Rudolf Mayerhofer, 2019.
# available under the ISC license, see LICENSE
import copy
import json
import re
import time
from acertmgr import tools
from acertmgr.authority.acme import ACMEAuthority as AbstractACMEAuthority
from import log
class ACMEAuthority(AbstractACMEAuthority):
# @brief Init class with config
# @param config Configuration data
# @param key Account key data
def __init__(self, config, key):
log('You currently use ACMEv1 which is deprecated, consider using ACMEv2 (RFC8555) if at all possible.',
AbstractACMEAuthority.__init__(self, config, key)
self.registered_account = False = config['authority']
self.agreement = config['authority_tos_agreement']
# @brief create the header information for ACME communication
# @param key the account key
# @return the header for ACME
def _prepare_header(self):
alg, jwk = tools.get_key_alg_and_jwk(self.key)
header = {
"alg": alg,
"jwk": jwk,
return header
# @brief helper function to make signed requests
# @param url the request URL
# @param header the message header
# @param payload the message
# @return tuple of return code and request answer
def _send_signed(self, url, header, payload):
payload64 = tools.bytes_to_base64url(json.dumps(payload).encode('utf8'))
protected = copy.deepcopy(header)
protected["nonce"] = tools.get_url( + "/directory").headers['Replay-Nonce']
protected64 = tools.bytes_to_base64url(json.dumps(protected).encode('utf8'))
out = tools.signature_of_str(self.key, '.'.join([protected64, payload64]))
data = json.dumps({
"header": header, "protected": protected64,
"payload": payload64, "signature": tools.bytes_to_base64url(out),
resp = tools.get_url(url, data.encode('utf8'))
return resp.getcode(),
except IOError as e:
return getattr(e, "code", None), getattr(e, "read", e.__str__)()
# @brief register an account over ACME
# @return True if new account was registered, False otherwise
def register_account(self):
if self.registered_account:
# We already have registered with this authority, just return
header = self._prepare_header()
code, result = self._send_signed( + "/acme/new-reg", header, {
"resource": "new-reg",
"agreement": self.agreement,
if code == 201:
self.registered_account = True
return True
elif code == 409:
log("Already registered!")
self.registered_account = True
return False
raise ValueError("Error registering: {0} {1}".format(code, result))
# @brief function to fetch certificate using ACME
# @param csr the certificate signing request in pyopenssl format
# @param domains list of domains in the certificate, first is CN
# @param challenge_handlers a dict containing challenge for all given domains
# @return the certificate and corresponding ca as a tuple
# @note algorithm and parts of the code are from acme-tiny
def get_crt_from_csr(self, csr, domains, challenge_handlers):
header = self._prepare_header()
account_thumbprint = tools.bytes_to_base64url(
tools.hash_of_str(json.dumps(header['jwk'], sort_keys=True, separators=(',', ':'))))
challenges = dict()
tokens = dict()
authdomains = list()
# verify each domain
for domain in domains:
log("Verifying {0}...".format(domain))
# get new challenge
code, result = self._send_signed( + "/acme/new-authz", header, {
"resource": "new-authz",
"identifier": {"type": "dns", "value": domain},
if code != 201:
raise ValueError("Error requesting challenges: {0} {1}".format(code, result))
# create the challenge
authz = json.loads(result.decode('utf8'))
if authz.get('status', 'no-status-found') == 'valid':
log("{} has already been verified".format(domain))
challenges[domain] = [c for c in authz['challenges'] if
c['type'] == challenge_handlers[domain].get_challenge_type()][0]
tokens[domain] = re.sub(r"[^A-Za-z0-9_\-]", "_", challenges[domain]['token'])
if domain not in challenge_handlers:
raise ValueError("No challenge handler given for domain: {0}".format(domain))
challenge_handlers[domain].create_challenge(domain, account_thumbprint, tokens[domain])
# after all challenges are created, start processing authorizations
for domain in authdomains:
challenge_handlers[domain].start_challenge(domain, account_thumbprint, tokens[domain])
# notify challenge are met
log("Starting key authorization")
keyauthorization = "{0}.{1}".format(tokens[domain], account_thumbprint)
code, result = self._send_signed(challenges[domain]['uri'], header, {
"resource": "challenge",
"keyAuthorization": keyauthorization,
if code != 202:
raise ValueError("Error triggering challenge: {0} {1}".format(code, result))
# wait for challenge to be verified
while True:
resp = tools.get_url(challenges[domain]['uri'])
challenge_status = json.loads('utf8'))
except IOError as e:
raise ValueError("Error checking challenge: {0} {1}".format(
e.code, json.loads('utf8'))))
if challenge_status['status'] == "pending":
elif challenge_status['status'] == "valid":
log("{0} verified!".format(domain))
raise ValueError("{0} challenge did not pass: {1}".format(
domain, challenge_status))
challenge_handlers[domain].stop_challenge(domain, account_thumbprint, tokens[domain])
# Destroy challenge handlers in reverse order to replay
# any saved state information in the handlers correctly
for domain in reversed(domains):
challenge_handlers[domain].destroy_challenge(domain, account_thumbprint, tokens[domain])
except Exception as e:
log('Challenge destruction failed: {}'.format(e), error=True)
# get the new certificate
log("Signing certificate...")
code, result = self._send_signed( + "/acme/new-cert", header, {
"resource": "new-cert",
"csr": tools.bytes_to_base64url(tools.convert_cert_to_der_bytes(csr)),
if code != 201:
raise ValueError("Error signing certificate: {0} {1}".format(code, result))
# return signed certificate!
log("Certificate signed!")
cert = tools.convert_der_bytes_to_cert(result)
return cert, tools.download_issuer_ca(cert)
# @brief function to revoke a certificate using ACME
# @param crt certificate to revoke
# @param reason (int) optional certificate revoke reason (see
def revoke_crt(self, crt, reason=None):
header = self._prepare_header()
payload = {"resource": "revoke-cert",
"certificate": tools.bytes_to_base64url(tools.convert_cert_to_der_bytes(crt))}
if reason:
payload['reason'] = int(reason)
code, result = self._send_signed( + "/acme/revoke-cert", header, payload)
if code < 400:
log("Revocation successful")
raise ValueError("Revocation failed: {}".format(result))