acertmgr: Add support for account.key based certificate revocation

This commit is contained in:
Kishi85 2019-03-27 21:00:21 +01:00
parent bd27db4ebd
commit 737578159b
6 changed files with 102 additions and 37 deletions

View File

@ -142,34 +142,48 @@ def cert_put(settings):
return crt_action
def cert_revoke(cert, configs, reason=None):
domains = set(tools.get_cert_domains(cert))
for config in configs:
if domains == set(config['domainlist']):
acme = create_authority(config)
acme.register_account()
acme.revoke_crt(cert, reason)
return
def main():
# load config
runtimeconfig, domainconfigs = configuration.load()
if runtimeconfig.get('mode') == 'revoke':
# Mode: revoke certificate
print("Revoking {}".format(runtimeconfig['revoke']))
cert_revoke(tools.read_pem_file(runtimeconfig['revoke']), domainconfigs, runtimeconfig['revoke_reason'])
else:
# Mode: issue certificates (implicit)
# post-update actions (run only once)
actions = set()
# check certificate validity and obtain/renew certificates if needed
for config in domainconfigs:
cert = None
if os.path.isfile(config['cert_file']):
cert = tools.read_pem_file(config['cert_file'])
if not cert or not tools.is_cert_valid(cert, config['ttl_days']) or \
('force_renew' in runtimeconfig and re.search(r'(^| ){}( |$)'.format(
re.escape(runtimeconfig['force_renew'])), config['domains'])):
cert_get(config)
# post-update actions (run only once)
actions = set()
for cfg in config['actions']:
if not tools.target_is_current(cfg['path'], config['cert_file']):
print("Updating '{}' due to newer version".format(cfg['path']))
actions.add(cert_put(cfg))
# check certificate validity and obtain/renew certificates if needed
for config in domainconfigs:
cert = None
if os.path.isfile(config['cert_file']):
cert = tools.read_pem_file(config['cert_file'])
if not cert or not tools.is_cert_valid(cert, config['ttl_days']) or \
('force_renew' in runtimeconfig and re.search(r'(^| ){}( |$)'.format(
re.escape(runtimeconfig['force_renew'])), config['domains'])):
cert_get(config)
for cfg in config['actions']:
if not tools.target_is_current(cfg['path'], config['cert_file']):
print("Updating '{}' due to newer version".format(cfg['path']))
actions.add(cert_put(cfg))
# run post-update actions
for action in actions:
if action is not None:
try:
# Run actions in a shell environment (to allow shell syntax) as stated in the configuration
output = subprocess.check_output(action, shell=True, stderr=subprocess.STDOUT)
print("Executed '{}' successfully: {}".format(action, output))
except subprocess.CalledProcessError as e:
print("Execution of '{}' failed with error '{}': {}".format(e.cmd, e.returncode, e.output))
# run post-update actions
for action in actions:
if action is not None:
try:
# Run actions in a shell environment (to allow shell syntax) as stated in the configuration
output = subprocess.check_output(action, shell=True, stderr=subprocess.STDOUT)
print("Executed '{}' successfully: {}".format(action, output))
except subprocess.CalledProcessError as e:
print("Execution of '{}' failed with error '{}': {}".format(e.cmd, e.returncode, e.output))

View File

@ -16,19 +16,19 @@ class ACMEAuthority:
self.config = config
# @brief register an account over ACME
# @param account_key the account key to register
# @param CA the certificate authority to register with
# @return True if new account was registered, False otherwise
def register_account(self):
raise NotImplementedError
# @brief function to fetch certificate using ACME
# @param account_key the account key in pyopenssl format
# @param csr the certificate signing request in pyopenssl format
# @param domains list of domains in the certificate, first is CN
# @param challenge_handlers a dict containing challenge for all given domains
# @param CA which signing CA to use
# @return the certificate in pyopenssl format
# @note algorithm and parts of the code are from acme-tiny
# @return the certificate
def get_crt_from_csr(self, csr, domains, challenge_handlers):
raise NotImplementedError
# @brief function to revoke a certificate using ACME
# @param crt certificate to revoke
# @param reason (int) optional certificate revoke reason (see https://tools.ietf.org/html/rfc5280#section-5.3.1)
def revoke_crt(self, crt, reason=None):
raise NotImplementedError

View File

@ -165,7 +165,7 @@ class ACMEAuthority(AbstractACMEAuthority):
print("Signing certificate...")
code, result = self._send_signed(self.ca + "/acme/new-cert", header, {
"resource": "new-cert",
"csr": tools.bytes_to_base64url(tools.convert_csr_to_der_bytes(csr)),
"csr": tools.bytes_to_base64url(tools.convert_cert_to_der_bytes(csr)),
})
if code != 201:
raise ValueError("Error signing certificate: {0} {1}".format(code, result))
@ -174,3 +174,17 @@ class ACMEAuthority(AbstractACMEAuthority):
print("Certificate signed!")
cert = tools.convert_der_bytes_to_cert(result)
return cert, tools.download_issuer_ca(cert)
# @brief function to revoke a certificate using ACME
# @param crt certificate to revoke
# @param reason (int) optional certificate revoke reason (see https://tools.ietf.org/html/rfc5280#section-5.3.1)
def revoke_crt(self, crt, reason=None):
header = self._prepare_header()
payload = {'certificate': tools.bytes_to_base64url(tools.convert_cert_to_der_bytes(crt))}
if reason:
payload['reason'] = int(reason)
code, result = self._send_signed(self.ca + "/acme/revoke-cert", header, payload)
if code < 400:
print("Revocation successful")
else:
raise ValueError("Revocation failed: {}".format(result))

View File

@ -40,6 +40,7 @@ class ACMEAuthority(AbstractACMEAuthority):
"newAccount": "{}/acme/new-acct".format(self.ca),
"newNonce": "{}/acme/new-nonce".format(self.ca),
"newOrder": "{}/acme/new-order".format(self.ca),
"revokeCert": "{}/acme/revoke-cert".format(self.ca),
}
print("API directory retrieval failed ({}). Guessed necessary values: {}".format(code, self.directory))
self.nonce = None
@ -229,7 +230,7 @@ class ACMEAuthority(AbstractACMEAuthority):
# get the new certificate
print("Finalizing certificate")
code, finalize, _ = self._request_acme_url(order['finalize'], {
"csr": tools.bytes_to_base64url(tools.convert_csr_to_der_bytes(csr)),
"csr": tools.bytes_to_base64url(tools.convert_cert_to_der_bytes(csr)),
})
while code < 400 and (finalize.get('status') == 'pending' or finalize.get('status') == 'processing'):
time.sleep(5)
@ -253,3 +254,16 @@ class ACMEAuthority(AbstractACMEAuthority):
ca = tools.convert_pem_str_to_cert(cert_dict['ca'])
return cert, ca
# @brief function to revoke a certificate using ACME
# @param crt certificate to revoke
# @param reason (int) optional certificate revoke reason (see https://tools.ietf.org/html/rfc5280#section-5.3.1)
def revoke_crt(self, crt, reason=None):
payload = {'certificate': tools.bytes_to_base64url(tools.convert_cert_to_der_bytes(crt))}
if reason:
payload['reason'] = int(reason)
code, result, _ = self._request_acme_endpoint("revokeCert", payload)
if code < 400:
print("Revocation successful")
else:
raise ValueError("Revocation failed: {}".format(result))

View File

@ -203,6 +203,10 @@ def load():
help="Agree to the authorities Terms of Service (value required depends on authority)")
parser.add_argument("--force-renew", "--renew-now", nargs="?",
help="Renew all domain configurations matching the given value immediately")
parser.add_argument("--revoke", nargs="?",
help="Revoke a certificate file issued with the currently configured account key.")
parser.add_argument("--revoke-reason", nargs="?", type=int,
help="Provide a revoke reason, see https://tools.ietf.org/html/rfc5280#section-5.3.1")
args = parser.parse_args()
# Determine global configuration file
@ -253,6 +257,12 @@ def load():
else:
runtimeconfig['force_renew'] = args.force_renew
# - revoke
if args.revoke:
runtimeconfig['mode'] = 'revoke'
runtimeconfig['revoke'] = args.revoke
runtimeconfig['revoke_reason'] = args.revoke_reason
# Global configuration: Load from file
globalconfig = dict()
if os.path.isfile(global_config_file):

View File

@ -150,6 +150,19 @@ def download_issuer_ca(cert):
return x509.load_der_x509_certificate(resp.read(), default_backend())
# @brief determine all san domains on a given certificate
def get_cert_domains(cert):
if cert is None:
print("WARN: None-certificate has no domains. You have found a bug. Congratulations!")
return []
san_cert = cert.extensions.get_extension_for_oid(ExtensionOID.SUBJECT_ALTERNATIVE_NAME)
if san_cert:
return [d.value for d in san_cert.value]
else:
return [cert.subject.rfc4514_string()[3:], ] # strip CN= from the result and return as 1 item list
# @brief convert certificate to PEM format
# @param cert certificate object in pyopenssl format
# @return the certificate in PEM format
@ -162,8 +175,8 @@ def convert_pem_str_to_cert(certdata):
return x509.load_pem_x509_certificate(certdata.encode('utf8'), default_backend())
# @brief serialize CSR to DER bytes
def convert_csr_to_der_bytes(data):
# @brief serialize cert/csr to DER bytes
def convert_cert_to_der_bytes(data):
return data.public_bytes(serialization.Encoding.DER)