service.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. # SPDX-FileCopyrightText: 2016-2023 Helmut Pozimski <helmut@pozimski.eu>
  2. #
  3. # SPDX-License-Identifier: GPL-2.0-only
  4. # -*- coding: utf8 -*-
  5. """ Contains the service module which manages certificates for generic
  6. services that don't need any special configuration. The configuration
  7. needs to at least contain the keys "certificate_path", "key_path",
  8. "tlsa"and "tlsa_ports".
  9. """
  10. import logging
  11. import shutil
  12. import subprocess
  13. import OpenSSL
  14. from amulib import helpers
  15. from amulib.cert_path_provider import CertPathProvider
  16. LOGGER = logging.getLogger("acme-updater")
  17. def run(cert_path_provider: CertPathProvider, service_name, config,
  18. named_key_path="/run/named/session.key", dns_server="localhost"):
  19. """
  20. :param cert_path_provider: provider for certifcate paths
  21. :type cert_path_provider: CertPathProvider
  22. :param service_name: name of the service
  23. :type service_name: str
  24. :param config: configuration for the service
  25. :type config: dict
  26. :param named_key_path: path to the named session.key
  27. :type named_key_path: str
  28. :param dns_server: DNS server to use to create TLSA records
  29. :type dns_server: str
  30. """
  31. certificate_path = config["certificate_path"]
  32. key_path = config["key_path"]
  33. tlsa = config["tlsa"]
  34. tlsa_ports = config["tlsa_ports"]
  35. renewal_successful = False
  36. try:
  37. with open(certificate_path, "r") as cert_file:
  38. cert_text = cert_file.read()
  39. except IOError:
  40. LOGGER.error("Error while opening the %s certificate", service_name)
  41. else:
  42. current_cert = OpenSSL.crypto.load_certificate(
  43. OpenSSL.crypto.FILETYPE_PEM, cert_text
  44. )
  45. cert_alt_names = helpers.get_subject_alt_name(current_cert)
  46. fqdn = cert_alt_names[0]
  47. acme_fullchain_path = cert_path_provider.provide_fullchain_path(fqdn)
  48. if helpers.check_renewal(current_cert, acme_fullchain_path):
  49. try:
  50. with open(acme_fullchain_path, "r") as acme_cert_file:
  51. acme_cert_text = acme_cert_file.read()
  52. except IOError:
  53. LOGGER.error("Error while opening new %s "
  54. "certificate file", service_name)
  55. else:
  56. acme_cert = OpenSSL.crypto.load_certificate(
  57. OpenSSL.crypto.FILETYPE_PEM, acme_cert_text
  58. )
  59. if tlsa:
  60. for name in cert_alt_names:
  61. for port in tlsa_ports:
  62. helpers.create_tlsa_records(name, port, acme_cert,
  63. named_key_path,
  64. dns_server)
  65. newkey_path = cert_path_provider.provide_key_path(fqdn)
  66. if certificate_path == key_path:
  67. if helpers.create_backup_copy(certificate_path):
  68. try:
  69. with open(certificate_path, "wb") as target:
  70. with open(acme_fullchain_path, "rb") as chain:
  71. with open(newkey_path, "rb") as newkey:
  72. shutil.copyfileobj(newkey,
  73. target)
  74. shutil.copyfileobj(chain, target)
  75. except IOError:
  76. LOGGER.error("Renewal of cert for %s failed, "
  77. "please clean up manually and "
  78. "check the backup files!",
  79. service_name)
  80. else:
  81. renewal_successful = True
  82. else:
  83. LOGGER.error("Renewal of cert for %s failed, "
  84. "please clean up manually and "
  85. "check the backup files!", service_name)
  86. else:
  87. if helpers.copy_file(acme_fullchain_path,
  88. certificate_path):
  89. if helpers.copy_file(newkey_path, key_path):
  90. renewal_successful = True
  91. else:
  92. LOGGER.error("Renewal of cert for %s failed, "
  93. "please clean up manually and "
  94. "check the backup files!",
  95. service_name)
  96. else:
  97. LOGGER.error("Renewal of cert for %s failed, "
  98. "please clean up manually and "
  99. "check the backup files!", service_name)
  100. if renewal_successful:
  101. LOGGER.info("Certificate for %s successfully "
  102. "renewed, restarting service.",
  103. service_name)
  104. subprocess.call(["/etc/init.d/%s" % service_name,
  105. "restart"])