From 737578159b9904666a4a1ce351f5350d7d2688e5 Mon Sep 17 00:00:00 2001 From: Kishi85 Date: Wed, 27 Mar 2019 21:00:21 +0100 Subject: [PATCH] acertmgr: Add support for account.key based certificate revocation --- acertmgr/__init__.py | 66 +++++++++++++++++++++++--------------- acertmgr/authority/acme.py | 14 ++++---- acertmgr/authority/v1.py | 16 ++++++++- acertmgr/authority/v2.py | 16 ++++++++- acertmgr/configuration.py | 10 ++++++ acertmgr/tools.py | 17 ++++++++-- 6 files changed, 102 insertions(+), 37 deletions(-) diff --git a/acertmgr/__init__.py b/acertmgr/__init__.py index e7db7f0..01b7b1c 100755 --- a/acertmgr/__init__.py +++ b/acertmgr/__init__.py @@ -142,34 +142,48 @@ def cert_put(settings): return crt_action +def cert_revoke(cert, configs, reason=None): + domains = set(tools.get_cert_domains(cert)) + for config in configs: + if domains == set(config['domainlist']): + acme = create_authority(config) + acme.register_account() + acme.revoke_crt(cert, reason) + return + + def main(): # load config runtimeconfig, domainconfigs = configuration.load() + if runtimeconfig.get('mode') == 'revoke': + # Mode: revoke certificate + print("Revoking {}".format(runtimeconfig['revoke'])) + cert_revoke(tools.read_pem_file(runtimeconfig['revoke']), domainconfigs, runtimeconfig['revoke_reason']) + else: + # Mode: issue certificates (implicit) + # post-update actions (run only once) + actions = set() + # check certificate validity and obtain/renew certificates if needed + for config in domainconfigs: + cert = None + if os.path.isfile(config['cert_file']): + cert = tools.read_pem_file(config['cert_file']) + if not cert or not tools.is_cert_valid(cert, config['ttl_days']) or \ + ('force_renew' in runtimeconfig and re.search(r'(^| ){}( |$)'.format( + re.escape(runtimeconfig['force_renew'])), config['domains'])): + cert_get(config) - # post-update actions (run only once) - actions = set() + for cfg in config['actions']: + if not tools.target_is_current(cfg['path'], config['cert_file']): + print("Updating '{}' due to newer version".format(cfg['path'])) + actions.add(cert_put(cfg)) - # check certificate validity and obtain/renew certificates if needed - for config in domainconfigs: - cert = None - if os.path.isfile(config['cert_file']): - cert = tools.read_pem_file(config['cert_file']) - if not cert or not tools.is_cert_valid(cert, config['ttl_days']) or \ - ('force_renew' in runtimeconfig and re.search(r'(^| ){}( |$)'.format( - re.escape(runtimeconfig['force_renew'])), config['domains'])): - cert_get(config) - - for cfg in config['actions']: - if not tools.target_is_current(cfg['path'], config['cert_file']): - print("Updating '{}' due to newer version".format(cfg['path'])) - actions.add(cert_put(cfg)) - - # run post-update actions - for action in actions: - if action is not None: - try: - # Run actions in a shell environment (to allow shell syntax) as stated in the configuration - output = subprocess.check_output(action, shell=True, stderr=subprocess.STDOUT) - print("Executed '{}' successfully: {}".format(action, output)) - except subprocess.CalledProcessError as e: - print("Execution of '{}' failed with error '{}': {}".format(e.cmd, e.returncode, e.output)) + # run post-update actions + for action in actions: + if action is not None: + try: + # Run actions in a shell environment (to allow shell syntax) as stated in the configuration + output = subprocess.check_output(action, shell=True, stderr=subprocess.STDOUT) + print("Executed '{}' successfully: {}".format(action, output)) + except subprocess.CalledProcessError as e: + print("Execution of '{}' failed with error '{}': {}".format(e.cmd, e.returncode, e.output)) diff --git a/acertmgr/authority/acme.py b/acertmgr/authority/acme.py index 9446654..d0880e0 100644 --- a/acertmgr/authority/acme.py +++ b/acertmgr/authority/acme.py @@ -16,19 +16,19 @@ class ACMEAuthority: self.config = config # @brief register an account over ACME - # @param account_key the account key to register - # @param CA the certificate authority to register with - # @return True if new account was registered, False otherwise def register_account(self): raise NotImplementedError # @brief function to fetch certificate using ACME - # @param account_key the account key in pyopenssl format # @param csr the certificate signing request in pyopenssl format # @param domains list of domains in the certificate, first is CN # @param challenge_handlers a dict containing challenge for all given domains - # @param CA which signing CA to use - # @return the certificate in pyopenssl format - # @note algorithm and parts of the code are from acme-tiny + # @return the certificate def get_crt_from_csr(self, csr, domains, challenge_handlers): raise NotImplementedError + + # @brief function to revoke a certificate using ACME + # @param crt certificate to revoke + # @param reason (int) optional certificate revoke reason (see https://tools.ietf.org/html/rfc5280#section-5.3.1) + def revoke_crt(self, crt, reason=None): + raise NotImplementedError diff --git a/acertmgr/authority/v1.py b/acertmgr/authority/v1.py index f247e19..d4861f8 100644 --- a/acertmgr/authority/v1.py +++ b/acertmgr/authority/v1.py @@ -165,7 +165,7 @@ class ACMEAuthority(AbstractACMEAuthority): print("Signing certificate...") code, result = self._send_signed(self.ca + "/acme/new-cert", header, { "resource": "new-cert", - "csr": tools.bytes_to_base64url(tools.convert_csr_to_der_bytes(csr)), + "csr": tools.bytes_to_base64url(tools.convert_cert_to_der_bytes(csr)), }) if code != 201: raise ValueError("Error signing certificate: {0} {1}".format(code, result)) @@ -174,3 +174,17 @@ class ACMEAuthority(AbstractACMEAuthority): print("Certificate signed!") cert = tools.convert_der_bytes_to_cert(result) return cert, tools.download_issuer_ca(cert) + + # @brief function to revoke a certificate using ACME + # @param crt certificate to revoke + # @param reason (int) optional certificate revoke reason (see https://tools.ietf.org/html/rfc5280#section-5.3.1) + def revoke_crt(self, crt, reason=None): + header = self._prepare_header() + payload = {'certificate': tools.bytes_to_base64url(tools.convert_cert_to_der_bytes(crt))} + if reason: + payload['reason'] = int(reason) + code, result = self._send_signed(self.ca + "/acme/revoke-cert", header, payload) + if code < 400: + print("Revocation successful") + else: + raise ValueError("Revocation failed: {}".format(result)) diff --git a/acertmgr/authority/v2.py b/acertmgr/authority/v2.py index f32dae5..590e26d 100644 --- a/acertmgr/authority/v2.py +++ b/acertmgr/authority/v2.py @@ -40,6 +40,7 @@ class ACMEAuthority(AbstractACMEAuthority): "newAccount": "{}/acme/new-acct".format(self.ca), "newNonce": "{}/acme/new-nonce".format(self.ca), "newOrder": "{}/acme/new-order".format(self.ca), + "revokeCert": "{}/acme/revoke-cert".format(self.ca), } print("API directory retrieval failed ({}). Guessed necessary values: {}".format(code, self.directory)) self.nonce = None @@ -229,7 +230,7 @@ class ACMEAuthority(AbstractACMEAuthority): # get the new certificate print("Finalizing certificate") code, finalize, _ = self._request_acme_url(order['finalize'], { - "csr": tools.bytes_to_base64url(tools.convert_csr_to_der_bytes(csr)), + "csr": tools.bytes_to_base64url(tools.convert_cert_to_der_bytes(csr)), }) while code < 400 and (finalize.get('status') == 'pending' or finalize.get('status') == 'processing'): time.sleep(5) @@ -253,3 +254,16 @@ class ACMEAuthority(AbstractACMEAuthority): ca = tools.convert_pem_str_to_cert(cert_dict['ca']) return cert, ca + + # @brief function to revoke a certificate using ACME + # @param crt certificate to revoke + # @param reason (int) optional certificate revoke reason (see https://tools.ietf.org/html/rfc5280#section-5.3.1) + def revoke_crt(self, crt, reason=None): + payload = {'certificate': tools.bytes_to_base64url(tools.convert_cert_to_der_bytes(crt))} + if reason: + payload['reason'] = int(reason) + code, result, _ = self._request_acme_endpoint("revokeCert", payload) + if code < 400: + print("Revocation successful") + else: + raise ValueError("Revocation failed: {}".format(result)) diff --git a/acertmgr/configuration.py b/acertmgr/configuration.py index 9625f02..32fa463 100644 --- a/acertmgr/configuration.py +++ b/acertmgr/configuration.py @@ -203,6 +203,10 @@ def load(): help="Agree to the authorities Terms of Service (value required depends on authority)") parser.add_argument("--force-renew", "--renew-now", nargs="?", help="Renew all domain configurations matching the given value immediately") + parser.add_argument("--revoke", nargs="?", + help="Revoke a certificate file issued with the currently configured account key.") + parser.add_argument("--revoke-reason", nargs="?", type=int, + help="Provide a revoke reason, see https://tools.ietf.org/html/rfc5280#section-5.3.1") args = parser.parse_args() # Determine global configuration file @@ -253,6 +257,12 @@ def load(): else: runtimeconfig['force_renew'] = args.force_renew + # - revoke + if args.revoke: + runtimeconfig['mode'] = 'revoke' + runtimeconfig['revoke'] = args.revoke + runtimeconfig['revoke_reason'] = args.revoke_reason + # Global configuration: Load from file globalconfig = dict() if os.path.isfile(global_config_file): diff --git a/acertmgr/tools.py b/acertmgr/tools.py index 8566350..7ea78ba 100644 --- a/acertmgr/tools.py +++ b/acertmgr/tools.py @@ -150,6 +150,19 @@ def download_issuer_ca(cert): return x509.load_der_x509_certificate(resp.read(), default_backend()) +# @brief determine all san domains on a given certificate +def get_cert_domains(cert): + if cert is None: + print("WARN: None-certificate has no domains. You have found a bug. Congratulations!") + return [] + + san_cert = cert.extensions.get_extension_for_oid(ExtensionOID.SUBJECT_ALTERNATIVE_NAME) + if san_cert: + return [d.value for d in san_cert.value] + else: + return [cert.subject.rfc4514_string()[3:], ] # strip CN= from the result and return as 1 item list + + # @brief convert certificate to PEM format # @param cert certificate object in pyopenssl format # @return the certificate in PEM format @@ -162,8 +175,8 @@ def convert_pem_str_to_cert(certdata): return x509.load_pem_x509_certificate(certdata.encode('utf8'), default_backend()) -# @brief serialize CSR to DER bytes -def convert_csr_to_der_bytes(data): +# @brief serialize cert/csr to DER bytes +def convert_cert_to_der_bytes(data): return data.public_bytes(serialization.Encoding.DER)