From 22db50c62d1f0f92eebd7ce7edb0443118265165 Mon Sep 17 00:00:00 2001 From: David Klaftenegger Date: Mon, 4 Apr 2016 01:12:50 +0200 Subject: [PATCH] replace acme-tiny using a pyopenssl implementation of the same functionality instead --- README.md | 2 +- acertmgr.py | 3 +- acertmgr_ssl.py | 161 +++++++++++++++++++++++++++++++++++++++++++++++- 3 files changed, 162 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index c685261..82c561a 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,7 @@ Requirements * Python (2.7+ and 3.4+ should work) * python-dateutil * PyYAML - * acme\_tiny (`acme_tiny.py` in $PYTHONHOME or $PYTHONPATH or placed next to `acertmgr.py`) + * pyopenssl Initial Setup ------------- diff --git a/acertmgr.py b/acertmgr.py index 44417c5..579b993 100755 --- a/acertmgr.py +++ b/acertmgr.py @@ -5,7 +5,6 @@ # Copyright (c) Markus Hauschild, 2016. -import acme_tiny import acertmgr_ssl import acertmgr_web import datetime @@ -95,7 +94,7 @@ def cert_get(domain, settings): key = key_fd.read() key_fd.close() cr = acertmgr_ssl.cert_request(domains.split(), key) - crt = acme_tiny.get_crt(acc_file, csr_file, challenge_dir) + crt = acertmgr_ssl.get_crt_from_csr(acc_file, cr, domains.split(), challenge_dir) with open(crt_file, "w") as crt_fd: crt_fd.write(crt) diff --git a/acertmgr_ssl.py b/acertmgr_ssl.py index 1a6b825..c91b23b 100644 --- a/acertmgr_ssl.py +++ b/acertmgr_ssl.py @@ -6,6 +6,24 @@ # available under the ISC license, see LICENSE from OpenSSL import crypto +import base64 +import binascii +import copy +import datetime +import hashlib +import json +import subprocess +import textwrap +import time +import os +import re +try: + from urllib.request import urlopen # Python 3 +except ImportError: + from urllib2 import urlopen # Python 2 + +DEFAULT_CA = "https://acme-staging.api.letsencrypt.org" +#DEFAULT_CA = "https://acme-v01.api.letsencrypt.org" # @brief retrieve notBefore and notAfter dates of a certificate file # @param cert_file the path to the certificate @@ -32,5 +50,146 @@ def cert_request(names, key_data): key = crypto.load_privatekey(crypto.FILETYPE_PEM, key_data) req.set_pubkey(key) req.sign(key, 'sha256') - return crypto.dump_certificate_request(crypto.FILETYPE_PEM, req) + #return crypto.dump_certificate_request(crypto.FILETYPE_PEM, req) + return req + + +# @brief helper function base64 encode for jose spec +# @param b the string to encode +# @return the encoded string +def base64_enc(b): + return base64.urlsafe_b64encode(b).decode('utf8').replace("=", "") + +# @brief function to fetch certificate using ACME +# @param account_key_file the path to the account key +# @param csr the certificate signing request in pyopenssl format +# @param domains list of domains in the certificate, first is CN +# @param acme_dir directory for ACME challanges +# @param CA which signing CA to use +# @return the certificate in PEM format +# @note algorithm and parts of the code are from acme-tiny +def get_crt_from_csr(account_key_file, csr, domains, acme_dir, CA=DEFAULT_CA): + print("Reading account key...") + with open(account_key_file) as f: + account_key_data = f.read() + account_key = crypto.load_privatekey(crypto.FILETYPE_PEM, account_key_data) + proc = subprocess.Popen(['openssl', 'rsa', '-modulus', '-noout', '-text'], + stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + out, err = proc.communicate(account_key_data) + if proc.returncode != 0: + raise IOError("OpenSSL Error: {0}".format(err)) + pub_exp, pub_hex = re.search( + r"publicExponent: [0-9]+ \(0x([0-9A-F]+)\).+Modulus=([0-9A-F]+)", + out.decode('utf8'), re.DOTALL).groups() + pub_exp = "0{0}".format(pub_exp) if len(pub_exp) % 2 else pub_exp + header = { + "alg": "RS256", + "jwk": { + "e": base64_enc(binascii.unhexlify(pub_exp.encode("utf-8"))), + "kty": "RSA", + "n": base64_enc(binascii.unhexlify(pub_hex.encode("utf-8"))), + }, + } + accountkey_json = json.dumps(header['jwk'], sort_keys=True, separators=(',', ':')) + thumbprint = base64_enc(hashlib.sha256(accountkey_json.encode('utf8')).digest()) + + # helper function make signed requests + def _send_signed_request(url, payload): + payload64 = base64_enc(json.dumps(payload).encode('utf8')) + protected = copy.deepcopy(header) + protected["nonce"] = urlopen(CA + "/directory").headers['Replay-Nonce'] + protected64 = base64_enc(json.dumps(protected).encode('utf8')) + out = crypto.sign(account_key, '.'.join([protected64, payload64]), 'sha256') + data = json.dumps({ + "header": header, "protected": protected64, + "payload": payload64, "signature": base64_enc(out), + }) + try: + resp = urlopen(url, data.encode('utf8')) + return resp.getcode(), resp.read() + except IOError as e: + return getattr(e, "code", None), getattr(e, "read", e.__str__)() + + code, result = _send_signed_request(CA + "/acme/new-reg", { + "resource": "new-reg", + "agreement": "https://letsencrypt.org/documents/LE-SA-v1.0.1-July-27-2015.pdf", + }) + if code == 201: + print("Registered!") + elif code == 409: + print("Already registered!") + else: + raise ValueError("Error registering: {0} {1}".format(code, result)) + + # verify each domain + for domain in domains: + print("Verifying {0}...".format(domain)) + + # get new challenge + code, result = _send_signed_request(CA + "/acme/new-authz", { + "resource": "new-authz", + "identifier": {"type": "dns", "value": domain}, + }) + if code != 201: + raise ValueError("Error requesting challenges: {0} {1}".format(code, result)) + + # make the challenge file + challenge = [c for c in json.loads(result.decode('utf8'))['challenges'] if c['type'] == "http-01"][0] + token = re.sub(r"[^A-Za-z0-9_\-]", "_", challenge['token']) + keyauthorization = "{0}.{1}".format(token, thumbprint) + wellknown_path = os.path.join(acme_dir, token) + with open(wellknown_path, "w") as wellknown_file: + wellknown_file.write(keyauthorization) + + # check that the file is in place + wellknown_url = "http://{0}/.well-known/acme-challenge/{1}".format(domain, token) + try: + resp = urlopen(wellknown_url) + resp_data = resp.read().decode('utf8').strip() + assert resp_data == keyauthorization + except (IOError, AssertionError): + os.remove(wellknown_path) + raise ValueError("Wrote file to {0}, but couldn't download {1}".format( + wellknown_path, wellknown_url)) + + # notify challenge are met + code, result = _send_signed_request(challenge['uri'], { + "resource": "challenge", + "keyAuthorization": keyauthorization, + }) + if code != 202: + raise ValueError("Error triggering challenge: {0} {1}".format(code, result)) + + # wait for challenge to be verified + while True: + try: + resp = urlopen(challenge['uri']) + challenge_status = json.loads(resp.read().decode('utf8')) + except IOError as e: + raise ValueError("Error checking challenge: {0} {1}".format( + e.code, json.loads(e.read().decode('utf8')))) + if challenge_status['status'] == "pending": + time.sleep(2) + elif challenge_status['status'] == "valid": + print("{0} verified!".format(domain)) + os.remove(wellknown_path) + break + else: + raise ValueError("{0} challenge did not pass: {1}".format( + domain, challenge_status)) + + # get the new certificate + print("Signing certificate...") + csr_der = crypto.dump_certificate_request(crypto.FILETYPE_ASN1, csr) + code, result = _send_signed_request(CA + "/acme/new-cert", { + "resource": "new-cert", + "csr": base64_enc(csr_der), + }) + if code != 201: + raise ValueError("Error signing certificate: {0} {1}".format(code, result)) + + # return signed certificate! + print("Certificate signed!") + cert = crypto.load_certificate(crypto.FILETYPE_ASN1, result) + return crypto.dump_certificate(crypto.FILETYPE_PEM, cert)