123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200 |
- #! /usr/bin/env python3
- # -*- coding: utf8 -*-
- """
- This script scans all apache vhosts on the system, checks if they use
- letsencrypt certificates for which acmetool has renewed the certificate and
- replaces the old certificate with the new one automatically. This way,
- acmetool can run as an unprivileged user and acme-updater can take
- care of swapping out certificates while running as root.
- Requirements:
- * python 3.x
- * module OpenSSL
- """
- import os
- import logging
- import datetime
- import shutil
- import subprocess
- import OpenSSL
- # directory for enabled apache vhosts, might be different on your distribution
- VHOSTS_DIRECTORY = "/etc/apache2/sites-enabled"
- # Log level, default is info
- LOG_LEVEL = logging.INFO
- # State directory for acmetool, default is /var/lib/acme
- ACME_STATE_DIR = "/var/lib/acme"
- LOGGER = logging.getLogger("acme-updater")
- LOGGER.setLevel(LOG_LEVEL)
- CONSOLE_HANDLER = logging.StreamHandler()
- LOGGER.addHandler(CONSOLE_HANDLER)
- PARSED_VHOSTS = []
- CERT_RENEWED = False
- def parse_vhost(file_obj):
- """
- Parses a given vhost file and extracts the main domain,
- the certificate file and the TLS key file.
- :param file_obj: file obj pointing to a vhost to parse
- :return: list of tuples with domains and found certificates
- :rtype: list
- """
- vhost_started = False
- parsed_info = []
- cert_path = ""
- key_path = ""
- main_domain = ""
- for line in file_obj:
- if "<VirtualHost" in line:
- vhost_started = True
- elif "</VirtualHost" in line and vhost_started:
- vhost_started = False
- if cert_path and key_path and main_domain:
- parsed_info.append((main_domain, cert_path, key_path))
- LOGGER.debug(
- "Found vhost with main domain %s, certificate %s and key "
- "file %s", main_domain, cert_path, key_path)
- cert_path = ""
- key_path = ""
- main_domain = ""
- elif "ServerName" in line:
- main_domain = line.strip().rsplit()[1]
- elif "SSLCertificateFile" in line:
- cert_path = line.strip().rsplit()[1]
- elif "SSLCertificateKeyFile" in line:
- key_path = line.strip().rsplit()[1]
- return parsed_info
- def parse_asn1_time(timestamp):
- """
- parses an ANS1 timestamp as returned by OpenSSL and turns it into a python
- datetime object.
- :param timestamp: ASN1 timestamp
- :type timestamp: str
- :return: timestamp as datetime object
- :rtype: datetime
- """
- year = int(timestamp[:4])
- month = int(timestamp[4:6])
- day = int(timestamp[6:8])
- date = datetime.datetime(year, month, day)
- return date
- def copy_file(source, destination, backup=True):
- """
- Copies a file from the given source file to
- the given destionation and creates a copy of the
- source file.
- :param source: source file path
- :type source: str
- :param destination: destination file path
- :type destination: str
- :param backup: whether to take a backup of the destination file before \
- overwriting it
- :type backup: bool
- :return: success
- :rtype: bool
- """
- backup_file = destination + ".bak_%s" % datetime.datetime.now().strftime(
- "%Y%m%d%H%M%S")
- if backup:
- try:
- shutil.copy(destination, backup_file)
- except IOError:
- LOGGER.error("Creating of backup file for %s failed!", destination)
- return False
- try:
- shutil.copy(source, destination)
- except IOError:
- LOGGER.error("Copying of file %s to %s failed!",
- source, destination)
- else:
- os.chmod(destination, 0o0644)
- os.chown(destination, 0, 0)
- return True
- for vhost in os.listdir(VHOSTS_DIRECTORY):
- if vhost.endswith(".conf"):
- vhost_absolute = os.path.join(VHOSTS_DIRECTORY, vhost)
- with open(vhost_absolute, "r") as vhost_file:
- PARSED_VHOSTS.extend(parse_vhost(vhost_file))
- for entry in PARSED_VHOSTS:
- try:
- with open(entry[1], "r") as cert_file:
- cert_text = cert_file.read()
- except IOError:
- LOGGER.error("Error while opening cert file %s ", entry[1])
- else:
- x509_current_cert = OpenSSL.crypto.load_certificate(
- OpenSSL.crypto.FILETYPE_PEM, cert_text)
- if "Let's Encrypt" in x509_current_cert.get_issuer().__str__():
- acme_cert_path = os.path.join(ACME_STATE_DIR, "live", entry[0],
- "cert")
- try:
- with open(acme_cert_path, "r") as acme_cert_file:
- acme_cert_text = acme_cert_file.read()
- except IOError:
- LOGGER.error("Could not open certificate for %s in acme "
- "state directory", entry[0])
- else:
- x509_acme_cert = OpenSSL.crypto.load_certificate(
- OpenSSL.crypto.FILETYPE_PEM, acme_cert_text
- )
- expiry_date = x509_acme_cert.get_notAfter().decode("utf-8")
- expiry_datetime = parse_asn1_time(expiry_date)
- if expiry_datetime < datetime.datetime.utcnow():
- LOGGER.warning("Certificate for %s is expired and no newer "
- "one is available, bailing out!", entry[0])
- else:
- serial_current_cert = x509_current_cert.get_serial_number()
- serial_acme_cert = x509_acme_cert.get_serial_number()
- if serial_current_cert == serial_acme_cert:
- LOGGER.debug("Cert for %s matches with the one "
- "installed, nothing to do.", entry[1])
- else:
- if copy_file(acme_cert_path, entry[1]):
- acme_key_path = os.path.join(ACME_STATE_DIR,
- "live", entry[0],
- "privkey")
- if copy_file(acme_key_path, entry[2]):
- LOGGER.info("Successfully renewed cert for %s",
- entry[0])
- CERT_RENEWED = True
- else:
- LOGGER.error("Renewal of cert for %s failed, "
- "please clean up manually and "
- "check the backup files!",
- entry[0])
- else:
- LOGGER.error("Renewal of cert for %s failed, "
- "please clean up manually and "
- "check the backup files!", entry[0])
- if CERT_RENEWED:
- LOGGER.debug("Checking apache configuration")
- try:
- subprocess.check_call(["/usr/sbin/apache2ctl", "-t"])
- except subprocess.CalledProcessError:
- LOGGER.error("Error in apache configuration, will not restart the "
- "web server")
- else:
- try:
- subprocess.check_call(["/etc/init.d/apache2", "restart"])
- except subprocess.CalledProcessError:
- LOGGER.error("Apache restart failed!")
- else:
- LOGGER.info("Apache restarted successfully")
|