mirror of
https://github.com/moepman/acertmgr.git
synced 2025-01-01 04:21:51 +01:00
dns.*: Use a static query timeout for any DNS queries using dnspython
This commit is contained in:
parent
1a4272f11a
commit
9b10f10efd
@ -20,6 +20,7 @@ from acertmgr import tools
|
|||||||
from acertmgr.modes.abstract import AbstractChallengeHandler
|
from acertmgr.modes.abstract import AbstractChallengeHandler
|
||||||
from acertmgr.tools import log
|
from acertmgr.tools import log
|
||||||
|
|
||||||
|
QUERY_TIMEOUT = 60 # seconds are the maximum for any query (otherwise the DNS server will be considered dead)
|
||||||
REGEX_IP4 = r'^(?:[0-9]{1,3}\.){3}[0-9]{1,3}$'
|
REGEX_IP4 = r'^(?:[0-9]{1,3}\.){3}[0-9]{1,3}$'
|
||||||
REGEX_IP6 = r'^(([0-9a-fA-F]{1,4}:){7,7}[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,7}' \
|
REGEX_IP6 = r'^(([0-9a-fA-F]{1,4}:){7,7}[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,7}' \
|
||||||
r':|([0-9a-fA-F]{1,4}:){1,6}:[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,5}' \
|
r':|([0-9a-fA-F]{1,4}:){1,6}:[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,5}' \
|
||||||
@ -66,7 +67,7 @@ class DNSChallengeHandler(AbstractChallengeHandler):
|
|||||||
nameserver = DNSChallengeHandler._lookup_ip(zonemaster)
|
nameserver = DNSChallengeHandler._lookup_ip(zonemaster)
|
||||||
|
|
||||||
request = dns.message.make_query(zone, dns.rdatatype.NS)
|
request = dns.message.make_query(zone, dns.rdatatype.NS)
|
||||||
response = dns.query.udp(request, nameserver)
|
response = dns.query.udp(request, nameserver, timeout=QUERY_TIMEOUT)
|
||||||
retval = set()
|
retval = set()
|
||||||
if response.rcode() == dns.rcode.NOERROR:
|
if response.rcode() == dns.rcode.NOERROR:
|
||||||
for answer in response.answer:
|
for answer in response.answer:
|
||||||
@ -95,7 +96,7 @@ class DNSChallengeHandler(AbstractChallengeHandler):
|
|||||||
request = dns.message.make_query(domain, dns.rdatatype.SOA)
|
request = dns.message.make_query(domain, dns.rdatatype.SOA)
|
||||||
for nameserver in nameservers:
|
for nameserver in nameservers:
|
||||||
try:
|
try:
|
||||||
response = dns.query.udp(request, nameserver)
|
response = dns.query.udp(request, nameserver, timeout=QUERY_TIMEOUT)
|
||||||
if response.rcode() == dns.rcode.NOERROR:
|
if response.rcode() == dns.rcode.NOERROR:
|
||||||
for answer in response.answer:
|
for answer in response.answer:
|
||||||
for item in answer:
|
for item in answer:
|
||||||
@ -121,9 +122,9 @@ class DNSChallengeHandler(AbstractChallengeHandler):
|
|||||||
try:
|
try:
|
||||||
request = dns.message.make_query(domain, dns.rdatatype.TXT)
|
request = dns.message.make_query(domain, dns.rdatatype.TXT)
|
||||||
if use_tcp:
|
if use_tcp:
|
||||||
response = dns.query.tcp(request, nameserverip)
|
response = dns.query.tcp(request, nameserverip, timeout=QUERY_TIMEOUT)
|
||||||
else:
|
else:
|
||||||
response = dns.query.udp(request, nameserverip)
|
response = dns.query.udp(request, nameserverip, timeout=QUERY_TIMEOUT)
|
||||||
for rrset in response.answer:
|
for rrset in response.answer:
|
||||||
for answer in rrset:
|
for answer in rrset:
|
||||||
if answer.to_text().strip('"') == txtvalue:
|
if answer.to_text().strip('"') == txtvalue:
|
||||||
|
@ -12,7 +12,7 @@ import dns.query
|
|||||||
import dns.tsigkeyring
|
import dns.tsigkeyring
|
||||||
import dns.update
|
import dns.update
|
||||||
|
|
||||||
from acertmgr.modes.dns.abstract import DNSChallengeHandler
|
from acertmgr.modes.dns.abstract import DNSChallengeHandler, QUERY_TIMEOUT
|
||||||
from acertmgr.tools import log
|
from acertmgr.tools import log
|
||||||
|
|
||||||
DEFAULT_KEY_ALGORITHM = "HMAC-MD5.SIG-ALG.REG.INT"
|
DEFAULT_KEY_ALGORITHM = "HMAC-MD5.SIG-ALG.REG.INT"
|
||||||
@ -72,14 +72,14 @@ class ChallengeHandler(DNSChallengeHandler):
|
|||||||
update = dns.update.Update(zone, keyring=self.keyring, keyalgorithm=self.keyalgorithm)
|
update = dns.update.Update(zone, keyring=self.keyring, keyalgorithm=self.keyalgorithm)
|
||||||
update.add(domain, self.dns_ttl, dns.rdatatype.TXT, txtvalue)
|
update.add(domain, self.dns_ttl, dns.rdatatype.TXT, txtvalue)
|
||||||
log('Adding \'{} {} IN TXT "{}"\' to {}'.format(domain, self.dns_ttl, txtvalue, nameserverip))
|
log('Adding \'{} {} IN TXT "{}"\' to {}'.format(domain, self.dns_ttl, txtvalue, nameserverip))
|
||||||
dns.query.tcp(update, nameserverip)
|
dns.query.tcp(update, nameserverip, timeout=QUERY_TIMEOUT)
|
||||||
|
|
||||||
def remove_dns_record(self, domain, txtvalue):
|
def remove_dns_record(self, domain, txtvalue):
|
||||||
zone, nameserverip = self._determine_zone_and_nameserverip(domain)
|
zone, nameserverip = self._determine_zone_and_nameserverip(domain)
|
||||||
update = dns.update.Update(zone, keyring=self.keyring, keyalgorithm=self.keyalgorithm)
|
update = dns.update.Update(zone, keyring=self.keyring, keyalgorithm=self.keyalgorithm)
|
||||||
update.delete(domain, dns.rdata.from_text(dns.rdataclass.IN, dns.rdatatype.TXT, txtvalue))
|
update.delete(domain, dns.rdata.from_text(dns.rdataclass.IN, dns.rdatatype.TXT, txtvalue))
|
||||||
log('Deleting \'{} {} IN TXT "{}"\' from {}'.format(domain, self.dns_ttl, txtvalue, nameserverip))
|
log('Deleting \'{} {} IN TXT "{}"\' from {}'.format(domain, self.dns_ttl, txtvalue, nameserverip))
|
||||||
dns.query.tcp(update, nameserverip)
|
dns.query.tcp(update, nameserverip, timeout=QUERY_TIMEOUT)
|
||||||
|
|
||||||
def verify_dns_record(self, domain, txtvalue):
|
def verify_dns_record(self, domain, txtvalue):
|
||||||
if self.nsupdate_verify and not self.dns_verify_all_ns and not self.nsupdate_verified:
|
if self.nsupdate_verify and not self.dns_verify_all_ns and not self.nsupdate_verified:
|
||||||
|
Loading…
Reference in New Issue
Block a user