1
0
mirror of https://github.com/moepman/acertmgr.git synced 2025-01-01 04:21:51 +01:00

modes: unify and optimize challenge handler workflow

- Remove wait times returned by create_challenge
- Remove wait loops from authorities
- Add the wait for valid DNS TXT records in the abstract
  DNSChallengeHandler start_challenge function.
- Move challenge verification to start_challenge in general
This commit is contained in:
Kishi85 2019-03-29 12:59:56 +01:00
parent 54cb334600
commit 1aae651d98
8 changed files with 125 additions and 106 deletions

View File

@ -19,7 +19,7 @@ Optional packages (required to use specified features)
------------------------------------------------------ ------------------------------------------------------
* PyYAML: to parse YAML-formatted configuration files * PyYAML: to parse YAML-formatted configuration files
* dnspython: used by dns.nsupdate for RFC2136 dynamic updates to DNS * dnspython: used by dns.* challenge handlers
* idna: to allow automatic conversion of unicode domain names to their IDNA2008 counterparts * idna: to allow automatic conversion of unicode domain names to their IDNA2008 counterparts
Setup Setup
@ -81,12 +81,16 @@ By default the directory (work_dir) containing the working data (csr,certificate
| key_file | **d**,g | Path to store (and load) the private key file | {cert_dir}/{cert_id}.key | | key_file | **d**,g | Path to store (and load) the private key file | {cert_dir}/{cert_id}.key |
| mode | **d**,g | Mode of challenge handling used | standalone | | mode | **d**,g | Mode of challenge handling used | standalone |
| webdir | **d**,g | [webdir] Put acme challenges into this path | /var/www/acme-challenge/ | | webdir | **d**,g | [webdir] Put acme challenges into this path | /var/www/acme-challenge/ |
| webdir_verify | **d**,g | [webdir] Verify challenge after writing it | true | | http_verify | **d**,g | [webdir/standalone] Verify challenge before attempting authorization | true |
| bind_address | **d**,g | [standalone] Serve the challenge using a HTTP server on given IP | | | bind_address | **d**,g | [standalone] Serve the challenge using a HTTP server on given IP | |
| port | **d**,g | [standalone] Serve the challenge using a HTTP server on this port | 80 | | port | **d**,g | [standalone] Serve the challenge using a HTTP server on this port | 80 |
| dns_ttl | **d**,g | [dns.*] Write TXT records with this TTL (also determines the update wait time at twice this value | 60 | | dns_ttl | **d**,g | [dns.*] Write TXT records with this TTL (also determines the update wait time at twice this value | 60 |
| dns_updatedomain | **d**,g | [dns.*] Write the TXT records to this domain (you have to create the necessary CNAME on the real challenge domain manually) | | | dns_updatedomain | **d**,g | [dns.*] Write the TXT records to this domain (you have to create the necessary CNAME on the real challenge domain manually) | |
| dns_verify_interval | **d**,g | [dns.*] Do verification checks when starting the challenge every {dns_verify_interval} seconds | 10 |
| dns_verify_failtime | **d**,g | [dns.*] Fail challenge TXT record verification after {dns_verify_failtime} seconds | {dns_waittime} + 1 |
| dns_verify_waittime | **d**,g | [dns.*] Assume DNS challenges are valid after {dns_verify_waittime} | 2 * {dns_ttl} |
| nsupdate_server | **d**,g | [dns.nsupdate] DNS Server to delegate the update to | {determine from zone SOA} | | nsupdate_server | **d**,g | [dns.nsupdate] DNS Server to delegate the update to | {determine from zone SOA} |
| nsupdate_verify | **d**,g | [dns.*] Verify TXT record on the update server upon creation | true |
| nsupdate_keyfile | **d**,g | [dns.nsupdate] Bind-formatted TSIG key file to use for updates (may be used instead of nsupdate_key*) | | | nsupdate_keyfile | **d**,g | [dns.nsupdate] Bind-formatted TSIG key file to use for updates (may be used instead of nsupdate_key*) | |
| nsupdate_keyname | **d**,g | [dns.nsupdate] TSIG key name to use for updates | | | nsupdate_keyname | **d**,g | [dns.nsupdate] TSIG key name to use for updates | |
| nsupdate_keyvalue | **d**,g | [dns.nsupdate] TSIG key value to use for updates | | | nsupdate_keyvalue | **d**,g | [dns.nsupdate] TSIG key value to use for updates | |

View File

@ -7,7 +7,6 @@
# available under the ISC license, see LICENSE # available under the ISC license, see LICENSE
import copy import copy
import datetime
import json import json
import re import re
import time import time
@ -98,7 +97,6 @@ class ACMEAuthority(AbstractACMEAuthority):
challenges = dict() challenges = dict()
tokens = dict() tokens = dict()
valid_times = list()
# verify each domain # verify each domain
try: try:
for domain in domains: for domain in domains:
@ -120,16 +118,11 @@ class ACMEAuthority(AbstractACMEAuthority):
if domain not in challenge_handlers: if domain not in challenge_handlers:
raise ValueError("No challenge handler given for domain: {0}".format(domain)) raise ValueError("No challenge handler given for domain: {0}".format(domain))
valid_times.append( challenge_handlers[domain].create_challenge(domain, account_thumbprint, tokens[domain])
challenge_handlers[domain].create_challenge(domain, account_thumbprint, tokens[domain]))
print("Waiting until challenges are valid ({})".format(",".join([str(x) for x in valid_times])))
for valid_time in valid_times:
while datetime.datetime.now() < valid_time:
time.sleep(1)
# after all challenges are created, start processing authorizations
for domain in domains: for domain in domains:
challenge_handlers[domain].start_challenge() challenge_handlers[domain].start_challenge(domain, account_thumbprint, tokens[domain])
try: try:
print("Starting key authorization") print("Starting key authorization")
# notify challenge are met # notify challenge are met
@ -158,7 +151,7 @@ class ACMEAuthority(AbstractACMEAuthority):
raise ValueError("{0} challenge did not pass: {1}".format( raise ValueError("{0} challenge did not pass: {1}".format(
domain, challenge_status)) domain, challenge_status))
finally: finally:
challenge_handlers[domain].stop_challenge() challenge_handlers[domain].stop_challenge(domain, account_thumbprint, tokens[domain])
finally: finally:
# Destroy challenge handlers in reverse order to replay # Destroy challenge handlers in reverse order to replay
# any saved state information in the handlers correctly # any saved state information in the handlers correctly

View File

@ -6,7 +6,6 @@
# available under the ISC license, see LICENSE # available under the ISC license, see LICENSE
import copy import copy
import datetime
import json import json
import re import re
import time import time
@ -159,7 +158,6 @@ class ACMEAuthority(AbstractACMEAuthority):
authorizations = list() authorizations = list()
# verify each domain # verify each domain
try: try:
valid_times = list()
for authorizationUrl in order['authorizations']: for authorizationUrl in order['authorizations']:
# get new challenge # get new challenge
code, authorization, _ = self._request_url(authorizationUrl) code, authorization, _ = self._request_url(authorizationUrl)
@ -182,20 +180,17 @@ class ACMEAuthority(AbstractACMEAuthority):
if authorization['_domain'] not in challenge_handlers: if authorization['_domain'] not in challenge_handlers:
raise ValueError("No challenge handler given for domain: {0}".format(authorization['_domain'])) raise ValueError("No challenge handler given for domain: {0}".format(authorization['_domain']))
valid_times.append( challenge_handlers[authorization['_domain']].create_challenge(authorization['identifier']['value'],
challenge_handlers[authorization['_domain']].create_challenge(authorization['identifier']['value'], account_thumbprint,
account_thumbprint, authorization['_token'])
authorization['_token']))
authorizations.append(authorization) authorizations.append(authorization)
print("Waiting until challenges are valid ({})".format(",".join([str(x) for x in valid_times]))) # after all challenges are created, start processing authorizations
for valid_time in valid_times:
while datetime.datetime.now() < valid_time:
time.sleep(1)
for authorization in authorizations: for authorization in authorizations:
print("Starting verification of {}".format(authorization['_domain'])) print("Starting verification of {}".format(authorization['_domain']))
challenge_handlers[authorization['_domain']].start_challenge() challenge_handlers[authorization['_domain']].start_challenge(authorization['identifier']['value'],
account_thumbprint,
authorization['_token'])
try: try:
# notify challenge is met # notify challenge is met
code, challenge_status, _ = self._request_acme_url(authorization['_challenge']['url'], { code, challenge_status, _ = self._request_acme_url(authorization['_challenge']['url'], {
@ -212,7 +207,9 @@ class ACMEAuthority(AbstractACMEAuthority):
raise ValueError("{0} challenge did not pass: {1}".format( raise ValueError("{0} challenge did not pass: {1}".format(
authorization['_domain'], challenge_status)) authorization['_domain'], challenge_status))
finally: finally:
challenge_handlers[authorization['_domain']].stop_challenge() challenge_handlers[authorization['_domain']].stop_challenge(authorization['identifier']['value'],
account_thumbprint,
authorization['_token'])
finally: finally:
# Destroy challenge handlers in reverse order to replay # Destroy challenge handlers in reverse order to replay
# any saved state information in the handlers correctly # any saved state information in the handlers correctly

View File

@ -14,7 +14,6 @@ class AbstractChallengeHandler:
def get_challenge_type(): def get_challenge_type():
raise NotImplementedError raise NotImplementedError
# @return datetime after which the challenge is valid
def create_challenge(self, domain, thumbprint, token): def create_challenge(self, domain, thumbprint, token):
raise NotImplementedError raise NotImplementedError
@ -22,9 +21,9 @@ class AbstractChallengeHandler:
raise NotImplementedError raise NotImplementedError
# Optional: Indicate when a challenge request is imminent # Optional: Indicate when a challenge request is imminent
def start_challenge(self): def start_challenge(self, domain, thumbprint, token):
pass pass
# Optional: Indicate when a challenge response has been received # Optional: Indicate when a challenge response has been received
def stop_challenge(self): def stop_challenge(self, domain, thumbprint, token):
pass pass

View File

@ -4,6 +4,9 @@
# Copyright (c) Rudolf Mayerhofer, 2018-2019 # Copyright (c) Rudolf Mayerhofer, 2018-2019
# available under the ISC license, see LICENSE # available under the ISC license, see LICENSE
import time
from datetime import datetime, timedelta
import dns import dns
import dns.query import dns.query
import dns.resolver import dns.resolver
@ -15,6 +18,10 @@ from acertmgr.modes.abstract import AbstractChallengeHandler
class DNSChallengeHandler(AbstractChallengeHandler): class DNSChallengeHandler(AbstractChallengeHandler):
@staticmethod
def _determine_txtvalue(thumbprint, token):
return tools.bytes_to_base64url(tools.hash_of_str("{0}.{1}".format(token, thumbprint)))
@staticmethod @staticmethod
def get_challenge_type(): def get_challenge_type():
return "dns-01" return "dns-01"
@ -23,6 +30,11 @@ class DNSChallengeHandler(AbstractChallengeHandler):
AbstractChallengeHandler.__init__(self, config) AbstractChallengeHandler.__init__(self, config)
self.dns_updatedomain = config.get("dns_updatedomain") self.dns_updatedomain = config.get("dns_updatedomain")
self.dns_ttl = int(config.get("dns_ttl", 60)) self.dns_ttl = int(config.get("dns_ttl", 60))
self.dns_verify_waittime = int(config.get("dns_verify_waittime", 2 * self.dns_ttl))
self.dns_verify_failtime = int(config.get("dns_verify_failtime", self.dns_verify_waittime + 1))
self.dns_verify_interval = int(config.get("dns_verify_interval", 10))
self._valid_times = {}
def _determine_challenge_domain(self, domain): def _determine_challenge_domain(self, domain):
if self.dns_updatedomain: if self.dns_updatedomain:
@ -36,14 +48,11 @@ class DNSChallengeHandler(AbstractChallengeHandler):
return domain.to_text() return domain.to_text()
@staticmethod
def _determine_txtvalue(thumbprint, token):
return tools.bytes_to_base64url(tools.hash_of_str("{0}.{1}".format(token, thumbprint)))
def create_challenge(self, domain, thumbprint, token): def create_challenge(self, domain, thumbprint, token):
domain = self._determine_challenge_domain(domain) domain = self._determine_challenge_domain(domain)
txtvalue = self._determine_txtvalue(thumbprint, token) txtvalue = self._determine_txtvalue(thumbprint, token)
return self.add_dns_record(domain, txtvalue) self.add_dns_record(domain, txtvalue)
self._valid_times[domain] = datetime.now() + timedelta(seconds=self.dns_verify_waittime)
def add_dns_record(self, domain, txtvalue): def add_dns_record(self, domain, txtvalue):
raise NotImplementedError raise NotImplementedError
@ -51,7 +60,26 @@ class DNSChallengeHandler(AbstractChallengeHandler):
def destroy_challenge(self, domain, thumbprint, token): def destroy_challenge(self, domain, thumbprint, token):
domain = self._determine_challenge_domain(domain) domain = self._determine_challenge_domain(domain)
txtvalue = self._determine_txtvalue(thumbprint, token) txtvalue = self._determine_txtvalue(thumbprint, token)
return self.remove_dns_record(domain, txtvalue) self.remove_dns_record(domain, txtvalue)
def remove_dns_record(self, domain, txtvalue): def remove_dns_record(self, domain, txtvalue):
raise NotImplementedError raise NotImplementedError
def start_challenge(self, domain, thumbprint, token):
domain = self._determine_challenge_domain(domain)
txtvalue = self._determine_txtvalue(thumbprint, token)
failtime = datetime.now() + timedelta(seconds=self.dns_verify_failtime)
if self.verify_dns_record(domain, txtvalue):
return
else:
print("Waiting until TXT record '{}' is ready".format(domain))
while failtime > datetime.now():
time.sleep(self.dns_verify_interval)
if self.verify_dns_record(domain, txtvalue):
return
raise ValueError("DNS challenge is not ready after waiting {} seconds".format(self.dns_verify_waittime))
def verify_dns_record(self, domain, txtvalue):
if domain not in self._valid_times:
return False
return datetime.now() >= self._valid_times[domain]

View File

@ -4,7 +4,6 @@
# dns.nsupdate - rfc2136 based challenge handler # dns.nsupdate - rfc2136 based challenge handler
# Copyright (c) Rudolf Mayerhofer, 2019 # Copyright (c) Rudolf Mayerhofer, 2019
# available under the ISC license, see LICENSE # available under the ISC license, see LICENSE
import datetime
import io import io
import ipaddress import ipaddress
import re import re
@ -109,10 +108,6 @@ class ChallengeHandler(DNSChallengeHandler):
domain = domain.parent() domain = domain.parent()
raise Exception('Could not find Zone SOA for "{0}"'.format(domain)) raise Exception('Could not find Zone SOA for "{0}"'.format(domain))
@staticmethod
def get_challenge_type():
return "dns-01"
def __init__(self, config): def __init__(self, config):
DNSChallengeHandler.__init__(self, config) DNSChallengeHandler.__init__(self, config)
if 'nsupdate_keyfile' in config: if 'nsupdate_keyfile' in config:
@ -123,20 +118,35 @@ class ChallengeHandler(DNSChallengeHandler):
config.get("nsupdate_keyname"): config.get("nsupdate_keyvalue") config.get("nsupdate_keyname"): config.get("nsupdate_keyvalue")
}) })
self.keyalgorithm = config.get("nsupdate_keyalgorithm", DEFAULT_KEY_ALGORITHM) self.keyalgorithm = config.get("nsupdate_keyalgorithm", DEFAULT_KEY_ALGORITHM)
self.dns_server = config.get("nsupdate_server") self.nsupdate_server = config.get("nsupdate_server")
self.dns_verify = config.get("nsupdate_verify", "true") == "true" self.nsupdate_verify = config.get("nsupdate_verify", "true") == "true"
self.nsupdate_verified = False
def _determine_zone_and_nameserverip(self, domain): def _determine_zone_and_nameserverip(self, domain):
nameserver = self.dns_server nameserver = self.nsupdate_server
if nameserver: if nameserver:
nameserverip = self._lookup_dns_server(nameserver) nameserverip = self._lookup_dns_server(nameserver)
zone, _ = self._get_soa(domain, nameserverip) zone, _ = self._get_soa(domain, nameserverip)
else: else:
zone, nameserver = self._get_soa(domain) zone, nameserver = self._get_soa(domain)
nameserverip = self._lookup_dns_server(nameserver) nameserverip = self._lookup_dns_server(nameserver)
return zone, nameserverip return zone, nameserverip
def _check_txt_record_value(self, domain, txtvalue, nameserverip, use_tcp=False):
try:
request = dns.message.make_query(domain, dns.rdatatype.TXT)
if use_tcp:
response = dns.query.tcp(request, nameserverip)
else:
response = dns.query.udp(request, nameserverip)
for rrset in response.answer:
for answer in rrset:
if answer.to_text().strip('"') == txtvalue:
return True
except dns.exception.DNSException:
# Ignore DNS errors and return failure
return False
def add_dns_record(self, domain, txtvalue): def add_dns_record(self, domain, txtvalue):
zone, nameserverip = self._determine_zone_and_nameserverip(domain) zone, nameserverip = self._determine_zone_and_nameserverip(domain)
update = dns.update.Update(zone, keyring=self.keyring, keyalgorithm=self.keyalgorithm) update = dns.update.Update(zone, keyring=self.keyring, keyalgorithm=self.keyalgorithm)
@ -144,36 +154,21 @@ class ChallengeHandler(DNSChallengeHandler):
print('Adding \'{} {} IN TXT "{}"\' to {}'.format(domain, self.dns_ttl, txtvalue, nameserverip)) print('Adding \'{} {} IN TXT "{}"\' to {}'.format(domain, self.dns_ttl, txtvalue, nameserverip))
dns.query.tcp(update, nameserverip) dns.query.tcp(update, nameserverip)
verified = False
retry = 0
while self.dns_verify and not verified and retry < 5:
request = dns.message.make_query(domain, dns.rdatatype.TXT)
response = dns.query.tcp(request, nameserverip)
for rrset in response.answer:
for answer in rrset:
if answer.to_text().strip('"') == txtvalue:
verified = True
print('Verified \'{} {} IN TXT "{}"\' on {}'.format(domain,
self.dns_ttl,
txtvalue,
nameserverip))
break
if not verified:
time.sleep(1)
retry += 1
if not self.dns_verify or verified:
# Return a valid time at twice the given TTL (to allow DNS to propagate)
return datetime.datetime.now() + datetime.timedelta(seconds=2 * self.dns_ttl)
else:
raise ValueError('Failed to verify \'{} {} IN TXT "{}"\' on {}'.format(domain,
self.dns_ttl,
txtvalue,
nameserverip))
def remove_dns_record(self, domain, txtvalue): def remove_dns_record(self, domain, txtvalue):
zone, nameserverip = self._determine_zone_and_nameserverip(domain) zone, nameserverip = self._determine_zone_and_nameserverip(domain)
update = dns.update.Update(zone, keyring=self.keyring, keyalgorithm=self.keyalgorithm) update = dns.update.Update(zone, keyring=self.keyring, keyalgorithm=self.keyalgorithm)
update.delete(domain, dns.rdata.from_text(dns.rdataclass.IN, dns.rdatatype.TXT, txtvalue)) update.delete(domain, dns.rdata.from_text(dns.rdataclass.IN, dns.rdatatype.TXT, txtvalue))
print('Deleting \'{} 60 IN TXT "{}"\' from {}'.format(domain, txtvalue, nameserverip)) print('Deleting \'{} {} IN TXT "{}"\' from {}'.format(domain, self.dns_ttl, txtvalue, nameserverip))
dns.query.tcp(update, nameserverip) dns.query.tcp(update, nameserverip)
def verify_dns_record(self, domain, txtvalue):
if self.nsupdate_verify and not self.nsupdate_verified:
_, nameserverip = self._determine_zone_and_nameserverip(domain)
if self._check_txt_record_value(domain, txtvalue, nameserverip, use_tcp=True):
print('Verified \'{} {} IN TXT "{}"\' on {}'.format(domain, self.dns_ttl, txtvalue, nameserverip))
self.nsupdate_verified = True
else:
# Master DNS verification failed. Return immediately and try again.
return False
return DNSChallengeHandler.verify_dns_record(self, domain, txtvalue)

View File

@ -11,12 +11,11 @@ try:
except ImportError: except ImportError:
from http.server import HTTPServer, BaseHTTPRequestHandler from http.server import HTTPServer, BaseHTTPRequestHandler
import datetime
import re import re
import socket import socket
import threading import threading
from acertmgr.modes.abstract import AbstractChallengeHandler from acertmgr.modes.webdir import HTTPChallengeHandler
HTTPServer.allow_reuse_address = True HTTPServer.allow_reuse_address = True
@ -25,9 +24,9 @@ class HTTPServer6(HTTPServer):
address_family = socket.AF_INET6 address_family = socket.AF_INET6
class ChallengeHandler(AbstractChallengeHandler): class ChallengeHandler(HTTPChallengeHandler):
def __init__(self, config): def __init__(self, config):
AbstractChallengeHandler.__init__(self, config) HTTPChallengeHandler.__init__(self, config)
bind_address = config.get("bind_address", "") bind_address = config.get("bind_address", "")
port = int(config.get("port", 80)) port = int(config.get("port", 80))
@ -60,24 +59,20 @@ class ChallengeHandler(AbstractChallengeHandler):
except socket.gaierror: except socket.gaierror:
self.server = HTTPServer((bind_address, port), _HTTPRequestHandler) self.server = HTTPServer((bind_address, port), _HTTPRequestHandler)
@staticmethod
def get_challenge_type():
return "http-01"
def create_challenge(self, domain, thumbprint, token): def create_challenge(self, domain, thumbprint, token):
self.challenges[token] = "{0}.{1}".format(token, thumbprint) self.challenges[token] = "{0}.{1}".format(token, thumbprint)
return datetime.datetime.now()
def destroy_challenge(self, domain, thumbprint, token): def destroy_challenge(self, domain, thumbprint, token):
del self.challenges[token] del self.challenges[token]
def start_challenge(self): def start_challenge(self, domain, thumbprint, token):
def _(): def _():
self.server.serve_forever() self.server.serve_forever()
self.server_thread = threading.Thread(target=_) self.server_thread = threading.Thread(target=_)
self.server_thread.start() self.server_thread.start()
HTTPChallengeHandler.start_challenge(self, domain, thumbprint, token)
def stop_challenge(self): def stop_challenge(self, domain, thumbprint, token):
self.server.shutdown() self.server.shutdown()
self.server_thread.join() self.server_thread.join()

View File

@ -5,44 +5,52 @@
# Copyright (c) Rudolf Mayerhofer, 2019. # Copyright (c) Rudolf Mayerhofer, 2019.
# available under the ISC license, see LICENSE # available under the ISC license, see LICENSE
import datetime
import os import os
from acertmgr import tools from acertmgr import tools
from acertmgr.modes.abstract import AbstractChallengeHandler from acertmgr.modes.abstract import AbstractChallengeHandler
class ChallengeHandler(AbstractChallengeHandler): class HTTPChallengeHandler(AbstractChallengeHandler):
def __init__(self, config):
AbstractChallengeHandler.__init__(self, config)
self._verify_challenge = str(config.get("webdir_verify", "true")).lower() == "true"
self.challenge_directory = config.get("webdir", "/var/www/acme-challenge/")
if not os.path.isdir(self.challenge_directory):
raise FileNotFoundError("Challenge directory (%s) does not exist!" % self.challenge_directory)
@staticmethod @staticmethod
def get_challenge_type(): def get_challenge_type():
return "http-01" return "http-01"
def __init__(self, config):
AbstractChallengeHandler.__init__(self, config)
self.http_verify = str(config.get("http_verify", "true")).lower() == "true"
def create_challenge(self, domain, thumbprint, token):
raise NotImplementedError
def destroy_challenge(self, domain, thumbprint, token):
raise NotImplementedError
def start_challenge(self, domain, thumbprint, token):
if self.http_verify:
keyauthorization = "{0}.{1}".format(token, thumbprint)
wellknown_url = "http://{0}/.well-known/acme-challenge/{1}".format(domain, token)
try:
resp = tools.get_url(wellknown_url)
resp_data = resp.read().decode('utf8').strip()
if resp_data != keyauthorization:
raise ValueError("keyauthorization and response data do NOT match")
except (IOError, ValueError):
raise ValueError("keyauthorization verification failed")
class ChallengeHandler(HTTPChallengeHandler):
def __init__(self, config):
HTTPChallengeHandler.__init__(self, config)
self.challenge_directory = config.get("webdir", "/var/www/acme-challenge/")
if not os.path.isdir(self.challenge_directory):
raise FileNotFoundError("Challenge directory (%s) does not exist!" % self.challenge_directory)
def create_challenge(self, domain, thumbprint, token): def create_challenge(self, domain, thumbprint, token):
keyauthorization = "{0}.{1}".format(token, thumbprint) keyauthorization = "{0}.{1}".format(token, thumbprint)
wellknown_path = os.path.join(self.challenge_directory, token) wellknown_path = os.path.join(self.challenge_directory, token)
with open(wellknown_path, "w") as wellknown_file: with open(wellknown_path, "w") as wellknown_file:
wellknown_file.write(keyauthorization) wellknown_file.write(keyauthorization)
# check that the file is in place
wellknown_url = "http://{0}/.well-known/acme-challenge/{1}".format(domain, token)
if self._verify_challenge:
try:
resp = tools.get_url(wellknown_url)
resp_data = resp.read().decode('utf8').strip()
if resp_data != keyauthorization:
raise ValueError("keyauthorization and response data do NOT match")
except (IOError, ValueError):
os.remove(wellknown_path)
raise ValueError("Wrote file to {0}, but couldn't download {1}".format(
wellknown_path, wellknown_url))
return datetime.datetime.now()
def destroy_challenge(self, domain, thumbprint, token): def destroy_challenge(self, domain, thumbprint, token):
os.remove(os.path.join(self.challenge_directory, token)) os.remove(os.path.join(self.challenge_directory, token))