1
0
mirror of https://github.com/moepman/acertmgr.git synced 2025-01-01 03:21:49 +01:00

tools: cleanup function names and add crypto wrappers

This commit is contained in:
Kishi85 2019-03-23 09:46:36 +01:00
parent 710c42c805
commit fd2134753a
5 changed files with 80 additions and 70 deletions

View File

@ -37,8 +37,8 @@ def create_authority(settings):
acc_file = settings['account_key'] acc_file = settings['account_key']
if not os.path.isfile(acc_file): if not os.path.isfile(acc_file):
print("Account key not found at '{0}'. Creating RSA key.".format(acc_file)) print("Account key not found at '{0}'. Creating RSA key.".format(acc_file))
tools.new_rsa_key(acc_file) tools.new_account_key(acc_file)
acc_key = tools.read_key(acc_file) acc_key = tools.read_pem_key(acc_file)
authority_module = importlib.import_module("acertmgr.authority.{0}".format(settings["api"])) authority_module = importlib.import_module("acertmgr.authority.{0}".format(settings["api"]))
authority_class = getattr(authority_module, "ACMEAuthority") authority_class = getattr(authority_module, "ACMEAuthority")
@ -67,7 +67,7 @@ def cert_get(settings):
key_length = settings['key_length'] key_length = settings['key_length']
if not os.path.isfile(key_file): if not os.path.isfile(key_file):
print("SSL key not found at '{0}'. Creating {1} bit RSA key.".format(key_file, key_length)) print("SSL key not found at '{0}'. Creating {1} bit RSA key.".format(key_file, key_length))
tools.new_rsa_key(key_file, key_length) tools.new_ssl_key(key_file, key_length)
acme = create_authority(settings) acme = create_authority(settings)
@ -82,13 +82,13 @@ def cert_get(settings):
challenge_handlers[domain] = create_challenge_handler(settings['handlers'][domain]) challenge_handlers[domain] = create_challenge_handler(settings['handlers'][domain])
try: try:
key = tools.read_key(key_file) key = tools.read_pem_key(key_file)
cr = tools.new_cert_request(settings['domainlist'], key) cr = tools.new_cert_request(settings['domainlist'], key)
print("Reading account key...") print("Reading account key...")
acme.register_account() acme.register_account()
crt, ca = acme.get_crt_from_csr(cr, settings['domainlist'], challenge_handlers) crt, ca = acme.get_crt_from_csr(cr, settings['domainlist'], challenge_handlers)
with io.open(crt_file, "w") as crt_fd: with io.open(crt_file, "w") as crt_fd:
crt_fd.write(tools.convert_cert_to_pem(crt)) crt_fd.write(tools.convert_cert_to_pem_str(crt))
# if resulting certificate is valid: store in final location # if resulting certificate is valid: store in final location
if tools.is_cert_valid(crt_file, 60): if tools.is_cert_valid(crt_file, 60):
@ -97,7 +97,7 @@ def cert_get(settings):
os.chmod(crt_final, stat.S_IREAD) os.chmod(crt_final, stat.S_IREAD)
if "static_ca" in settings and not settings['static_ca'] and ca is not None: if "static_ca" in settings and not settings['static_ca'] and ca is not None:
with io.open(settings['ca_file'], "w") as ca_fd: with io.open(settings['ca_file'], "w") as ca_fd:
ca_fd.write(tools.convert_cert_to_pem(ca)) ca_fd.write(tools.convert_cert_to_pem_str(ca))
finally: finally:
os.remove(csr_file) os.remove(csr_file)
os.remove(crt_file) os.remove(crt_file)

View File

@ -12,11 +12,6 @@ import json
import re import re
import time import time
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import padding
from acertmgr import tools from acertmgr import tools
from acertmgr.authority.acme import ACMEAuthority as AbstractACMEAuthority from acertmgr.authority.acme import ACMEAuthority as AbstractACMEAuthority
@ -38,9 +33,9 @@ class ACMEAuthority(AbstractACMEAuthority):
header = { header = {
"alg": "RS256", "alg": "RS256",
"jwk": { "jwk": {
"e": tools.to_json_base64(tools.byte_string_format(numbers.e)), "e": tools.bytes_to_base64url(tools.number_to_byte_format(numbers.e)),
"kty": "RSA", "kty": "RSA",
"n": tools.to_json_base64(tools.byte_string_format(numbers.n)), "n": tools.bytes_to_base64url(tools.number_to_byte_format(numbers.n)),
}, },
} }
return header return header
@ -51,17 +46,14 @@ class ACMEAuthority(AbstractACMEAuthority):
# @param payload the message # @param payload the message
# @return tuple of return code and request answer # @return tuple of return code and request answer
def _send_signed(self, url, header, payload): def _send_signed(self, url, header, payload):
payload64 = tools.to_json_base64(json.dumps(payload).encode('utf8')) payload64 = tools.bytes_to_base64url(json.dumps(payload).encode('utf8'))
protected = copy.deepcopy(header) protected = copy.deepcopy(header)
protected["nonce"] = tools.get_url(self.ca + "/directory").headers['Replay-Nonce'] protected["nonce"] = tools.get_url(self.ca + "/directory").headers['Replay-Nonce']
protected64 = tools.to_json_base64(json.dumps(protected).encode('utf8')) protected64 = tools.bytes_to_base64url(json.dumps(protected).encode('utf8'))
# @todo check why this padding is not working out = tools.signature_of_str(self.key, '.'.join([protected64, payload64]))
# pad = padding.PSS(mgf=padding.MGF1(hashes.SHA256()), salt_length=padding.PSS.MAX_LENGTH)
pad = padding.PKCS1v15()
out = self.key.sign('.'.join([protected64, payload64]).encode('utf8'), pad, hashes.SHA256())
data = json.dumps({ data = json.dumps({
"header": header, "protected": protected64, "header": header, "protected": protected64,
"payload": payload64, "signature": tools.to_json_base64(out), "payload": payload64, "signature": tools.bytes_to_base64url(out),
}) })
try: try:
resp = tools.get_url(url, data.encode('utf8')) resp = tools.get_url(url, data.encode('utf8'))
@ -94,10 +86,8 @@ class ACMEAuthority(AbstractACMEAuthority):
# @note algorithm and parts of the code are from acme-tiny # @note algorithm and parts of the code are from acme-tiny
def get_crt_from_csr(self, csr, domains, challenge_handlers): def get_crt_from_csr(self, csr, domains, challenge_handlers):
header = self._prepare_header() header = self._prepare_header()
accountkey_json = json.dumps(header['jwk'], sort_keys=True, separators=(',', ':')) account_thumbprint = tools.bytes_to_base64url(
account_hash = hashes.Hash(hashes.SHA256(), backend=default_backend()) tools.hash_of_str(json.dumps(header['jwk'], sort_keys=True, separators=(',', ':'))))
account_hash.update(accountkey_json.encode('utf8'))
account_thumbprint = tools.to_json_base64(account_hash.finalize())
challenges = dict() challenges = dict()
tokens = dict() tokens = dict()
@ -173,15 +163,14 @@ class ACMEAuthority(AbstractACMEAuthority):
# get the new certificate # get the new certificate
print("Signing certificate...") print("Signing certificate...")
csr_der = csr.public_bytes(serialization.Encoding.DER)
code, result = self._send_signed(self.ca + "/acme/new-cert", header, { code, result = self._send_signed(self.ca + "/acme/new-cert", header, {
"resource": "new-cert", "resource": "new-cert",
"csr": tools.to_json_base64(csr_der), "csr": tools.bytes_to_base64url(tools.convert_csr_to_der_bytes(csr)),
}) })
if code != 201: if code != 201:
raise ValueError("Error signing certificate: {0} {1}".format(code, result)) raise ValueError("Error signing certificate: {0} {1}".format(code, result))
# return signed certificate! # return signed certificate!
print("Certificate signed!") print("Certificate signed!")
cert = x509.load_der_x509_certificate(result, default_backend()) cert = tools.convert_der_bytes_to_cert(result)
return cert, tools.download_issuer_ca(cert) return cert, tools.download_issuer_ca(cert)

View File

@ -11,11 +11,6 @@ import json
import re import re
import time import time
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import padding
from acertmgr import tools from acertmgr import tools
from acertmgr.authority.acme import ACMEAuthority as AbstractACMEAuthority from acertmgr.authority.acme import ACMEAuthority as AbstractACMEAuthority
@ -56,8 +51,8 @@ class ACMEAuthority(AbstractACMEAuthority):
"alg": self.algorithm, "alg": self.algorithm,
"jwk": { "jwk": {
"kty": "RSA", "kty": "RSA",
"e": tools.to_json_base64(tools.byte_string_format(numbers.e)), "e": tools.bytes_to_base64url(tools.number_to_byte_format(numbers.e)),
"n": tools.to_json_base64(tools.byte_string_format(numbers.n)), "n": tools.bytes_to_base64url(tools.number_to_byte_format(numbers.n)),
}, },
} }
self.account_id = None # will be updated to correct value during account registration self.account_id = None # will be updated to correct value during account registration
@ -92,7 +87,7 @@ class ACMEAuthority(AbstractACMEAuthority):
payload = {} payload = {}
if not protected: if not protected:
protected = {} protected = {}
payload64 = tools.to_json_base64(json.dumps(payload).encode('utf8')) payload64 = tools.bytes_to_base64url(json.dumps(payload).encode('utf8'))
# Request a new nonce if there is none in cache # Request a new nonce if there is none in cache
if not self.nonce: if not self.nonce:
@ -104,13 +99,12 @@ class ACMEAuthority(AbstractACMEAuthority):
protected["alg"] = self.algorithm protected["alg"] = self.algorithm
if self.account_id: if self.account_id:
protected["kid"] = self.account_id protected["kid"] = self.account_id
protected64 = tools.to_json_base64(json.dumps(protected).encode('utf8')) protected64 = tools.bytes_to_base64url(json.dumps(protected).encode('utf8'))
pad = padding.PKCS1v15() out = tools.signature_of_str(self.key, '.'.join([protected64, payload64]))
out = self.key.sign('.'.join([protected64, payload64]).encode('utf8'), pad, hashes.SHA256())
data = json.dumps({ data = json.dumps({
"protected": protected64, "protected": protected64,
"payload": payload64, "payload": payload64,
"signature": tools.to_json_base64(out), "signature": tools.bytes_to_base64url(out),
}) })
try: try:
return self._request_url(url, data, raw_result) return self._request_url(url, data, raw_result)
@ -153,10 +147,8 @@ class ACMEAuthority(AbstractACMEAuthority):
# @return the certificate and corresponding ca as a tuple # @return the certificate and corresponding ca as a tuple
# @note algorithm and parts of the code are from acme-tiny # @note algorithm and parts of the code are from acme-tiny
def get_crt_from_csr(self, csr, domains, challenge_handlers): def get_crt_from_csr(self, csr, domains, challenge_handlers):
accountkey_json = json.dumps(self.account_protected['jwk'], sort_keys=True, separators=(',', ':')) account_thumbprint = tools.bytes_to_base64url(
account_hash = hashes.Hash(hashes.SHA256(), backend=default_backend()) tools.hash_of_str(json.dumps(self.account_protected['jwk'], sort_keys=True, separators=(',', ':'))))
account_hash.update(accountkey_json.encode('utf8'))
account_thumbprint = tools.to_json_base64(account_hash.finalize())
print("Ordering certificate for {}".format(domains)) print("Ordering certificate for {}".format(domains))
identifiers = [{'type': 'dns', 'value': domain} for domain in domains] identifiers = [{'type': 'dns', 'value': domain} for domain in domains]
@ -242,9 +234,8 @@ class ACMEAuthority(AbstractACMEAuthority):
# get the new certificate # get the new certificate
print("Finalizing certificate") print("Finalizing certificate")
csr_der = csr.public_bytes(serialization.Encoding.DER)
code, finalize, _ = self._request_acme_url(order['finalize'], { code, finalize, _ = self._request_acme_url(order['finalize'], {
"csr": tools.to_json_base64(csr_der), "csr": tools.bytes_to_base64url(tools.convert_csr_to_der_bytes(csr)),
}) })
while code < 400 and (finalize.get('status') == 'pending' or finalize.get('status') == 'processing'): while code < 400 and (finalize.get('status') == 'pending' or finalize.get('status') == 'processing'):
time.sleep(5) time.sleep(5)
@ -261,10 +252,10 @@ class ACMEAuthority(AbstractACMEAuthority):
cert_dict = re.match((r'(?P<cert>-----BEGIN CERTIFICATE-----[^\-]+-----END CERTIFICATE-----)\n\n' cert_dict = re.match((r'(?P<cert>-----BEGIN CERTIFICATE-----[^\-]+-----END CERTIFICATE-----)\n\n'
r'(?P<ca>-----BEGIN CERTIFICATE-----[^\-]+-----END CERTIFICATE-----)?'), r'(?P<ca>-----BEGIN CERTIFICATE-----[^\-]+-----END CERTIFICATE-----)?'),
certificate.decode('utf-8'), re.DOTALL).groupdict() certificate.decode('utf-8'), re.DOTALL).groupdict()
cert = x509.load_pem_x509_certificate(cert_dict['cert'].encode('utf-8'), default_backend()) cert = tools.convert_pem_str_to_cert(cert_dict['cert'])
if cert_dict['ca'] is None: if cert_dict['ca'] is None:
ca = tools.download_issuer_ca(cert) ca = tools.download_issuer_ca(cert)
else: else:
ca = x509.load_pem_x509_certificate(cert_dict['ca'].encode('utf-8'), default_backend()) ca = tools.convert_pem_str_to_cert(cert_dict['ca'])
return cert, ca return cert, ca

View File

@ -9,8 +9,6 @@ import dns.query
import dns.resolver import dns.resolver
import dns.tsigkeyring import dns.tsigkeyring
import dns.update import dns.update
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from acertmgr import tools from acertmgr import tools
from acertmgr.modes.abstract import AbstractChallengeHandler from acertmgr.modes.abstract import AbstractChallengeHandler
@ -40,10 +38,7 @@ class DNSChallengeHandler(AbstractChallengeHandler):
@staticmethod @staticmethod
def _determine_txtvalue(thumbprint, token): def _determine_txtvalue(thumbprint, token):
keyauthorization = "{0}.{1}".format(token, thumbprint) return tools.bytes_to_base64url(tools.hash_of_str("{0}.{1}".format(token, thumbprint)))
digest = hashes.Hash(hashes.SHA256(), backend=default_backend())
digest.update(keyauthorization.encode('utf8'))
return tools.to_json_base64(digest.finalize())
def create_challenge(self, domain, thumbprint, token): def create_challenge(self, domain, thumbprint, token):
domain = self._determine_challenge_domain(domain) domain = self._determine_challenge_domain(domain)

View File

@ -16,7 +16,7 @@ import six
from cryptography import x509 from cryptography import x509
from cryptography.hazmat.backends import default_backend from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.x509.oid import NameOID, ExtensionOID from cryptography.x509.oid import NameOID, ExtensionOID
try: try:
@ -39,8 +39,7 @@ def get_url(url, data=None, headers=None):
# @return the tuple of dates: (notBefore, notAfter) # @return the tuple of dates: (notBefore, notAfter)
def get_cert_valid_times(cert_file): def get_cert_valid_times(cert_file):
with io.open(cert_file, 'r') as f: with io.open(cert_file, 'r') as f:
cert_data = f.read().encode('utf-8') cert = convert_pem_str_to_cert(f.read())
cert = x509.load_pem_x509_certificate(cert_data, default_backend())
return cert.not_valid_before, cert.not_valid_after return cert.not_valid_before, cert.not_valid_after
@ -85,9 +84,15 @@ def new_cert_request(names, key):
return req return req
# @brief generate a new rsa key # @brief generate a new ssl key
# @param path path where the new key file should be written # @param path path where the new key file should be written
def new_rsa_key(path, key_size=4096): def new_account_key(path, key_size=4096):
return new_ssl_key(path, key_size)
# @brief generate a new ssl key
# @param path path where the new key file should be written
def new_ssl_key(path, key_size=4096):
private_key = rsa.generate_private_key( private_key = rsa.generate_private_key(
public_exponent=65537, public_exponent=65537,
key_size=key_size, key_size=key_size,
@ -106,6 +111,15 @@ def new_rsa_key(path, key_size=4096):
print('Warning: Could not set file permissions on {0}!'.format(path)) print('Warning: Could not set file permissions on {0}!'.format(path))
# @brief read a key from file
# @param path path to key file
# @return the key in pyopenssl format
def read_pem_key(path):
with io.open(path, 'r') as f:
key_data = f.read().encode('utf-8')
return serialization.load_pem_private_key(key_data, None, default_backend())
# @brief download the issuer ca for a given certificate # @brief download the issuer ca for a given certificate
# @param cert certificate data # @param cert certificate data
# @returns ca certificate data # @returns ca certificate data
@ -124,7 +138,7 @@ def download_issuer_ca(cert):
print("Downloading CA certificate from {}".format(ca_issuers)) print("Downloading CA certificate from {}".format(ca_issuers))
resp = get_url(ca_issuers) resp = get_url(ca_issuers)
code = resp.getcode() code = resp.getcode()
if code >= 400: if code >= 400:
print("Could not download issuer CA (error {}) for given certificate: {}".format(code, cert)) print("Could not download issuer CA (error {}) for given certificate: {}".format(code, cert))
return None return None
@ -134,30 +148,51 @@ def download_issuer_ca(cert):
# @brief convert certificate to PEM format # @brief convert certificate to PEM format
# @param cert certificate object in pyopenssl format # @param cert certificate object in pyopenssl format
# @return the certificate in PEM format # @return the certificate in PEM format
def convert_cert_to_pem(cert): def convert_cert_to_pem_str(cert):
return cert.public_bytes(serialization.Encoding.PEM).decode('utf8') return cert.public_bytes(serialization.Encoding.PEM).decode('utf8')
# @brief read a key from file # @brief load a PEM certificate from str
# @param path path to key file def convert_pem_str_to_cert(certdata):
# @return the key in pyopenssl format return x509.load_pem_x509_certificate(certdata.encode('utf8'), default_backend())
def read_key(path):
with io.open(path, 'r') as f:
key_data = f.read().encode('utf-8') # @brief serialize CSR to DER bytes
return serialization.load_pem_private_key(key_data, None, default_backend()) def convert_csr_to_der_bytes(data):
return data.public_bytes(serialization.Encoding.DER)
# @brief load a DER certificate from str
def convert_der_bytes_to_cert(data):
return x509.load_der_x509_certificate(data, default_backend())
# @brief sign string with key
def signature_of_str(key, string):
# @todo check why this padding is not working
# pad = padding.PSS(mgf=padding.MGF1(hashes.SHA256()), salt_length=padding.PSS.MAX_LENGTH)
pad = padding.PKCS1v15()
return key.sign(string.encode('utf8'), pad, hashes.SHA256())
# @brief hash a string
def hash_of_str(string):
account_hash = hashes.Hash(hashes.SHA256(), backend=default_backend())
account_hash.update(string.encode('utf8'))
return account_hash.finalize()
# @brief helper function to base64 encode for JSON objects # @brief helper function to base64 encode for JSON objects
# @param b the string to encode # @param b the byte-string to encode
# @return the encoded string # @return the encoded string
def to_json_base64(b): def bytes_to_base64url(b):
return base64.urlsafe_b64encode(b).decode('utf8').replace("=", "") return base64.urlsafe_b64encode(b).decode('utf8').replace("=", "")
# @brief convert numbers to byte-string # @brief convert numbers to byte-string
# @param num number to convert # @param num number to convert
# @return byte-string containing the number # @return byte-string containing the number
def byte_string_format(num): def number_to_byte_format(num):
n = format(num, 'x') n = format(num, 'x')
n = "0{0}".format(n) if len(n) % 2 else n n = "0{0}".format(n) if len(n) % 2 else n
return binascii.unhexlify(n) return binascii.unhexlify(n)