1
0
mirror of https://github.com/moepman/acertmgr.git synced 2025-01-04 01:25:24 +01:00

tools: add wrapper for urlopen and use it throughout acertmgr

This commit is contained in:
Kishi85 2019-03-22 15:55:36 +01:00
parent 985bc46f39
commit c0d23631b6
4 changed files with 18 additions and 31 deletions

View File

@ -18,13 +18,6 @@ from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import padding from cryptography.hazmat.primitives.asymmetric import padding
from acertmgr import tools from acertmgr import tools
from acertmgr.tools import byte_string_format
try:
from urllib.request import urlopen # Python 3
except ImportError:
from urllib2 import urlopen # Python 2
from acertmgr.authority.acme import ACMEAuthority as AbstractACMEAuthority from acertmgr.authority.acme import ACMEAuthority as AbstractACMEAuthority
@ -45,9 +38,9 @@ class ACMEAuthority(AbstractACMEAuthority):
header = { header = {
"alg": "RS256", "alg": "RS256",
"jwk": { "jwk": {
"e": tools.to_json_base64(byte_string_format(numbers.e)), "e": tools.to_json_base64(tools.byte_string_format(numbers.e)),
"kty": "RSA", "kty": "RSA",
"n": tools.to_json_base64(byte_string_format(numbers.n)), "n": tools.to_json_base64(tools.byte_string_format(numbers.n)),
}, },
} }
return header return header
@ -60,7 +53,7 @@ class ACMEAuthority(AbstractACMEAuthority):
def _send_signed(self, url, header, payload): def _send_signed(self, url, header, payload):
payload64 = tools.to_json_base64(json.dumps(payload).encode('utf8')) payload64 = tools.to_json_base64(json.dumps(payload).encode('utf8'))
protected = copy.deepcopy(header) protected = copy.deepcopy(header)
protected["nonce"] = urlopen(self.ca + "/directory").headers['Replay-Nonce'] protected["nonce"] = tools.get_url(self.ca + "/directory").headers['Replay-Nonce']
protected64 = tools.to_json_base64(json.dumps(protected).encode('utf8')) protected64 = tools.to_json_base64(json.dumps(protected).encode('utf8'))
# @todo check why this padding is not working # @todo check why this padding is not working
# pad = padding.PSS(mgf=padding.MGF1(hashes.SHA256()), salt_length=padding.PSS.MAX_LENGTH) # pad = padding.PSS(mgf=padding.MGF1(hashes.SHA256()), salt_length=padding.PSS.MAX_LENGTH)
@ -71,7 +64,7 @@ class ACMEAuthority(AbstractACMEAuthority):
"payload": payload64, "signature": tools.to_json_base64(out), "payload": payload64, "signature": tools.to_json_base64(out),
}) })
try: try:
resp = urlopen(url, data.encode('utf8')) resp = tools.get_url(url, data.encode('utf8'))
return resp.getcode(), resp.read() return resp.getcode(), resp.read()
except IOError as e: except IOError as e:
return getattr(e, "code", None), getattr(e, "read", e.__str__)() return getattr(e, "code", None), getattr(e, "read", e.__str__)()
@ -154,7 +147,7 @@ class ACMEAuthority(AbstractACMEAuthority):
# wait for challenge to be verified # wait for challenge to be verified
while True: while True:
try: try:
resp = urlopen(challenges[domain]['uri']) resp = tools.get_url(challenges[domain]['uri'])
challenge_status = json.loads(resp.read().decode('utf8')) challenge_status = json.loads(resp.read().decode('utf8'))
except IOError as e: except IOError as e:
raise ValueError("Error checking challenge: {0} {1}".format( raise ValueError("Error checking challenge: {0} {1}".format(

View File

@ -18,12 +18,6 @@ from cryptography.hazmat.primitives.asymmetric import padding
from acertmgr import tools from acertmgr import tools
from acertmgr.authority.acme import ACMEAuthority as AbstractACMEAuthority from acertmgr.authority.acme import ACMEAuthority as AbstractACMEAuthority
from acertmgr.tools import byte_string_format
try:
from urllib.request import urlopen, Request # Python 3
except ImportError:
from urllib2 import urlopen, Request # Python 2
class ACMEAuthority(AbstractACMEAuthority): class ACMEAuthority(AbstractACMEAuthority):
@ -62,8 +56,8 @@ class ACMEAuthority(AbstractACMEAuthority):
"alg": self.algorithm, "alg": self.algorithm,
"jwk": { "jwk": {
"kty": "RSA", "kty": "RSA",
"e": tools.to_json_base64(byte_string_format(numbers.e)), "e": tools.to_json_base64(tools.byte_string_format(numbers.e)),
"n": tools.to_json_base64(byte_string_format(numbers.n)), "n": tools.to_json_base64(tools.byte_string_format(numbers.n)),
}, },
} }
self.account_id = None # will be updated to correct value during account registration self.account_id = None # will be updated to correct value during account registration
@ -74,7 +68,7 @@ class ACMEAuthority(AbstractACMEAuthority):
if data: if data:
data = data.encode('utf-8') data = data.encode('utf-8')
resp = urlopen(Request(url, data=data, headers=header)) resp = tools.get_url(url, data, header)
# Store next Replay-Nonce if it is in the header # Store next Replay-Nonce if it is in the header
if 'Replay-Nonce' in resp.headers: if 'Replay-Nonce' in resp.headers:

View File

@ -7,14 +7,9 @@
import datetime import datetime
import os import os
from acertmgr import tools
from acertmgr.modes.abstract import AbstractChallengeHandler from acertmgr.modes.abstract import AbstractChallengeHandler
try:
from urllib.request import urlopen # Python 3
except ImportError:
from urllib2 import urlopen # Python 2
class ChallengeHandler(AbstractChallengeHandler): class ChallengeHandler(AbstractChallengeHandler):
def __init__(self, config): def __init__(self, config):
@ -36,7 +31,7 @@ class ChallengeHandler(AbstractChallengeHandler):
# check that the file is in place # check that the file is in place
wellknown_url = "http://{0}/.well-known/acme-challenge/{1}".format(domain, token) wellknown_url = "http://{0}/.well-known/acme-challenge/{1}".format(domain, token)
try: try:
resp = urlopen(wellknown_url) resp = tools.get_url(wellknown_url)
resp_data = resp.read().decode('utf8').strip() resp_data = resp.read().decode('utf8').strip()
if resp_data != keyauthorization: if resp_data != keyauthorization:
raise ValueError("keyauthorization and response data do NOT match") raise ValueError("keyauthorization and response data do NOT match")

View File

@ -20,15 +20,20 @@ from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.x509.oid import NameOID, ExtensionOID from cryptography.x509.oid import NameOID, ExtensionOID
try: try:
from urllib.request import urlopen # Python 3 from urllib.request import urlopen, Request # Python 3
except ImportError: except ImportError:
from urllib2 import urlopen # Python 2 from urllib2 import urlopen, Request # Python 2
class InvalidCertificateError(Exception): class InvalidCertificateError(Exception):
pass pass
# @brief wrapper for downloading an url
def get_url(url, data=None, headers=None):
return urlopen(Request(url, data=data, headers={} if headers is None else headers))
# @brief retrieve notBefore and notAfter dates of a certificate file # @brief retrieve notBefore and notAfter dates of a certificate file
# @param cert_file the path to the certificate # @param cert_file the path to the certificate
# @return the tuple of dates: (notBefore, notAfter) # @return the tuple of dates: (notBefore, notAfter)
@ -116,7 +121,7 @@ def download_issuer_ca(cert):
raise Exception("Could not determine issuer CA for given certificate: {}".format(cert)) raise Exception("Could not determine issuer CA for given certificate: {}".format(cert))
print("Downloading CA certificate from {}".format(ca_issuers)) print("Downloading CA certificate from {}".format(ca_issuers))
cadata = urlopen(ca_issuers).read() cadata = get_url(ca_issuers).read()
return x509.load_der_x509_certificate(cadata, default_backend()) return x509.load_der_x509_certificate(cadata, default_backend())