1
0
mirror of https://github.com/moepman/acertmgr.git synced 2024-12-29 10:31:49 +01:00

dns: Add additional TXT record verifications to reduce wait time

This may also be used to guarantee a correct TXT record lookup by setting
dns_verify_all_ns=true, a dns_verify_failtime < dns_verify_waittime and
a high enough value of dns_verify_failtime (like 300 seconds)
This commit is contained in:
Kishi85 2019-03-29 14:46:57 +01:00
parent 1aae651d98
commit 47e3312aad
3 changed files with 152 additions and 83 deletions

View File

@ -89,6 +89,8 @@ By default the directory (work_dir) containing the working data (csr,certificate
| dns_verify_interval | **d**,g | [dns.*] Do verification checks when starting the challenge every {dns_verify_interval} seconds | 10 |
| dns_verify_failtime | **d**,g | [dns.*] Fail challenge TXT record verification after {dns_verify_failtime} seconds | {dns_waittime} + 1 |
| dns_verify_waittime | **d**,g | [dns.*] Assume DNS challenges are valid after {dns_verify_waittime} | 2 * {dns_ttl} |
| dns_verify_all_ns | **d**,g | [dns.*] Verify DNS challenges by querying all known zone NS servers (resolved by zone master from SOA or dns_verify_server) | false |
| dns_verify_server | **d**,g | [dns.*] Verify DNS challenges by querying this DNS server unless 'dns_verify_all_ns' is enabled, then use to determine zone NS | |
| nsupdate_server | **d**,g | [dns.nsupdate] DNS Server to delegate the update to | {determine from zone SOA} |
| nsupdate_verify | **d**,g | [dns.*] Verify TXT record on the update server upon creation | true |
| nsupdate_keyfile | **d**,g | [dns.nsupdate] Bind-formatted TSIG key file to use for updates (may be used instead of nsupdate_key*) | |

View File

@ -4,6 +4,9 @@
# Copyright (c) Rudolf Mayerhofer, 2018-2019
# available under the ISC license, see LICENSE
import ipaddress
import re
import socket
import time
from datetime import datetime, timedelta
@ -16,8 +19,118 @@ import dns.update
from acertmgr import tools
from acertmgr.modes.abstract import AbstractChallengeHandler
REGEX_IP4 = r'^(?:[0-9]{1,3}\.){3}[0-9]{1,3}$'
REGEX_IP6 = r'^(([0-9a-fA-F]{1,4}:){7,7}[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,7}' \
r':|([0-9a-fA-F]{1,4}:){1,6}:[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,5}' \
r'(:[0-9a-fA-F]{1,4}){1,2}|([0-9a-fA-F]{1,4}:){1,4}(:[0-9a-fA-F]{1,4}){1,3}' \
r'|([0-9a-fA-F]{1,4}:){1,3}(:[0-9a-fA-F]{1,4}){1,4}|([0-9a-fA-F]{1,4}:){1,2}' \
r'(:[0-9a-fA-F]{1,4}){1,5}|[0-9a-fA-F]{1,4}:((:[0-9a-fA-F]{1,4}){1,6})' \
r'|:((:[0-9a-fA-F]{1,4}){1,7}|:)|fe80:(:[0-9a-fA-F]{0,4}){0,4}%[0-9a-zA-Z]{1,}' \
r'|::(ffff(:0{1,4}){0,1}:){0,1}((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}' \
r'(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])|([0-9a-fA-F]{1,4}:){1,4}' \
r':((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9]))$'
_lookup_ip_cache = {}
_lookup_ns_ip_cache = {}
_lookup_zone_cache = {}
class DNSChallengeHandler(AbstractChallengeHandler):
@staticmethod
def _lookup_ip(domain_or_ip):
if domain_or_ip in _lookup_ip_cache:
return _lookup_ip_cache[domain_or_ip]
try:
if re.search(REGEX_IP4, domain_or_ip.strip()) or re.search(REGEX_IP6, domain_or_ip.strip()):
return str(ipaddress.ip_address(domain_or_ip))
except ValueError:
pass
# No valid ip found so far, try to resolve using system resolver
result = socket.getaddrinfo(domain_or_ip, 53)
if len(result) > 0:
retval = result[0][4][0]
_lookup_ip_cache[domain_or_ip] = retval
return retval
else:
raise ValueError("Could not lookup dns ip for {}".format(domain_or_ip))
@staticmethod
def _lookup_ns_ip(domain, nameserver=None):
zone, zonemaster = DNSChallengeHandler._lookup_zone(domain, nameserver)
cache_key = "{}${}".format(zone, zonemaster)
if cache_key in _lookup_ns_ip_cache:
return _lookup_ns_ip_cache[cache_key]
if not nameserver:
nameserver = DNSChallengeHandler._lookup_ip(zonemaster)
request = dns.message.make_query(zone, dns.rdatatype.NS)
response = dns.query.udp(request, nameserver)
retval = set()
if response.rcode() == dns.rcode.NOERROR:
for answer in response.answer:
for item in answer:
if item.rdtype == dns.rdatatype.NS:
retval.add(DNSChallengeHandler._lookup_ip(item.to_text()))
_lookup_ns_ip_cache[cache_key] = retval
return retval
@staticmethod
def _lookup_zone(domain, nameserver=None):
cache_key = "{}${}".format(domain, nameserver)
if cache_key in _lookup_zone_cache:
return _lookup_zone_cache[cache_key]
if nameserver:
nameservers = [nameserver]
else:
nameservers = dns.resolver.get_default_resolver().nameservers
domain = dns.name.from_text(domain)
if not domain.is_absolute():
domain = domain.concatenate(dns.name.root)
while domain.parent() != dns.name.root:
request = dns.message.make_query(domain, dns.rdatatype.SOA)
for nameserver in nameservers:
try:
response = dns.query.udp(request, nameserver)
if response.rcode() == dns.rcode.NOERROR:
for answer in response.answer:
for item in answer:
if item.rdtype == dns.rdatatype.SOA:
zone = domain.to_text()
authoritative_ns = item.mname.to_text().split(' ')[0]
retval = zone, authoritative_ns
_lookup_zone_cache[cache_key] = retval
return retval
else:
break
except dns.exception.Timeout:
# Go to next nameserver on timeout
continue
except dns.exception.DNSException:
# Break loop on any other error
break
domain = domain.parent()
raise ValueError('No zone SOA for "{0}"'.format(domain))
@staticmethod
def _check_txt_record_value(domain, txtvalue, nameserverip, use_tcp=False):
try:
request = dns.message.make_query(domain, dns.rdatatype.TXT)
if use_tcp:
response = dns.query.tcp(request, nameserverip)
else:
response = dns.query.udp(request, nameserverip)
for rrset in response.answer:
for answer in rrset:
if answer.to_text().strip('"') == txtvalue:
return True
except dns.exception.DNSException:
# Ignore DNS errors and return failure
return False
@staticmethod
def _determine_txtvalue(thumbprint, token):
return tools.bytes_to_base64url(tools.hash_of_str("{0}.{1}".format(token, thumbprint)))
@ -33,6 +146,8 @@ class DNSChallengeHandler(AbstractChallengeHandler):
self.dns_verify_waittime = int(config.get("dns_verify_waittime", 2 * self.dns_ttl))
self.dns_verify_failtime = int(config.get("dns_verify_failtime", self.dns_verify_waittime + 1))
self.dns_verify_interval = int(config.get("dns_verify_interval", 10))
self.dns_verify_all_ns = str(config.get("dns_verify_all_ns")).lower() == "true"
self.dns_verify_server = config.get("dns_verify_server")
self._valid_times = {}
@ -80,6 +195,35 @@ class DNSChallengeHandler(AbstractChallengeHandler):
raise ValueError("DNS challenge is not ready after waiting {} seconds".format(self.dns_verify_waittime))
def verify_dns_record(self, domain, txtvalue):
if self.dns_verify_all_ns:
try:
nameserverip = None
if self.dns_verify_server:
# Use the specific dns server to determine NS for domain, will otherwise default to SOA master
nameserverip = self._lookup_ip(self.dns_verify_server)
ns_ip = self._lookup_ns_ip(domain, nameserverip)
if len(ns_ip) > 0 and all(self._check_txt_record_value(domain, txtvalue, ip) for ip in ns_ip):
# All NS servers have the necessary TXT record. Succeed immediately!
print("All NS ({}) for '{}' have the correct TXT record".format(','.join(ns_ip), domain))
return True
except (ValueError, dns.exception.DNSException):
# Fall back to next verification
pass
if self.dns_verify_server and not self.dns_verify_all_ns:
try:
# Verify using specific dns server
nameserverip = self._lookup_ip(self.dns_verify_server)
if self._check_txt_record_value(domain, txtvalue, nameserverip):
# Verify server confirms the necessary TXT record. Succeed immediately!
print("DNS server '{}' found correct TXT record for '{}'".format(self.dns_verify_server, domain))
return True
except (ValueError, dns.exception.DNSException):
# Fall back to next verification
pass
if domain not in self._valid_times:
# No valid wait time for domain. Verification fails!
return False
# Verification fails or succeeds based on valid wait time set by add_dns_record
return datetime.now() >= self._valid_times[domain]

View File

@ -5,29 +5,15 @@
# Copyright (c) Rudolf Mayerhofer, 2019
# available under the ISC license, see LICENSE
import io
import ipaddress
import re
import socket
import time
import dns
import dns.query
import dns.resolver
import dns.tsigkeyring
import dns.update
from acertmgr.modes.dns.abstract import DNSChallengeHandler
REGEX_IP4 = r'^(?:[0-9]{1,3}\.){3}[0-9]{1,3}$'
REGEX_IP6 = r'^(([0-9a-fA-F]{1,4}:){7,7}[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,7}' \
r':|([0-9a-fA-F]{1,4}:){1,6}:[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,5}' \
r'(:[0-9a-fA-F]{1,4}){1,2}|([0-9a-fA-F]{1,4}:){1,4}(:[0-9a-fA-F]{1,4}){1,3}' \
r'|([0-9a-fA-F]{1,4}:){1,3}(:[0-9a-fA-F]{1,4}){1,4}|([0-9a-fA-F]{1,4}:){1,2}' \
r'(:[0-9a-fA-F]{1,4}){1,5}|[0-9a-fA-F]{1,4}:((:[0-9a-fA-F]{1,4}){1,6})' \
r'|:((:[0-9a-fA-F]{1,4}){1,7}|:)|fe80:(:[0-9a-fA-F]{0,4}){0,4}%[0-9a-zA-Z]{1,}' \
r'|::(ffff(:0{1,4}){0,1}:){0,1}((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}' \
r'(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])|([0-9a-fA-F]{1,4}:){1,4}' \
r':((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9]))$'
DEFAULT_KEY_ALGORITHM = "HMAC-MD5.SIG-ALG.REG.INT"
@ -59,55 +45,6 @@ class ChallengeHandler(DNSChallengeHandler):
return keyring, algorithm
@staticmethod
def _lookup_dns_server(domain_or_ip):
try:
if re.search(REGEX_IP4, domain_or_ip.strip()) or re.search(REGEX_IP6, domain_or_ip.strip()):
return str(ipaddress.ip_address(domain_or_ip))
except ValueError:
pass
# No valid ip found so far, try to resolve
result = socket.getaddrinfo(domain_or_ip, 53)
if len(result) > 0:
return result[0][4][0]
else:
raise ValueError("Could not lookup dns ip for {}".format(domain_or_ip))
@staticmethod
def _get_soa(domain, nameserver=None):
if nameserver:
nameservers = [nameserver]
else:
nameservers = dns.resolver.get_default_resolver().nameservers
domain = dns.name.from_text(domain)
if not domain.is_absolute():
domain = domain.concatenate(dns.name.root)
while domain.parent() != dns.name.root:
request = dns.message.make_query(domain, dns.rdatatype.SOA)
for nameserver in nameservers:
try:
response = dns.query.udp(request, nameserver)
if response.rcode() == dns.rcode.NOERROR:
for answer in response.answer:
for item in answer:
if item.rdtype == dns.rdatatype.SOA:
zone = domain.to_text()
authoritative_ns = item.mname.to_text().split(' ')[0]
return zone, authoritative_ns
else:
break
except dns.exception.Timeout:
# Go to next nameserver on timeout
continue
except dns.exception.DNSException:
# Break loop on any other error
break
domain = domain.parent()
raise Exception('Could not find Zone SOA for "{0}"'.format(domain))
def __init__(self, config):
DNSChallengeHandler.__init__(self, config)
if 'nsupdate_keyfile' in config:
@ -125,28 +62,13 @@ class ChallengeHandler(DNSChallengeHandler):
def _determine_zone_and_nameserverip(self, domain):
nameserver = self.nsupdate_server
if nameserver:
nameserverip = self._lookup_dns_server(nameserver)
zone, _ = self._get_soa(domain, nameserverip)
nameserverip = self._lookup_ip(nameserver)
zone, _ = self._lookup_zone(domain, nameserverip)
else:
zone, nameserver = self._get_soa(domain)
nameserverip = self._lookup_dns_server(nameserver)
zone, nameserver = self._lookup_zone(domain)
nameserverip = self._lookup_ip(nameserver)
return zone, nameserverip
def _check_txt_record_value(self, domain, txtvalue, nameserverip, use_tcp=False):
try:
request = dns.message.make_query(domain, dns.rdatatype.TXT)
if use_tcp:
response = dns.query.tcp(request, nameserverip)
else:
response = dns.query.udp(request, nameserverip)
for rrset in response.answer:
for answer in rrset:
if answer.to_text().strip('"') == txtvalue:
return True
except dns.exception.DNSException:
# Ignore DNS errors and return failure
return False
def add_dns_record(self, domain, txtvalue):
zone, nameserverip = self._determine_zone_and_nameserverip(domain)
update = dns.update.Update(zone, keyring=self.keyring, keyalgorithm=self.keyalgorithm)
@ -162,7 +84,8 @@ class ChallengeHandler(DNSChallengeHandler):
dns.query.tcp(update, nameserverip)
def verify_dns_record(self, domain, txtvalue):
if self.nsupdate_verify and not self.nsupdate_verified:
if self.nsupdate_verify and not self.dns_verify_all_ns and not self.nsupdate_verified:
# Verify master DNS only if we don't do a full NS check and it has not yet been verified
_, nameserverip = self._determine_zone_and_nameserverip(domain)
if self._check_txt_record_value(domain, txtvalue, nameserverip, use_tcp=True):
print('Verified \'{} {} IN TXT "{}"\' on {}'.format(domain, self.dns_ttl, txtvalue, nameserverip))