diff --git a/acertmgr.py b/acertmgr.py index 503fc28..98395a0 100755 --- a/acertmgr.py +++ b/acertmgr.py @@ -98,9 +98,12 @@ def cert_get(domains, settings): key = key_fd.read() key_fd.close() cr = acertmgr_ssl.cert_request(domains.split(), key) - crt = acertmgr_ssl.get_crt_from_csr(acc_file, cr, domains.split(), challenge_dir, settings['authority']) + print("Reading account key...") + acc_key = acertmgr_ssl.read_key(acc_file) + acertmgr_ssl.register_account(acc_key, settings['authority']) + crt = acertmgr_ssl.get_crt_from_csr(acc_key, cr, domains.split(), challenge_dir, settings['authority']) with open(crt_file, "w") as crt_fd: - crt_fd.write(crt) + crt_fd.write(acertmgr_ssl.cert_to_pem(crt)) # if resulting certificate is valid: store in final location if cert_isValid(crt_file, 60): diff --git a/acertmgr_ssl.py b/acertmgr_ssl.py index 5ff834f..0969862 100644 --- a/acertmgr_ssl.py +++ b/acertmgr_ssl.py @@ -50,32 +50,30 @@ def cert_request(names, key_data): #return crypto.dump_certificate_request(crypto.FILETYPE_PEM, req) return req - -# @brief helper function base64 encode for jose spec -# @param b the string to encode -# @return the encoded string -def base64_enc(b): - return base64.urlsafe_b64encode(b).decode('utf8').replace("=", "") - -# @brief function to fetch certificate using ACME -# @param account_key_file the path to the account key -# @param csr the certificate signing request in pyopenssl format -# @param domains list of domains in the certificate, first is CN -# @param acme_dir directory for ACME challanges -# @param CA which signing CA to use +# @brief convert certificate to PEM format +# @param cert certificate object in pyopenssl format # @return the certificate in PEM format -# @note algorithm and parts of the code are from acme-tiny -def get_crt_from_csr(account_key_file, csr, domains, acme_dir, CA): - print("Reading account key...") - with open(account_key_file) as f: - account_key_data = f.read() - account_key = crypto.load_privatekey(crypto.FILETYPE_PEM, account_key_data) +def cert_to_pem(cert): + return crypto.dump_certificate(crypto.FILETYPE_PEM, cert).decode('utf8') + +# @brief read a key from file +# @param path path to key file +# @return the key in pyopenssl format +def read_key(path): + with open(path) as f: + key_data = f.read() + return crypto.load_privatekey(crypto.FILETYPE_PEM, key_data) + +# @brief create the header information for ACME communication +# @param key the account key +# @return the header for ACME +def acme_header(key): proc = subprocess.Popen(['openssl', 'rsa', '-modulus', '-noout', '-text'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - out, err = proc.communicate(account_key_data.encode('utf8')) + out, err = proc.communicate(crypto.dump_privatekey(crypto.FILETYPE_PEM, key)) if proc.returncode != 0: raise IOError("OpenSSL Error: {0}".format(err)) - pub_exp, pub_hex = re.search( + pub_exp, pub_mod = re.search( r"publicExponent: [0-9]+ \(0x([0-9A-F]+)\).+Modulus=([0-9A-F]+)", out.decode('utf8'), re.DOTALL).groups() pub_exp = "0{0}".format(pub_exp) if len(pub_exp) % 2 else pub_exp @@ -84,46 +82,78 @@ def get_crt_from_csr(account_key_file, csr, domains, acme_dir, CA): "jwk": { "e": base64_enc(binascii.unhexlify(pub_exp.encode("utf-8"))), "kty": "RSA", - "n": base64_enc(binascii.unhexlify(pub_hex.encode("utf-8"))), + "n": base64_enc(binascii.unhexlify(pub_mod.encode("utf-8"))), }, } - accountkey_json = json.dumps(header['jwk'], sort_keys=True, separators=(',', ':')) - thumbprint = base64_enc(hashlib.sha256(accountkey_json.encode('utf8')).digest()) + return header - # helper function make signed requests - def _send_signed_request(url, payload): - payload64 = base64_enc(json.dumps(payload).encode('utf8')) - protected = copy.deepcopy(header) - protected["nonce"] = urlopen(CA + "/directory").headers['Replay-Nonce'] - protected64 = base64_enc(json.dumps(protected).encode('utf8')) - out = crypto.sign(account_key, '.'.join([protected64, payload64]), 'sha256') - data = json.dumps({ - "header": header, "protected": protected64, - "payload": payload64, "signature": base64_enc(out), - }) - try: - resp = urlopen(url, data.encode('utf8')) - return resp.getcode(), resp.read() - except IOError as e: - return getattr(e, "code", None), getattr(e, "read", e.__str__)() - - code, result = _send_signed_request(CA + "/acme/new-reg", { +# @brief register an account over ACME +# @param account_key the account key to register +# @param CA the certificate authority to register with +# @return True if new account was registered, False otherwise +def register_account(account_key, CA): + header = acme_header(account_key) + code, result = send_signed(account_key, CA, CA + "/acme/new-reg", header, { "resource": "new-reg", "agreement": "https://letsencrypt.org/documents/LE-SA-v1.0.1-July-27-2015.pdf", }) if code == 201: print("Registered!") + return True elif code == 409: print("Already registered!") + return False else: raise ValueError("Error registering: {0} {1}".format(code, result)) +# @brief helper function to base64 encode for JSON objects +# @param b the string to encode +# @return the encoded string +def base64_enc(b): + return base64.urlsafe_b64encode(b).decode('utf8').replace("=", "") + + +# @brief helper function to make signed requests +# @param CA the certificate authority +# @param url the request URL +# @param header the message header +# @param payload the message +# @return tuple of return code and request answer +def send_signed(account_key, CA, url, header, payload): + payload64 = base64_enc(json.dumps(payload).encode('utf8')) + protected = copy.deepcopy(header) + protected["nonce"] = urlopen(CA + "/directory").headers['Replay-Nonce'] + protected64 = base64_enc(json.dumps(protected).encode('utf8')) + out = crypto.sign(account_key, '.'.join([protected64, payload64]), 'sha256') + data = json.dumps({ + "header": header, "protected": protected64, + "payload": payload64, "signature": base64_enc(out), + }) + try: + resp = urlopen(url, data.encode('utf8')) + return resp.getcode(), resp.read() + except IOError as e: + return getattr(e, "code", None), getattr(e, "read", e.__str__)() + +# @brief function to fetch certificate using ACME +# @param account_key the account key in pyopenssl format +# @param csr the certificate signing request in pyopenssl format +# @param domains list of domains in the certificate, first is CN +# @param acme_dir directory for ACME challanges +# @param CA which signing CA to use +# @return the certificate in pyopenssl format +# @note algorithm and parts of the code are from acme-tiny +def get_crt_from_csr(account_key, csr, domains, acme_dir, CA): + header = acme_header(account_key) + accountkey_json = json.dumps(header['jwk'], sort_keys=True, separators=(',', ':')) + account_thumbprint = base64_enc(hashlib.sha256(accountkey_json.encode('utf8')).digest()) + # verify each domain for domain in domains: print("Verifying {0}...".format(domain)) # get new challenge - code, result = _send_signed_request(CA + "/acme/new-authz", { + code, result = send_signed(account_key, CA, CA + "/acme/new-authz", header, { "resource": "new-authz", "identifier": {"type": "dns", "value": domain}, }) @@ -133,7 +163,7 @@ def get_crt_from_csr(account_key_file, csr, domains, acme_dir, CA): # make the challenge file challenge = [c for c in json.loads(result.decode('utf8'))['challenges'] if c['type'] == "http-01"][0] token = re.sub(r"[^A-Za-z0-9_\-]", "_", challenge['token']) - keyauthorization = "{0}.{1}".format(token, thumbprint) + keyauthorization = "{0}.{1}".format(token, account_thumbprint) wellknown_path = os.path.join(acme_dir, token) with open(wellknown_path, "w") as wellknown_file: wellknown_file.write(keyauthorization) @@ -150,7 +180,7 @@ def get_crt_from_csr(account_key_file, csr, domains, acme_dir, CA): wellknown_path, wellknown_url)) # notify challenge are met - code, result = _send_signed_request(challenge['uri'], { + code, result = send_signed(account_key, CA, challenge['uri'], header, { "resource": "challenge", "keyAuthorization": keyauthorization, }) @@ -178,7 +208,7 @@ def get_crt_from_csr(account_key_file, csr, domains, acme_dir, CA): # get the new certificate print("Signing certificate...") csr_der = crypto.dump_certificate_request(crypto.FILETYPE_ASN1, csr) - code, result = _send_signed_request(CA + "/acme/new-cert", { + code, result = send_signed(account_key, CA, CA + "/acme/new-cert", header, { "resource": "new-cert", "csr": base64_enc(csr_der), }) @@ -188,5 +218,5 @@ def get_crt_from_csr(account_key_file, csr, domains, acme_dir, CA): # return signed certificate! print("Certificate signed!") cert = crypto.load_certificate(crypto.FILETYPE_ASN1, result) - return crypto.dump_certificate(crypto.FILETYPE_PEM, cert).decode('utf8') + return cert