mirror of
https://github.com/moepman/acertmgr.git
synced 2024-11-13 06:45:24 +01:00
Fix python3 compatibility
This commit is contained in:
parent
1c939363c0
commit
1ce94491fd
12
acertmgr.py
12
acertmgr.py
@ -6,7 +6,7 @@
|
||||
# Copyright (c) Rudolf Mayerhofer, 2019.
|
||||
# available under the ISC license, see LICENSE
|
||||
|
||||
|
||||
import io
|
||||
import grp
|
||||
import importlib
|
||||
import os
|
||||
@ -93,7 +93,7 @@ def cert_get(settings):
|
||||
print("Reading account key...")
|
||||
acme.register_account()
|
||||
crt = acme.get_crt_from_csr(cr, config['domainlist'], challenge_handlers)
|
||||
with open(crt_file, "w") as crt_fd:
|
||||
with io.open(crt_file, "w") as crt_fd:
|
||||
crt_fd.write(tools.convert_cert_to_pem(crt))
|
||||
|
||||
# if resulting certificate is valid: store in final location
|
||||
@ -127,20 +127,20 @@ def cert_put(settings):
|
||||
key_file = settings['key_file']
|
||||
crt_final = settings['cert_file']
|
||||
|
||||
with open(crt_path, "w+") as crt_fd:
|
||||
with io.open(crt_path, "w+") as crt_fd:
|
||||
for fmt in crt_format:
|
||||
if fmt == "crt":
|
||||
src_fd = open(crt_final, "r")
|
||||
src_fd = io.open(crt_final, "r")
|
||||
crt_fd.write(src_fd.read())
|
||||
src_fd.close()
|
||||
if fmt == "key":
|
||||
src_fd = open(key_file, "r")
|
||||
src_fd = io.open(key_file, "r")
|
||||
crt_fd.write(src_fd.read())
|
||||
src_fd.close()
|
||||
if fmt == "ca":
|
||||
if not os.path.isfile(ca_file):
|
||||
raise FileNotFoundError("The CA certificate file (%s) is missing!" % ca_file)
|
||||
src_fd = open(ca_file, "r")
|
||||
src_fd = io.open(ca_file, "r")
|
||||
crt_fd.write(src_fd.read())
|
||||
src_fd.close()
|
||||
else:
|
||||
|
@ -11,6 +11,7 @@ import io
|
||||
import os
|
||||
|
||||
import tools
|
||||
import io
|
||||
|
||||
ACME_DIR = "/etc/acme"
|
||||
ACME_CONF = os.path.join(ACME_DIR, "acme.conf")
|
||||
@ -143,7 +144,7 @@ def load():
|
||||
globalconfig = dict()
|
||||
# load global configuration
|
||||
if os.path.isfile(ACME_CONF):
|
||||
with open(ACME_CONF) as config_fd:
|
||||
with io.open(ACME_CONF) as config_fd:
|
||||
try:
|
||||
import json
|
||||
globalconfig = json.load(config_fd)
|
||||
@ -156,7 +157,7 @@ def load():
|
||||
# load domain configuration
|
||||
for config_file in os.listdir(ACME_CONFD):
|
||||
if config_file.endswith(".conf"):
|
||||
with open(os.path.join(ACME_CONFD, config_file)) as config_fd:
|
||||
with io.open(os.path.join(ACME_CONFD, config_file)) as config_fd:
|
||||
try:
|
||||
import json
|
||||
for entry in json.load(config_fd).items():
|
||||
|
32
tools.py
32
tools.py
@ -11,12 +11,14 @@ import binascii
|
||||
import datetime
|
||||
import os
|
||||
import hashlib
|
||||
import io
|
||||
import six
|
||||
|
||||
from cryptography import x509
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from cryptography.hazmat.primitives import hashes, serialization
|
||||
from cryptography.hazmat.primitives.asymmetric import rsa
|
||||
from cryptography.x509.oid import NameOID,ExtensionOID
|
||||
from cryptography.x509.oid import NameOID, ExtensionOID
|
||||
|
||||
try:
|
||||
from urllib.request import urlopen # Python 3
|
||||
@ -32,8 +34,8 @@ class InvalidCertificateError(Exception):
|
||||
# @param cert_file the path to the certificate
|
||||
# @return the tuple of dates: (notBefore, notAfter)
|
||||
def get_cert_valid_times(cert_file):
|
||||
with open(cert_file, 'r') as f:
|
||||
cert_data = f.read()
|
||||
with io.open(cert_file, 'r') as f:
|
||||
cert_data = f.read().encode('utf-8')
|
||||
cert = x509.load_pem_x509_certificate(cert_data, default_backend())
|
||||
return cert.not_valid_before, cert.not_valid_after
|
||||
|
||||
@ -64,8 +66,14 @@ def is_cert_valid(crt_file, ttl_days):
|
||||
# @param key the key to use with the certificate in pyopenssl format
|
||||
# @return the CSR in pyopenssl format
|
||||
def new_cert_request(names, key):
|
||||
primary_name = x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, names[0].decode('utf8'))])
|
||||
all_names = x509.SubjectAlternativeName([x509.DNSName(name.decode('utf8')) for name in names])
|
||||
# TODO: There has to be a better way to ensure correct text type (why typecheck, cryptography?)
|
||||
primary_name = x509.Name([x509.NameAttribute(
|
||||
NameOID.COMMON_NAME,
|
||||
names[0] if isinstance(names[0], six.text_type) else names[0].decode('utf-8'))
|
||||
])
|
||||
all_names = x509.SubjectAlternativeName([x509.DNSName(
|
||||
name if isinstance(name, six.text_type) else name.decode('utf-8')
|
||||
) for name in names])
|
||||
req = x509.CertificateSigningRequestBuilder()
|
||||
req = req.subject_name(primary_name)
|
||||
req = req.add_extension(all_names, critical=False)
|
||||
@ -86,7 +94,7 @@ def new_rsa_key(path, key_size=4096):
|
||||
format=serialization.PrivateFormat.TraditionalOpenSSL,
|
||||
encryption_algorithm=serialization.NoEncryption()
|
||||
)
|
||||
with open(path, 'wb') as pem_out:
|
||||
with io.open(path, 'wb') as pem_out:
|
||||
pem_out.write(pem)
|
||||
try:
|
||||
os.chmod(path, int("0400", 8))
|
||||
@ -98,8 +106,8 @@ def new_rsa_key(path, key_size=4096):
|
||||
# @param cert_file certificate file
|
||||
# @param ca_file destination for the ca file
|
||||
def download_issuer_ca(cert_file, ca_file):
|
||||
with open(cert_file, 'r') as f:
|
||||
cert_data = f.read()
|
||||
with io.open(cert_file, 'r') as f:
|
||||
cert_data = f.read().encode('utf-8')
|
||||
cert = x509.load_pem_x509_certificate(cert_data, default_backend())
|
||||
aia = cert.extensions.get_extension_for_oid(ExtensionOID.AUTHORITY_INFORMATION_ACCESS)
|
||||
|
||||
@ -116,7 +124,7 @@ def download_issuer_ca(cert_file, ca_file):
|
||||
cadata = urlopen(ca_issuers).read()
|
||||
cacert = x509.load_der_x509_certificate(cadata, default_backend())
|
||||
pem = cacert.public_bytes(encoding=serialization.Encoding.PEM)
|
||||
with open(ca_file, 'wb') as pem_out:
|
||||
with io.open(ca_file, 'wb') as pem_out:
|
||||
pem_out.write(pem)
|
||||
|
||||
|
||||
@ -131,8 +139,8 @@ def convert_cert_to_pem(cert):
|
||||
# @param path path to key file
|
||||
# @return the key in pyopenssl format
|
||||
def read_key(path):
|
||||
with open(path, 'r') as f:
|
||||
key_data = f.read()
|
||||
with io.open(path, 'r') as f:
|
||||
key_data = f.read().encode('utf-8')
|
||||
return serialization.load_pem_private_key(key_data, None, default_backend())
|
||||
|
||||
|
||||
@ -156,4 +164,4 @@ def byte_string_format(num):
|
||||
# @param data data to convert to id
|
||||
# @return unique id string
|
||||
def to_unique_id(data):
|
||||
return hashlib.md5(data).hexdigest()
|
||||
return hashlib.md5(data.encode('utf-8')).hexdigest()
|
||||
|
Loading…
Reference in New Issue
Block a user