123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115 |
- # SPDX-FileCopyrightText: 2016-2023 Helmut Pozimski <helmut@pozimski.eu>
- #
- # SPDX-License-Identifier: GPL-2.0-only
- # -*- coding: utf8 -*-
- """ Contains the service module which manages certificates for generic
- services that don't need any special configuration. The configuration
- needs to at least contain the keys "certificate_path", "key_path",
- "tlsa"and "tlsa_ports".
- """
- import logging
- import shutil
- import subprocess
- import OpenSSL
- from amulib import helpers
- from amulib.cert_path_provider import CertPathProvider
- from amulib.helpers import restart_service
- LOGGER = logging.getLogger("acme-updater")
- def run(cert_path_provider: CertPathProvider, service_name, config,
- named_key_path="/run/named/session.key", dns_server="localhost"):
- """
- :param cert_path_provider: provider for certifcate paths
- :type cert_path_provider: CertPathProvider
- :param service_name: name of the service
- :type service_name: str
- :param config: configuration for the service
- :type config: dict
- :param named_key_path: path to the named session.key
- :type named_key_path: str
- :param dns_server: DNS server to use to create TLSA records
- :type dns_server: str
- """
- certificate_path = config["certificate_path"]
- key_path = config["key_path"]
- tlsa = config["tlsa"]
- tlsa_ports = config["tlsa_ports"]
- renewal_successful = False
- try:
- with open(certificate_path, "r") as cert_file:
- cert_text = cert_file.read()
- except IOError:
- LOGGER.error("Error while opening the %s certificate", service_name)
- else:
- current_cert = OpenSSL.crypto.load_certificate(
- OpenSSL.crypto.FILETYPE_PEM, cert_text
- )
- cert_alt_names = helpers.get_subject_alt_name(current_cert)
- fqdn = cert_alt_names[0]
- acme_fullchain_path = cert_path_provider.provide_fullchain_path(fqdn)
- if helpers.check_renewal(current_cert, acme_fullchain_path):
- try:
- with open(acme_fullchain_path, "r") as acme_cert_file:
- acme_cert_text = acme_cert_file.read()
- except IOError:
- LOGGER.error("Error while opening new %s "
- "certificate file", service_name)
- else:
- acme_cert = OpenSSL.crypto.load_certificate(
- OpenSSL.crypto.FILETYPE_PEM, acme_cert_text
- )
- if tlsa:
- for name in cert_alt_names:
- for port in tlsa_ports:
- helpers.create_tlsa_records(name, port, acme_cert,
- named_key_path,
- dns_server)
- newkey_path = cert_path_provider.provide_key_path(fqdn)
- if certificate_path == key_path:
- if helpers.create_backup_copy(certificate_path):
- try:
- with open(certificate_path, "wb") as target:
- with open(acme_fullchain_path, "rb") as chain:
- with open(newkey_path, "rb") as newkey:
- shutil.copyfileobj(newkey,
- target)
- shutil.copyfileobj(chain, target)
- except IOError:
- LOGGER.error("Renewal of cert for %s failed, "
- "please clean up manually and "
- "check the backup files!",
- service_name)
- else:
- renewal_successful = True
- else:
- LOGGER.error("Renewal of cert for %s failed, "
- "please clean up manually and "
- "check the backup files!", service_name)
- else:
- if helpers.copy_file(acme_fullchain_path,
- certificate_path):
- if helpers.copy_file(newkey_path, key_path):
- renewal_successful = True
- else:
- LOGGER.error("Renewal of cert for %s failed, "
- "please clean up manually and "
- "check the backup files!",
- service_name)
- else:
- LOGGER.error("Renewal of cert for %s failed, "
- "please clean up manually and "
- "check the backup files!", service_name)
- if renewal_successful:
- LOGGER.info("Certificate for %s successfully "
- "renewed, restarting service.",
- service_name)
- restart_service(service_name)
|