service.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. # SPDX-FileCopyrightText: 2016-2023 Helmut Pozimski <helmut@pozimski.eu>
  2. #
  3. # SPDX-License-Identifier: GPL-2.0-only
  4. # -*- coding: utf8 -*-
  5. """ Contains the service module which manages certificates for generic
  6. services that don't need any special configuration. The configuration
  7. needs to at least contain the keys "certificate_path", "key_path",
  8. "tlsa"and "tlsa_ports".
  9. """
  10. import logging
  11. import shutil
  12. import subprocess
  13. import OpenSSL
  14. from amulib import helpers
  15. from amulib.cert_path_provider import CertPathProvider
  16. from amulib.helpers import restart_service
  17. LOGGER = logging.getLogger("acme-updater")
  18. def run(cert_path_provider: CertPathProvider, service_name, config,
  19. named_key_path="/run/named/session.key", dns_server="localhost"):
  20. """
  21. :param cert_path_provider: provider for certifcate paths
  22. :type cert_path_provider: CertPathProvider
  23. :param service_name: name of the service
  24. :type service_name: str
  25. :param config: configuration for the service
  26. :type config: dict
  27. :param named_key_path: path to the named session.key
  28. :type named_key_path: str
  29. :param dns_server: DNS server to use to create TLSA records
  30. :type dns_server: str
  31. """
  32. certificate_path = config["certificate_path"]
  33. key_path = config["key_path"]
  34. tlsa = config["tlsa"]
  35. tlsa_ports = config["tlsa_ports"]
  36. renewal_successful = False
  37. try:
  38. with open(certificate_path, "r") as cert_file:
  39. cert_text = cert_file.read()
  40. except IOError:
  41. LOGGER.error("Error while opening the %s certificate", service_name)
  42. else:
  43. current_cert = OpenSSL.crypto.load_certificate(
  44. OpenSSL.crypto.FILETYPE_PEM, cert_text
  45. )
  46. cert_alt_names = helpers.get_subject_alt_name(current_cert)
  47. fqdn = cert_alt_names[0]
  48. acme_fullchain_path = cert_path_provider.provide_fullchain_path(fqdn)
  49. if helpers.check_renewal(current_cert, acme_fullchain_path):
  50. try:
  51. with open(acme_fullchain_path, "r") as acme_cert_file:
  52. acme_cert_text = acme_cert_file.read()
  53. except IOError:
  54. LOGGER.error("Error while opening new %s "
  55. "certificate file", service_name)
  56. else:
  57. acme_cert = OpenSSL.crypto.load_certificate(
  58. OpenSSL.crypto.FILETYPE_PEM, acme_cert_text
  59. )
  60. if tlsa:
  61. for name in cert_alt_names:
  62. for port in tlsa_ports:
  63. helpers.create_tlsa_records(name, port, acme_cert,
  64. named_key_path,
  65. dns_server)
  66. newkey_path = cert_path_provider.provide_key_path(fqdn)
  67. if certificate_path == key_path:
  68. if helpers.create_backup_copy(certificate_path):
  69. try:
  70. with open(certificate_path, "wb") as target:
  71. with open(acme_fullchain_path, "rb") as chain:
  72. with open(newkey_path, "rb") as newkey:
  73. shutil.copyfileobj(newkey,
  74. target)
  75. shutil.copyfileobj(chain, target)
  76. except IOError:
  77. LOGGER.error("Renewal of cert for %s failed, "
  78. "please clean up manually and "
  79. "check the backup files!",
  80. service_name)
  81. else:
  82. renewal_successful = True
  83. else:
  84. LOGGER.error("Renewal of cert for %s failed, "
  85. "please clean up manually and "
  86. "check the backup files!", service_name)
  87. else:
  88. if helpers.copy_file(acme_fullchain_path,
  89. certificate_path):
  90. if helpers.copy_file(newkey_path, key_path):
  91. renewal_successful = True
  92. else:
  93. LOGGER.error("Renewal of cert for %s failed, "
  94. "please clean up manually and "
  95. "check the backup files!",
  96. service_name)
  97. else:
  98. LOGGER.error("Renewal of cert for %s failed, "
  99. "please clean up manually and "
  100. "check the backup files!", service_name)
  101. if renewal_successful:
  102. LOGGER.info("Certificate for %s successfully "
  103. "renewed, restarting service.",
  104. service_name)
  105. restart_service(service_name)