acme_updater.py 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
  1. #! /usr/bin/env python3
  2. # -*- coding: utf8 -*-
  3. """
  4. This script scans all apache vhosts on the system, checks if they use
  5. letsencrypt certificates for which acmetool has renewed the certificate and
  6. replaces the old certificate with the new one automatically. This way,
  7. acmetool can run as an unprivileged user and acme-updater can take
  8. care of swapping out certificates while running as root.
  9. Requirements:
  10. * python 3.x
  11. * module OpenSSL
  12. """
  13. import os
  14. import logging
  15. import datetime
  16. import shutil
  17. import subprocess
  18. import OpenSSL
  19. # directory for enabled apache vhosts, might be different on your distribution
  20. VHOSTS_DIRECTORY = "/etc/apache2/sites-enabled"
  21. # Log level, default is info
  22. LOG_LEVEL = logging.INFO
  23. # State directory for acmetool, default is /var/lib/acme
  24. ACME_STATE_DIR = "/var/lib/acme"
  25. LOGGER = logging.getLogger("acme-updater")
  26. LOGGER.setLevel(LOG_LEVEL)
  27. CONSOLE_HANDLER = logging.StreamHandler()
  28. LOGGER.addHandler(CONSOLE_HANDLER)
  29. PARSED_VHOSTS = []
  30. CERT_RENEWED = False
  31. def parse_vhost(file_obj):
  32. """
  33. Parses a given vhost file and extracts the main domain,
  34. the certificate file and the TLS key file.
  35. :param file_obj: file obj pointing to a vhost to parse
  36. :return: list of tuples with domains and found certificates
  37. :rtype: list
  38. """
  39. vhost_started = False
  40. parsed_info = []
  41. cert_path = ""
  42. key_path = ""
  43. main_domain = ""
  44. for line in file_obj:
  45. if "<VirtualHost" in line:
  46. vhost_started = True
  47. elif "</VirtualHost" in line and vhost_started:
  48. vhost_started = False
  49. if cert_path and key_path and main_domain:
  50. parsed_info.append((main_domain, cert_path, key_path))
  51. LOGGER.debug(
  52. "Found vhost with main domain %s, certificate %s and key "
  53. "file %s", main_domain, cert_path, key_path)
  54. cert_path = ""
  55. key_path = ""
  56. main_domain = ""
  57. elif "ServerName" in line:
  58. main_domain = line.strip().rsplit()[1]
  59. elif "SSLCertificateFile" in line:
  60. cert_path = line.strip().rsplit()[1]
  61. elif "SSLCertificateKeyFile" in line:
  62. key_path = line.strip().rsplit()[1]
  63. return parsed_info
  64. def parse_asn1_time(timestamp):
  65. """
  66. parses an ANS1 timestamp as returned by OpenSSL and turns it into a python
  67. datetime object.
  68. :param timestamp: ASN1 timestamp
  69. :type timestamp: str
  70. :return: timestamp as datetime object
  71. :rtype: datetime
  72. """
  73. year = int(timestamp[:4])
  74. month = int(timestamp[4:6])
  75. day = int(timestamp[6:8])
  76. date = datetime.datetime(year, month, day)
  77. return date
  78. def copy_file(source, destination, backup=True):
  79. """
  80. Copies a file from the given source file to
  81. the given destionation and creates a copy of the
  82. source file.
  83. :param source: source file path
  84. :type source: str
  85. :param destination: destination file path
  86. :type destination: str
  87. :param backup: whether to take a backup of the destination file before \
  88. overwriting it
  89. :type backup: bool
  90. :return: success
  91. :rtype: bool
  92. """
  93. backup_file = destination + ".bak_%s" % datetime.datetime.now().strftime(
  94. "%Y%m%d%H%M%S")
  95. if backup:
  96. try:
  97. shutil.copy(destination, backup_file)
  98. except IOError:
  99. LOGGER.error("Creating of backup file for %s failed!", destination)
  100. return False
  101. try:
  102. shutil.copy(source, destination)
  103. except IOError:
  104. LOGGER.error("Copying of file %s to %s failed!",
  105. source, destination)
  106. else:
  107. os.chmod(destination, 0o0644)
  108. os.chown(destination, 0, 0)
  109. return True
  110. for vhost in os.listdir(VHOSTS_DIRECTORY):
  111. if vhost.endswith(".conf"):
  112. vhost_absolute = os.path.join(VHOSTS_DIRECTORY, vhost)
  113. with open(vhost_absolute, "r") as vhost_file:
  114. PARSED_VHOSTS.extend(parse_vhost(vhost_file))
  115. for entry in PARSED_VHOSTS:
  116. try:
  117. with open(entry[1], "r") as cert_file:
  118. cert_text = cert_file.read()
  119. except IOError:
  120. LOGGER.error("Error while opening cert file %s ", entry[1])
  121. else:
  122. x509_current_cert = OpenSSL.crypto.load_certificate(
  123. OpenSSL.crypto.FILETYPE_PEM, cert_text)
  124. if "Let's Encrypt" in x509_current_cert.get_issuer().__str__():
  125. acme_cert_path = os.path.join(ACME_STATE_DIR, "live", entry[0],
  126. "cert")
  127. try:
  128. with open(acme_cert_path, "r") as acme_cert_file:
  129. acme_cert_text = acme_cert_file.read()
  130. except IOError:
  131. LOGGER.error("Could not open certificate for %s in acme "
  132. "state directory", entry[0])
  133. else:
  134. x509_acme_cert = OpenSSL.crypto.load_certificate(
  135. OpenSSL.crypto.FILETYPE_PEM, acme_cert_text
  136. )
  137. expiry_date = x509_acme_cert.get_notAfter().decode("utf-8")
  138. expiry_datetime = parse_asn1_time(expiry_date)
  139. if expiry_datetime < datetime.datetime.utcnow():
  140. LOGGER.warning("Certificate for %s is expired and no newer "
  141. "one is available, bailing out!", entry[0])
  142. else:
  143. serial_current_cert = x509_current_cert.get_serial_number()
  144. serial_acme_cert = x509_acme_cert.get_serial_number()
  145. if serial_current_cert == serial_acme_cert:
  146. LOGGER.debug("Cert for %s matches with the one "
  147. "installed, nothing to do.", entry[1])
  148. else:
  149. if copy_file(acme_cert_path, entry[1]):
  150. acme_key_path = os.path.join(ACME_STATE_DIR,
  151. "live", entry[0],
  152. "privkey")
  153. if copy_file(acme_key_path, entry[2]):
  154. LOGGER.info("Successfully renewed cert for %s",
  155. entry[0])
  156. CERT_RENEWED = True
  157. else:
  158. LOGGER.error("Renewal of cert for %s failed, "
  159. "please clean up manually and "
  160. "check the backup files!",
  161. entry[0])
  162. else:
  163. LOGGER.error("Renewal of cert for %s failed, "
  164. "please clean up manually and "
  165. "check the backup files!", entry[0])
  166. if CERT_RENEWED:
  167. LOGGER.debug("Checking apache configuration")
  168. try:
  169. subprocess.check_call(["/usr/sbin/apache2ctl", "-t"])
  170. except subprocess.CalledProcessError:
  171. LOGGER.error("Error in apache configuration, will not restart the "
  172. "web server")
  173. else:
  174. try:
  175. subprocess.check_call(["/etc/init.d/apache2", "restart"])
  176. except subprocess.CalledProcessError:
  177. LOGGER.error("Apache restart failed!")
  178. else:
  179. LOGGER.info("Apache restarted successfully")