1
0
mirror of https://github.com/moepman/acertmgr.git synced 2024-06-02 07:32:34 +02:00

Compare commits

...

92 Commits

Author SHA1 Message Date
9ca6dae048 version: bump to 1.0.5 2023-07-13 15:36:44 +02:00
Kishi85
274351415d Update github action workflow to build debian packages with gzip format for older OS versions 2023-07-12 17:46:06 +02:00
Rudolf Mayerhofer
f78cb5c554 Github Action: Update to newer github action syntax/standards, change image to ubuntu-latest, change pypi-publish to supported version and check if we have credentials to publish at all 2023-07-12 16:10:21 +02:00
Rudolf Mayerhofer
c3736c0838 Allow multiple sets of the same domain to defined in a single config file (necessary for multiple certs using different key_algorithm) in a list style notation (lists of maps) 2023-07-12 16:10:21 +02:00
Rudolf Mayerhofer
1a98f86aad Fix idna conversion for force-renew (probably broken since the IDNA cleanup) 2023-07-12 16:10:21 +02:00
Rudolf Mayerhofer
ef81ea62d1 Unify key_algorithm handling for elipic curves (change naming to ECC but stay backwards compatible) 2023-07-12 16:10:21 +02:00
Rudolf Mayerhofer
d1caaf80ef Fix LOG_REPLACEMENTS determination when multiple domain sets exist and we are on a newer version of python 2023-07-12 16:10:21 +02:00
Rudolf Mayerhofer
ba644d44f1 Update config id if we have a key algorithm set to allow for multiple certs with different algorithms for the same set of domains
This is a breaking change!
Changes the id for configurations with a key algorithm set, which by default results in changes to serveral dependent configuration values as well,
such as cert_file/key_file/csr_file. This will require existing ECC setups to append the ecc suffix to files in the acertmgr configuration directory
2023-07-12 16:10:21 +02:00
c15b6ec441 Instantiate HashAlgorithm in OCSPRequestBuilder
Installations of more recent cryptography require parameter hash
algorithm to be an instance of hashes.HashAlgorithm, not the bare object
itself.

Fixes #63
2023-07-10 19:27:44 +02:00
Kishi85
2d230e30d9 Clarify expected authority format (at least for v2) and add an example 2021-10-31 09:57:31 +01:00
Kishi85
6f0ccfdc91 logging: Add real counterparts of IDNA-mapped domains in brackets 2021-09-20 09:26:47 +02:00
Kishi85
460b0119ac configuration: Simplify too complex IDNA conversion 2021-09-13 09:00:59 +02:00
David Klaftenegger
e2f7b09b18 certs already contain idna domain names
The idna_convert call here does nothing: when reading a certificate, it
already contains idna domain names. Converting them to idna is
equivalent to the identity function, and can thus be removed.
2021-05-30 16:21:54 +02:00
93e28437ff version: bump to 1.0.4 2021-05-21 22:52:34 +02:00
Kishi85
2e1f5cd894 acertmgr/v2: Handle CA certificate chains properly 2021-05-21 22:50:44 +02:00
Kishi85
ce157a5c8a CI: Build on Ubuntu 18.04 while we are Python 2 compatbile and OS version is not EOL 2021-03-23 18:43:07 +01:00
Kishi85
9953cb4527 standalone: Fix multiple challange handlers on same port
If you define challenge handlers on a per-domain basis multiple will be
created. This would cause the standalone handler to potientially try
to bind the same port (when configured) multiple times, which would only
work on the first try. Subsequent tries would fail with "Address already
in use". To fix this only bind the server between start and stop of the
challenge and cleanup afterwards.
2021-03-23 18:43:07 +01:00
7a5d35f29b GitHub Actions: use current setuptools and wheel 2020-10-12 19:22:02 +02:00
62f01aeff9 GitHub Actions: twine upload via pypa/gh-action-pypi-publish 2020-10-12 19:01:09 +02:00
b48f4532b9 reformat setup.py 2020-10-12 17:48:55 +02:00
bc2a7229ec GitHub Actions: unify whitespace style 2020-10-12 17:22:52 +02:00
fd4fed9432 version: bump to 1.0.3 2020-03-12 18:41:15 +01:00
56743dcbb9 GitHub Actions: fix fetching tags 2020-03-04 17:29:11 +01:00
Kishi85
0648cb7b38 tools: Fix IDNA handler (again) 2020-03-04 14:50:05 +01:00
Kishi85
b37d0cad94 acertmgr: Add a OCSP validation to certificate verification 2020-03-04 14:50:05 +01:00
Kishi85
c33a39a433 tools: make pem file writable by owner before tryting to write
A PEM file might not be writable by the owner when it should be written
(e.g. on Windows), so we have to ensure the file has write permissions
before doing so
2020-03-04 14:40:49 +01:00
Kishi85
882ddfd0b8 Generate proper dependencies on deb Packages 2020-02-20 18:40:22 +01:00
Kishi
e5edc4e5aa Use Github Actions for automated building and release 2020-02-20 18:35:41 +01:00
e48724b726 version: bump to 1.0.2 2019-11-23 15:37:07 +01:00
6314f468c1 setup.py: fix package name for yaml 2019-11-08 19:40:05 +01:00
Kishi85
97e9be80cf acertmgr: Fix module/function issues on windows 2019-10-28 10:50:09 +01:00
Kishi85
f5f038d47b configuration: global config is now relative to config_dir 2019-10-26 19:11:33 +02:00
a0a4b0bf07 version: bump to 1.0.1 2019-10-01 13:08:45 +02:00
a63eabd0ee .drone.yml: upload releases to PyPI 2019-10-01 13:08:10 +02:00
2911e05165 setup.py: use proper PyPI supported classifiers 2019-10-01 13:06:37 +02:00
8dad549d68 version: bump to 1.0.0 2019-09-23 14:57:29 +02:00
11d43d4817 build packages via drone.io 2019-09-23 14:57:12 +02:00
Kishi85
ba4dda154b acertmgr: Remove legacy configuration directives (#30) 2019-09-06 16:07:16 +02:00
31c43321d4 version: bump to 0.9.8 2019-07-04 09:34:31 +02:00
Kishi85
9b10f10efd dns.*: Use a static query timeout for any DNS queries using dnspython 2019-07-02 12:55:09 +02:00
Kishi85
1a4272f11a authority.v2: invalidate nonces after 2 minutes and re-request
Boulder seems to invalidate older nonces after some time. Therefore we
allow nonces from the cache to be used for up to 2 minutes and after
those they will be considered invalid (and re-requested with an extra
request to the nonce endpoint when necessary)
2019-06-21 11:39:10 +02:00
514ff7cbad version: bump to 0.9.7 2019-06-12 10:40:06 +02:00
Kishi85
0b8e49d6ee tools: Display warning about IDNA only if unicode names are in use 2019-06-11 10:05:37 +02:00
Kishi85
af0bb45d73 authority.v2: Properly clear the nonce cache on using it's content 2019-06-11 09:52:55 +02:00
Kishi85
7475d5e73f authority.v2: Check challenge return code on validation as well 2019-06-11 09:52:55 +02:00
bc991f12d1 version: bump to 0.9.6 2019-05-20 18:43:49 +02:00
Kishi85
abc0c4a9c2 authority: use correct account_key_length 2019-05-13 21:47:31 +02:00
Kishi85
258855c5b4 legacy: fix ToS agreement value 2019-05-13 20:48:44 +02:00
Kishi85
6e52dd41b0 docs: Update README 2019-05-06 21:24:35 +02:00
Kishi85
7a019d1ac9 idna: unify usage as tools function 2019-05-06 21:24:24 +02:00
Kishi85
6a07ab1188 tools/configuration: Add support for EC/Ed25519/Ed448 generation 2019-04-19 15:29:44 +02:00
Kishi85
4f0fe2c74a tools: Add support for Ed25519 and Ed448 account keys
Add support for Ed25519 and Ed448 account keys in addition
to already supported algorithms
2019-04-16 19:12:25 +02:00
Kishi85
88d4a52ab9 tools: use cryptography conversion instead of custom function
Use cryptography's int_to_bytes consistently instead of our own number
to byte conversion function
2019-04-16 19:12:15 +02:00
Kishi85
4df74d67d5 tools: add support for EC account keys
Allows usage of pre-generated EC account keys (P-256, P-384, P-521)
in addition to already supported RSA keys.
2019-04-16 19:12:05 +02:00
Kishi85
1f5ef9322b tools: remove six dependency
Always decode string if the functions is available, assume normal string
otherwise
2019-04-07 15:31:07 +02:00
Kishi85
b5bac4870a authority.v1: add deprecation warning
ACMEv1 will be deprecated by letsencrypt.org in 2021* therefore add a
deprecation warning on object creation

* https://community.letsencrypt.org/t/end-of-life-plan-for-acmev1/88430
2019-04-07 15:31:07 +02:00
Kishi85
89be66dc87 acertmgr: implement deployment error handling
Remove the long-standing todo from cert_put and implement useful
error handling and defaults for certificate deployment. Also do
a separate try/expect for each deployed file on every single
certificate.
2019-04-07 15:31:07 +02:00
Kishi85
7c9e7f7d0c authority.v2: use POST-as-GET to remove unauthenticated GET requests
As unauthenticated GET requests are soon to be deprecated,
remove all unacceptable usages and replace with POST-as-GET.
See also https://tools.ietf.org/html/rfc8555#section-6.3
2019-04-07 15:31:07 +02:00
Kishi85
b86d8b6e0a setup: update dependencies and requirements 2019-04-07 15:31:07 +02:00
Kishi85
4510aaf393 acertmgr: properly format action output 2019-04-07 15:31:07 +02:00
Kishi85
79b625619a acertmgr: try using a fallback configuration for revoke
If no configuration matching the domains in the given certificate exist
use the globalconfig/default settings for an authority to revoke the
certificate (which might still fail if things do not match up, but the
authority will decide on that)

Configuration parsing for the authority settings is therefore split into
a seperate function which will be called for the 'fallback_authority'
element in runtimeconfig.
2019-04-07 15:31:07 +02:00
Kishi85
762037c42d standalone: cleanup start/stop challenge
stop_challenge should only stop the server if the thread is still alive
2019-04-07 15:31:07 +02:00
Kishi85
e12abbc3cb authority.v1/2: call stop_challenge even if start_challenge fails
start_challenge may raise an exception at any point. to not just destroy
the challenge but also try to stop it in order to clean up anything done
by start_challenge already properly
2019-04-07 15:31:07 +02:00
Kishi85
989d3b585a authority.v1/2: do not re-authorize already valid domains
Skipping re-authorization when not necessary speeds up cert renewal.
2019-04-07 15:31:07 +02:00
Kishi85
fd8c4ec443 authority.v2: cleanup error- and encoding-handling 2019-04-07 15:31:07 +02:00
Kishi85
df6e3a743e authority.v1: Add missing resource to revoke_crt
Add missing resource: revoke-cert to the request payload
2019-04-07 15:31:07 +02:00
Kishi85
79791c53bc tools: update the get_cert_domain function
Add all domains for SAN certificates and convert IDNA values to the
correct representation
2019-04-07 15:31:07 +02:00
Kishi85
b63a0bc424 tools: add log function, update log messages mentioning certificates
This simple implementation writes log messages to stdout/err and flushes
the buffers immediately after the message has been written.

Also update log messages with the certificate CN to a better readable format
Introduce functions for get_cert_cn and get_cert_valid_until to
encapsulate all cryptographic functions consistently in tools.
2019-04-07 15:31:06 +02:00
Kishi85
2046215e37 tools: encapsulate key parameter determination in tools function
This is will isolate more cryptographic functions in the tools module
and allow for easier replacement of any cryptographic function should
the need ever arise
2019-04-07 15:31:02 +02:00
Kishi85
47e3312aad dns: Add additional TXT record verifications to reduce wait time
This may also be used to guarantee a correct TXT record lookup by setting
dns_verify_all_ns=true, a dns_verify_failtime < dns_verify_waittime and
a high enough value of dns_verify_failtime (like 300 seconds)
2019-04-04 13:39:34 +02:00
Kishi85
1aae651d98 modes: unify and optimize challenge handler workflow
- Remove wait times returned by create_challenge
- Remove wait loops from authorities
- Add the wait for valid DNS TXT records in the abstract
  DNSChallengeHandler start_challenge function.
- Move challenge verification to start_challenge in general
2019-04-04 13:39:34 +02:00
Kishi85
54cb334600 acertmgr: add support for the ocsp must-staple extension
Introduces a new config directive and requires at least cryptography 2.1
2019-04-04 13:39:05 +02:00
07696f5721 version: bump to 0.9.5 2019-04-01 12:31:44 +02:00
Kishi85
0a5356a302 configuration: fix broken idna handling 2019-03-31 23:17:02 +02:00
Kishi85
fe7a064604 acertmgr: log exceptions during processing, raise afterward
If anything goes wrong during cert_get/cert_put/running
actions/cert_revoke superseded do not fail completely and continue with
the remaining domains to process. Print all exceptions and after
processing raise a RuntimeError
2019-03-28 21:15:46 +01:00
Kishi85
7e4c350a4f configuration: remove redundant 'domains' parameter, just use domainlist 2019-03-28 14:52:18 +01:00
Kishi85
fa3fc196f3 configuration: unify how ca_file and ca_static are determined
ensure legacy compatibility (also include defaults case) and update README.md
2019-03-28 13:41:27 +01:00
Kishi85
99d9e41322 configuration: cleanup for legacy removal and improve readability 2019-03-28 12:38:53 +01:00
Kishi85
45ccb6b0d6 docs: update readme with new command-line parameters 2019-03-28 11:13:54 +01:00
Kishi85
ba9e206423 authority.v[12]: skip subsequent account registration 2019-03-28 09:48:54 +01:00
Kishi85
735c986f0d acertmgr: Move factories to their packages and reuse objects with same config 2019-03-28 09:48:54 +01:00
Kishi85
75f597ac36 configuration: put all authority related directives into sub-dict 2019-03-28 09:48:54 +01:00
Kishi85
f01140e89b acertmgr: Add option to supersede previous cert on renewal
Add option to automatically revoke the previous certificate with reason
superseded after deployment and all actions have been successful.
2019-03-28 09:48:54 +01:00
Kishi85
39aa7db24c acertmgr: deploy certificates after all are renewed
as certificate renewal might take some time (on DNS-01 especially) it is
a good idea to wait with deployment until all certificates are finished
renewing and copy them to their destinations then + run actions
2019-03-28 09:48:45 +01:00
Kishi85
737578159b acertmgr: Add support for account.key based certificate revocation 2019-03-28 00:53:54 +01:00
Kishi85
bd27db4ebd acertmgr: add force renew option to immediately renew a cert 2019-03-27 18:37:03 +01:00
Kishi85
dfaca3b58f configuration: put idna handling into function 2019-03-27 18:34:48 +01:00
Kishi85
52f5584dc0 configuration: add seperate configuration for runtime options 2019-03-27 15:32:49 +01:00
Kishi85
7da3c266a7 authority.v2: optimize code paths (raw_result, nonce)
raw_result does not need an extra return, dicarding the nonce at that
point would discard the newer nonce from the response and also the first
nonce is gotten implicitly with the first request acme anyway
2019-03-27 14:22:16 +01:00
Kishi85
44aeda6915 webdir: add config option for verification 2019-03-27 14:22:16 +01:00
Kishi85
ff3a57eaff standalone: remove dependency to webdir and add ipv6 support
- Serve the challenge authorizations from in-memory instead of files
- Try to establish a dual-stack IPv6 HTTPServer before falling back
2019-03-27 14:22:09 +01:00
Kishi85
8cfcdf9385 docs: update and refine readme 2019-03-27 13:29:41 +01:00
20 changed files with 1384 additions and 696 deletions

114
.github/workflows/release.yml vendored Normal file
View File

@ -0,0 +1,114 @@
---
name: Build (and release)
on: [ push, pull_request ]
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
with:
fetch-depth: 0
- name: Install dependencies
run: |
sudo apt update -qq -y
sudo apt install -qq -y build-essential fakeroot git python-all python3-cryptography python3-pip python3-stdeb python3-wheel twine dh-python
sudo pip3 install --upgrade setuptools wheel
- name: Prepare build process
id: buildprep
run: |
# Fetch tags and determine version
git fetch --tags -f
VER="$(python3 setup.py --version)"
echo "Version found: $VER"
echo "version=$VER" >> $GITHUB_OUTPUT
- name: Build python package using setuptools (source/wheel)
run: |
python3 setup.py sdist --formats=gztar
python3 setup.py bdist_wheel
- name: Prepare stdeb build process
id: stdebprep
run: |
# Patch setup.py to allow stdeb proper debian style builds
sed "s/=determine_version()/='$(python3 setup.py --version)'/gi" -i setup.py
sed "s@('readme'@('share/doc/python3-acertmgr'@" -i setup.py
# Determine recommended dependencies for deb package
echo "recommends3=$(echo "python3-pkg-resources")" >> $GITHUB_OUTPUT
# Find optional dependencies to suggest in deb package
echo "suggests3=$(python3 -c "from setup import extra_requirements; print('\n'.join(['\n'.join(x) for x in extra_requirements.values()]))" | grep -v cryptography | sed 's/PyYAML/yaml/gi' | awk '{ printf("python3-%s, ",$1)};' | awk '{$1=$1; print}')" >> $GITHUB_OUTPUT
- name: Build debian package using setuptools and stdeb
run: |
# Create debianized source directory
python3 setup.py --command-packages=stdeb.command sdist_dsc --with-python2=False --with-python3=True --recommends3="${{ steps.stdebprep.outputs.recommends3 }}" --suggests3="${{ steps.stdebprep.outputs.suggests3 }}"
# Enter debianzied source directory (first sub-directory under deb_dist)
cd "$(find deb_dist -maxdepth 1 -type d | grep -v 'deb_dist$\|tmp_sdist_dsc' | head -n1)"
# Enforce GZIP compressed debian package info for older OS versions
echo -e 'override_dh_builddeb:\n\tdh_builddeb -- -Zgzip\n' >> debian/rules
# Build deb package
DEB_BUILD_OPTIONS=nocheck dpkg-buildpackage -rfakeroot -uc -us
- name: Create a changelog from git log since last non-pre/rc tag
run: |
# Determine current tag and last non-rc/pre tag
CTAG="$(git describe --tags --abbrev=0)"
LTAG="$(git tag | grep -B1 ${CTAG} | head -n1)"
while echo $LTAG | grep -q 'rc\|pre'; do
LTAG="$(git tag | grep -B1 ${LTAG} | head -n1)"
done
# Write changelog
echo "Changes since ${LTAG}:" > changelog.txt
git log --format=' * %s' ${LTAG}..${CTAG} >> changelog.txt
cat changelog.txt
- name: Collect files for artifact upload
run: |
mkdir -v artifacts
cp -v changelog.txt artifacts/
cp -v dist/*.tar.gz artifacts/
cp -v dist/*.whl artifacts/
cp -v deb_dist/*.deb artifacts/
- name: Upload build artifact
uses: actions/upload-artifact@v1
with:
name: ${{ format('acertmgr_build_{0}', steps.buildprep.outputs.version) }}
path: artifacts
- name: Create new GitHub release
uses: softprops/action-gh-release@v1
if: startsWith(github.ref, 'refs/tags/')
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
with:
name: ${{ steps.buildprep.outputs.version }}
draft: true
prerelease: ${{ contains(github.ref, 'rc') || contains(github.ref, 'pre') }}
body_path: changelog.txt
files: |
artifacts/*.tar.gz
artifacts/*.whl
artifacts/*.deb
- name: Check PyPI secrets
id: checksecrets
shell: bash
run: |
if [ "$USER" == "" -o "$PASSWORD" == "" ]; then
echo "secretspresent=NO" >> $GITHUB_OUTPUT
else
echo "secretspresent=YES" >> $GITHUB_OUTPUT
fi
env:
USER: ${{ secrets.PYPI_USERNAME }}
PASSWORD: ${{ secrets.PYPI_PASSWORD }}
- name: Create new PyPI release
if: steps.checksecrets.outputs.secretspresent == 'YES' && github.event_name == 'push' && startsWith(github.ref, 'refs/tags/') && (!(contains(github.ref, 'rc') || contains(github.ref, 'pre')))
uses: pypa/gh-action-pypi-publish@release/v1
with:
user: ${{ secrets.PYPI_USERNAME }}
password: ${{ secrets.PYPI_PASSWORD }}

View File

@ -12,32 +12,32 @@ Requirements
------------
* Python (2.7+ and 3.5+ should work)
* cryptography
* cryptography>=0.6
Optional packages
-----------------
Optional requirements (to use specified features)
------------------------------------------------------
* PyYAML (when using config files in YAML format)
* dnspython (required for the dns.nsupdate mode)
* idna (for automatically translating to the IDNA representation of unicode domain names)
* PyYAML: to parse YAML-formatted configuration files
* dnspython: used by dns.* challenge handlers
* idna: to allow automatic conversion of unicode domain names to their IDNA2008 counterparts
* cryptography>=2.1: for creating certificates with the OCSP must-staple flag (cert_must_staple)
* cryptography>=2.6: for usage of Ed25519/Ed448 keys
Initial Setup
-------------
Setup
-----
You should decide which challenge mode you want to use with acertmgr:
* webdir: In this mode, responses to challenges are put into a directory, to be served by an existing webserver
* standalone: In this mode, challenges are completed by acertmgr directly.
This starts a webserver to solve the challenges, which can be used standalone or together with an existing webserver that forwards request to a specified local port
* webdir/standalone: Make sure that the `webdir` directory exists in both cases (Note: the standalone webserver does not yet serve the files in situation)
* standalone: In this mode, challenges are completed by acertmgr directly. This starts a webserver to solve the challenges, which can be used standalone or together with an existing webserver that forwards request to a specified local port/address.
* dns.*: This mode puts the challenge into a TXT record for the domain (usually _acme-challenge.<domain>) where it will be parsed from by the authority
* dns.* (Alias mode): Can be used similar to the above but allows redirection of _acme-challenge.<domain> to any other (updatable domain) defined in dns_updatedomain via CNAME (e.g. _acme-challenge.example.net IN CNAME bla.foo.bar with config dns_updatedomain="bla.foo.bar" in config)
* dns.nsupdate: Updates the TXT record using RFC2136 (with dnspython)
* dns.* (Alias mode): Can be used similar to the above but allows redirection of _acme-challenge.<domain> to any other (updatable domain) defined in dns_updatedomain via CNAME (e.g. _acme-challenge.example.net IN CNAME bla.foo.bar with dns_updatedomain="bla.foo.bar" in domainconfig)
* dns.nsupdate: Updates the TXT record using RFC2136
You can optionally provide the private key files to be used with the ACME protocol (if you do not they will be automatically created):
* The account private key is (by default) expected at `/etc/acertmgr/account.key` (used to register an account with the authorities server)
* The domain private keys are (by default) expected at `/etc/acertmgr/{certificate-hash}.key`
* If you are missing these keys, they will be created for you or you can create them using e.g. `openssl genrsa 4096 > /etc/acertmgr/account.key`
* Do not forget to set proper permissions of the keys using `chmod 0400 /etc/acertmgr/*.key`
* The domain private keys are (by default) expected at `/etc/acertmgr/{cert_id}.key`
* If you are missing these keys, they will be created for you (using RSA with the configured key_length) or you can create them using e.g. `openssl genrsa 4096 > /etc/acertmgr/account.key`
* Do not forget to set proper permissions of the keys using `chmod 0400 /etc/acertmgr/*.key` if you created those manually
Finally, you need to setup the configuration files, as shown in the next section.
While testing, you can use the acme-staging authority instead, in order to avoid issuing too many certificates.
@ -47,49 +47,68 @@ Authorities (e.g. our default Let's Encrypt) will require you to accept their Te
Configuration
-------------
Unless specified with a commandline parameter (see acertmgr.py --help) the optional global configuration is read from '/etc/acertmgr/acertmgr.conf'.
Domains for which certificates should be obtained/renewed should be configured in `/etc/acertmgr/*.conf` (the global configuration is always excluded if it is in the same directory).
By default the directory containing the working data (work_dir) is located at `/etc/acertmgr/`.
Configuration examples are included in the `docs/` directory. All configuration files can use yaml (requires PyYAML) or json syntax. (Note: The JSON examples may be incomplete due to inability to express comments in JSON)
All configuration files can use yaml (requires PyYAML) or json syntax. *Examples can be found in the docs/ directory* (Note: The JSON examples may be incomplete due to inability to express comments in JSON)
Unless specified with a commandline parameter (see acertmgr.py --help) the optional global configuration is read from `/etc/acertmgr/acertmgr.conf`.
Domains for which certificates should be obtained/renewed are be configured in `/etc/acertmgr/*.conf` (the global configuration is always excluded if it is in the same directory).
By default the directory (work_dir) containing the working data (csr,certificate,key and ca files) is located at `/etc/acertmgr/`.
4 configuration contexts are known: *domainconfig (d) > globalconfig (g) > commandline (c) > built-in defaults*
The following the directives are currently known (subject to change, recommended context for usage written bold):
4 configuration contexts are known (*domainconfig (d) > globalconfig (g) > commandline (c) > built-in defaults*) with the following directives (subject to change, usual usage context written bold):
| Directive | Context | Description | Built-in Default |
| --- | --- | --- | --- |
| < domains >: | **d** | (domainconfig header) Domains to use in the cert request, will be MD5 hashed as cert_id | |
| --- | --- |----------------------------------------------------------------------------------------------------------------------------------------------| --- |
| -c/--config-file | **c** | global configuration file (optional) | /etc/acertmgr/acertmgr.conf |
| -d/--config-dir | **c** | directory containing domain configuration files (ending with .conf, globalconfig will be excluded automatically if in same directory) | /etc/acertmgr/*.conf |
| -w/--work-dir | **c** | working directory containing csr/certificates/keys/ca files | /etc/acertmgr |
| --force-renew | **c** | (or --renew-now) Immediately renew all certificates containing the given domain(s) | |
| --revoke | **c** | Revoke the certificate at the given path | |
| --revoke-reason | **c** | Provide a reason code for the revocation (see https://tools.ietf.org/html/rfc5280#section-5.3.1 for valid values) | |
| domain (san-domain...): | **d** | (domainconfig section start) Domains to use in the cert request. This value will be MD5-hashed as cert_id. | |
| api | d,**g** | Determines the API version used | v2 |
| authority | d,**g** | URL to the certificate authorities API | https://acme-v02.api.letsencrypt.org |
| authority_tos_agreement | d,**g**,**c** | Indicates agreement to the ToS of the certificate authority | |
| authority | d,**g** | URL to the certificate authorities ACME API root (without trailing /directory or similar) | https://acme-v02.api.letsencrypt.org |
| authority_tos_agreement | d,**g**,c | Indicates agreement to the ToS of the certificate authority (--authority-tos-agreement on command line) | |
| authority_contact_email | d,**g** | (v2 API only) Contact e-mail to be registered with your account key | |
| account_key | d,**g** | Path to the account key | {work_dir}/account.key |
| account_key_algorithm | d,**g** | Key-algorithm for newly generated account keys (RSA, EC, ED25519, ED448) | RSA |
| account_key_length | d,**g** | Key-length for newly generated RSA account keys (in bits) or EC curve (256=P-256, 384=P-384, 521=P-521) | depends on account_key_algorithm |
| ttl_days | d,**g** | Renew certificate if it has less than this value validity left | 30 |
| validate_ocsp | d,**g** | Renew certificate if it's OCSP status is REVOKED. Allowed values for this key are: false, sha1, sha224, sha256, sha384, sha512 | sha1 (as mandated by RFC5019) |
| cert_dir | d,**g** | Directory containing all certificate related data (crt,key,csr) | {work_dir} |
| key_algorithm | d,**g** | Key-algorithm for newly generated private keys (RSA, ECC, ED25519, ED448) | RSA |
| key_length | d,**g** | Key-length for newly generated RSA private keys (in bits) or EC curve (256=P-256, 384=P-384, 521=P-521) | depends on key_algorithm |
| csr_static | **d**,g | Whether to re-use a static CSR or generate a new dynamic CSR | false |
| csr_file | **d**,g | Path to store (and load) the certificate CSR file | {cert_dir}/{cert_id}.csr |
| ca_static | **d**,g | Whether to re-use a static CA or download a CA file | false |
| ca_file | **d**,g | Path to store (and load) the certificate authority file | {cert_dir}/{cert_id}.ca |
| cert_file | **d** | Path to store (and load) the certificate file | {cert_dir}/{cert_id}.crt |
| cert_revoke_superseded | **d**,g | Revoke the previous certificate with reason "superseded" after successful deployment | false |
| cert_must_staple | **d**,g | Generate a certificate (request) with the OCSP must-staple flag (will be honoured on the next newly generated CSR if using csr_static=true) | false |
| key_file | **d**,g | Path to store (and load) the private key file | {cert_dir}/{cert_id}.key |
| key_length | d,**g** | Key-length for newly generated private keys | 4096 |
| mode | **d**,g | Mode of challenge handling used | standalone |
| webdir | **d**,g | [webdir] Put acme challenges into this path | |
| webdir | **d**,g | [webdir] Put acme challenges into this path | /var/www/acme-challenge/ |
| http_verify | **d**,g | [webdir/standalone] Verify challenge before attempting authorization | true |
| bind_address | **d**,g | [standalone] Serve the challenge using a HTTP server on given IP | |
| port | **d**,g | [standalone] Serve the challenge using a HTTP server on this port | 80 |
| dns_ttl | **d**,g | [dns.*] Write TXT records with this TTL (also determines the update wait time at twice this value | 60 |
| dns_updatedomain | **d**,g | [dns.*] Write the TXT records to this domain (you have to create the necessary CNAME on the real challenge domain manually) | |
| nsupdate_server | **d**,g | [dns.nsupdate] DNS Server to delegate the update to | <determine from zone SOA> |
| dns_verify_interval | **d**,g | [dns.*] Do verification checks when starting the challenge every {dns_verify_interval} seconds | 10 |
| dns_verify_failtime | **d**,g | [dns.*] Fail challenge TXT record verification after {dns_verify_failtime} seconds | {dns_waittime} + 1 |
| dns_verify_waittime | **d**,g | [dns.*] Assume DNS challenges are valid after {dns_verify_waittime} | 2 * {dns_ttl} |
| dns_verify_all_ns | **d**,g | [dns.*] Verify DNS challenges by querying all known zone NS servers (resolved by zone master from SOA or dns_verify_server) | false |
| dns_verify_server | **d**,g | [dns.*] Verify DNS challenges by querying this DNS server unless 'dns_verify_all_ns' is enabled, then use to determine zone NS | |
| nsupdate_server | **d**,g | [dns.nsupdate] DNS Server to delegate the update to | {determine from zone SOA} |
| nsupdate_verify | **d**,g | [dns.nsupdate] Verify TXT record on the update server upon creation | true |
| nsupdate_keyfile | **d**,g | [dns.nsupdate] Bind-formatted TSIG key file to use for updates (may be used instead of nsupdate_key*) | |
| nsupdate_keyname | **d**,g | [dns.nsupdate] TSIG key name to use for updates | |
| nsupdate_keyvalue | **d**,g | [dns.nsupdate] TSIG key value to use for updates | |
| nsupdate_keyalgorithm | **d**,g | [dns.nsupdate] TSIG key algorithm to use for updates | |
| defaults: | **g** | Deployment action defaults | |
| nsupdate_keyalgorithm | **d**,g | [dns.nsupdate] TSIG key algorithm to use for updates | HMAC-MD5.SIG-ALG.REG.INT |
| defaults: | **g** | Default deployment action settings used by all domains | |
| path | **d** | (deployment) deploy certificate data to the given file | |
| user | **d**,g(defaults) | (deployment) change the user of the file deployed at path to this value | |
| group | **d**,g(defaults) | (deployment) change the group of the file deployed at path to this value | |
| perm | **d**,g(defaults) | (deployment) change the permissions of the file deployed at path to this value | |
| format | **d**,g(defaults) | (deployment) deploy one or more of the following data to the file at path: key,crt,ca | |
| action | **d**,g(defaults) | (deployment) run the following action after deployment is finished. This command will be run in a shell and therefore supports shell syntax. | |
| user | **d**,g(defaults) | (deployment) change the user of the file deployed at path to this value (optional, defaults to acertmgr current effective user) | |
| group | **d**,g(defaults) | (deployment) change the group of the file deployed at path to this value (optional,defaults to acertmgr current effective group) | |
| perm | **d**,g(defaults) | (deployment) change the permissions of the file deployed at path to this value (optional, CAUTION: uses system defaults for new files) | |
| action | **d**,g(defaults) | (deployment) run the following action after deployment is finished. This command will be run in a shell and supports it's syntax. (optional) | |
Security
--------
@ -98,3 +117,8 @@ Please keep the following in mind when using this software:
* DO read the source code, since it (usually) will be run as root
* Make sure that your configuration files are NOT writable by other users - arbitrary commands can be executed after updating certificates
* Try to run this program non-privileged if possible. This requires you to:
* Create a dedicated user for acertmgr (e.g. acertmgr)
* Run a acertmgr as that user (add acertmgr to that users cron!)
* Access rights to read/write all files configured with the created user
* Run any programs/scripts defined on cert update as the created user (might need work-arounds with sudo or wrapper scripts)

View File

@ -6,77 +6,56 @@
# Copyright (c) Rudolf Mayerhofer, 2019.
# available under the ISC license, see LICENSE
import grp
import importlib
import io
import os
import pwd
import stat
import subprocess
import sys
from acertmgr import configuration, tools
from acertmgr.authority import authority
from acertmgr.modes import challenge_handler
from acertmgr.tools import log, LOG_REPLACEMENTS
# @brief create a authority for the given configuration
# @param settings the authority configuration options
def create_authority(settings):
acc_file = settings['account_key']
if os.path.isfile(acc_file):
print("Reading account key from {}".format(acc_file))
acc_key = tools.read_pem_file(acc_file, key=True)
else:
print("Account key not found at '{0}'. Creating key.".format(acc_file))
acc_key = tools.new_account_key(acc_file)
authority_module = importlib.import_module("acertmgr.authority.{0}".format(settings["api"]))
authority_class = getattr(authority_module, "ACMEAuthority")
return authority_class(settings, acc_key)
# @brief create a challenge handler for the given configuration
# @param settings the domain's configuration options
def create_challenge_handler(settings):
if "mode" in settings:
mode = settings["mode"]
else:
mode = "standalone"
handler_module = importlib.import_module("acertmgr.modes.{0}".format(mode))
handler_class = getattr(handler_module, "ChallengeHandler")
return handler_class(settings)
try:
import pwd
import grp
except ImportError:
# Warnings will be reported upon usage below
pass
# @brief fetch new certificate from letsencrypt
# @param settings the domain's configuration options
def cert_get(settings):
print("Getting certificate for '%s'." % settings['domains'])
log("Getting certificate for %s" % settings['domainlist'])
acme = create_authority(settings)
acme = authority(settings['authority'])
acme.register_account()
# create challenge handlers for this certificate
challenge_handlers = dict()
for domain in settings['domainlist']:
# Create the challenge handler
challenge_handlers[domain] = create_challenge_handler(settings['handlers'][domain])
challenge_handlers[domain] = challenge_handler(settings['handlers'][domain])
# create ssl key
key_file = settings['key_file']
key_length = settings['key_length']
if os.path.isfile(key_file):
key = tools.read_pem_file(key_file, key=True)
else:
print("SSL key not found at '{0}'. Creating {1} bit key.".format(key_file, key_length))
key = tools.new_ssl_key(key_file, key_length)
log("SSL key not found at '{0}'. Creating key.".format(key_file))
key = tools.new_ssl_key(key_file, settings['key_algorithm'], settings['key_length'])
# create ssl csr
csr_file = settings['csr_file']
if os.path.isfile(csr_file) and str(settings['csr_static']).lower() == 'true':
print('Loading CSR from {}'.format(csr_file))
log('Loading CSR from {}'.format(csr_file))
cr = tools.read_pem_file(csr_file, csr=True)
else:
print('Generating CSR for {}'.format(settings['domainlist']))
cr = tools.new_cert_request(settings['domainlist'], key)
log('Generating CSR for {}'.format(settings['domainlist']))
must_staple = str(settings.get('cert_must_staple')).lower() == "true"
cr = tools.new_cert_request(settings['domainlist'], key, must_staple)
tools.write_pem_file(cr, csr_file)
# request cert with csr
@ -84,8 +63,11 @@ def cert_get(settings):
# if resulting certificate is valid: store in final location
if tools.is_cert_valid(crt, settings['ttl_days']):
log("Certificate '{}' renewed and valid until {}".format(tools.get_cert_cn(crt),
tools.get_cert_valid_until(crt)))
tools.write_pem_file(crt, settings['cert_file'], stat.S_IREAD)
if "static_ca" in settings and not settings['static_ca'] and ca is not None:
if (not str(settings.get('ca_static')).lower() == 'true' or not os.path.exists(settings['ca_file'])) \
and ca is not None:
tools.write_pem_file(ca, settings['ca_file'])
@ -93,81 +75,165 @@ def cert_get(settings):
# @param settings the domain's configuration options
# @return the action to be executed after the certificate update
def cert_put(settings):
# TODO error handling
ca_file = settings['ca_file']
crt_user = settings['user']
crt_group = settings['group']
crt_perm = settings['perm']
crt_path = settings['path']
crt_format = settings['format'].split(",")
crt_format = [str.strip(x) for x in crt_format]
crt_action = settings['action']
if 'path' not in settings:
raise ValueError('Deployment settings are missing required element: path')
if 'format' not in settings:
raise ValueError('Deployment settings are missing required element: format')
key_file = settings['key_file']
crt_final = settings['cert_file']
with io.open(crt_path, "w+") as crt_fd:
for fmt in crt_format:
with io.open(settings['path'], "w+") as crt_fd:
for fmt in [str.strip(x) for x in settings['format'].split(",")]:
if fmt == "crt":
src_fd = io.open(crt_final, "r")
crt_fd.write(src_fd.read())
src_fd.close()
if fmt == "key":
src_fd = io.open(key_file, "r")
crt_fd.write(src_fd.read())
src_fd.close()
if fmt == "ca":
if not os.path.isfile(ca_file):
raise FileNotFoundError("The CA certificate file (%s) is missing!" % ca_file)
src_fd = io.open(ca_file, "r")
crt_fd.write(src_fd.read())
src_fd.close()
with io.open(settings['cert_file'], "r") as src_fd:
crt_fd.write(src_fd.read())
elif fmt == "key":
with io.open(settings['key_file'], "r") as src_fd:
crt_fd.write(src_fd.read())
elif fmt == "ca":
with io.open(settings['ca_file'], "r") as src_fd:
crt_fd.write(src_fd.read())
else:
# TODO error handling
pass
log("Ignored unknown deployment format key: {}".format(fmt), warning=True)
# set owner and permissions
uid = pwd.getpwnam(crt_user).pw_uid
gid = grp.getgrnam(crt_group).gr_gid
try:
os.chown(crt_path, uid, gid)
except OSError:
print('Warning: Could not set certificate file ownership!')
try:
os.chmod(crt_path, int(crt_perm, 8))
except OSError:
print('Warning: Could not set certificate file permissions!')
# set owner and group
if 'user' in settings or 'group' in settings:
if 'pwd' in sys.modules and 'grp' in sys.modules and hasattr(os, 'chown') and hasattr(os, 'geteuid') and \
hasattr(os, 'getegid'):
try:
uid = pwd.getpwnam(settings['user']).pw_uid if 'user' in settings else os.geteuid()
gid = grp.getgrnam(settings['group']).gr_gid if 'group' in settings else os.getegid()
os.chown(settings['path'], uid, gid)
except OSError as e:
log('Could not set certificate file ownership', e, warning=True)
else:
log('File user and group handling unavailable on this platform', warning=True)
# set permissions
if 'perm' in settings:
if hasattr(os, 'chmod'):
try:
os.chmod(settings['path'], int(settings['perm'], 8))
except OSError as e:
log('Could not set certificate file permissions', e, warning=True)
else:
log('File permission handling unavailable on this platform', warning=True)
return crt_action
return settings['action']
def cert_revoke(cert, configs, fallback_authority, reason=None):
domains = set(tools.get_cert_domains(cert))
acmeconfig = None
for config in configs:
if domains == set(config['domainlist']):
acmeconfig = config['authority']
break
if not acmeconfig:
acmeconfig = fallback_authority
log("No matching authority found to revoke {}: {}, using globalconfig/defaults".format(tools.get_cert_cn(cert),
tools.get_cert_domains(
cert)), warning=True)
acme = authority(acmeconfig)
acme.register_account()
acme.revoke_crt(cert, reason)
def main():
# load config
configs = configuration.load()
# post-update actions (run only once)
actions = set()
# check certificate validity and obtain/renew certificates if needed
for config in configs:
cert_file = config['cert_file']
cert_file_exists = os.path.isfile(cert_file)
if cert_file_exists:
cert = tools.read_pem_file(cert_file)
if not cert_file_exists or not tools.is_cert_valid(cert, config['ttl_days']):
cert_get(config)
for cfg in config['actions']:
if not tools.target_is_current(cfg['path'], cert_file):
print("Updating '{}' due to newer version".format(cfg['path']))
actions.add(cert_put(cfg))
# run post-update actions
for action in actions:
if action is not None:
runtimeconfig, domainconfigs = configuration.load()
# register idna-mapped domains as LOG_REPLACEMENTS for better readability of log output
for domainconfig in domainconfigs:
LOG_REPLACEMENTS.update({k: "{} [{}]".format(k, v) for k, v in domainconfig['domainlist_idna_mapped'].items()})
# Start processing
if runtimeconfig.get('mode') == 'revoke':
# Mode: revoke certificate
log("Revoking {}".format(runtimeconfig['revoke']))
cert_revoke(tools.read_pem_file(runtimeconfig['revoke']),
domainconfigs,
runtimeconfig['fallback_authority'],
runtimeconfig['revoke_reason'])
else:
# Mode: issue certificates (implicit)
# post-update actions (run only once)
actions = set()
superseded = set()
exceptions = list()
# check certificate validity and obtain/renew certificates if needed
for config in domainconfigs:
try:
# Run actions in a shell environment (to allow shell syntax) as stated in the configuration
output = subprocess.check_output(action, shell=True, stderr=subprocess.STDOUT)
print("Executed '{}' successfully: {}".format(action, output))
except subprocess.CalledProcessError as e:
print("Execution of '{}' failed with error '{}': {}".format(e.cmd, e.returncode, e.output))
cert = None
if os.path.isfile(config['cert_file']):
cert = tools.read_pem_file(config['cert_file'])
validate_ocsp = str(config.get('validate_ocsp')).lower() != 'false'
if validate_ocsp and cert and os.path.isfile(config['ca_file']):
try:
issuer = tools.read_pem_file(config['ca_file'])
except Exception as e1:
log("Failed to retrieve issuer from ca file: {}. Trying to download...".format(e1))
try:
issuer = tools.download_issuer_ca(cert)
except Exception as e2:
log("Failed to download issuer for cert file: {}. Cannot validate OCSP.".format(e2))
validate_ocsp = False
if not cert or ('force_renew' in runtimeconfig and all(
d in config['domainlist'] for d in runtimeconfig['force_renew'])) \
or not tools.is_cert_valid(cert, config['ttl_days']) \
or (validate_ocsp and not tools.is_ocsp_valid(cert, issuer, config['validate_ocsp'])):
cert_get(config)
if str(config.get('cert_revoke_superseded')).lower() == 'true' and cert:
superseded.add(cert)
except Exception as e:
log("Certificate issue/renew failed", e, error=True)
exceptions.append(e)
# deploy new certificates after all are renewed
deployment_success = True
for config in domainconfigs:
for cfg in config['actions']:
try:
if not tools.target_is_current(cfg['path'], config['cert_file']):
actions.add(cert_put(cfg))
log("Updated '{}' due to newer version".format(cfg['path']))
except Exception as e:
log("Certificate deployment to {} failed".format(cfg['path']), e, error=True)
exceptions.append(e)
deployment_success = False
# run post-update actions
for action in actions:
if action is not None:
try:
# Run actions in a shell environment (to allow shell syntax) as stated in the configuration
output = subprocess.check_output(action, shell=True, stderr=subprocess.STDOUT)
logmsg = "Action succeeded: {}".format(action)
if len(output) > 0:
if getattr(output, 'decode', None):
# Decode function available? Use it to get a proper str
output = output.decode('utf-8')
logmsg += os.linesep + tools.indent(output, 18) # 18 = len("Action succeeded: ")
log(logmsg)
except subprocess.CalledProcessError as e:
output = e.output
logmsg = "Action failed: ({}) {}".format(e.returncode, e.cmd)
if len(output) > 0:
if getattr(output, 'decode', None):
# Decode function available? Use it to get a proper str
output = output.decode('utf-8')
logmsg += os.linesep + tools.indent(output, 15) # 15 = len("Action failed: ")
log(logmsg, error=True)
exceptions.append(e)
deployment_success = False
# revoke old certificates as superseded
if deployment_success:
for superseded_cert in superseded:
try:
log("Revoking '{}' valid until {} as superseded".format(
tools.get_cert_cn(superseded_cert),
tools.get_cert_valid_until(superseded_cert)))
cert_revoke(superseded_cert, domainconfigs, runtimeconfig['fallback_authority'], reason=4)
except Exception as e:
log("Certificate supersede revoke failed", e, error=True)
exceptions.append(e)
# throw a RuntimeError with all exceptions caught while working if there were any
if len(exceptions) > 0:
raise RuntimeError("{} exception(s) occurred during processing".format(len(exceptions)))

View File

@ -0,0 +1,37 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# authority - authority api package
# Copyright (c) Rudolf Mayerhofer, 2019.
# available under the ISC license, see LICENSE
import importlib
import json
import os
from acertmgr import tools
from acertmgr.tools import log
authorities = dict()
# @brief find or create a suitable authority for the given settings
# @param settings the authority configuration options
def authority(settings):
key = json.dumps(settings, sort_keys=True)
if key in authorities:
return authorities[key]
else:
acc_file = settings['account_key']
if os.path.isfile(acc_file):
log("Reading account key from {}".format(acc_file))
acc_key = tools.read_pem_file(acc_file, key=True)
else:
log("Account key not found at '{0}'. Creating key.".format(acc_file))
acc_key = tools.new_account_key(acc_file, settings['account_key_algorithm'], settings['account_key_length'])
authority_module = importlib.import_module("acertmgr.authority.{0}".format(settings["api"]))
authority_class = getattr(authority_module, "ACMEAuthority")
authority_obj = authority_class(settings, acc_key)
authorities[key] = authority_obj
return authority_obj

View File

@ -16,19 +16,19 @@ class ACMEAuthority:
self.config = config
# @brief register an account over ACME
# @param account_key the account key to register
# @param CA the certificate authority to register with
# @return True if new account was registered, False otherwise
def register_account(self):
raise NotImplementedError
# @brief function to fetch certificate using ACME
# @param account_key the account key in pyopenssl format
# @param csr the certificate signing request in pyopenssl format
# @param domains list of domains in the certificate, first is CN
# @param challenge_handlers a dict containing challenge for all given domains
# @param CA which signing CA to use
# @return the certificate in pyopenssl format
# @note algorithm and parts of the code are from acme-tiny
# @return the certificate
def get_crt_from_csr(self, csr, domains, challenge_handlers):
raise NotImplementedError
# @brief function to revoke a certificate using ACME
# @param crt certificate to revoke
# @param reason (int) optional certificate revoke reason (see https://tools.ietf.org/html/rfc5280#section-5.3.1)
def revoke_crt(self, crt, reason=None):
raise NotImplementedError

View File

@ -7,13 +7,13 @@
# available under the ISC license, see LICENSE
import copy
import datetime
import json
import re
import time
from acertmgr import tools
from acertmgr.authority.acme import ACMEAuthority as AbstractACMEAuthority
from acertmgr.tools import log
class ACMEAuthority(AbstractACMEAuthority):
@ -21,7 +21,10 @@ class ACMEAuthority(AbstractACMEAuthority):
# @param config Configuration data
# @param key Account key data
def __init__(self, config, key):
log('You currently use ACMEv1 which is deprecated, consider using ACMEv2 (RFC8555) if at all possible.',
warning=True)
AbstractACMEAuthority.__init__(self, config, key)
self.registered_account = False
self.ca = config['authority']
self.agreement = config['authority_tos_agreement']
@ -29,14 +32,10 @@ class ACMEAuthority(AbstractACMEAuthority):
# @param key the account key
# @return the header for ACME
def _prepare_header(self):
numbers = self.key.public_key().public_numbers()
alg, jwk = tools.get_key_alg_and_jwk(self.key)
header = {
"alg": "RS256",
"jwk": {
"e": tools.bytes_to_base64url(tools.number_to_byte_format(numbers.e)),
"kty": "RSA",
"n": tools.bytes_to_base64url(tools.number_to_byte_format(numbers.n)),
},
"alg": alg,
"jwk": jwk,
}
return header
@ -64,16 +63,22 @@ class ACMEAuthority(AbstractACMEAuthority):
# @brief register an account over ACME
# @return True if new account was registered, False otherwise
def register_account(self):
if self.registered_account:
# We already have registered with this authority, just return
return
header = self._prepare_header()
code, result = self._send_signed(self.ca + "/acme/new-reg", header, {
"resource": "new-reg",
"agreement": self.agreement,
})
if code == 201:
print("Registered!")
log("Registered!")
self.registered_account = True
return True
elif code == 409:
print("Already registered!")
log("Already registered!")
self.registered_account = True
return False
else:
raise ValueError("Error registering: {0} {1}".format(code, result))
@ -91,11 +96,11 @@ class ACMEAuthority(AbstractACMEAuthority):
challenges = dict()
tokens = dict()
valid_times = list()
authdomains = list()
# verify each domain
try:
for domain in domains:
print("Verifying {0}...".format(domain))
log("Verifying {0}...".format(domain))
# get new challenge
code, result = self._send_signed(self.ca + "/acme/new-authz", header, {
@ -106,26 +111,26 @@ class ACMEAuthority(AbstractACMEAuthority):
raise ValueError("Error requesting challenges: {0} {1}".format(code, result))
# create the challenge
challenges[domain] = [c for c in json.loads(result.decode('utf8'))['challenges'] if
authz = json.loads(result.decode('utf8'))
if authz.get('status', 'no-status-found') == 'valid':
log("{} has already been verified".format(domain))
continue
challenges[domain] = [c for c in authz['challenges'] if
c['type'] == challenge_handlers[domain].get_challenge_type()][0]
tokens[domain] = re.sub(r"[^A-Za-z0-9_\-]", "_", challenges[domain]['token'])
if domain not in challenge_handlers:
raise ValueError("No challenge handler given for domain: {0}".format(domain))
valid_times.append(
challenge_handlers[domain].create_challenge(domain, account_thumbprint, tokens[domain]))
challenge_handlers[domain].create_challenge(domain, account_thumbprint, tokens[domain])
authdomains.append(domain)
print("Waiting until challenges are valid ({})".format(",".join([str(x) for x in valid_times])))
for valid_time in valid_times:
while datetime.datetime.now() < valid_time:
time.sleep(1)
for domain in domains:
challenge_handlers[domain].start_challenge()
# after all challenges are created, start processing authorizations
for domain in authdomains:
try:
print("Starting key authorization")
challenge_handlers[domain].start_challenge(domain, account_thumbprint, tokens[domain])
# notify challenge are met
log("Starting key authorization")
keyauthorization = "{0}.{1}".format(tokens[domain], account_thumbprint)
code, result = self._send_signed(challenges[domain]['uri'], header, {
"resource": "challenge",
@ -145,13 +150,13 @@ class ACMEAuthority(AbstractACMEAuthority):
if challenge_status['status'] == "pending":
time.sleep(2)
elif challenge_status['status'] == "valid":
print("{0} verified!".format(domain))
log("{0} verified!".format(domain))
break
else:
raise ValueError("{0} challenge did not pass: {1}".format(
domain, challenge_status))
finally:
challenge_handlers[domain].stop_challenge()
challenge_handlers[domain].stop_challenge(domain, account_thumbprint, tokens[domain])
finally:
# Destroy challenge handlers in reverse order to replay
# any saved state information in the handlers correctly
@ -159,18 +164,33 @@ class ACMEAuthority(AbstractACMEAuthority):
try:
challenge_handlers[domain].destroy_challenge(domain, account_thumbprint, tokens[domain])
except Exception as e:
print('Challenge destruction failed: {}'.format(e))
log('Challenge destruction failed: {}'.format(e), error=True)
# get the new certificate
print("Signing certificate...")
log("Signing certificate...")
code, result = self._send_signed(self.ca + "/acme/new-cert", header, {
"resource": "new-cert",
"csr": tools.bytes_to_base64url(tools.convert_csr_to_der_bytes(csr)),
"csr": tools.bytes_to_base64url(tools.convert_cert_to_der_bytes(csr)),
})
if code != 201:
raise ValueError("Error signing certificate: {0} {1}".format(code, result))
# return signed certificate!
print("Certificate signed!")
log("Certificate signed!")
cert = tools.convert_der_bytes_to_cert(result)
return cert, tools.download_issuer_ca(cert)
# @brief function to revoke a certificate using ACME
# @param crt certificate to revoke
# @param reason (int) optional certificate revoke reason (see https://tools.ietf.org/html/rfc5280#section-5.3.1)
def revoke_crt(self, crt, reason=None):
header = self._prepare_header()
payload = {"resource": "revoke-cert",
"certificate": tools.bytes_to_base64url(tools.convert_cert_to_der_bytes(crt))}
if reason:
payload['reason'] = int(reason)
code, result = self._send_signed(self.ca + "/acme/revoke-cert", header, payload)
if code < 400:
log("Revocation successful")
else:
raise ValueError("Revocation failed: {}".format(result))

View File

@ -1,18 +1,21 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# acertmgr - acme api v2 functions
# acertmgr - acme api v2 functions (implements RFC8555)
# Copyright (c) Rudolf Mayerhofer, 2019.
# available under the ISC license, see LICENSE
import copy
import datetime
import json
import re
import time
from acertmgr import tools
from acertmgr.authority.acme import ACMEAuthority as AbstractACMEAuthority
from acertmgr.tools import log
# Maximum age for nonce values (Boulder invalidates them after some time, so we use a low value of 2 minutes here)
MAX_NONCE_AGE = 120
class ACMEAuthority(AbstractACMEAuthority):
@ -40,20 +43,17 @@ class ACMEAuthority(AbstractACMEAuthority):
"newAccount": "{}/acme/new-acct".format(self.ca),
"newNonce": "{}/acme/new-nonce".format(self.ca),
"newOrder": "{}/acme/new-order".format(self.ca),
"revokeCert": "{}/acme/revoke-cert".format(self.ca),
}
print("API directory retrieval failed ({}). Guessed necessary values: {}".format(code, self.directory))
self._request_endpoint('newNonce') # cache the first nonce
log("API directory retrieval failed ({}). Guessed necessary values: {}".format(code, self.directory),
warning=True)
self.nonce = None
self.nonce_time = 0
# @todo: Add support for key-types other than RSA
numbers = key.public_key().public_numbers()
self.algorithm = "RS256"
self.algorithm, jwk = tools.get_key_alg_and_jwk(key)
self.account_protected = {
"alg": self.algorithm,
"jwk": {
"kty": "RSA",
"e": tools.bytes_to_base64url(tools.number_to_byte_format(numbers.e)),
"n": tools.bytes_to_base64url(tools.number_to_byte_format(numbers.n)),
},
"jwk": jwk
}
self.account_id = None # will be updated to correct value during account registration
@ -61,39 +61,52 @@ class ACMEAuthority(AbstractACMEAuthority):
def _request_url(self, url, data=None, raw_result=False):
header = {'Content-Type': 'application/jose+json'}
if data:
# Always encode data to bytes
data = data.encode('utf-8')
resp = tools.get_url(url, data, header)
try:
resp = tools.get_url(url, data, header)
except IOError as e:
body = getattr(e, "read", e.__str__)()
if getattr(body, 'decode', None):
# Decode function available? Use it to get a proper str
body = body.decode('utf-8')
return getattr(e, "code", 999), body, {}
# Store next Replay-Nonce if it is in the header
if 'Replay-Nonce' in resp.headers:
self.nonce = resp.headers['Replay-Nonce']
if raw_result:
return resp.getcode(), resp.read(), resp.headers
self.nonce_time = time.time()
body = resp.read()
if len(body) > 0:
if getattr(body, 'decode', None):
# Decode function available? Use it to get a proper str
body = body.decode('utf-8')
if not raw_result and len(body) > 0:
try:
body = json.loads(body.decode('utf-8'))
body = json.loads(body)
except json.JSONDecodeError as e:
raise ValueError('Could not parse non-raw result (expected JSON)', e)
return resp.getcode(), body, resp.headers
# @brief helper function to make signed requests
# @brief fetch an url with a signed request
def _request_acme_url(self, url, payload=None, protected=None, raw_result=False):
if not payload:
payload = {}
if not protected:
protected = {}
payload64 = tools.bytes_to_base64url(json.dumps(payload).encode('utf8'))
if payload:
payload64 = tools.bytes_to_base64url(json.dumps(payload).encode('utf8'))
else:
payload64 = "" # for POST-as-GET
# Request a new nonce if there is none in cache
if not self.nonce:
self._request_endpoint('newNonce')
if not self.nonce or time.time() > self.nonce_time + MAX_NONCE_AGE:
self._request_url(self.directory['newNonce'])
# Set request nonce to current cache value
protected["nonce"] = self.nonce
# Reset nonce cache as we are using it's current value
self.nonce = None
protected["url"] = url
if self.algorithm:
protected["alg"] = self.algorithm
@ -106,17 +119,7 @@ class ACMEAuthority(AbstractACMEAuthority):
"payload": payload64,
"signature": tools.bytes_to_base64url(out),
})
try:
return self._request_url(url, data, raw_result)
except IOError as e:
return getattr(e, "code", None), getattr(e, "read", e.__str__)(), {}
finally:
# Dispose of nonce after it was used
self.nonce = None
# @brief send a request to authority
def _request_endpoint(self, request, data=None, raw_result=False):
return self._request_url(self.directory[request], data, raw_result)
return self._request_url(url, data, raw_result)
# @brief send a signed request to authority
def _request_acme_endpoint(self, request, payload=None, protected=None, raw_result=False):
@ -124,6 +127,10 @@ class ACMEAuthority(AbstractACMEAuthority):
# @brief register an account over ACME
def register_account(self):
if self.account_id:
# We already have registered with this authority, just return
return
protected = copy.deepcopy(self.account_protected)
payload = {
"termsOfServiceAgreed": self.tos_agreed,
@ -135,8 +142,8 @@ class ACMEAuthority(AbstractACMEAuthority):
if code < 400 and result['status'] == 'valid':
self.account_id = headers['Location']
if 'meta' in self.directory and 'termsOfService' in self.directory['meta']:
print("ToS at {} have been accepted.".format(self.directory['meta']['termsOfService']))
print("Account registered and valid.".format())
log("ToS at {} have been accepted.".format(self.directory['meta']['termsOfService']))
log("Account registered and valid on {}.".format(self.ca))
else:
raise ValueError("Error registering account: {0} {1}".format(code, result))
@ -150,7 +157,7 @@ class ACMEAuthority(AbstractACMEAuthority):
account_thumbprint = tools.bytes_to_base64url(
tools.hash_of_str(json.dumps(self.account_protected['jwk'], sort_keys=True, separators=(',', ':'))))
print("Ordering certificate for {}".format(domains))
log("Ordering certificate for {}".format(domains))
identifiers = [{'type': 'dns', 'value': domain} for domain in domains]
code, order, headers = self._request_acme_endpoint('newOrder', {'identifiers': identifiers})
if code >= 400:
@ -160,44 +167,46 @@ class ACMEAuthority(AbstractACMEAuthority):
authorizations = list()
# verify each domain
try:
valid_times = list()
for authorizationUrl in order['authorizations']:
# get new challenge
code, authorization, _ = self._request_url(authorizationUrl)
code, authorization, _ = self._request_acme_url(authorizationUrl)
if code >= 400:
raise ValueError("Error requesting authorization: {0} {1}".format(code, authorization))
authorization['_domain'] = "*.{}".format(authorization['identifier']['value']) if \
'wildcard' in authorization and authorization['wildcard'] else authorization['identifier']['value']
print("Authorizing {0}".format(authorization['_domain']))
# create the challenge
matching_challenges = [c for c in authorization['challenges'] if
c['type'] == challenge_handlers[authorization['_domain']].get_challenge_type()]
if len(matching_challenges) == 0:
raise ValueError("Error no challenge matching {0} found: {1}".format(
challenge_handlers[authorization['_domain']].get_challenge_type(), authorization))
authorization['_challenge'] = matching_challenges[0]
authorization['_token'] = re.sub(r"[^A-Za-z0-9_\-]", "_", authorization['_challenge']['token'])
if authorization.get('status', 'no-status-found') == 'valid':
log("{} has already been authorized".format(authorization['_domain']))
continue
if authorization['_domain'] not in challenge_handlers:
raise ValueError("No challenge handler given for domain: {0}".format(authorization['_domain']))
log("Authorizing {0}".format(authorization['_domain']))
valid_times.append(
challenge_handlers[authorization['_domain']].create_challenge(authorization['identifier']['value'],
account_thumbprint,
authorization['_token']))
# create the challenge
ctype = challenge_handlers[authorization['_domain']].get_challenge_type()
matching_challenges = [c for c in authorization['challenges'] if c['type'] == ctype]
if len(matching_challenges) == 0:
raise ValueError("Error no challenge matching {0} found: {1}".format(ctype, authorization))
authorization['_challenge'] = matching_challenges[0]
if authorization['_challenge'].get('status', 'no-status-found') == 'valid':
log("{} has already been authorized using {}".format(authorization['_domain'], ctype))
continue
authorization['_token'] = re.sub(r"[^A-Za-z0-9_\-]", "_", authorization['_challenge']['token'])
challenge_handlers[authorization['_domain']].create_challenge(authorization['identifier']['value'],
account_thumbprint,
authorization['_token'])
authorizations.append(authorization)
print("Waiting until challenges are valid ({})".format(",".join([str(x) for x in valid_times])))
for valid_time in valid_times:
while datetime.datetime.now() < valid_time:
time.sleep(1)
# after all challenges are created, start processing authorizations
for authorization in authorizations:
print("Starting verification of {}".format(authorization['_domain']))
challenge_handlers[authorization['_domain']].start_challenge()
try:
log("Starting verification of {}".format(authorization['_domain']))
challenge_handlers[authorization['_domain']].start_challenge(authorization['identifier']['value'],
account_thumbprint,
authorization['_token'])
# notify challenge is met
code, challenge_status, _ = self._request_acme_url(authorization['_challenge']['url'], {
"keyAuthorization": "{0}.{1}".format(authorization['_token'], account_thumbprint),
@ -205,15 +214,17 @@ class ACMEAuthority(AbstractACMEAuthority):
# wait for challenge to be verified
while code < 400 and challenge_status.get('status') == "pending":
time.sleep(5)
code, challenge_status, _ = self._request_url(authorization['_challenge']['url'])
code, challenge_status, _ = self._request_acme_url(authorization['_challenge']['url'])
if challenge_status.get('status') == "valid":
print("{0} verified".format(authorization['_domain']))
if code < 400 and challenge_status.get('status') == "valid":
log("{0} verified".format(authorization['_domain']))
else:
raise ValueError("{0} challenge did not pass: {1}".format(
authorization['_domain'], challenge_status))
raise ValueError("{0} challenge did not pass ({1}): {2}".format(
authorization['_domain'], code, challenge_status))
finally:
challenge_handlers[authorization['_domain']].stop_challenge()
challenge_handlers[authorization['_domain']].stop_challenge(authorization['identifier']['value'],
account_thumbprint,
authorization['_token'])
finally:
# Destroy challenge handlers in reverse order to replay
# any saved state information in the handlers correctly
@ -222,36 +233,36 @@ class ACMEAuthority(AbstractACMEAuthority):
challenge_handlers[authorization['_domain']].destroy_challenge(
authorization['identifier']['value'], account_thumbprint, authorization['_token'])
except Exception as e:
print('Challenge destruction failed: {}'.format(e))
log('Challenge destruction failed: {}'.format(e), error=True)
# check order status and retry once
code, order, _ = self._request_url(order_url)
code, order, _ = self._request_acme_url(order_url)
if code < 400 and order.get('status') == 'pending':
time.sleep(5)
code, order, _ = self._request_url(order_url)
code, order, _ = self._request_acme_url(order_url)
if code >= 400:
raise ValueError("Order is still not ready to be finalized: {0} {1}".format(code, order))
# get the new certificate
print("Finalizing certificate")
log("Finalizing certificate")
code, finalize, _ = self._request_acme_url(order['finalize'], {
"csr": tools.bytes_to_base64url(tools.convert_csr_to_der_bytes(csr)),
"csr": tools.bytes_to_base64url(tools.convert_cert_to_der_bytes(csr)),
})
while code < 400 and (finalize.get('status') == 'pending' or finalize.get('status') == 'processing'):
time.sleep(5)
code, finalize, _ = self._request_url(order_url)
code, finalize, _ = self._request_acme_url(order_url)
if code >= 400:
raise ValueError("Error finalizing certificate: {0} {1}".format(code, finalize))
print("Certificate ready!")
log("Certificate ready!")
# return certificate
code, certificate, _ = self._request_url(finalize['certificate'], raw_result=True)
code, certificate, _ = self._request_acme_url(finalize['certificate'], raw_result=True)
if code >= 400:
raise ValueError("Error downloading certificate chain: {0} {1}".format(code, certificate))
cert_dict = re.match((r'(?P<cert>-----BEGIN CERTIFICATE-----[^\-]+-----END CERTIFICATE-----)\n\n'
r'(?P<ca>-----BEGIN CERTIFICATE-----[^\-]+-----END CERTIFICATE-----)?'),
certificate.decode('utf-8'), re.DOTALL).groupdict()
cert_dict = re.match((r'(?P<cert>^-----BEGIN CERTIFICATE-----\n[^\-]+\n-----END CERTIFICATE-----)\n*'
r'(?P<ca>-----BEGIN CERTIFICATE-----\n.+\n-----END CERTIFICATE-----)?$'),
certificate, re.DOTALL).groupdict()
cert = tools.convert_pem_str_to_cert(cert_dict['cert'])
if cert_dict['ca'] is None:
ca = tools.download_issuer_ca(cert)
@ -259,3 +270,16 @@ class ACMEAuthority(AbstractACMEAuthority):
ca = tools.convert_pem_str_to_cert(cert_dict['ca'])
return cert, ca
# @brief function to revoke a certificate using ACME
# @param crt certificate to revoke
# @param reason (int) optional certificate revoke reason (see https://tools.ietf.org/html/rfc5280#section-5.3.1)
def revoke_crt(self, crt, reason=None):
payload = {'certificate': tools.bytes_to_base64url(tools.convert_cert_to_der_bytes(crt))}
if reason:
payload['reason'] = int(reason)
code, result, _ = self._request_acme_endpoint("revokeCert", payload)
if code < 400:
log("Revocation successful")
else:
raise ValueError("Revocation failed: {}".format(result))

View File

@ -13,21 +13,15 @@ import io
import json
import os
# Backward compatiblity for older versions/installations of acertmgr
LEGACY_WORK_DIR = "/etc/acme"
LEGACY_CONF_FILE = os.path.join(LEGACY_WORK_DIR, "acme.conf")
LEGACY_CONF_DIR = os.path.join(LEGACY_WORK_DIR, "domains.d")
from acertmgr.tools import idna_convert
# Configuration defaults to use if not specified otherwise
DEFAULT_CONF_FILE = "/etc/acertmgr/acertmgr.conf"
DEFAULT_CONF_DIR = "/etc/acertmgr"
DEFAULT_KEY_LENGTH = 4096 # bits
DEFAULT_CONF_FILENAME = "acertmgr.conf"
DEFAULT_TTL = 30 # days
DEFAULT_VALIDATE_OCSP = "sha1" # mandated by RFC5019
DEFAULT_API = "v2"
DEFAULT_AUTHORITY = "https://acme-v02.api.letsencrypt.org"
LEGACY_API = "v1"
LEGACY_AUTHORITY = "https://acme-v01.api.letsencrypt.org"
LEGACY_AUTHORITY_TOS_AGREEMENT = "true"
# @brief augment configuration with defaults
@ -49,107 +43,114 @@ def complete_action_config(domainconfig, config):
# @brief update config[name] with value from localconfig>globalconfig>default
def update_config_value(config, name, localconfig, globalconfig, default):
values = [x for x in localconfig if name in x]
values = [x[name] for x in localconfig if name in x]
if len(values) > 0:
config[name] = values[0][name]
config[name] = values[0]
else:
config[name] = globalconfig.get(name, default)
# @brief parse authority from config
def parse_authority(localconfig, globalconfig, runtimeconfig):
authority = {}
# - API version
update_config_value(authority, 'api', localconfig, globalconfig, DEFAULT_API)
# - Certificate authority
update_config_value(authority, 'authority', localconfig, globalconfig, DEFAULT_AUTHORITY)
# - Certificate authority ToS agreement
update_config_value(authority, 'authority_tos_agreement', localconfig, globalconfig,
runtimeconfig['authority_tos_agreement'])
# - Certificate authority contact email addresses
update_config_value(authority, 'authority_contact_email', localconfig, globalconfig, None)
# - Account key path
update_config_value(authority, 'account_key', localconfig, globalconfig,
os.path.join(runtimeconfig['work_dir'], "account.key"))
# - Account key algorithm (if key has to be (re-)generated)
update_config_value(authority, 'account_key_algorithm', localconfig, globalconfig, None)
# - Account key length (if key has to be (re-)generated, converted to int)
update_config_value(authority, 'account_key_length', localconfig, globalconfig, None)
authority['account_key_length'] = int(authority['account_key_length']) if authority['account_key_length'] else None
return authority
# @brief load the configuration from a file
def parse_config_entry(entry, globalconfig, work_dir, authority_tos_agreement):
def parse_config_entry(entry, globalconfig, runtimeconfig):
config = dict()
# Basic domain information
config['domains'], localconfig = entry
config['domainlist'] = config['domains'].split(' ')
config['id'] = hashlib.md5(config['domains'].encode('utf-8')).hexdigest()
domains, localconfig = entry
config['domainlist'] = domains.split(' ')
config['id'] = hashlib.md5(domains.encode('utf-8')).hexdigest()
# Append IDNA domains to the domainlist and domains
if any(ord(c) >= 128 for c in config['domains']):
try:
import idna
domainlist = []
config['domaintranslation'] = {}
for domain in config['domainlist']:
if any(ord(c) >= 128 for c in domain):
# Translate IDNA domain name from a unicode domain (handle wildcards separately)
if domain.startswith('*.'):
idna_domain = "*.{}".format(idna.encode(domain[2:]).decode('utf-8'))
else:
idna_domain = idna.encode(domain).decode('utf-8')
domainlist.append(idna_domain)
config['domaintranslation'][idna_domain] = domain
else:
domainlist.append(domain)
# Refresh the domainlist and domains config value
config['domainlist'] = domainlist
config['domains'] = ' '.join(domainlist)
except ImportError:
print("Unicode domain found but IDNA names could not be translated due to missing idna module")
# Convert unicode to IDNA domains
config['domainlist_idna_mapped'] = {}
for idx in range(0, len(config['domainlist'])):
if any(ord(c) >= 128 for c in config['domainlist'][idx]):
domain_human = config['domainlist'][idx]
domain_idna = idna_convert(domain_human)
if domain_idna != domain_human:
config['domainlist'][idx] = domain_idna # Update domain with idna counterpart
config['domainlist_idna_mapped'][domain_idna] = domain_human # Store original domain for reference
# Action config defaults
config['defaults'] = globalconfig.get('defaults', {})
# API version
update_config_value(config, 'api', localconfig, globalconfig, DEFAULT_API)
# Certificate authority
update_config_value(config, 'authority', localconfig, globalconfig, DEFAULT_AUTHORITY)
# Certificate authority ToS agreement
update_config_value(config, 'authority_tos_agreement', localconfig, globalconfig, authority_tos_agreement)
# Certificate authority contact email addresses
update_config_value(config, 'authority_contact_email', localconfig, globalconfig, None)
# Account key
update_config_value(config, 'account_key', localconfig, globalconfig, os.path.join(work_dir, "account.key"))
# Authority related config options
config['authority'] = parse_authority(localconfig, globalconfig, runtimeconfig)
# Certificate directory
update_config_value(config, 'cert_dir', localconfig, globalconfig, work_dir)
update_config_value(config, 'cert_dir', localconfig, globalconfig, runtimeconfig['work_dir'])
# TTL days
update_config_value(config, 'ttl_days', localconfig, globalconfig, DEFAULT_TTL)
config['ttl_days'] = int(config['ttl_days'])
# Validate OCSP on certificate verification
update_config_value(config, 'validate_ocsp', localconfig, globalconfig, DEFAULT_VALIDATE_OCSP)
# Revoke old certificate with reason superseded after renewal
update_config_value(config, 'cert_revoke_superseded', localconfig, globalconfig, "false")
# Whether to include request for OCSP must-staple in the certificate
update_config_value(config, 'cert_must_staple', localconfig, globalconfig, "false")
# Use a static cert request
update_config_value(config, 'csr_static', localconfig, globalconfig, "false")
# SSL key algorithm (if key has to be (re-)generated)
update_config_value(config, 'key_algorithm', localconfig, globalconfig, None)
# Update config id if we have a key algorithm set to allow for
# multiple certs with different algorithms for the same set of domains
if config.get('key_algorithm', None):
config['id'] += "_" + config['key_algorithm'].lower()
# SSL key length (if key has to be (re-)generated, converted to int)
update_config_value(config, 'key_length', localconfig, globalconfig, None)
config['key_length'] = int(config['key_length']) if config['key_length'] else None
# SSL cert request location
update_config_value(config, 'csr_file', localconfig, globalconfig,
os.path.join(config['cert_dir'], "{}.csr".format(config['id'])))
# SSL cert location (with compatibility to older versions)
if 'server_cert' in globalconfig:
print("WARNING: Legacy configuration directive 'server_cert' used. Support will be removed in 1.0")
update_config_value(config, 'cert_file', localconfig, globalconfig,
globalconfig.get('server_cert',
os.path.join(config['cert_dir'], "{}.crt".format(config['id']))))
os.path.join(config['cert_dir'], "{}.crt".format(config['id'])))
# SSL key location (with compatibility to older versions)
if 'server_key' in globalconfig:
print("WARNING: Legacy configuration directive 'server_key' used. Support will be removed in 1.0")
update_config_value(config, 'key_file', localconfig, globalconfig,
globalconfig.get('server_key',
os.path.join(config['cert_dir'], "{}.key".format(config['id']))))
os.path.join(config['cert_dir'], "{}.key".format(config['id'])))
# SSL key length (if key has to be (re-)generated, converted to int)
update_config_value(config, 'key_length', localconfig, globalconfig, DEFAULT_KEY_LENGTH)
config['key_length'] = int(config['key_length'])
# SSL CA location
ca_files = [x for x in entry if 'ca_file' in x]
if len(ca_files) > 0:
config['static_ca'] = True
config['ca_file'] = ca_files[0]
elif 'server_ca' in globalconfig:
print("WARNING: Legacy configuration directive 'server_ca' used. Support will be removed in 1.0")
config['static_ca'] = True
config['ca_file'] = globalconfig['server_ca']
else:
config['static_ca'] = False
config['ca_file'] = os.path.join(config['cert_dir'], "{}.ca".format(config['id']))
# SSL CA location / use static
update_config_value(config, 'ca_file', localconfig, globalconfig,
os.path.join(config['cert_dir'], "{}.ca".format(config['id'])))
update_config_value(config, 'ca_static', localconfig, globalconfig, "false")
# Domain action configuration
config['actions'] = list()
@ -159,6 +160,7 @@ def parse_config_entry(entry, globalconfig, work_dir, authority_tos_agreement):
# Domain challenge handler configuration
config['handlers'] = dict()
handlerconfigs = [x for x in localconfig if 'mode' in x]
_domaintranslation_dict = {x: y for x, y in config.get('domaintranslation', [])}
for domain in config['domainlist']:
# Use global config as base handler config
cfg = copy.deepcopy(globalconfig)
@ -169,8 +171,8 @@ def parse_config_entry(entry, globalconfig, work_dir, authority_tos_agreement):
cfg.update(genericfgs[0])
# Update handler config with more specific values (use original names for translated unicode domains)
_domain = config.get('domaintranslation', {}).get(domain, domain)
specificcfgs = [x for x in handlerconfigs if 'domain' in x and x['domain'] == _domain]
specificcfgs = [x for x in handlerconfigs if
'domain' in x and x['domain'] == config['domainlist_idna_mapped'].get(domain, domain)]
if len(specificcfgs) > 0:
cfg.update(specificcfgs[0])
@ -181,54 +183,67 @@ def parse_config_entry(entry, globalconfig, work_dir, authority_tos_agreement):
# @brief load the configuration from a file
def load():
runtimeconfig = dict()
parser = argparse.ArgumentParser(description="acertmgr - Automated Certificate Manager using ACME/Let's Encrypt")
parser.add_argument("-c", "--config-file", nargs="?",
help="global configuration file (default='{}')".format(DEFAULT_CONF_FILE))
help="global configuration file (default='$config_dir/{}')".format(DEFAULT_CONF_FILENAME))
parser.add_argument("-d", "--config-dir", nargs="?",
help="domain configuration directory (default='{}')".format(DEFAULT_CONF_DIR))
parser.add_argument("-w", "--work-dir", nargs="?",
help="persistent work data directory (default=config_dir)")
help="persistent work data directory (default='$config_dir')")
parser.add_argument("--authority-tos-agreement", "--tos-agreement", "--tos", nargs="?",
help="Agree to the authorities Terms of Service (value required depends on authority)")
parser.add_argument("--force-renew", "--renew-now", nargs="?",
help="Renew all domain configurations matching the given value immediately")
parser.add_argument("--revoke", nargs="?",
help="Revoke a certificate file issued with the currently configured account key.")
parser.add_argument("--revoke-reason", nargs="?", type=int,
help="Provide a revoke reason, see https://tools.ietf.org/html/rfc5280#section-5.3.1")
args = parser.parse_args()
# Determine global configuration file
if args.config_file:
global_config_file = args.config_file
elif os.path.isfile(LEGACY_CONF_FILE):
print("WARNING: Legacy config file '{}' used. Move to '{}' for 1.0".format(LEGACY_CONF_FILE, DEFAULT_CONF_FILE))
global_config_file = LEGACY_CONF_FILE
else:
global_config_file = DEFAULT_CONF_FILE
# Determine domain configuration directory
if args.config_dir:
domain_config_dir = args.config_dir
elif os.path.isdir(LEGACY_CONF_DIR):
print("WARNING: Legacy config dir '{}' used. Move to '{}' for 1.0".format(LEGACY_CONF_DIR, DEFAULT_CONF_DIR))
domain_config_dir = LEGACY_CONF_DIR
else:
domain_config_dir = DEFAULT_CONF_DIR
# Determine work directory...
# Determine global configuration file
if args.config_file:
global_config_file = args.config_file
else:
global_config_file = os.path.join(domain_config_dir, DEFAULT_CONF_FILENAME)
# Runtime configuration: Get from command-line options
# - work_dir
if args.work_dir:
work_dir = args.work_dir
elif os.path.isdir(LEGACY_WORK_DIR) and domain_config_dir == LEGACY_CONF_DIR:
work_dir = LEGACY_WORK_DIR
runtimeconfig['work_dir'] = args.work_dir
else:
# .. or use the domain configuration directory otherwise
work_dir = domain_config_dir
runtimeconfig['work_dir'] = domain_config_dir
# create work_dir if it does not exist yet
if not os.path.isdir(runtimeconfig['work_dir']):
os.mkdir(runtimeconfig['work_dir'], int("0700", 8))
# Determine authority agreement
# - authority_tos_agreement
if args.authority_tos_agreement:
authority_tos_agreement = args.authority_tos_agreement
elif global_config_file == LEGACY_CONF_FILE:
# Old global config file assumes ToS are agreed
authority_tos_agreement = LEGACY_AUTHORITY_TOS_AGREEMENT
runtimeconfig['authority_tos_agreement'] = args.authority_tos_agreement
else:
authority_tos_agreement = None
runtimeconfig['authority_tos_agreement'] = None
# load global configuration
# - force-rewew
if args.force_renew:
domaintranslation = [idna_convert(d) for d in args.force_renew.split(' ')]
if len(domaintranslation) > 0:
runtimeconfig['force_renew'] = domaintranslation
else:
runtimeconfig['force_renew'] = args.force_renew.split(' ')
# - revoke
if args.revoke:
runtimeconfig['mode'] = 'revoke'
runtimeconfig['revoke'] = args.revoke
runtimeconfig['revoke_reason'] = args.revoke_reason
# Global configuration: Load from file
globalconfig = dict()
if os.path.isfile(global_config_file):
with io.open(global_config_file) as config_fd:
@ -238,18 +253,9 @@ def load():
import yaml
config_fd.seek(0)
globalconfig = yaml.safe_load(config_fd)
if global_config_file == LEGACY_CONF_FILE:
if 'api' not in globalconfig:
globalconfig['api'] = LEGACY_API
if 'authority' not in globalconfig:
globalconfig['authority'] = LEGACY_AUTHORITY
# create work directory if it does not exist
if not os.path.isdir(work_dir):
os.mkdir(work_dir, int("0700", 8))
# load domain configuration
config = list()
# Domain configuration(s): Load from file(s)
domainconfigs = list()
if os.path.isdir(domain_config_dir):
for domain_config_file in os.listdir(domain_config_dir):
domain_config_file = os.path.join(domain_config_dir, domain_config_file)
@ -258,12 +264,29 @@ def load():
os.path.abspath(domain_config_file) != os.path.abspath(global_config_file):
with io.open(domain_config_file) as config_fd:
try:
for entry in json.load(config_fd).items():
config.append(parse_config_entry(entry, globalconfig, work_dir, authority_tos_agreement))
data = json.load(config_fd)
except ValueError:
import yaml
config_fd.seek(0)
for entry in yaml.safe_load(config_fd).items():
config.append(parse_config_entry(entry, globalconfig, work_dir, authority_tos_agreement))
data = yaml.safe_load(config_fd)
if isinstance(data, list):
# Handle newer config in list format (allows for multiple entries with same domains)
entries = list()
for element in data:
entries += element.items()
else:
# Handle older config format with just one entry per same domain set
entries = data.items()
for entry in entries:
domainconfigs.append(parse_config_entry(entry, globalconfig, runtimeconfig))
return config
# Define a fallback authority from global configuration / defaults
runtimeconfig['fallback_authority'] = parse_authority([], globalconfig, runtimeconfig)
return runtimeconfig, domainconfigs
if __name__ == '__main__':
# Simple configuration load test and output
from pprint import pprint
pprint(load())

View File

@ -0,0 +1,30 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# modes - challenge handler modes package
# Copyright (c) Rudolf Mayerhofer, 2019.
# available under the ISC license, see LICENSE
import importlib
import json
challenge_handlers = dict()
# @brief find or create a challenge handler for the given settings
# @param settings the domain's configuration options
def challenge_handler(settings):
key = json.dumps(settings, sort_keys=True)
if key in challenge_handlers:
return challenge_handlers[key]
else:
if "mode" in settings:
mode = settings["mode"]
else:
mode = "standalone"
handler_module = importlib.import_module("acertmgr.modes.{0}".format(mode))
handler_class = getattr(handler_module, "ChallengeHandler")
handler_obj = handler_class(settings)
challenge_handlers[key] = handler_obj
return handler_obj

View File

@ -14,7 +14,6 @@ class AbstractChallengeHandler:
def get_challenge_type():
raise NotImplementedError
# @return datetime after which the challenge is valid
def create_challenge(self, domain, thumbprint, token):
raise NotImplementedError
@ -22,9 +21,9 @@ class AbstractChallengeHandler:
raise NotImplementedError
# Optional: Indicate when a challenge request is imminent
def start_challenge(self):
def start_challenge(self, domain, thumbprint, token):
pass
# Optional: Indicate when a challenge response has been received
def stop_challenge(self):
def stop_challenge(self, domain, thumbprint, token):
pass

View File

@ -4,6 +4,12 @@
# Copyright (c) Rudolf Mayerhofer, 2018-2019
# available under the ISC license, see LICENSE
import ipaddress
import re
import socket
import time
from datetime import datetime, timedelta
import dns
import dns.query
import dns.resolver
@ -12,9 +18,125 @@ import dns.update
from acertmgr import tools
from acertmgr.modes.abstract import AbstractChallengeHandler
from acertmgr.tools import log
QUERY_TIMEOUT = 60 # seconds are the maximum for any query (otherwise the DNS server will be considered dead)
REGEX_IP4 = r'^(?:[0-9]{1,3}\.){3}[0-9]{1,3}$'
REGEX_IP6 = r'^(([0-9a-fA-F]{1,4}:){7,7}[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,7}' \
r':|([0-9a-fA-F]{1,4}:){1,6}:[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,5}' \
r'(:[0-9a-fA-F]{1,4}){1,2}|([0-9a-fA-F]{1,4}:){1,4}(:[0-9a-fA-F]{1,4}){1,3}' \
r'|([0-9a-fA-F]{1,4}:){1,3}(:[0-9a-fA-F]{1,4}){1,4}|([0-9a-fA-F]{1,4}:){1,2}' \
r'(:[0-9a-fA-F]{1,4}){1,5}|[0-9a-fA-F]{1,4}:((:[0-9a-fA-F]{1,4}){1,6})' \
r'|:((:[0-9a-fA-F]{1,4}){1,7}|:)|fe80:(:[0-9a-fA-F]{0,4}){0,4}%[0-9a-zA-Z]{1,}' \
r'|::(ffff(:0{1,4}){0,1}:){0,1}((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}' \
r'(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])|([0-9a-fA-F]{1,4}:){1,4}' \
r':((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9]))$'
_lookup_ip_cache = {}
_lookup_ns_ip_cache = {}
_lookup_zone_cache = {}
class DNSChallengeHandler(AbstractChallengeHandler):
@staticmethod
def _lookup_ip(domain_or_ip):
if domain_or_ip in _lookup_ip_cache:
return _lookup_ip_cache[domain_or_ip]
try:
if re.search(REGEX_IP4, domain_or_ip.strip()) or re.search(REGEX_IP6, domain_or_ip.strip()):
return str(ipaddress.ip_address(domain_or_ip))
except ValueError:
pass
# No valid ip found so far, try to resolve using system resolver
result = socket.getaddrinfo(domain_or_ip, 53)
if len(result) > 0:
retval = result[0][4][0]
_lookup_ip_cache[domain_or_ip] = retval
return retval
else:
raise ValueError("Could not lookup dns ip for {}".format(domain_or_ip))
@staticmethod
def _lookup_ns_ip(domain, nameserver=None):
zone, zonemaster = DNSChallengeHandler._lookup_zone(domain, nameserver)
cache_key = "{}${}".format(zone, zonemaster)
if cache_key in _lookup_ns_ip_cache:
return _lookup_ns_ip_cache[cache_key]
if not nameserver:
nameserver = DNSChallengeHandler._lookup_ip(zonemaster)
request = dns.message.make_query(zone, dns.rdatatype.NS)
response = dns.query.udp(request, nameserver, timeout=QUERY_TIMEOUT)
retval = set()
if response.rcode() == dns.rcode.NOERROR:
for answer in response.answer:
for item in answer:
if item.rdtype == dns.rdatatype.NS:
retval.add(DNSChallengeHandler._lookup_ip(item.to_text()))
_lookup_ns_ip_cache[cache_key] = retval
return retval
@staticmethod
def _lookup_zone(domain, nameserver=None):
cache_key = "{}${}".format(domain, nameserver)
if cache_key in _lookup_zone_cache:
return _lookup_zone_cache[cache_key]
if nameserver:
nameservers = [nameserver]
else:
nameservers = dns.resolver.get_default_resolver().nameservers
domain = dns.name.from_text(domain)
if not domain.is_absolute():
domain = domain.concatenate(dns.name.root)
while domain.parent() != dns.name.root:
request = dns.message.make_query(domain, dns.rdatatype.SOA)
for nameserver in nameservers:
try:
response = dns.query.udp(request, nameserver, timeout=QUERY_TIMEOUT)
if response.rcode() == dns.rcode.NOERROR:
for answer in response.answer:
for item in answer:
if item.rdtype == dns.rdatatype.SOA:
zone = domain.to_text()
authoritative_ns = item.mname.to_text().split(' ')[0]
retval = zone, authoritative_ns
_lookup_zone_cache[cache_key] = retval
return retval
else:
break
except dns.exception.Timeout:
# Go to next nameserver on timeout
continue
except dns.exception.DNSException:
# Break loop on any other error
break
domain = domain.parent()
raise ValueError('No zone SOA for "{0}"'.format(domain))
@staticmethod
def _check_txt_record_value(domain, txtvalue, nameserverip, use_tcp=False):
try:
request = dns.message.make_query(domain, dns.rdatatype.TXT)
if use_tcp:
response = dns.query.tcp(request, nameserverip, timeout=QUERY_TIMEOUT)
else:
response = dns.query.udp(request, nameserverip, timeout=QUERY_TIMEOUT)
for rrset in response.answer:
for answer in rrset:
if answer.to_text().strip('"') == txtvalue:
return True
except dns.exception.DNSException:
# Ignore DNS errors and return failure
return False
@staticmethod
def _determine_txtvalue(thumbprint, token):
return tools.bytes_to_base64url(tools.hash_of_str("{0}.{1}".format(token, thumbprint)))
@staticmethod
def get_challenge_type():
return "dns-01"
@ -23,6 +145,13 @@ class DNSChallengeHandler(AbstractChallengeHandler):
AbstractChallengeHandler.__init__(self, config)
self.dns_updatedomain = config.get("dns_updatedomain")
self.dns_ttl = int(config.get("dns_ttl", 60))
self.dns_verify_waittime = int(config.get("dns_verify_waittime", 2 * self.dns_ttl))
self.dns_verify_failtime = int(config.get("dns_verify_failtime", self.dns_verify_waittime + 1))
self.dns_verify_interval = int(config.get("dns_verify_interval", 10))
self.dns_verify_all_ns = str(config.get("dns_verify_all_ns")).lower() == "true"
self.dns_verify_server = config.get("dns_verify_server")
self._valid_times = {}
def _determine_challenge_domain(self, domain):
if self.dns_updatedomain:
@ -36,14 +165,11 @@ class DNSChallengeHandler(AbstractChallengeHandler):
return domain.to_text()
@staticmethod
def _determine_txtvalue(thumbprint, token):
return tools.bytes_to_base64url(tools.hash_of_str("{0}.{1}".format(token, thumbprint)))
def create_challenge(self, domain, thumbprint, token):
domain = self._determine_challenge_domain(domain)
txtvalue = self._determine_txtvalue(thumbprint, token)
return self.add_dns_record(domain, txtvalue)
self.add_dns_record(domain, txtvalue)
self._valid_times[domain] = datetime.now() + timedelta(seconds=self.dns_verify_waittime)
def add_dns_record(self, domain, txtvalue):
raise NotImplementedError
@ -51,7 +177,55 @@ class DNSChallengeHandler(AbstractChallengeHandler):
def destroy_challenge(self, domain, thumbprint, token):
domain = self._determine_challenge_domain(domain)
txtvalue = self._determine_txtvalue(thumbprint, token)
return self.remove_dns_record(domain, txtvalue)
self.remove_dns_record(domain, txtvalue)
def remove_dns_record(self, domain, txtvalue):
raise NotImplementedError
def start_challenge(self, domain, thumbprint, token):
domain = self._determine_challenge_domain(domain)
txtvalue = self._determine_txtvalue(thumbprint, token)
failtime = datetime.now() + timedelta(seconds=self.dns_verify_failtime)
if self.verify_dns_record(domain, txtvalue):
return
else:
log("Waiting until TXT record '{}' is ready".format(domain))
while failtime > datetime.now():
time.sleep(self.dns_verify_interval)
if self.verify_dns_record(domain, txtvalue):
return
raise ValueError("DNS challenge is not ready after waiting {} seconds".format(self.dns_verify_waittime))
def verify_dns_record(self, domain, txtvalue):
if self.dns_verify_all_ns:
try:
nameserverip = None
if self.dns_verify_server:
# Use the specific dns server to determine NS for domain, will otherwise default to SOA master
nameserverip = self._lookup_ip(self.dns_verify_server)
ns_ip = self._lookup_ns_ip(domain, nameserverip)
if len(ns_ip) > 0 and all(self._check_txt_record_value(domain, txtvalue, ip) for ip in ns_ip):
# All NS servers have the necessary TXT record. Succeed immediately!
log("All NS ({}) for '{}' have the correct TXT record".format(','.join(ns_ip), domain))
return True
except (ValueError, dns.exception.DNSException):
# Fall back to next verification
pass
if self.dns_verify_server and not self.dns_verify_all_ns:
try:
# Verify using specific dns server
nameserverip = self._lookup_ip(self.dns_verify_server)
if self._check_txt_record_value(domain, txtvalue, nameserverip):
# Verify server confirms the necessary TXT record. Succeed immediately!
log("DNS server '{}' found correct TXT record for '{}'".format(self.dns_verify_server, domain))
return True
except (ValueError, dns.exception.DNSException):
# Fall back to next verification
pass
if domain not in self._valid_times:
# No valid wait time for domain. Verification fails!
return False
# Verification fails or succeeds based on valid wait time set by add_dns_record
return datetime.now() >= self._valid_times[domain]

View File

@ -4,31 +4,17 @@
# dns.nsupdate - rfc2136 based challenge handler
# Copyright (c) Rudolf Mayerhofer, 2019
# available under the ISC license, see LICENSE
import datetime
import io
import ipaddress
import re
import socket
import time
import dns
import dns.query
import dns.resolver
import dns.tsigkeyring
import dns.update
from acertmgr.modes.dns.abstract import DNSChallengeHandler
from acertmgr.modes.dns.abstract import DNSChallengeHandler, QUERY_TIMEOUT
from acertmgr.tools import log
REGEX_IP4 = r'^(?:[0-9]{1,3}\.){3}[0-9]{1,3}$'
REGEX_IP6 = r'^(([0-9a-fA-F]{1,4}:){7,7}[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,7}' \
r':|([0-9a-fA-F]{1,4}:){1,6}:[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,5}' \
r'(:[0-9a-fA-F]{1,4}){1,2}|([0-9a-fA-F]{1,4}:){1,4}(:[0-9a-fA-F]{1,4}){1,3}' \
r'|([0-9a-fA-F]{1,4}:){1,3}(:[0-9a-fA-F]{1,4}){1,4}|([0-9a-fA-F]{1,4}:){1,2}' \
r'(:[0-9a-fA-F]{1,4}){1,5}|[0-9a-fA-F]{1,4}:((:[0-9a-fA-F]{1,4}){1,6})' \
r'|:((:[0-9a-fA-F]{1,4}){1,7}|:)|fe80:(:[0-9a-fA-F]{0,4}){0,4}%[0-9a-zA-Z]{1,}' \
r'|::(ffff(:0{1,4}){0,1}:){0,1}((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}' \
r'(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])|([0-9a-fA-F]{1,4}:){1,4}' \
r':((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9]))$'
DEFAULT_KEY_ALGORITHM = "HMAC-MD5.SIG-ALG.REG.INT"
@ -44,12 +30,9 @@ class ChallengeHandler(DNSChallengeHandler):
algorithm = re.search(r"algorithm ([a-zA-Z0-9_-]+?);", key_data, re.DOTALL).group(1)
tsig_secret = re.search(r"secret \"(.*?)\"", key_data, re.DOTALL).group(1)
except IOError as exc:
print(exc)
raise Exception(
"A problem was encountered opening your keyfile, %s." % tsig_key_file)
raise ValueError("A problem was encountered opening your keyfile '{}': {}".format(tsig_key_file, exc))
except AttributeError as exc:
print(exc)
raise Exception("Unable to decipher the keyname and secret from your keyfile.")
raise ValueError("Unable to decipher data from your keyfile: {}".format(exc))
keyring = dns.tsigkeyring.from_text({
key_name: tsig_secret
@ -60,59 +43,6 @@ class ChallengeHandler(DNSChallengeHandler):
return keyring, algorithm
@staticmethod
def _lookup_dns_server(domain_or_ip):
try:
if re.search(REGEX_IP4, domain_or_ip.strip()) or re.search(REGEX_IP6, domain_or_ip.strip()):
return str(ipaddress.ip_address(domain_or_ip))
except ValueError:
pass
# No valid ip found so far, try to resolve
result = socket.getaddrinfo(domain_or_ip, 53)
if len(result) > 0:
return result[0][4][0]
else:
raise ValueError("Could not lookup dns ip for {}".format(domain_or_ip))
@staticmethod
def _get_soa(domain, nameserver=None):
if nameserver:
nameservers = [nameserver]
else:
nameservers = dns.resolver.get_default_resolver().nameservers
domain = dns.name.from_text(domain)
if not domain.is_absolute():
domain = domain.concatenate(dns.name.root)
while domain.parent() != dns.name.root:
request = dns.message.make_query(domain, dns.rdatatype.SOA)
for nameserver in nameservers:
try:
response = dns.query.udp(request, nameserver)
if response.rcode() == dns.rcode.NOERROR:
for answer in response.answer:
for item in answer:
if item.rdtype == dns.rdatatype.SOA:
zone = domain.to_text()
authoritative_ns = item.mname.to_text().split(' ')[0]
return zone, authoritative_ns
else:
break
except dns.exception.Timeout:
# Go to next nameserver on timeout
continue
except dns.exception.DNSException:
# Break loop on any other error
break
domain = domain.parent()
raise Exception('Could not find Zone SOA for "{0}"'.format(domain))
@staticmethod
def get_challenge_type():
return "dns-01"
def __init__(self, config):
DNSChallengeHandler.__init__(self, config)
if 'nsupdate_keyfile' in config:
@ -123,57 +53,43 @@ class ChallengeHandler(DNSChallengeHandler):
config.get("nsupdate_keyname"): config.get("nsupdate_keyvalue")
})
self.keyalgorithm = config.get("nsupdate_keyalgorithm", DEFAULT_KEY_ALGORITHM)
self.dns_server = config.get("nsupdate_server")
self.dns_verify = config.get("nsupdate_verify", "true") == "true"
self.nsupdate_server = config.get("nsupdate_server")
self.nsupdate_verify = config.get("nsupdate_verify", "true") == "true"
self.nsupdate_verified = False
def _determine_zone_and_nameserverip(self, domain):
nameserver = self.dns_server
nameserver = self.nsupdate_server
if nameserver:
nameserverip = self._lookup_dns_server(nameserver)
zone, _ = self._get_soa(domain, nameserverip)
nameserverip = self._lookup_ip(nameserver)
zone, _ = self._lookup_zone(domain, nameserverip)
else:
zone, nameserver = self._get_soa(domain)
nameserverip = self._lookup_dns_server(nameserver)
zone, nameserver = self._lookup_zone(domain)
nameserverip = self._lookup_ip(nameserver)
return zone, nameserverip
def add_dns_record(self, domain, txtvalue):
zone, nameserverip = self._determine_zone_and_nameserverip(domain)
update = dns.update.Update(zone, keyring=self.keyring, keyalgorithm=self.keyalgorithm)
update.add(domain, self.dns_ttl, dns.rdatatype.TXT, txtvalue)
print('Adding \'{} {} IN TXT "{}"\' to {}'.format(domain, self.dns_ttl, txtvalue, nameserverip))
dns.query.tcp(update, nameserverip)
verified = False
retry = 0
while self.dns_verify and not verified and retry < 5:
request = dns.message.make_query(domain, dns.rdatatype.TXT)
response = dns.query.tcp(request, nameserverip)
for rrset in response.answer:
for answer in rrset:
if answer.to_text().strip('"') == txtvalue:
verified = True
print('Verified \'{} {} IN TXT "{}"\' on {}'.format(domain,
self.dns_ttl,
txtvalue,
nameserverip))
break
if not verified:
time.sleep(1)
retry += 1
if not self.dns_verify or verified:
# Return a valid time at twice the given TTL (to allow DNS to propagate)
return datetime.datetime.now() + datetime.timedelta(seconds=2 * self.dns_ttl)
else:
raise ValueError('Failed to verify \'{} {} IN TXT "{}"\' on {}'.format(domain,
self.dns_ttl,
txtvalue,
nameserverip))
log('Adding \'{} {} IN TXT "{}"\' to {}'.format(domain, self.dns_ttl, txtvalue, nameserverip))
dns.query.tcp(update, nameserverip, timeout=QUERY_TIMEOUT)
def remove_dns_record(self, domain, txtvalue):
zone, nameserverip = self._determine_zone_and_nameserverip(domain)
update = dns.update.Update(zone, keyring=self.keyring, keyalgorithm=self.keyalgorithm)
update.delete(domain, dns.rdata.from_text(dns.rdataclass.IN, dns.rdatatype.TXT, txtvalue))
print('Deleting \'{} 60 IN TXT "{}"\' from {}'.format(domain, txtvalue, nameserverip))
dns.query.tcp(update, nameserverip)
log('Deleting \'{} {} IN TXT "{}"\' from {}'.format(domain, self.dns_ttl, txtvalue, nameserverip))
dns.query.tcp(update, nameserverip, timeout=QUERY_TIMEOUT)
def verify_dns_record(self, domain, txtvalue):
if self.nsupdate_verify and not self.dns_verify_all_ns and not self.nsupdate_verified:
# Verify master DNS only if we don't do a full NS check and it has not yet been verified
_, nameserverip = self._determine_zone_and_nameserverip(domain)
if self._check_txt_record_value(domain, txtvalue, nameserverip, use_tcp=True):
log('Verified \'{} {} IN TXT "{}"\' on {}'.format(domain, self.dns_ttl, txtvalue, nameserverip))
self.nsupdate_verified = True
else:
# Master DNS verification failed. Return immediately and try again.
return False
return DNSChallengeHandler.verify_dns_record(self, domain, txtvalue)

View File

@ -7,77 +7,79 @@
# available under the ISC license, see LICENSE
try:
from SimpleHTTPServer import SimpleHTTPRequestHandler
from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler
except ImportError:
from http.server import SimpleHTTPRequestHandler
from http.server import HTTPServer, BaseHTTPRequestHandler
try:
from SocketServer import TCPServer as HTTPServer
except ImportError:
from http.server import HTTPServer
import os
import re
import socket
import threading
from acertmgr.modes.webdir import ChallengeHandler as WebChallengeHandler
# @brief custom request handler for ACME challenges
# @note current working directory is temporarily changed by the script before
# the webserver starts, which allows using SimpleHTTPRequestHandler
class ACMERequestHandler(SimpleHTTPRequestHandler):
# @brief remove directories from GET URL
# @details the current working directory contains the challenge files,
# there is no need for creating subdirectories for the path
# that ACME expects.
# Additionally, this allows redirecting the ACME path to this
# webserver without having to know which subdirectory is
# redirected, which simplifies integration with existing
# webservers.
def translate_path(self, path):
spath = path.split('/')
if spath[0] != '':
raise ValueError("spath should be '' is {}".format(spath[0]))
spath = spath[1:]
if spath[0] == '.well-known':
spath = spath[1:]
if spath[0] == 'acme-challenge':
spath = spath[1:]
if len(spath) != 1:
raise ValueError("spath length {} != 1".format(len(spath)))
spath.insert(0, '')
path = '/'.join(spath)
return SimpleHTTPRequestHandler.translate_path(self, path)
# @brief start the standalone webserver
# @param server the HTTPServer object
# @note this function is used to be passed to threading.Thread
def start_standalone(server):
server.serve_forever()
from acertmgr.modes.webdir import HTTPChallengeHandler
from acertmgr.tools import log
HTTPServer.allow_reuse_address = True
class ChallengeHandler(WebChallengeHandler):
class HTTPServer6(HTTPServer):
address_family = socket.AF_INET6
class ChallengeHandler(HTTPChallengeHandler):
def __init__(self, config):
WebChallengeHandler.__init__(self, config)
self._verify_challenge = False
self.current_directory = os.getcwd()
if "port" in config:
port = int(config["port"])
else:
port = 80
HTTPChallengeHandler.__init__(self, config)
self.bind_address = config.get("bind_address", "")
self.port = int(config.get("port", 80))
self.challenges = {} # Initialize the challenge data dict
self.server_thread = None
self.server = HTTPServer(("", port), ACMERequestHandler)
self.server = None
def start_challenge(self):
self.server_thread = threading.Thread(target=start_standalone, args=(self.server,))
os.chdir(self.challenge_directory)
def create_challenge(self, domain, thumbprint, token):
self.challenges[token] = "{0}.{1}".format(token, thumbprint)
def destroy_challenge(self, domain, thumbprint, token):
del self.challenges[token]
def start_challenge(self, domain, thumbprint, token):
_self = self
# Custom HTTP request handler
class _HTTPRequestHandler(BaseHTTPRequestHandler):
def log_message(self, fmt, *args):
log("Request from '%s': %s" % (self.address_string(), fmt % args))
def do_GET(self):
# Match token on http://<domain>/.well-known/acme-challenge/<token>
match = re.match(r'.*/(?P<token>[^/]*)$', self.path)
if match and match.group('token') in _self.challenges:
value = _self.challenges[match.group('token')].encode('utf-8')
rcode = 200
else:
value = "404 - NOT FOUND".encode('utf-8')
rcode = 404
self.send_response(rcode)
self.send_header('Content-type', 'text/plain')
self.send_header('Content-length', len(value))
self.end_headers()
self.wfile.write(value)
try:
self.server = HTTPServer6((self.bind_address, self.port), _HTTPRequestHandler)
except socket.gaierror:
self.server = HTTPServer((self.bind_address, self.port), _HTTPRequestHandler)
def _serve():
self.server.serve_forever()
self.server_thread = threading.Thread(target=_serve)
self.server_thread.start()
HTTPChallengeHandler.start_challenge(self, domain, thumbprint, token)
def stop_challenge(self):
self.server.shutdown()
self.server_thread.join()
os.chdir(self.current_directory)
def stop_challenge(self, domain, thumbprint, token):
if self.server_thread.is_alive():
self.server.shutdown()
self.server_thread.join()
self.server.server_close()
self.server = None
self.server_thread = None

View File

@ -5,44 +5,52 @@
# Copyright (c) Rudolf Mayerhofer, 2019.
# available under the ISC license, see LICENSE
import datetime
import os
from acertmgr import tools
from acertmgr.modes.abstract import AbstractChallengeHandler
class ChallengeHandler(AbstractChallengeHandler):
def __init__(self, config):
AbstractChallengeHandler.__init__(self, config)
self._verify_challenge = True
self.challenge_directory = config.get("webdir", "/var/www/acme-challenge/")
if not os.path.isdir(self.challenge_directory):
raise FileNotFoundError("Challenge directory (%s) does not exist!" % self.challenge_directory)
class HTTPChallengeHandler(AbstractChallengeHandler):
@staticmethod
def get_challenge_type():
return "http-01"
def __init__(self, config):
AbstractChallengeHandler.__init__(self, config)
self.http_verify = str(config.get("http_verify", "true")).lower() == "true"
def create_challenge(self, domain, thumbprint, token):
raise NotImplementedError
def destroy_challenge(self, domain, thumbprint, token):
raise NotImplementedError
def start_challenge(self, domain, thumbprint, token):
if self.http_verify:
keyauthorization = "{0}.{1}".format(token, thumbprint)
wellknown_url = "http://{0}/.well-known/acme-challenge/{1}".format(domain, token)
try:
resp = tools.get_url(wellknown_url)
resp_data = resp.read().decode('utf8').strip()
if resp_data != keyauthorization:
raise ValueError("keyauthorization and response data do NOT match")
except (IOError, ValueError):
raise ValueError("keyauthorization verification failed")
class ChallengeHandler(HTTPChallengeHandler):
def __init__(self, config):
HTTPChallengeHandler.__init__(self, config)
self.challenge_directory = config.get("webdir", "/var/www/acme-challenge/")
if not os.path.isdir(self.challenge_directory):
raise FileNotFoundError("Challenge directory (%s) does not exist!" % self.challenge_directory)
def create_challenge(self, domain, thumbprint, token):
keyauthorization = "{0}.{1}".format(token, thumbprint)
wellknown_path = os.path.join(self.challenge_directory, token)
with open(wellknown_path, "w") as wellknown_file:
wellknown_file.write(keyauthorization)
# check that the file is in place
wellknown_url = "http://{0}/.well-known/acme-challenge/{1}".format(domain, token)
if self._verify_challenge:
try:
resp = tools.get_url(wellknown_url)
resp_data = resp.read().decode('utf8').strip()
if resp_data != keyauthorization:
raise ValueError("keyauthorization and response data do NOT match")
except (IOError, ValueError):
os.remove(wellknown_path)
raise ValueError("Wrote file to {0}, but couldn't download {1}".format(
wellknown_path, wellknown_url))
return datetime.datetime.now()
def destroy_challenge(self, domain, thumbprint, token):
os.remove(os.path.join(self.challenge_directory, token))

View File

@ -7,28 +7,81 @@
# available under the ISC license, see LICENSE
import base64
import binascii
import datetime
import io
import os
import re
import stat
import sys
import traceback
import six
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.hazmat.primitives.asymmetric import rsa, ec, padding
from cryptography.hazmat.primitives.asymmetric.utils import decode_dss_signature
from cryptography.utils import int_to_bytes
from cryptography.x509.oid import NameOID, ExtensionOID
try:
from cryptography.x509 import ocsp
except ImportError:
pass
try:
from cryptography.hazmat.primitives.asymmetric import ed25519, ed448
except ImportError:
pass
try:
from urllib.request import urlopen, Request # Python 3
except ImportError:
from urllib2 import urlopen, Request # Python 2
LOG_REPLACEMENTS = {}
class InvalidCertificateError(Exception):
pass
# @brief a simple, portable indent function
def indent(text, spaces=0):
ind = ' ' * spaces
return os.linesep.join(ind + line for line in text.splitlines())
# @brief wrapper for log output
def log(msg, exc=None, error=False, warning=False):
if error:
prefix = "Error: "
elif warning:
prefix = "Warning: "
else:
prefix = ""
output = prefix + msg
for k, v in LOG_REPLACEMENTS.items():
output = output.replace(k, v)
if exc:
_, exc_value, _ = sys.exc_info()
if not getattr(exc, '__traceback__', None) and exc == exc_value:
# Traceback handling on Python 2 is ugly, so we only output it if the exception is the current sys one
formatted_exc = traceback.format_exc()
else:
formatted_exc = traceback.format_exception(type(exc), exc, getattr(exc, '__traceback__', None))
exc_string = ''.join(formatted_exc) if isinstance(formatted_exc, list) else str(formatted_exc)
output += os.linesep + indent(exc_string, len(prefix))
if error or warning:
sys.stderr.write(output + os.linesep)
sys.stderr.flush() # force flush buffers after message was written for immediate display
else:
sys.stdout.write(output + os.linesep)
sys.stdout.flush() # force flush buffers after message was written for immediate display
# @brief wrapper for downloading an url
def get_url(url, data=None, headers=None):
return urlopen(Request(url, data=data, headers={} if headers is None else headers))
@ -53,49 +106,78 @@ def is_cert_valid(cert, ttl_days):
# @brief create a certificate signing request
# @param names list of domain names the certificate should be valid for
# @param key the key to use with the certificate in pyopenssl format
# @param must_staple whether or not the certificate should include the OCSP must-staple flag
# @return the CSR in pyopenssl format
def new_cert_request(names, key):
# TODO: There has to be a better way to ensure correct text type (why typecheck, cryptography?)
primary_name = x509.Name([x509.NameAttribute(
NameOID.COMMON_NAME,
names[0] if isinstance(names[0], six.text_type) else names[0].decode('utf-8'))
])
all_names = x509.SubjectAlternativeName([x509.DNSName(
name if isinstance(name, six.text_type) else name.decode('utf-8')
) for name in names])
def new_cert_request(names, key, must_staple=False):
primary_name = x509.Name([x509.NameAttribute(NameOID.COMMON_NAME,
names[0].decode('utf-8') if getattr(names[0], 'decode', None) else
names[0])])
all_names = x509.SubjectAlternativeName(
[x509.DNSName(name.decode('utf-8') if getattr(name, 'decode', None) else name) for name in names])
req = x509.CertificateSigningRequestBuilder()
req = req.subject_name(primary_name)
req = req.add_extension(all_names, critical=False)
if must_staple:
if getattr(x509, 'TLSFeature', None):
req = req.add_extension(x509.TLSFeature(features=[x509.TLSFeatureType.status_request]), critical=False)
else:
log('OCSP must-staple ignored as current version of cryptography does not support the flag.', warning=True)
req = req.sign(key, hashes.SHA256(), default_backend())
return req
# @brief generate a new account key
# @param path path where the new key file should be written in PEM format (optional)
def new_account_key(path=None, key_size=4096):
return new_ssl_key(path, key_size)
def new_account_key(path=None, key_algo=None, key_size=None):
return new_ssl_key(path, key_algo, key_size)
# @brief generate a new ssl key
# @param path path where the new key file should be written in PEM format (optional)
def new_ssl_key(path=None, key_size=4096):
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=key_size,
backend=default_backend()
)
pem = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption()
)
def new_ssl_key(path=None, key_algo=None, key_size=None):
if not key_algo or key_algo.lower() == 'rsa':
if not key_size:
key_size = 4096
key_format = serialization.PrivateFormat.TraditionalOpenSSL
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=key_size,
backend=default_backend()
)
elif key_algo.lower() == 'ec' or key_algo.lower() == 'ecc':
if not key_size or key_size == 256:
key_curve = ec.SECP256R1
elif key_size == 384:
key_curve = ec.SECP384R1
elif key_size == 521:
key_curve = ec.SECP521R1
else:
raise ValueError("Unsupported EC curve size parameter: {}".format(key_size))
key_format = serialization.PrivateFormat.PKCS8
private_key = ec.generate_private_key(curve=key_curve, backend=default_backend())
elif key_algo.lower() == 'ed25519' and "cryptography.hazmat.primitives.asymmetric.ed25519":
key_format = serialization.PrivateFormat.PKCS8
private_key = ed25519.Ed25519PrivateKey.generate()
elif key_algo.lower() == 'ed448' and "cryptography.hazmat.primitives.asymmetric.ed448":
key_format = serialization.PrivateFormat.PKCS8
private_key = ed448.Ed448PrivateKey.generate()
else:
raise ValueError("Unsupported key algorithm: {}".format(key_algo))
if path is not None:
pem = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=key_format,
encryption_algorithm=serialization.NoEncryption(),
)
with io.open(path, 'wb') as pem_out:
pem_out.write(pem)
try:
os.chmod(path, int("0400", 8))
except OSError:
print('Warning: Could not set file permissions on {0}!'.format(path))
if hasattr(os, 'chmod'):
try:
os.chmod(path, int("0400", 8))
except OSError:
log('Could not set file permissions on {0}!'.format(path), warning=True)
else:
log('Keyfile permission handling unavailable on this platform', warning=True)
return private_key
@ -116,13 +198,21 @@ def read_pem_file(path, key=False, csr=False):
# @brief write cert data to PEM formatted file
def write_pem_file(crt, path, perms=None):
if hasattr(os, 'chmod') and os.path.exists(path):
try:
os.chmod(path, os.stat(path).st_mode | stat.S_IWRITE)
except OSError:
log('Could not make file ({0}) writable'.format(path), warning=True)
with io.open(path, "w") as f:
f.write(convert_cert_to_pem_str(crt))
if perms:
try:
os.chmod(path, perms)
except OSError:
print('Warning: Could not set file permissions ({0}) on {1}!'.format(perms, path))
if hasattr(os, 'chmod'):
try:
os.chmod(path, perms)
except OSError:
log('Could not set file permissions ({0}) on {1}!'.format(perms, path), warning=True)
else:
log('PEM-File permission handling unavailable on this platform', warning=True)
# @brief download the issuer ca for a given certificate
@ -137,33 +227,65 @@ def download_issuer_ca(cert):
break
if not ca_issuers:
print("Could not determine issuer CA for given certificate: {}".format(cert))
log("Could not determine issuer CA for given certificate: {}".format(cert), error=True)
return None
print("Downloading CA certificate from {}".format(ca_issuers))
log("Downloading CA certificate from {}".format(ca_issuers))
resp = get_url(ca_issuers)
code = resp.getcode()
if code >= 400:
print("Could not download issuer CA (error {}) for given certificate: {}".format(code, cert))
log("Could not download issuer CA (error {}) for given certificate: {}".format(code, cert), error=True)
return None
return x509.load_der_x509_certificate(resp.read(), default_backend())
# @brief determine all san domains on a given certificate
def get_cert_domains(cert):
san_cert = cert.extensions.get_extension_for_oid(ExtensionOID.SUBJECT_ALTERNATIVE_NAME)
domains = set()
domains.add(cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value)
if san_cert:
for d in san_cert.value:
domains.add(d.value)
return domains
# @brief determine certificate cn
def get_cert_cn(cert):
return "CN={}".format(cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value)
# @brief determine certificate end of validity
def get_cert_valid_until(cert):
return cert.not_valid_after
# @brief convert certificate to PEM format
# @param cert certificate object in pyopenssl format
# @param cert certificate object or a list thereof
# @return the certificate in PEM format
def convert_cert_to_pem_str(cert):
return cert.public_bytes(serialization.Encoding.PEM).decode('utf8')
if not isinstance(cert, list):
cert = [cert]
result = list()
for data in cert:
result.append(data.public_bytes(serialization.Encoding.PEM).decode('utf8'))
return '\n'.join(result)
# @brief load a PEM certificate from str
# @return a certificate object or a list of objects if multiple are in the string
def convert_pem_str_to_cert(certdata):
return x509.load_pem_x509_certificate(certdata.encode('utf8'), default_backend())
certs = re.findall(r'(-----BEGIN CERTIFICATE-----\n[^\-]+\n-----END CERTIFICATE-----)',
certdata, re.DOTALL)
result = list()
for data in certs:
result.append(x509.load_pem_x509_certificate(data.encode('utf8'), default_backend()))
return result[0] if len(result) == 1 else result
# @brief serialize CSR to DER bytes
def convert_csr_to_der_bytes(data):
# @brief serialize cert/csr to DER bytes
def convert_cert_to_der_bytes(data):
return data.public_bytes(serialization.Encoding.DER)
@ -172,12 +294,73 @@ def convert_der_bytes_to_cert(data):
return x509.load_der_x509_certificate(data, default_backend())
# @brief determine key signing algorithm and jwk data
# @return key algorithm, signature algorithm, key numbers as a dict
def get_key_alg_and_jwk(key):
if isinstance(key, rsa.RSAPrivateKey):
# See https://tools.ietf.org/html/rfc7518#section-6.3
numbers = key.public_key().public_numbers()
return "RS256", {"kty": "RSA",
"e": bytes_to_base64url(int_to_bytes(numbers.e)),
"n": bytes_to_base64url(int_to_bytes(numbers.n))}
elif isinstance(key, ec.EllipticCurvePrivateKey):
# See https://tools.ietf.org/html/rfc7518#section-6.2
numbers = key.public_key().public_numbers()
if isinstance(numbers.curve, ec.SECP256R1):
alg = 'ES256'
crv = 'P-256'
elif isinstance(numbers.curve, ec.SECP384R1):
alg = 'ES384'
crv = 'P-384'
elif isinstance(numbers.curve, ec.SECP521R1):
alg = 'ES512'
crv = 'P-521'
else:
raise ValueError("Unsupported EC curve in key: {}".format(key))
full_octets = (int(crv[2:]) + 7) // 8
return alg, {"kty": "EC", "crv": crv,
"x": bytes_to_base64url(int_to_bytes(numbers.x, full_octets)),
"y": bytes_to_base64url(int_to_bytes(numbers.y, full_octets))}
elif "cryptography.hazmat.primitives.asymmetric.ed25519" in sys.modules and isinstance(key,
ed25519.Ed25519PrivateKey):
# See https://tools.ietf.org/html/rfc8037#appendix-A.2
return "EdDSA", {"kty": "OKP", "crv": "Ed25519",
"x": bytes_to_base64url(key.public_key().public_bytes(encoding=serialization.Encoding.Raw,
format=serialization.PublicFormat.Raw)
)}
elif "cryptography.hazmat.primitives.asymmetric.ed448" in sys.modules and isinstance(key,
ed448.Ed448PrivateKey):
return "EdDSA", {"kty": "OKP", "crv": "Ed448",
"x": bytes_to_base64url(key.public_key().public_bytes(encoding=serialization.Encoding.Raw,
format=serialization.PublicFormat.Raw)
)}
else:
raise ValueError("Unsupported key: {}".format(key))
# @brief sign string with key
def signature_of_str(key, string):
# @todo check why this padding is not working
# pad = padding.PSS(mgf=padding.MGF1(hashes.SHA256()), salt_length=padding.PSS.MAX_LENGTH)
pad = padding.PKCS1v15()
return key.sign(string.encode('utf8'), pad, hashes.SHA256())
alg, _ = get_key_alg_and_jwk(key)
data = string.encode('utf8')
if alg == 'RS256':
return key.sign(data, padding.PKCS1v15(), hashes.SHA256())
elif alg.startswith('ES'):
full_octets = (int(alg[2:]) + 7) // 8
if alg == 'ES256':
der_sig = key.sign(data, ec.ECDSA(hashes.SHA256()))
elif alg == 'ES384':
der_sig = key.sign(data, ec.ECDSA(hashes.SHA384()))
elif alg == 'ES512':
der_sig = key.sign(data, ec.ECDSA(hashes.SHA512()))
else:
raise ValueError("Unsupported EC signature algorithm: {}".format(alg))
# convert DER signature to RAW format (https://tools.ietf.org/html/rfc7518#section-3.4)
r, s = decode_dss_signature(der_sig)
return int_to_bytes(r, full_octets) + int_to_bytes(s, full_octets)
elif alg == 'EdDSA':
return key.sign(data)
else:
raise ValueError("Unsupported signature algorithm: {}".format(alg))
# @brief hash a string
@ -194,15 +377,6 @@ def bytes_to_base64url(b):
return base64.urlsafe_b64encode(b).decode('utf8').replace("=", "")
# @brief convert numbers to byte-string
# @param num number to convert
# @return byte-string containing the number
def number_to_byte_format(num):
n = format(num, 'x')
n = "0{0}".format(n) if len(n) % 2 else n
return binascii.unhexlify(n)
# @brief check whether existing target file is still valid or source crt has been updated
# @param target string containing the path to the target file
# @param file string containing the path to the certificate file
@ -213,3 +387,66 @@ def target_is_current(target, file):
target_date = os.path.getmtime(target)
crt_date = os.path.getmtime(file)
return target_date >= crt_date
# @brief convert domain to idna representation (if applicable
def idna_convert(domain):
try:
if any(ord(c) >= 128 for c in domain):
# Translate IDNA domain name from a unicode domain (handle wildcards separately)
if domain.startswith('*.'):
idna_domain = "*.{}".format(domain[2:].encode('idna').decode('ascii'))
else:
idna_domain = domain.encode('idna').decode('ascii')
return idna_domain
except Exception as e:
log("Unicode domain(s) found but IDNA names could not be translated due to error: {}".format(e), error=True)
return domain
# @brief validate the OCSP status for a given certificate by the given issuer
def is_ocsp_valid(cert, issuer, hash_algo):
if hash_algo == 'sha1':
algorithm = hashes.SHA1
elif hash_algo == 'sha224':
algorithm = hashes.SHA224
elif hash_algo == 'sha256':
algorithm = hashes.SHA256
elif hash_algo == 'sha385':
algorithm = hashes.SHA384
elif hash_algo == 'sha512':
algorithm = hashes.SHA512
else:
log("Invalid hash algorithm '{}' used for OCSP validation. Validation ignored.".format(hash_algo), warning=True)
return True
if isinstance(issuer, list):
issuer = issuer[0] # First certificate in the CA chain is the immediate issuer
try:
ocsp_urls = []
aia = cert.extensions.get_extension_for_oid(ExtensionOID.AUTHORITY_INFORMATION_ACCESS)
for data in aia.value:
if data.access_method == x509.OID_OCSP:
ocsp_urls.append(data.access_location.value)
# This is a bit of a hack due to validation problems within cryptography (TODO: Check if this is still true)
# Correct replacement: ocsprequest = ocsp.OCSPRequestBuilder().add_certificate(cert, issuer, algorithm).build()
ocsprequest = ocsp.OCSPRequestBuilder((cert, issuer, (algorithm)())).build()
ocsprequestdata = ocsprequest.public_bytes(serialization.Encoding.DER)
for ocsp_url in ocsp_urls:
response = get_url(ocsp_url,
ocsprequestdata,
{
'Accept': 'application/ocsp-response',
'Content-Type': 'application/ocsp-request',
})
ocspresponsedata = response.read()
ocspresponse = ocsp.load_der_ocsp_response(ocspresponsedata)
if ocspresponse.response_status == ocsp.OCSPResponseStatus.SUCCESSFUL \
and ocspresponse.certificate_status == ocsp.OCSPCertStatus.REVOKED:
return False
except Exception as e:
log("An exception occurred during OCSP validation (Validation will be ignored): {}".format(e), error=True)
return True

View File

@ -6,10 +6,13 @@ pkgdesc='An automated certificate manager using ACME/letsencrypt'
arch=('any')
url='https://github.com/moepman/acertmgr'
license=('ISC')
depends=('python-cryptography')
optdepends=('yaml: python-yaml'
'idna: python-idna'
'dns.nsupdate: python-dnspython')
depends=('python-cryptography>=0.6')
optdepends=('python-yaml: Support config files in YAML format'
'python-idna: Support conversion of unicode domains'
'python-dnspython: Support for dns challenge handlers'
'python-cryptography>=2.1: Support for the OCSP must-staple flag'
'python-cryptography>=2.6: Support for Ed25519 key support'
)
makedepends=('git')
conflicts=('python-acertmgr')
provides=('python-acertmgr')

View File

@ -6,10 +6,13 @@ pkgdesc='An automated certificate manager using ACME/letsencrypt'
arch=('any')
url='https://github.com/moepman/acertmgr'
license=('ISC')
depends=('python2-cryptography')
optdepends=('yaml: python2-yaml'
'idna: python2-idna'
'dns.nsupdate: python2-dnspython')
depends=('python2-cryptography>=0.6')
optdepends=('python2-yaml: Support config files in YAML format'
'python2-idna: Support conversion of unicode domains'
'python2-dnspython: Support for dns challenge handlers'
'python2-cryptography>=2.1: Support for the OCSP must-staple flag'
'python2-cryptography>=2.6: Support for Ed25519 key support'
)
makedepends=('git')
conflicts=('python-acertmgr')
provides=('python-acertmgr')
@ -29,4 +32,4 @@ build() {
package() {
cd $_pkgname
python2 setup.py install --root=${pkgdir} --optimize=1
}
}

View File

@ -68,3 +68,12 @@ mail.example.com smtp.example.com webmail.example.net *.intra.example.com:
perm: '400'
format: crt,ca
action: '/etc/init.d/postfix reload'
# this will use a different authority for the following set of domains (buypass.com in this example)
buypass-example.com *.buypass-example.com:
- authority: 'https://api.buypass.com/acme' # Removed trailing /directory from buypass docs for API endpoint
mode: dns.nsupdate
nsupdate_keyname: buypass
nsupdate_keyvalue: Test1234512359==
nsupdate_keyalgorithm: HMAC-MD5.SIG-ALG.REG.INT

View File

@ -26,63 +26,62 @@ def determine_version():
return version
# Derive version from git
try:
output = subprocess.check_output(['git', 'describe', '--tags', '--dirty'], cwd=dir_path) \
.decode('utf-8').strip().split('-')
output = subprocess.check_output(["git", "describe", "--tags", "--dirty"], cwd=dir_path) \
.decode("utf-8").strip().split("-")
if len(output) == 1:
return output[0]
elif len(output) == 2:
return "{}.dev0".format(output[0])
else:
release = 'dev' if len(output) == 4 and output[3] == 'dirty' else ''
release = "dev" if len(output) == 4 and output[3] == "dirty" else ""
return "{}.{}{}+{}".format(output[0], release, output[1], output[2])
except subprocess.CalledProcessError:
try:
commit = subprocess.check_output(['git', 'rev-parse', 'HEAD']).decode('utf-8').strip()
status = subprocess.check_output(['git', 'status', '-s']).decode('utf-8').strip()
commit = subprocess.check_output(["git", "rev-parse", "HEAD"]).decode("utf-8").strip()
status = subprocess.check_output(["git", "status", "-s"]).decode("utf-8").strip()
return "{}.dev0+{}".format(version, commit) if len(status) > 0 else "{}+{}".format(version, commit)
except subprocess.CalledProcessError:
# finding the git version has utterly failed, use version.txt
return version
setup(
name="acertmgr",
version=determine_version(),
author="Markus Hauschild",
author_email="moepman@binary-kitchen.de",
description="An automated certificate manager using ACME/letsencrypt",
license="ISC",
keywords="acme letsencrypt",
url="https://github.com/moepman/acertmgr",
packages=find_packages(),
long_description=read('README.md'),
long_description_content_type="text/markdown",
classifiers=[
"Development Status :: 4 - Beta",
"Programming Language :: Python",
"Environment :: Console",
"Topic :: Security :: Cryptography",
"License :: OSI Approved :: ISC License",
],
install_requires=[
"cryptography",
"six",
],
extras_require={
"dns.nsupdate": [
"dnspython",
extra_requirements = {
"dns": ["dnspython"],
"yaml": ["PyYAML"],
"idna": ["idna"],
"ocsp-must-staple": ["cryptography>=2.1"],
"ocsp-validation": ["cryptography>=2.4"],
"ed25519": ["cryptography>=2.6"],
}
if __name__ == "__main__":
setup(
name="acertmgr",
version=determine_version(),
author="Markus Hauschild",
author_email="moepman@binary-kitchen.de",
description="An automated certificate manager using ACME/letsencrypt",
license="ISC",
keywords="acme letsencrypt",
url="https://github.com/moepman/acertmgr",
packages=find_packages(),
long_description=read("README.md"),
long_description_content_type="text/markdown",
classifiers=[
"Development Status :: 5 - Production/Stable",
"Programming Language :: Python",
"Environment :: Console",
"Topic :: Security :: Cryptography",
"License :: OSI Approved :: ISC License (ISCL)",
],
"yaml": [
"yaml",
install_requires=[
"cryptography>=0.6",
],
"idna": [
"idna",
],
},
entry_points={
'console_scripts': [
'acertmgr=acertmgr:main',
],
},
data_files=[('readme', ['README.md'])]
)
extras_require=extra_requirements,
entry_points={
"console_scripts": [
"acertmgr=acertmgr:main",
],
},
data_files=[("readme", ["README.md"])],
)

View File

@ -1 +1 @@
0.9.4
1.0.5