2019-01-08 08:12:20 +01:00
|
|
|
#!/usr/bin/env python
|
|
|
|
# -*- coding: utf-8 -*-
|
|
|
|
|
|
|
|
# acertmgr - various support functions
|
|
|
|
# Copyright (c) Markus Hauschild & David Klaftenegger, 2016.
|
|
|
|
# Copyright (c) Rudolf Mayerhofer, 2019.
|
|
|
|
# available under the ISC license, see LICENSE
|
|
|
|
|
|
|
|
import base64
|
|
|
|
import binascii
|
|
|
|
import datetime
|
2019-01-21 16:18:47 +01:00
|
|
|
import io
|
2019-02-18 20:45:28 +01:00
|
|
|
import os
|
2019-01-08 08:12:20 +01:00
|
|
|
|
2019-02-18 20:45:28 +01:00
|
|
|
import six
|
2019-01-08 08:12:20 +01:00
|
|
|
from cryptography import x509
|
|
|
|
from cryptography.hazmat.backends import default_backend
|
|
|
|
from cryptography.hazmat.primitives import hashes, serialization
|
2019-03-23 09:46:36 +01:00
|
|
|
from cryptography.hazmat.primitives.asymmetric import rsa, padding
|
2019-01-21 16:18:47 +01:00
|
|
|
from cryptography.x509.oid import NameOID, ExtensionOID
|
2019-01-21 15:35:17 +01:00
|
|
|
|
|
|
|
try:
|
2019-03-22 15:55:36 +01:00
|
|
|
from urllib.request import urlopen, Request # Python 3
|
2019-01-21 15:35:17 +01:00
|
|
|
except ImportError:
|
2019-03-22 15:55:36 +01:00
|
|
|
from urllib2 import urlopen, Request # Python 2
|
2019-01-08 08:12:20 +01:00
|
|
|
|
|
|
|
|
|
|
|
class InvalidCertificateError(Exception):
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
2019-03-22 15:55:36 +01:00
|
|
|
# @brief wrapper for downloading an url
|
|
|
|
def get_url(url, data=None, headers=None):
|
|
|
|
return urlopen(Request(url, data=data, headers={} if headers is None else headers))
|
|
|
|
|
|
|
|
|
2019-01-08 08:12:20 +01:00
|
|
|
# @brief retrieve notBefore and notAfter dates of a certificate file
|
|
|
|
# @param cert_file the path to the certificate
|
|
|
|
# @return the tuple of dates: (notBefore, notAfter)
|
|
|
|
def get_cert_valid_times(cert_file):
|
2019-01-21 16:18:47 +01:00
|
|
|
with io.open(cert_file, 'r') as f:
|
2019-03-23 09:46:36 +01:00
|
|
|
cert = convert_pem_str_to_cert(f.read())
|
2019-01-08 08:12:20 +01:00
|
|
|
return cert.not_valid_before, cert.not_valid_after
|
|
|
|
|
|
|
|
|
|
|
|
# @brief check whether existing certificate is still valid or expiring soon
|
|
|
|
# @param crt_file string containing the path to the certificate file
|
|
|
|
# @param ttl_days the minimum amount of days for which the certificate must be valid
|
|
|
|
# @return True if certificate is still valid for at least ttl_days, False otherwise
|
|
|
|
def is_cert_valid(crt_file, ttl_days):
|
|
|
|
if not os.path.isfile(crt_file):
|
|
|
|
return False
|
|
|
|
else:
|
|
|
|
(valid_from, valid_to) = get_cert_valid_times(crt_file)
|
|
|
|
|
|
|
|
now = datetime.datetime.now()
|
|
|
|
if valid_from > now:
|
|
|
|
raise InvalidCertificateError("Certificate seems to be from the future")
|
|
|
|
|
|
|
|
expiry_limit = now + datetime.timedelta(days=ttl_days)
|
|
|
|
if valid_to < expiry_limit:
|
|
|
|
return False
|
|
|
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
|
|
# @brief create a certificate signing request
|
|
|
|
# @param names list of domain names the certificate should be valid for
|
|
|
|
# @param key the key to use with the certificate in pyopenssl format
|
|
|
|
# @return the CSR in pyopenssl format
|
|
|
|
def new_cert_request(names, key):
|
2019-01-21 16:18:47 +01:00
|
|
|
# TODO: There has to be a better way to ensure correct text type (why typecheck, cryptography?)
|
|
|
|
primary_name = x509.Name([x509.NameAttribute(
|
|
|
|
NameOID.COMMON_NAME,
|
|
|
|
names[0] if isinstance(names[0], six.text_type) else names[0].decode('utf-8'))
|
|
|
|
])
|
|
|
|
all_names = x509.SubjectAlternativeName([x509.DNSName(
|
|
|
|
name if isinstance(name, six.text_type) else name.decode('utf-8')
|
|
|
|
) for name in names])
|
2019-01-08 08:12:20 +01:00
|
|
|
req = x509.CertificateSigningRequestBuilder()
|
|
|
|
req = req.subject_name(primary_name)
|
|
|
|
req = req.add_extension(all_names, critical=False)
|
|
|
|
req = req.sign(key, hashes.SHA256(), default_backend())
|
|
|
|
return req
|
|
|
|
|
|
|
|
|
2019-03-23 09:46:36 +01:00
|
|
|
# @brief generate a new ssl key
|
2019-01-08 08:14:42 +01:00
|
|
|
# @param path path where the new key file should be written
|
2019-03-23 09:46:36 +01:00
|
|
|
def new_account_key(path, key_size=4096):
|
|
|
|
return new_ssl_key(path, key_size)
|
|
|
|
|
|
|
|
|
|
|
|
# @brief generate a new ssl key
|
|
|
|
# @param path path where the new key file should be written
|
|
|
|
def new_ssl_key(path, key_size=4096):
|
2019-01-08 08:14:42 +01:00
|
|
|
private_key = rsa.generate_private_key(
|
|
|
|
public_exponent=65537,
|
|
|
|
key_size=key_size,
|
|
|
|
backend=default_backend()
|
|
|
|
)
|
|
|
|
pem = private_key.private_bytes(
|
|
|
|
encoding=serialization.Encoding.PEM,
|
|
|
|
format=serialization.PrivateFormat.TraditionalOpenSSL,
|
|
|
|
encryption_algorithm=serialization.NoEncryption()
|
|
|
|
)
|
2019-01-21 16:18:47 +01:00
|
|
|
with io.open(path, 'wb') as pem_out:
|
2019-01-08 08:14:42 +01:00
|
|
|
pem_out.write(pem)
|
|
|
|
try:
|
|
|
|
os.chmod(path, int("0400", 8))
|
|
|
|
except OSError:
|
|
|
|
print('Warning: Could not set file permissions on {0}!'.format(path))
|
|
|
|
|
|
|
|
|
2019-03-23 09:46:36 +01:00
|
|
|
# @brief read a key from file
|
|
|
|
# @param path path to key file
|
|
|
|
# @return the key in pyopenssl format
|
|
|
|
def read_pem_key(path):
|
|
|
|
with io.open(path, 'r') as f:
|
|
|
|
key_data = f.read().encode('utf-8')
|
|
|
|
return serialization.load_pem_private_key(key_data, None, default_backend())
|
|
|
|
|
|
|
|
|
2019-01-21 15:35:17 +01:00
|
|
|
# @brief download the issuer ca for a given certificate
|
2019-03-21 12:18:49 +01:00
|
|
|
# @param cert certificate data
|
|
|
|
# @returns ca certificate data
|
|
|
|
def download_issuer_ca(cert):
|
2019-01-21 15:35:17 +01:00
|
|
|
aia = cert.extensions.get_extension_for_oid(ExtensionOID.AUTHORITY_INFORMATION_ACCESS)
|
|
|
|
ca_issuers = None
|
|
|
|
for data in aia.value:
|
|
|
|
if data.access_method == x509.OID_CA_ISSUERS:
|
|
|
|
ca_issuers = data.access_location.value
|
|
|
|
break
|
|
|
|
|
|
|
|
if not ca_issuers:
|
2019-03-23 08:28:02 +01:00
|
|
|
print("Could not determine issuer CA for given certificate: {}".format(cert))
|
|
|
|
return None
|
2019-01-21 15:35:17 +01:00
|
|
|
|
2019-03-21 12:18:49 +01:00
|
|
|
print("Downloading CA certificate from {}".format(ca_issuers))
|
2019-03-23 08:28:02 +01:00
|
|
|
resp = get_url(ca_issuers)
|
|
|
|
code = resp.getcode()
|
2019-03-23 09:46:36 +01:00
|
|
|
if code >= 400:
|
2019-03-23 08:28:02 +01:00
|
|
|
print("Could not download issuer CA (error {}) for given certificate: {}".format(code, cert))
|
|
|
|
return None
|
|
|
|
|
|
|
|
return x509.load_der_x509_certificate(resp.read(), default_backend())
|
2019-01-21 15:35:17 +01:00
|
|
|
|
|
|
|
|
2019-01-08 08:12:20 +01:00
|
|
|
# @brief convert certificate to PEM format
|
|
|
|
# @param cert certificate object in pyopenssl format
|
|
|
|
# @return the certificate in PEM format
|
2019-03-23 09:46:36 +01:00
|
|
|
def convert_cert_to_pem_str(cert):
|
2019-01-08 08:12:20 +01:00
|
|
|
return cert.public_bytes(serialization.Encoding.PEM).decode('utf8')
|
|
|
|
|
|
|
|
|
2019-03-23 09:46:36 +01:00
|
|
|
# @brief load a PEM certificate from str
|
|
|
|
def convert_pem_str_to_cert(certdata):
|
|
|
|
return x509.load_pem_x509_certificate(certdata.encode('utf8'), default_backend())
|
|
|
|
|
|
|
|
|
|
|
|
# @brief serialize CSR to DER bytes
|
|
|
|
def convert_csr_to_der_bytes(data):
|
|
|
|
return data.public_bytes(serialization.Encoding.DER)
|
|
|
|
|
|
|
|
|
|
|
|
# @brief load a DER certificate from str
|
|
|
|
def convert_der_bytes_to_cert(data):
|
|
|
|
return x509.load_der_x509_certificate(data, default_backend())
|
|
|
|
|
|
|
|
|
|
|
|
# @brief sign string with key
|
|
|
|
def signature_of_str(key, string):
|
|
|
|
# @todo check why this padding is not working
|
|
|
|
# pad = padding.PSS(mgf=padding.MGF1(hashes.SHA256()), salt_length=padding.PSS.MAX_LENGTH)
|
|
|
|
pad = padding.PKCS1v15()
|
|
|
|
return key.sign(string.encode('utf8'), pad, hashes.SHA256())
|
|
|
|
|
|
|
|
|
|
|
|
# @brief hash a string
|
|
|
|
def hash_of_str(string):
|
|
|
|
account_hash = hashes.Hash(hashes.SHA256(), backend=default_backend())
|
|
|
|
account_hash.update(string.encode('utf8'))
|
|
|
|
return account_hash.finalize()
|
2019-01-08 08:12:20 +01:00
|
|
|
|
|
|
|
|
|
|
|
# @brief helper function to base64 encode for JSON objects
|
2019-03-23 09:46:36 +01:00
|
|
|
# @param b the byte-string to encode
|
2019-01-08 08:12:20 +01:00
|
|
|
# @return the encoded string
|
2019-03-23 09:46:36 +01:00
|
|
|
def bytes_to_base64url(b):
|
2019-01-08 08:12:20 +01:00
|
|
|
return base64.urlsafe_b64encode(b).decode('utf8').replace("=", "")
|
|
|
|
|
|
|
|
|
|
|
|
# @brief convert numbers to byte-string
|
|
|
|
# @param num number to convert
|
|
|
|
# @return byte-string containing the number
|
2019-03-23 09:46:36 +01:00
|
|
|
def number_to_byte_format(num):
|
2019-01-08 08:12:20 +01:00
|
|
|
n = format(num, 'x')
|
|
|
|
n = "0{0}".format(n) if len(n) % 2 else n
|
|
|
|
return binascii.unhexlify(n)
|