1
0
mirror of https://github.com/moepman/acertmgr.git synced 2024-06-15 00:32:34 +02:00

acertmgr: cleanup code (PEP-8 + replace assertions)

This commit is contained in:
Kishi85 2019-03-22 12:33:40 +01:00
parent d62afac9d6
commit 83f31bf91d
5 changed files with 16 additions and 21 deletions

View File

@ -175,11 +175,8 @@ class ACMEAuthority(AbstractACMEAuthority):
for domain in reversed(domains): for domain in reversed(domains):
try: try:
challenge_handlers[domain].destroy_challenge(domain, account_thumbprint, tokens[domain]) challenge_handlers[domain].destroy_challenge(domain, account_thumbprint, tokens[domain])
except (KeyboardInterrupt, SystemError, SystemExit): except Exception as e:
# Re-raise runtime/system exceptions print('Challenge destruction failed: {}'.format(e))
raise
except:
pass
# get the new certificate # get the new certificate
print("Signing certificate...") print("Signing certificate...")

View File

@ -179,10 +179,8 @@ class ACMEAuthority(AbstractACMEAuthority):
if code >= 400: if code >= 400:
raise ValueError("Error requesting authorization: {0} {1}".format(code, authorization)) raise ValueError("Error requesting authorization: {0} {1}".format(code, authorization))
authorization['_domain'] = \ authorization['_domain'] = "*.{}".format(authorization['identifier']['value']) if \
"*.{}".format(authorization['identifier']['value']) \ 'wildcard' in authorization and authorization['wildcard'] else authorization['identifier']['value']
if 'wildcard' in authorization and authorization['wildcard'] \
else authorization['identifier']['value']
print("Authorizing {0}".format(authorization['_domain'])) print("Authorizing {0}".format(authorization['_domain']))
# create the challenge # create the challenge
@ -235,11 +233,8 @@ class ACMEAuthority(AbstractACMEAuthority):
try: try:
challenge_handlers[authorization['_domain']].destroy_challenge( challenge_handlers[authorization['_domain']].destroy_challenge(
authorization['identifier']['value'], account_thumbprint, authorization['_token']) authorization['identifier']['value'], account_thumbprint, authorization['_token'])
except (KeyboardInterrupt, SystemError, SystemExit): except Exception as e:
# Re-raise runtime/system exceptions print('Challenge destruction failed: {}'.format(e))
raise
except:
pass
# check order status and retry once # check order status and retry once
code, order, _ = self._request_url(order_url) code, order, _ = self._request_url(order_url)
@ -267,8 +262,8 @@ class ACMEAuthority(AbstractACMEAuthority):
if code >= 400: if code >= 400:
raise ValueError("Error downloading certificate chain: {0} {1}".format(code, certificate)) raise ValueError("Error downloading certificate chain: {0} {1}".format(code, certificate))
cert_dict = re.match(("(?P<cert>-----BEGIN CERTIFICATE-----[^\-]+-----END CERTIFICATE-----)\n\n" cert_dict = re.match((r'(?P<cert>-----BEGIN CERTIFICATE-----[^\-]+-----END CERTIFICATE-----)\n\n'
"(?P<ca>-----BEGIN CERTIFICATE-----[^\-]+-----END CERTIFICATE-----)?"), r'(?P<ca>-----BEGIN CERTIFICATE-----[^\-]+-----END CERTIFICATE-----)?'),
certificate.decode('utf-8'), re.DOTALL).groupdict() certificate.decode('utf-8'), re.DOTALL).groupdict()
cert = x509.load_pem_x509_certificate(cert_dict['cert'].encode('utf-8'), default_backend()) cert = x509.load_pem_x509_certificate(cert_dict['cert'].encode('utf-8'), default_backend())
if cert_dict['ca'] is None: if cert_dict['ca'] is None:

View File

@ -24,7 +24,7 @@ class DNSChallengeHandler(AbstractChallengeHandler):
def __init__(self, config): def __init__(self, config):
AbstractChallengeHandler.__init__(self, config) AbstractChallengeHandler.__init__(self, config)
self.dns_updatedomain = config.get("dns_updatedomain") self.dns_updatedomain = config.get("dns_updatedomain")
self.dns_ttl = int(config.get("dns_ttl",60)) self.dns_ttl = int(config.get("dns_ttl", 60))
def _determine_challenge_domain(self, domain): def _determine_challenge_domain(self, domain):
if self.dns_updatedomain: if self.dns_updatedomain:

View File

@ -36,13 +36,15 @@ class ACMERequestHandler(SimpleHTTPRequestHandler):
# webservers. # webservers.
def translate_path(self, path): def translate_path(self, path):
spath = path.split('/') spath = path.split('/')
assert (spath[0] == '') if spath[0] != '':
raise ValueError("spath should be '' is {}".format(spath[0]))
spath = spath[1:] spath = spath[1:]
if spath[0] == '.well-known': if spath[0] == '.well-known':
spath = spath[1:] spath = spath[1:]
if spath[0] == 'acme-challenge': if spath[0] == 'acme-challenge':
spath = spath[1:] spath = spath[1:]
assert (len(spath) == 1) if len(spath) != 1:
raise ValueError("spath length {} != 1".format(len(spath)))
spath.insert(0, '') spath.insert(0, '')
path = '/'.join(spath) path = '/'.join(spath)
return SimpleHTTPRequestHandler.translate_path(self, path) return SimpleHTTPRequestHandler.translate_path(self, path)

View File

@ -38,8 +38,9 @@ class ChallengeHandler(AbstractChallengeHandler):
try: try:
resp = urlopen(wellknown_url) resp = urlopen(wellknown_url)
resp_data = resp.read().decode('utf8').strip() resp_data = resp.read().decode('utf8').strip()
assert resp_data == keyauthorization if resp_data != keyauthorization:
except (IOError, AssertionError): raise ValueError("keyauthorization and response data do NOT match")
except (IOError, ValueError):
os.remove(wellknown_path) os.remove(wellknown_path)
raise ValueError("Wrote file to {0}, but couldn't download {1}".format( raise ValueError("Wrote file to {0}, but couldn't download {1}".format(
wellknown_path, wellknown_url)) wellknown_path, wellknown_url))