From 4766102874b2df3a522bff4f3128f34d7f1b2350 Mon Sep 17 00:00:00 2001 From: David Klaftenegger Date: Sat, 5 Aug 2017 22:42:43 +0200 Subject: [PATCH] Switch from pyopenssl to cryptography The cryptography module is already a dependency of pyopenssl. This patch thus just drops the dependency on pyopenssl and somewhat simplifies the code. --- acertmgr_ssl.py | 75 ++++++++++++++++++++++++++----------------------- 1 file changed, 40 insertions(+), 35 deletions(-) diff --git a/acertmgr_ssl.py b/acertmgr_ssl.py index 37bc1c6..ad2a379 100644 --- a/acertmgr_ssl.py +++ b/acertmgr_ssl.py @@ -5,15 +5,15 @@ # Copyright (c) Markus Hauschild & David Klaftenegger, 2016. # available under the ISC license, see LICENSE -from OpenSSL import crypto +from cryptography import x509 +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives import hashes, serialization +from cryptography.hazmat.primitives.asymmetric import padding +from cryptography.x509.oid import NameOID import base64 import binascii import copy -import datetime -import hashlib import json -import subprocess -import textwrap import time import os import re @@ -28,56 +28,56 @@ except ImportError: def cert_valid_times(cert_file): with open(cert_file, 'r') as f: cert_data = f.read() - cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert_data) - asn1time = str('%Y%m%d%H%M%SZ'.encode('utf8')) - not_before = datetime.datetime.strptime(str(cert.get_notBefore()), asn1time) - not_after = datetime.datetime.strptime(str(cert.get_notAfter()), asn1time) - return (not_before, not_after) + cert = x509.load_pem_x509_certificate(cert_data, default_backend()) + return (cert.not_valid_before, cert.not_valid_after) # @brief create a certificate signing request # @param names list of domain names the certificate should be valid for -# @param key the key to use with the certificate in pyopenssl format -# @return the CSR in pyopenssl format +# @param key the key to use with the certificate in cryptography format +# @return the CSR in cryptography format def cert_request(names, key): - req = crypto.X509Req() - req.get_subject().commonName = names[0] - entries = ['DNS:'+name for name in names] - extensions = [crypto.X509Extension('subjectAltName'.encode('utf8'), False, ', '.join(entries).encode('utf8'))] - req.add_extensions(extensions) - req.set_pubkey(key) - req.sign(key, 'sha256') + primary_name = x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, names[0].decode('utf8'))]) + all_names = x509.SubjectAlternativeName([x509.DNSName(name.decode('utf8')) for name in names]) + req = x509.CertificateSigningRequestBuilder() + req = req.subject_name(primary_name) + req = req.add_extension(all_names, critical=False) + req = req.sign(key, hashes.SHA256(), default_backend()) return req # @brief convert certificate to PEM format -# @param cert certificate object in pyopenssl format +# @param cert certificate object in cryptography format # @return the certificate in PEM format def cert_to_pem(cert): - return crypto.dump_certificate(crypto.FILETYPE_PEM, cert).decode('utf8') + return cert.public_bytes(serialization.Encoding.PEM).decode('utf8') # @brief read a key from file # @param path path to key file -# @return the key in pyopenssl format +# @return the key in cryptography format def read_key(path): with open(path, 'r') as f: key_data = f.read() - return crypto.load_privatekey(crypto.FILETYPE_PEM, key_data) + return serialization.load_pem_private_key(key_data, None, default_backend()) + +# @brief convert numbers to byte-string +# @param num number to convert +# @return byte-string containing the number +# @todo better code welcome +def byte_string_format(num): + n = format(num, 'x') + n = "0{0}".format(n) if len(n) % 2 else n + return binascii.unhexlify(n) # @brief create the header information for ACME communication # @param key the account key # @return the header for ACME def acme_header(key): - txt = crypto.dump_privatekey(crypto.FILETYPE_TEXT, key) - pub_mod, pub_exp = re.search( - r"modulus:\n\s+00:([0-9a-f:\s]+)\npublicExponent: [0-9]+ \(0x([0-9A-F]+)\)", - txt.decode('utf8'), re.DOTALL).groups() - pub_mod = re.sub('[:\s]', '', pub_mod) - pub_exp = "0{0}".format(pub_exp) if len(pub_exp) % 2 else pub_exp + numbers = key.public_key().public_numbers() header = { "alg": "RS256", "jwk": { - "e": base64_enc(binascii.unhexlify(pub_exp.encode("utf-8"))), + "e": base64_enc(byte_string_format(numbers.e)), "kty": "RSA", - "n": base64_enc(binascii.unhexlify(pub_mod.encode("utf-8"))), + "n": base64_enc(byte_string_format(numbers.n)), }, } return header @@ -119,7 +119,10 @@ def send_signed(account_key, CA, url, header, payload): protected = copy.deepcopy(header) protected["nonce"] = urlopen(CA + "/directory").headers['Replay-Nonce'] protected64 = base64_enc(json.dumps(protected).encode('utf8')) - out = crypto.sign(account_key, '.'.join([protected64, payload64]), 'sha256') + # @todo check why this padding is not working + #pad = padding.PSS(mgf=padding.MGF1(hashes.SHA256()), salt_length=padding.PSS.MAX_LENGTH) + pad = padding.PKCS1v15() + out = account_key.sign('.'.join([protected64, payload64]).encode('utf8'), pad, hashes.SHA256()) data = json.dumps({ "header": header, "protected": protected64, "payload": payload64, "signature": base64_enc(out), @@ -141,7 +144,9 @@ def send_signed(account_key, CA, url, header, payload): def get_crt_from_csr(account_key, csr, domains, acme_dir, CA): header = acme_header(account_key) accountkey_json = json.dumps(header['jwk'], sort_keys=True, separators=(',', ':')) - account_thumbprint = base64_enc(hashlib.sha256(accountkey_json.encode('utf8')).digest()) + account_hash = hashes.Hash(hashes.SHA256(), backend=default_backend()) + account_hash.update(accountkey_json.encode('utf8')) + account_thumbprint = base64_enc(account_hash.finalize()) # verify each domain for domain in domains: @@ -202,7 +207,7 @@ def get_crt_from_csr(account_key, csr, domains, acme_dir, CA): # get the new certificate print("Signing certificate...") - csr_der = crypto.dump_certificate_request(crypto.FILETYPE_ASN1, csr) + csr_der = csr.public_bytes(serialization.Encoding.DER) code, result = send_signed(account_key, CA, CA + "/acme/new-cert", header, { "resource": "new-cert", "csr": base64_enc(csr_der), @@ -212,6 +217,6 @@ def get_crt_from_csr(account_key, csr, domains, acme_dir, CA): # return signed certificate! print("Certificate signed!") - cert = crypto.load_certificate(crypto.FILETYPE_ASN1, result) + cert = x509.load_der_x509_certificate(result, default_backend()) return cert