From b63a0bc4242b2f13bc01973351b84445a51f1113 Mon Sep 17 00:00:00 2001 From: Kishi85 Date: Thu, 4 Apr 2019 13:15:34 +0200 Subject: [PATCH] tools: add log function, update log messages mentioning certificates This simple implementation writes log messages to stdout/err and flushes the buffers immediately after the message has been written. Also update log messages with the certificate CN to a better readable format Introduce functions for get_cert_cn and get_cert_valid_until to encapsulate all cryptographic functions consistently in tools. --- acertmgr/__init__.py | 39 +++++++++++++----------- acertmgr/authority/__init__.py | 5 ++-- acertmgr/authority/v1.py | 19 ++++++------ acertmgr/authority/v2.py | 24 ++++++++------- acertmgr/configuration.py | 21 ++++++++----- acertmgr/modes/dns/abstract.py | 7 +++-- acertmgr/modes/dns/nsupdate.py | 14 ++++----- acertmgr/modes/standalone.py | 3 +- acertmgr/tools.py | 55 +++++++++++++++++++++++++++++----- 9 files changed, 120 insertions(+), 67 deletions(-) diff --git a/acertmgr/__init__.py b/acertmgr/__init__.py index e9350ed..367c7af 100755 --- a/acertmgr/__init__.py +++ b/acertmgr/__init__.py @@ -16,12 +16,13 @@ import subprocess from acertmgr import configuration, tools from acertmgr.authority import authority from acertmgr.modes import challenge_handler +from acertmgr.tools import log # @brief fetch new certificate from letsencrypt # @param settings the domain's configuration options def cert_get(settings): - print("Getting certificate for %s" % settings['domainlist']) + log("Getting certificate for %s" % settings['domainlist']) acme = authority(settings['authority']) acme.register_account() @@ -38,16 +39,16 @@ def cert_get(settings): if os.path.isfile(key_file): key = tools.read_pem_file(key_file, key=True) else: - print("SSL key not found at '{0}'. Creating {1} bit key.".format(key_file, key_length)) + log("SSL key not found at '{0}'. Creating {1} bit key.".format(key_file, key_length)) key = tools.new_ssl_key(key_file, key_length) # create ssl csr csr_file = settings['csr_file'] if os.path.isfile(csr_file) and str(settings['csr_static']).lower() == 'true': - print('Loading CSR from {}'.format(csr_file)) + log('Loading CSR from {}'.format(csr_file)) cr = tools.read_pem_file(csr_file, csr=True) else: - print('Generating CSR for {}'.format(settings['domainlist'])) + log('Generating CSR for {}'.format(settings['domainlist'])) must_staple = str(settings.get('cert_must_staple')).lower() == "true" cr = tools.new_cert_request(settings['domainlist'], key, must_staple) tools.write_pem_file(cr, csr_file) @@ -57,7 +58,8 @@ def cert_get(settings): # if resulting certificate is valid: store in final location if tools.is_cert_valid(crt, settings['ttl_days']): - print("Certificate '{}' renewed and valid until {}".format(crt, crt.not_valid_after)) + log("Certificate '{}' renewed and valid until {}".format(tools.get_cert_cn(crt), + tools.get_cert_valid_until(crt))) tools.write_pem_file(crt, settings['cert_file'], stat.S_IREAD) if (not str(settings.get('ca_static')).lower() == 'true' or not os.path.exists(settings['ca_file'])) \ and ca is not None: @@ -107,11 +109,11 @@ def cert_put(settings): try: os.chown(crt_path, uid, gid) except OSError: - print('Warning: Could not set certificate file ownership!') + log('Could not set certificate file ownership!', warning=True) try: os.chmod(crt_path, int(crt_perm, 8)) except OSError: - print('Warning: Could not set certificate file permissions!') + log('Could not set certificate file permissions!', warning=True) return crt_action @@ -131,7 +133,7 @@ def main(): runtimeconfig, domainconfigs = configuration.load() if runtimeconfig.get('mode') == 'revoke': # Mode: revoke certificate - print("Revoking {}".format(runtimeconfig['revoke'])) + log("Revoking {}".format(runtimeconfig['revoke'])) cert_revoke(tools.read_pem_file(runtimeconfig['revoke']), domainconfigs, runtimeconfig['revoke_reason']) else: # Mode: issue certificates (implicit) @@ -152,7 +154,7 @@ def main(): if str(config.get('cert_revoke_superseded')).lower() == 'true' and cert: superseded.add(cert) except Exception as e: - print("Certificate issue/renew failed: {}".format(e)) + log("Certificate issue/renew failed", e, error=True) exceptions.append(e) # deploy new certificates after all are renewed @@ -161,10 +163,10 @@ def main(): try: for cfg in config['actions']: if not tools.target_is_current(cfg['path'], config['cert_file']): - print("Updating '{}' due to newer version".format(cfg['path'])) + log("Updating '{}' due to newer version".format(cfg['path'])) actions.add(cert_put(cfg)) except Exception as e: - print("Certificate deployment failed: {}".format(e)) + log("Certificate deployment failed", e, error=True) exceptions.append(e) deployment_success = False @@ -174,9 +176,10 @@ def main(): try: # Run actions in a shell environment (to allow shell syntax) as stated in the configuration output = subprocess.check_output(action, shell=True, stderr=subprocess.STDOUT) - print("Executed '{}' successfully: {}".format(action, output)) + log("Executed '{}' successfully: {}".format(action, output)) except subprocess.CalledProcessError as e: - print("Execution of '{}' failed with error '{}': {}".format(e.cmd, e.returncode, e.output)) + log("Execution of '{}' failed with error '{}': {}".format(e.cmd, e.returncode, e.output), e, + error=True) exceptions.append(e) deployment_success = False @@ -184,14 +187,14 @@ def main(): if deployment_success: for superseded_cert in superseded: try: - print("Revoking previous certificate '{}' valid until {} as superseded".format( - superseded_cert, - superseded_cert.not_valid_after)) + log("Revoking '{}' valid until {} as superseded".format( + tools.get_cert_cn(superseded_cert), + tools.get_cert_valid_until(superseded_cert))) cert_revoke(superseded_cert, domainconfigs, reason=4) # reason=4 is superseded except Exception as e: - print("Certificate supersede revoke failed: {}".format(e)) + log("Certificate supersede revoke failed", e, error=True) exceptions.append(e) # throw a RuntimeError with all exceptions caught while working if there were any if len(exceptions) > 0: - raise RuntimeError("{} exception(s) occurred during runtime: {}".format(len(exceptions), exceptions)) + raise RuntimeError("{} exception(s) occurred during processing".format(len(exceptions))) diff --git a/acertmgr/authority/__init__.py b/acertmgr/authority/__init__.py index 5207d99..60c4b5d 100644 --- a/acertmgr/authority/__init__.py +++ b/acertmgr/authority/__init__.py @@ -10,6 +10,7 @@ import json import os from acertmgr import tools +from acertmgr.tools import log authorities = dict() @@ -23,10 +24,10 @@ def authority(settings): else: acc_file = settings['account_key'] if os.path.isfile(acc_file): - print("Reading account key from {}".format(acc_file)) + log("Reading account key from {}".format(acc_file)) acc_key = tools.read_pem_file(acc_file, key=True) else: - print("Account key not found at '{0}'. Creating key.".format(acc_file)) + log("Account key not found at '{0}'. Creating key.".format(acc_file)) acc_key = tools.new_account_key(acc_file) authority_module = importlib.import_module("acertmgr.authority.{0}".format(settings["api"])) diff --git a/acertmgr/authority/v1.py b/acertmgr/authority/v1.py index 9dbba2a..91e318e 100644 --- a/acertmgr/authority/v1.py +++ b/acertmgr/authority/v1.py @@ -13,6 +13,7 @@ import time from acertmgr import tools from acertmgr.authority.acme import ACMEAuthority as AbstractACMEAuthority +from acertmgr.tools import log class ACMEAuthority(AbstractACMEAuthority): @@ -70,11 +71,11 @@ class ACMEAuthority(AbstractACMEAuthority): "agreement": self.agreement, }) if code == 201: - print("Registered!") + log("Registered!") self.registered_account = True return True elif code == 409: - print("Already registered!") + log("Already registered!") self.registered_account = True return False else: @@ -96,7 +97,7 @@ class ACMEAuthority(AbstractACMEAuthority): # verify each domain try: for domain in domains: - print("Verifying {0}...".format(domain)) + log("Verifying {0}...".format(domain)) # get new challenge code, result = self._send_signed(self.ca + "/acme/new-authz", header, { @@ -120,7 +121,7 @@ class ACMEAuthority(AbstractACMEAuthority): for domain in domains: challenge_handlers[domain].start_challenge(domain, account_thumbprint, tokens[domain]) try: - print("Starting key authorization") + log("Starting key authorization") # notify challenge are met keyauthorization = "{0}.{1}".format(tokens[domain], account_thumbprint) code, result = self._send_signed(challenges[domain]['uri'], header, { @@ -141,7 +142,7 @@ class ACMEAuthority(AbstractACMEAuthority): if challenge_status['status'] == "pending": time.sleep(2) elif challenge_status['status'] == "valid": - print("{0} verified!".format(domain)) + log("{0} verified!".format(domain)) break else: raise ValueError("{0} challenge did not pass: {1}".format( @@ -155,10 +156,10 @@ class ACMEAuthority(AbstractACMEAuthority): try: challenge_handlers[domain].destroy_challenge(domain, account_thumbprint, tokens[domain]) except Exception as e: - print('Challenge destruction failed: {}'.format(e)) + log('Challenge destruction failed: {}'.format(e), error=True) # get the new certificate - print("Signing certificate...") + log("Signing certificate...") code, result = self._send_signed(self.ca + "/acme/new-cert", header, { "resource": "new-cert", "csr": tools.bytes_to_base64url(tools.convert_cert_to_der_bytes(csr)), @@ -167,7 +168,7 @@ class ACMEAuthority(AbstractACMEAuthority): raise ValueError("Error signing certificate: {0} {1}".format(code, result)) # return signed certificate! - print("Certificate signed!") + log("Certificate signed!") cert = tools.convert_der_bytes_to_cert(result) return cert, tools.download_issuer_ca(cert) @@ -181,6 +182,6 @@ class ACMEAuthority(AbstractACMEAuthority): payload['reason'] = int(reason) code, result = self._send_signed(self.ca + "/acme/revoke-cert", header, payload) if code < 400: - print("Revocation successful") + log("Revocation successful") else: raise ValueError("Revocation failed: {}".format(result)) diff --git a/acertmgr/authority/v2.py b/acertmgr/authority/v2.py index f19e1ed..4c5c06b 100644 --- a/acertmgr/authority/v2.py +++ b/acertmgr/authority/v2.py @@ -12,6 +12,7 @@ import time from acertmgr import tools from acertmgr.authority.acme import ACMEAuthority as AbstractACMEAuthority +from acertmgr.tools import log class ACMEAuthority(AbstractACMEAuthority): @@ -41,7 +42,8 @@ class ACMEAuthority(AbstractACMEAuthority): "newOrder": "{}/acme/new-order".format(self.ca), "revokeCert": "{}/acme/revoke-cert".format(self.ca), } - print("API directory retrieval failed ({}). Guessed necessary values: {}".format(code, self.directory)) + log("API directory retrieval failed ({}). Guessed necessary values: {}".format(code, self.directory), + warning=True) self.nonce = None self.algorithm, jwk = tools.get_key_alg_and_jwk(key) @@ -127,8 +129,8 @@ class ACMEAuthority(AbstractACMEAuthority): if code < 400 and result['status'] == 'valid': self.account_id = headers['Location'] if 'meta' in self.directory and 'termsOfService' in self.directory['meta']: - print("ToS at {} have been accepted.".format(self.directory['meta']['termsOfService'])) - print("Account registered and valid.".format()) + log("ToS at {} have been accepted.".format(self.directory['meta']['termsOfService'])) + log("Account registered and valid on {}.".format(self.ca)) else: raise ValueError("Error registering account: {0} {1}".format(code, result)) @@ -142,7 +144,7 @@ class ACMEAuthority(AbstractACMEAuthority): account_thumbprint = tools.bytes_to_base64url( tools.hash_of_str(json.dumps(self.account_protected['jwk'], sort_keys=True, separators=(',', ':')))) - print("Ordering certificate for {}".format(domains)) + log("Ordering certificate for {}".format(domains)) identifiers = [{'type': 'dns', 'value': domain} for domain in domains] code, order, headers = self._request_acme_endpoint('newOrder', {'identifiers': identifiers}) if code >= 400: @@ -160,7 +162,7 @@ class ACMEAuthority(AbstractACMEAuthority): authorization['_domain'] = "*.{}".format(authorization['identifier']['value']) if \ 'wildcard' in authorization and authorization['wildcard'] else authorization['identifier']['value'] - print("Authorizing {0}".format(authorization['_domain'])) + log("Authorizing {0}".format(authorization['_domain'])) # create the challenge matching_challenges = [c for c in authorization['challenges'] if @@ -181,7 +183,7 @@ class ACMEAuthority(AbstractACMEAuthority): # after all challenges are created, start processing authorizations for authorization in authorizations: - print("Starting verification of {}".format(authorization['_domain'])) + log("Starting verification of {}".format(authorization['_domain'])) challenge_handlers[authorization['_domain']].start_challenge(authorization['identifier']['value'], account_thumbprint, authorization['_token']) @@ -196,7 +198,7 @@ class ACMEAuthority(AbstractACMEAuthority): code, challenge_status, _ = self._request_url(authorization['_challenge']['url']) if challenge_status.get('status') == "valid": - print("{0} verified".format(authorization['_domain'])) + log("{0} verified".format(authorization['_domain'])) else: raise ValueError("{0} challenge did not pass: {1}".format( authorization['_domain'], challenge_status)) @@ -212,7 +214,7 @@ class ACMEAuthority(AbstractACMEAuthority): challenge_handlers[authorization['_domain']].destroy_challenge( authorization['identifier']['value'], account_thumbprint, authorization['_token']) except Exception as e: - print('Challenge destruction failed: {}'.format(e)) + log('Challenge destruction failed: {}'.format(e), error=True) # check order status and retry once code, order, _ = self._request_url(order_url) @@ -223,7 +225,7 @@ class ACMEAuthority(AbstractACMEAuthority): raise ValueError("Order is still not ready to be finalized: {0} {1}".format(code, order)) # get the new certificate - print("Finalizing certificate") + log("Finalizing certificate") code, finalize, _ = self._request_acme_url(order['finalize'], { "csr": tools.bytes_to_base64url(tools.convert_cert_to_der_bytes(csr)), }) @@ -232,7 +234,7 @@ class ACMEAuthority(AbstractACMEAuthority): code, finalize, _ = self._request_url(order_url) if code >= 400: raise ValueError("Error finalizing certificate: {0} {1}".format(code, finalize)) - print("Certificate ready!") + log("Certificate ready!") # return certificate code, certificate, _ = self._request_url(finalize['certificate'], raw_result=True) @@ -259,6 +261,6 @@ class ACMEAuthority(AbstractACMEAuthority): payload['reason'] = int(reason) code, result, _ = self._request_acme_endpoint("revokeCert", payload) if code < 400: - print("Revocation successful") + log("Revocation successful") else: raise ValueError("Revocation failed: {}".format(result)) diff --git a/acertmgr/configuration.py b/acertmgr/configuration.py index 04ebd30..7df0474 100644 --- a/acertmgr/configuration.py +++ b/acertmgr/configuration.py @@ -14,6 +14,8 @@ import json import os import sys +from acertmgr.tools import log + try: import idna except ImportError: @@ -80,7 +82,7 @@ def idna_convert(domainlist): return domaintranslation else: if 'idna' not in sys.modules: - print("Unicode domain found but IDNA names could not be translated due to missing idna module") + log("Unicode domain(s) found but IDNA names could not be translated due to missing idna module", error=True) return list() @@ -143,14 +145,14 @@ def parse_config_entry(entry, globalconfig, runtimeconfig): # SSL cert location (with compatibility to older versions) if 'server_cert' in globalconfig: - print("WARNING: Legacy configuration directive 'server_cert' used. Support will be removed in 1.0") + log("Legacy configuration directive 'server_cert' used. Support will be removed in 1.0", warning=True) update_config_value(config, 'cert_file', localconfig, globalconfig, globalconfig.get('server_cert', os.path.join(config['cert_dir'], "{}.crt".format(config['id'])))) # SSL key location (with compatibility to older versions) if 'server_key' in globalconfig: - print("WARNING: Legacy configuration directive 'server_key' used. Support will be removed in 1.0") + log("Legacy configuration directive 'server_key' used. Support will be removed in 1.0", warning=True) update_config_value(config, 'key_file', localconfig, globalconfig, globalconfig.get('server_key', os.path.join(config['cert_dir'], "{}.key".format(config['id'])))) @@ -162,11 +164,13 @@ def parse_config_entry(entry, globalconfig, runtimeconfig): # SSL CA location / use static update_config_value(config, 'ca_file', localconfig, globalconfig, globalconfig.get('server_ca', config['defaults'].get('server_ca', - os.path.join(config['cert_dir'], "{}.ca".format(config['id']))))) + os.path.join(config['cert_dir'], + "{}.ca".format( + config['id']))))) update_config_value(config, 'ca_static', localconfig, globalconfig, "false") if 'server_ca' in globalconfig or 'server_ca' in config['defaults']: config['ca_static'] = "true" - print("WARNING: Legacy configuration directive 'server_ca' used. Support will be removed in 1.0") + log("Legacy configuration directive 'server_ca' used. Support removed in 1.0", warning=True) # Domain action configuration config['actions'] = list() @@ -221,7 +225,8 @@ def load(): if args.config_file: global_config_file = args.config_file elif os.path.isfile(LEGACY_CONF_FILE): - print("WARNING: Legacy config file '{}' used. Move to '{}' for 1.0".format(LEGACY_CONF_FILE, DEFAULT_CONF_FILE)) + log("Legacy config file '{}' used. Move to '{}' for 1.0".format(LEGACY_CONF_FILE, DEFAULT_CONF_FILE), + warning=True) global_config_file = LEGACY_CONF_FILE else: global_config_file = DEFAULT_CONF_FILE @@ -230,7 +235,7 @@ def load(): if args.config_dir: domain_config_dir = args.config_dir elif os.path.isdir(LEGACY_CONF_DIR): - print("WARNING: Legacy config dir '{}' used. Move to '{}' for 1.0".format(LEGACY_CONF_DIR, DEFAULT_CONF_DIR)) + log("Legacy config dir '{}' used. Move to '{}' for 1.0".format(LEGACY_CONF_DIR, DEFAULT_CONF_DIR), warning=True) domain_config_dir = LEGACY_CONF_DIR else: domain_config_dir = DEFAULT_CONF_DIR @@ -240,7 +245,7 @@ def load(): if args.work_dir: runtimeconfig['work_dir'] = args.work_dir elif os.path.isdir(LEGACY_WORK_DIR) and domain_config_dir == LEGACY_CONF_DIR: - print("WARNING: Legacy work dir '{}' used. Move to config-dir for 1.0".format(LEGACY_WORK_DIR)) + log("Legacy work dir '{}' used. Move to config-dir for 1.0".format(LEGACY_WORK_DIR), warning=True) runtimeconfig['work_dir'] = LEGACY_WORK_DIR else: runtimeconfig['work_dir'] = domain_config_dir diff --git a/acertmgr/modes/dns/abstract.py b/acertmgr/modes/dns/abstract.py index b521322..3239b41 100644 --- a/acertmgr/modes/dns/abstract.py +++ b/acertmgr/modes/dns/abstract.py @@ -18,6 +18,7 @@ import dns.update from acertmgr import tools from acertmgr.modes.abstract import AbstractChallengeHandler +from acertmgr.tools import log REGEX_IP4 = r'^(?:[0-9]{1,3}\.){3}[0-9]{1,3}$' REGEX_IP6 = r'^(([0-9a-fA-F]{1,4}:){7,7}[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,7}' \ @@ -187,7 +188,7 @@ class DNSChallengeHandler(AbstractChallengeHandler): if self.verify_dns_record(domain, txtvalue): return else: - print("Waiting until TXT record '{}' is ready".format(domain)) + log("Waiting until TXT record '{}' is ready".format(domain)) while failtime > datetime.now(): time.sleep(self.dns_verify_interval) if self.verify_dns_record(domain, txtvalue): @@ -204,7 +205,7 @@ class DNSChallengeHandler(AbstractChallengeHandler): ns_ip = self._lookup_ns_ip(domain, nameserverip) if len(ns_ip) > 0 and all(self._check_txt_record_value(domain, txtvalue, ip) for ip in ns_ip): # All NS servers have the necessary TXT record. Succeed immediately! - print("All NS ({}) for '{}' have the correct TXT record".format(','.join(ns_ip), domain)) + log("All NS ({}) for '{}' have the correct TXT record".format(','.join(ns_ip), domain)) return True except (ValueError, dns.exception.DNSException): # Fall back to next verification @@ -216,7 +217,7 @@ class DNSChallengeHandler(AbstractChallengeHandler): nameserverip = self._lookup_ip(self.dns_verify_server) if self._check_txt_record_value(domain, txtvalue, nameserverip): # Verify server confirms the necessary TXT record. Succeed immediately! - print("DNS server '{}' found correct TXT record for '{}'".format(self.dns_verify_server, domain)) + log("DNS server '{}' found correct TXT record for '{}'".format(self.dns_verify_server, domain)) return True except (ValueError, dns.exception.DNSException): # Fall back to next verification diff --git a/acertmgr/modes/dns/nsupdate.py b/acertmgr/modes/dns/nsupdate.py index 0ad3859..28eca84 100644 --- a/acertmgr/modes/dns/nsupdate.py +++ b/acertmgr/modes/dns/nsupdate.py @@ -13,6 +13,7 @@ import dns.tsigkeyring import dns.update from acertmgr.modes.dns.abstract import DNSChallengeHandler +from acertmgr.tools import log DEFAULT_KEY_ALGORITHM = "HMAC-MD5.SIG-ALG.REG.INT" @@ -29,12 +30,9 @@ class ChallengeHandler(DNSChallengeHandler): algorithm = re.search(r"algorithm ([a-zA-Z0-9_-]+?);", key_data, re.DOTALL).group(1) tsig_secret = re.search(r"secret \"(.*?)\"", key_data, re.DOTALL).group(1) except IOError as exc: - print(exc) - raise Exception( - "A problem was encountered opening your keyfile, %s." % tsig_key_file) + raise ValueError("A problem was encountered opening your keyfile '{}': {}".format(tsig_key_file, exc)) except AttributeError as exc: - print(exc) - raise Exception("Unable to decipher the keyname and secret from your keyfile.") + raise ValueError("Unable to decipher data from your keyfile: {}".format(exc)) keyring = dns.tsigkeyring.from_text({ key_name: tsig_secret @@ -73,14 +71,14 @@ class ChallengeHandler(DNSChallengeHandler): zone, nameserverip = self._determine_zone_and_nameserverip(domain) update = dns.update.Update(zone, keyring=self.keyring, keyalgorithm=self.keyalgorithm) update.add(domain, self.dns_ttl, dns.rdatatype.TXT, txtvalue) - print('Adding \'{} {} IN TXT "{}"\' to {}'.format(domain, self.dns_ttl, txtvalue, nameserverip)) + log('Adding \'{} {} IN TXT "{}"\' to {}'.format(domain, self.dns_ttl, txtvalue, nameserverip)) dns.query.tcp(update, nameserverip) def remove_dns_record(self, domain, txtvalue): zone, nameserverip = self._determine_zone_and_nameserverip(domain) update = dns.update.Update(zone, keyring=self.keyring, keyalgorithm=self.keyalgorithm) update.delete(domain, dns.rdata.from_text(dns.rdataclass.IN, dns.rdatatype.TXT, txtvalue)) - print('Deleting \'{} {} IN TXT "{}"\' from {}'.format(domain, self.dns_ttl, txtvalue, nameserverip)) + log('Deleting \'{} {} IN TXT "{}"\' from {}'.format(domain, self.dns_ttl, txtvalue, nameserverip)) dns.query.tcp(update, nameserverip) def verify_dns_record(self, domain, txtvalue): @@ -88,7 +86,7 @@ class ChallengeHandler(DNSChallengeHandler): # Verify master DNS only if we don't do a full NS check and it has not yet been verified _, nameserverip = self._determine_zone_and_nameserverip(domain) if self._check_txt_record_value(domain, txtvalue, nameserverip, use_tcp=True): - print('Verified \'{} {} IN TXT "{}"\' on {}'.format(domain, self.dns_ttl, txtvalue, nameserverip)) + log('Verified \'{} {} IN TXT "{}"\' on {}'.format(domain, self.dns_ttl, txtvalue, nameserverip)) self.nsupdate_verified = True else: # Master DNS verification failed. Return immediately and try again. diff --git a/acertmgr/modes/standalone.py b/acertmgr/modes/standalone.py index 6dc1d4f..5353548 100644 --- a/acertmgr/modes/standalone.py +++ b/acertmgr/modes/standalone.py @@ -16,6 +16,7 @@ import socket import threading from acertmgr.modes.webdir import HTTPChallengeHandler +from acertmgr.tools import log HTTPServer.allow_reuse_address = True @@ -36,7 +37,7 @@ class ChallengeHandler(HTTPChallengeHandler): # Custom HTTP request handler class _HTTPRequestHandler(BaseHTTPRequestHandler): def log_message(self, fmt, *args): - print("Request from '%s': %s" % (self.address_string(), fmt % args)) + log("Request from '%s': %s" % (self.address_string(), fmt % args)) def do_GET(self): # Match token on http:///.well-known/acme-challenge/ diff --git a/acertmgr/tools.py b/acertmgr/tools.py index a2db6fb..5e8da69 100644 --- a/acertmgr/tools.py +++ b/acertmgr/tools.py @@ -11,6 +11,8 @@ import binascii import datetime import io import os +import sys +import traceback import six from cryptography import x509 @@ -29,6 +31,35 @@ class InvalidCertificateError(Exception): pass +# @brief wrapper for log output +def log(msg, exc=None, error=False, warning=False): + if error: + prefix = "Error: " + elif warning: + prefix = "Warning: " + else: + prefix = "" + + output = prefix + msg + if exc: + _, exc_value, _ = sys.exc_info() + if not getattr(exc, '__traceback__', None) and exc == exc_value: + # Traceback handling on Python 2 is ugly, so we only output it if the exception is the current sys one + formatted_exc = traceback.format_exc() + else: + formatted_exc = traceback.format_exception(type(exc), exc, getattr(exc, '__traceback__', None)) + exc_string = ''.join(formatted_exc) if isinstance(formatted_exc, list) else str(formatted_exc) + indent = ' ' * len(prefix) + output += os.linesep + os.linesep.join(indent + line for line in exc_string.splitlines()) + + if error or warning: + sys.stderr.write(output + os.linesep) + sys.stderr.flush() # force flush buffers after message was written for immediate display + else: + sys.stdout.write(output + os.linesep) + sys.stdout.flush() # force flush buffers after message was written for immediate display + + # @brief wrapper for downloading an url def get_url(url, data=None, headers=None): return urlopen(Request(url, data=data, headers={} if headers is None else headers)) @@ -71,7 +102,7 @@ def new_cert_request(names, key, must_staple=False): if getattr(x509, 'TLSFeature', None): req = req.add_extension(x509.TLSFeature(features=[x509.TLSFeatureType.status_request]), critical=False) else: - print('OCSP must-staple ignored as current version of cryptography does not support the flag.') + log('OCSP must-staple ignored as current version of cryptography does not support the flag.', warning=True) req = req.sign(key, hashes.SHA256(), default_backend()) return req @@ -101,7 +132,7 @@ def new_ssl_key(path=None, key_size=4096): try: os.chmod(path, int("0400", 8)) except OSError: - print('Warning: Could not set file permissions on {0}!'.format(path)) + log('Could not set file permissions on {0}!'.format(path), warning=True) return private_key @@ -128,7 +159,7 @@ def write_pem_file(crt, path, perms=None): try: os.chmod(path, perms) except OSError: - print('Warning: Could not set file permissions ({0}) on {1}!'.format(perms, path)) + log('Could not set file permissions ({0}) on {1}!'.format(perms, path), warning=True) # @brief download the issuer ca for a given certificate @@ -143,14 +174,14 @@ def download_issuer_ca(cert): break if not ca_issuers: - print("Could not determine issuer CA for given certificate: {}".format(cert)) + log("Could not determine issuer CA for given certificate: {}".format(cert), error=True) return None - print("Downloading CA certificate from {}".format(ca_issuers)) + log("Downloading CA certificate from {}".format(ca_issuers)) resp = get_url(ca_issuers) code = resp.getcode() if code >= 400: - print("Could not download issuer CA (error {}) for given certificate: {}".format(code, cert)) + log("Could not download issuer CA (error {}) for given certificate: {}".format(code, cert), error=True) return None return x509.load_der_x509_certificate(resp.read(), default_backend()) @@ -159,7 +190,7 @@ def download_issuer_ca(cert): # @brief determine all san domains on a given certificate def get_cert_domains(cert): if cert is None: - print("WARN: None-certificate has no domains. You have found a bug. Congratulations!") + log("None-certificate has no domains. You have found a bug. Congratulations!", warning=True) return [] san_cert = cert.extensions.get_extension_for_oid(ExtensionOID.SUBJECT_ALTERNATIVE_NAME) @@ -169,6 +200,16 @@ def get_cert_domains(cert): return [cert.subject.rfc4514_string()[3:], ] # strip CN= from the result and return as 1 item list +# @brief determine certificate cn +def get_cert_cn(cert): + return "CN={}".format(cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value) + + +# @brief determine certificate end of validity +def get_cert_valid_until(cert): + return cert.not_valid_after + + # @brief convert certificate to PEM format # @param cert certificate object in pyopenssl format # @return the certificate in PEM format