#!/usr/bin/env python # -*- coding: utf-8 -*- # standalone - standalone ACME challenge webserver # Copyright (c) Markus Hauschild & David Klaftenegger, 2016. # Copyright (c) Rudolf Mayerhofer, 2019. # available under the ISC license, see LICENSE try: from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler except ImportError: from http.server import HTTPServer, BaseHTTPRequestHandler import re import socket import threading from acertmgr.modes.webdir import HTTPChallengeHandler from acertmgr.tools import log HTTPServer.allow_reuse_address = True class HTTPServer6(HTTPServer): address_family = socket.AF_INET6 class ChallengeHandler(HTTPChallengeHandler): def __init__(self, config): HTTPChallengeHandler.__init__(self, config) self.bind_address = config.get("bind_address", "") self.port = int(config.get("port", 80)) self.challenges = {} # Initialize the challenge data dict self.server_thread = None self.server = None def create_challenge(self, domain, thumbprint, token): self.challenges[token] = "{0}.{1}".format(token, thumbprint) def destroy_challenge(self, domain, thumbprint, token): del self.challenges[token] def start_challenge(self, domain, thumbprint, token): _self = self # Custom HTTP request handler class _HTTPRequestHandler(BaseHTTPRequestHandler): def log_message(self, fmt, *args): log("Request from '%s': %s" % (self.address_string(), fmt % args)) def do_GET(self): # Match token on http:///.well-known/acme-challenge/ match = re.match(r'.*/(?P[^/]*)$', self.path) if match and match.group('token') in _self.challenges: value = _self.challenges[match.group('token')].encode('utf-8') rcode = 200 else: value = "404 - NOT FOUND".encode('utf-8') rcode = 404 self.send_response(rcode) self.send_header('Content-type', 'text/plain') self.send_header('Content-length', len(value)) self.end_headers() self.wfile.write(value) try: self.server = HTTPServer6((self.bind_address, self.port), _HTTPRequestHandler) except socket.gaierror: self.server = HTTPServer((self.bind_address, self.port), _HTTPRequestHandler) def _serve(): self.server.serve_forever() self.server_thread = threading.Thread(target=_serve) self.server_thread.start() HTTPChallengeHandler.start_challenge(self, domain, thumbprint, token) def stop_challenge(self, domain, thumbprint, token): if self.server_thread.is_alive(): self.server.shutdown() self.server_thread.join() self.server.server_close() self.server = None self.server_thread = None