Browse Source

remove acme_tlsa_mail.py and acme_updater.py, their functionality has been integrated in the new acme-updater which has it's own repository

Helmut Pozimski 7 years ago
parent
commit
86744c1e3e
3 changed files with 0 additions and 480 deletions
  1. 0 2
      README.md
  2. 0 278
      acme_tlsa_mail.py
  3. 0 200
      acme_updater.py

+ 0 - 2
README.md

@@ -8,8 +8,6 @@ This repository contains a collection of scripts written by me which are not big
 * backup_report.sh: Bash script that generates a report about created backups using [storeBackup](http://storebackup.org/). Can be used to track the creation of backups and detect failed backups so they can be cleaned up manually
 * cyber_generator: wsgi script that creates a random word with the prefix "cyber", currently used at [cyber-everything.de](https://cyber-everything.de/)
 * dnsping: Collection of scripts to implement a basic dynamic dns service
-* acme_updater.py: Python scripts to automatically replace letsencrypt certificates managed and renewed by acmetool
-* acme_tlsa_mail.py: Python script to replace letsencrypt certificates managed and renewed by acmetool for dovecot and postfix and also update the relevant TLSA records
 
 ## Copying
 

+ 0 - 278
acme_tlsa_mail.py

@@ -1,278 +0,0 @@
-#! /usr/bin/env python3
-# -*- coding: utf8 -*-
-"""
-Takes care of updating the Let's encrypt certificate for dovecot and postfix.
-It also updates the TLSA records for the entries relevant to
-the mail server automatically.
-
-Requirements:
-
-* python 3.x
-* module OpenSSL
-* module dnspython
-"""
-
-import socket
-import logging
-import datetime
-import os
-import hashlib
-import shutil
-import subprocess
-
-import OpenSSL
-import dns.tsigkeyring
-import dns.update
-import dns.query
-
-HOSTNAME = socket.gethostname()
-FQDN = socket.getfqdn()
-
-SERVICES = {
-    "postfix": {"cert": "/etc/postfix/%s.crt" % HOSTNAME,
-                "key": "/etc/postfix/%s.key" % HOSTNAME,
-                "ports": [25, 465, 587]},
-    "dovecot": {"cert": "/usr/share/ssl/certs/dovecot.pem",
-                "key": "/usr/share/ssl/private/dovecot.pem",
-                "ports": [993]}
-}
-
-DOMAIN = "%s.%s" % (FQDN.split(".")[-2], FQDN.split(".")[-1])
-
-# State directory for acmetool, default is /var/lib/acme
-ACME_STATE_DIR = "/var/lib/acme"
-
-# Log level, default is info
-LOG_LEVEL = logging.INFO
-
-LOGGER = logging.getLogger("acme_tlsa_mail")
-LOGGER.setLevel(LOG_LEVEL)
-CONSOLE_HANDLER = logging.StreamHandler()
-LOGGER.addHandler(CONSOLE_HANDLER)
-
-# Path to the bind9 session key
-NAMED_SESSION_KEY = "/run/named/session.key"
-
-# Address of the DNS server, defaults to localhost
-DNS_SERVER = "localhost"
-
-
-def check_renewal(cert):
-    """
-    Checks if the certificate has been renewed.
-
-    :param cert: the certificate that needs to be checked
-    :type cert: OpenSSL.crypto.X509
-    :return: renewal status
-    :rtype: bool
-    """
-    acme_cert_path = os.path.join(ACME_STATE_DIR, "live", FQDN, "cert")
-    try:
-        with open(acme_cert_path, "r") as acme_cert_file:
-            acme_cert_text = acme_cert_file.read()
-    except IOError:
-        LOGGER.error("Could not open certificate for %s in acme "
-                     "state directory", FQDN)
-    else:
-        x509_acme_cert = OpenSSL.crypto.load_certificate(
-            OpenSSL.crypto.FILETYPE_PEM, acme_cert_text
-        )
-        expiry_date = x509_acme_cert.get_notAfter().decode("utf-8")
-        expiry_datetime = parse_asn1_time(expiry_date)
-        if expiry_datetime < datetime.datetime.utcnow():
-            LOGGER.warning("Certificate for %s is expired and no newer "
-                           "one is available, bailing out!", FQDN)
-            return False
-        else:
-            serial_current_cert = cert.get_serial_number()
-            serial_acme_cert = x509_acme_cert.get_serial_number()
-            if serial_current_cert == serial_acme_cert:
-                LOGGER.debug("Cert for %s matches with the one "
-                             "installed, nothing to do.", FQDN)
-                return False
-            else:
-                return True
-
-
-def parse_asn1_time(timestamp):
-    """
-    parses an ANS1 timestamp as returned by OpenSSL and turns it into a python
-    datetime object.
-
-    :param timestamp: ASN1 timestamp
-    :type timestamp: str
-    :return: timestamp as datetime object
-    :rtype: datetime
-    """
-    year = int(timestamp[:4])
-    month = int(timestamp[4:6])
-    day = int(timestamp[6:8])
-    date = datetime.datetime(year, month, day)
-    return date
-
-
-def create_tlsa_hash(cert):
-    """
-    Creates an tlsa 3 1 1 hash to create TLSA records for a given certificate
-    :param cert: certificate to be used
-    :type cert: OpenSSL.crypto.X509
-    :return: sha256 has of the public key
-    :rtype: str
-    """
-    pubkey = cert.get_pubkey()
-    pubkey_der = OpenSSL.crypto.dump_publickey(OpenSSL.crypto.FILETYPE_ASN1,
-                                               pubkey)
-    sha256 = hashlib.sha256()
-    sha256.update(pubkey_der)
-    hexdigest = sha256.hexdigest()
-    return hexdigest
-
-
-def copy_file(source, destination, backup=True):
-    """
-    Copies a file from the given source file to
-    the given destionation and creates a copy of the
-    source file.
-
-    :param source: source file path
-    :type source: str
-    :param destination: destination file path
-    :type destination: str
-    :param backup: whether to take a backup of the destination file before \
-    overwriting it
-    :type backup: bool
-    :return: success
-    :rtype: bool
-    """
-    backup_file = destination + ".bak_%s" % datetime.datetime.now().strftime(
-        "%Y%m%d%H%M%S")
-    if backup:
-        try:
-            shutil.copy(destination, backup_file)
-        except IOError:
-            LOGGER.error("Creating of backup file for %s failed!", destination)
-            return False
-    try:
-        shutil.copy(source, destination)
-    except IOError:
-        LOGGER.error("Copying of file %s to %s failed!",
-                     source, destination)
-    else:
-        os.chmod(destination, 0o0644)
-        os.chown(destination, 0, 0)
-        return True
-
-
-def get_tsig_key():
-    """
-    Reads the named session key and generates a keyring object for it.
-
-    :return: keyring, algorithm
-    :rtype: tuple
-    """
-    key_name = None
-    key_algorithm = None
-    secret = None
-    try:
-        with open(NAMED_SESSION_KEY, "r") as bind_key:
-            for line in bind_key:
-                if "key" in line:
-                    key_name = line.split(" ")[1].strip("\"")
-                elif "algorithm" in line:
-                    key_algorithm = line.strip().split(" ")[1].strip(";")
-                elif "secret" in line:
-                    secret = line.strip().split(" ")[1].strip("\"").strip(";")
-    except IOError:
-        LOGGER.error("Error while opening the bind session key")
-        return None, None
-    else:
-        if key_name and key_algorithm and secret:
-            keyring = dns.tsigkeyring.from_text({
-                key_name: secret
-            })
-            return keyring, key_algorithm
-        else:
-            return None, None
-
-
-def update_tlsa_record(zone, tlsa_port, digest, keyring, keyalgorithm,
-                       subdomain="", ttl=300, protocol="tcp"):
-    """
-    Updates the tlsa record on the DNS server.
-
-    :param zone: Zone of the (sub) domain
-    :type zone: str
-    :param tlsa_port: port for the tlsa record
-    :type tlsa_port: str
-    :param digest: cryptographic hash of the certificate public key
-    :type digest: str
-    :param keyring: keyring object
-    :type keyring: dict
-    :param keyalgorithm: algorithm used for the tsig key
-    :type keyalgorithm: str
-    :param subdomain: subdomain to create the tlsa record for
-    :type subdomain: str
-    :param ttl: TTL to use for the TLSA record
-    :type ttl: int
-    :param protocol: protocol for the TLSA record
-    :type protocol: str
-    :returns: response of the operation
-    :rtype: dns.message.Message
-    """
-    update = dns.update.Update(zone, keyring=keyring,
-                               keyalgorithm=keyalgorithm)
-    tlsa_content = "3 1 1 %s" % digest
-    if subdomain:
-        tlsa_record = "_%s._%s.%s." % (tlsa_port, protocol, subdomain)
-    else:
-        tlsa_record = "_%s._%s.%s." % (tlsa_port, protocol, zone)
-    update.replace(tlsa_record, ttl, "tlsa", tlsa_content)
-    response = dns.query.tcp(update, 'localhost')
-    return response
-
-if __name__ == "__main__":
-    TSIG, KEYALGO = get_tsig_key()
-    for service in SERVICES:
-        try:
-            with open(SERVICES[service]["cert"], "r") as service_cert_file:
-                service_cert_text = service_cert_file.read()
-        except IOError:
-            LOGGER.error("Error while opening the postfix certificate")
-        else:
-            service_cert = OpenSSL.crypto.load_certificate(
-                OpenSSL.crypto.FILETYPE_PEM, service_cert_text
-            )
-            if check_renewal(service_cert):
-                newcert_path = os.path.join(ACME_STATE_DIR, "live", FQDN,
-                                            "fullchain")
-                try:
-                    with open(newcert_path, "r") as new_cert_file:
-                        new_cert_text = new_cert_file.read()
-                except IOError:
-                    LOGGER.error("Error while opening new %s certificate file",
-                                 service)
-                else:
-                    new_cert = OpenSSL.crypto.load_certificate(
-                        OpenSSL.crypto.FILETYPE_PEM, new_cert_text
-                    )
-                    hash_digest = create_tlsa_hash(new_cert)
-                    for port in SERVICES[service]["ports"]:
-                        update_tlsa_record(DOMAIN, port, hash_digest, TSIG,
-                                           KEYALGO, FQDN)
-                    if copy_file(newcert_path, SERVICES[service]["cert"]):
-                        newkey_path = os.path.join(ACME_STATE_DIR, "live",
-                                                   FQDN, "privkey")
-                        if copy_file(newkey_path, SERVICES[service]["key"]):
-                            LOGGER.info("Certificate for %s successfully "
-                                        "renewed, restarting service.",
-                                        service)
-                            subprocess.call(["/etc/init.d/%s"
-                                             % service, "restart"])
-                        else:
-                            LOGGER.error("Renewal of cert for %s failed, "
-                                         "please clean up manually and "
-                                         "check the backup files!", service)
-                    else:
-                        LOGGER.error("Renewal of cert for %s failed, "
-                                     "please clean up manually and "
-                                     "check the backup files!", service)

+ 0 - 200
acme_updater.py

@@ -1,200 +0,0 @@
-#! /usr/bin/env python3
-# -*- coding: utf8 -*-
-"""
-This script scans all apache vhosts on the system, checks if they use
-letsencrypt certificates for which acmetool has renewed the certificate and
-replaces the old certificate with the new one automatically. This way,
-acmetool can run as an unprivileged user and acme-updater can take
-care of swapping out certificates while running as root.
-
-Requirements:
-
-* python 3.x
-* module OpenSSL
-"""
-
-import os
-import logging
-import datetime
-import shutil
-import subprocess
-
-import OpenSSL
-
-# directory for enabled apache vhosts, might be different on your distribution
-VHOSTS_DIRECTORY = "/etc/apache2/sites-enabled"
-
-# Log level, default is info
-LOG_LEVEL = logging.INFO
-
-# State directory for acmetool, default is /var/lib/acme
-ACME_STATE_DIR = "/var/lib/acme"
-
-LOGGER = logging.getLogger("acme-updater")
-LOGGER.setLevel(LOG_LEVEL)
-CONSOLE_HANDLER = logging.StreamHandler()
-LOGGER.addHandler(CONSOLE_HANDLER)
-
-PARSED_VHOSTS = []
-CERT_RENEWED = False
-
-
-def parse_vhost(file_obj):
-    """
-    Parses a given vhost file and extracts the main domain,
-    the certificate file and the TLS key file.
-
-    :param file_obj: file obj pointing to a vhost to parse
-    :return: list of tuples with domains and found certificates
-    :rtype: list
-    """
-    vhost_started = False
-    parsed_info = []
-    cert_path = ""
-    key_path = ""
-    main_domain = ""
-    for line in file_obj:
-        if "<VirtualHost" in line:
-            vhost_started = True
-        elif "</VirtualHost" in line and vhost_started:
-            vhost_started = False
-            if cert_path and key_path and main_domain:
-                parsed_info.append((main_domain, cert_path, key_path))
-                LOGGER.debug(
-                    "Found vhost with main domain %s, certificate %s and key "
-                    "file %s", main_domain, cert_path, key_path)
-                cert_path = ""
-                key_path = ""
-                main_domain = ""
-        elif "ServerName" in line:
-            main_domain = line.strip().rsplit()[1]
-        elif "SSLCertificateFile" in line:
-            cert_path = line.strip().rsplit()[1]
-        elif "SSLCertificateKeyFile" in line:
-            key_path = line.strip().rsplit()[1]
-    return parsed_info
-
-
-def parse_asn1_time(timestamp):
-    """
-    parses an ANS1 timestamp as returned by OpenSSL and turns it into a python
-    datetime object.
-
-    :param timestamp: ASN1 timestamp
-    :type timestamp: str
-    :return: timestamp as datetime object
-    :rtype: datetime
-    """
-    year = int(timestamp[:4])
-    month = int(timestamp[4:6])
-    day = int(timestamp[6:8])
-    date = datetime.datetime(year, month, day)
-    return date
-
-
-def copy_file(source, destination, backup=True):
-    """
-    Copies a file from the given source file to
-    the given destionation and creates a copy of the
-    source file.
-
-    :param source: source file path
-    :type source: str
-    :param destination: destination file path
-    :type destination: str
-    :param backup: whether to take a backup of the destination file before \
-    overwriting it
-    :type backup: bool
-    :return: success
-    :rtype: bool
-    """
-    backup_file = destination + ".bak_%s" % datetime.datetime.now().strftime(
-        "%Y%m%d%H%M%S")
-    if backup:
-        try:
-            shutil.copy(destination, backup_file)
-        except IOError:
-            LOGGER.error("Creating of backup file for %s failed!", destination)
-            return False
-    try:
-        shutil.copy(source, destination)
-    except IOError:
-        LOGGER.error("Copying of file %s to %s failed!",
-                     source, destination)
-    else:
-        os.chmod(destination, 0o0644)
-        os.chown(destination, 0, 0)
-        return True
-
-for vhost in os.listdir(VHOSTS_DIRECTORY):
-    if vhost.endswith(".conf"):
-        vhost_absolute = os.path.join(VHOSTS_DIRECTORY, vhost)
-        with open(vhost_absolute, "r") as vhost_file:
-            PARSED_VHOSTS.extend(parse_vhost(vhost_file))
-
-for entry in PARSED_VHOSTS:
-    try:
-        with open(entry[1], "r") as cert_file:
-            cert_text = cert_file.read()
-    except IOError:
-        LOGGER.error("Error while opening cert file %s ", entry[1])
-    else:
-        x509_current_cert = OpenSSL.crypto.load_certificate(
-            OpenSSL.crypto.FILETYPE_PEM, cert_text)
-        if "Let's Encrypt" in x509_current_cert.get_issuer().__str__():
-            acme_cert_path = os.path.join(ACME_STATE_DIR, "live", entry[0],
-                                          "cert")
-            try:
-                with open(acme_cert_path, "r") as acme_cert_file:
-                    acme_cert_text = acme_cert_file.read()
-            except IOError:
-                LOGGER.error("Could not open certificate for %s in acme "
-                             "state directory", entry[0])
-            else:
-                x509_acme_cert = OpenSSL.crypto.load_certificate(
-                    OpenSSL.crypto.FILETYPE_PEM, acme_cert_text
-                )
-                expiry_date = x509_acme_cert.get_notAfter().decode("utf-8")
-                expiry_datetime = parse_asn1_time(expiry_date)
-                if expiry_datetime < datetime.datetime.utcnow():
-                    LOGGER.warning("Certificate for %s is expired and no newer "
-                                "one is available, bailing out!", entry[0])
-                else:
-                    serial_current_cert = x509_current_cert.get_serial_number()
-                    serial_acme_cert = x509_acme_cert.get_serial_number()
-                    if serial_current_cert == serial_acme_cert:
-                        LOGGER.debug("Cert for %s matches with the one "
-                                     "installed, nothing to do.", entry[1])
-                    else:
-                        if copy_file(acme_cert_path, entry[1]):
-                            acme_key_path = os.path.join(ACME_STATE_DIR,
-                                                         "live", entry[0],
-                                                         "privkey")
-                            if copy_file(acme_key_path, entry[2]):
-                                LOGGER.info("Successfully renewed cert for %s",
-                                            entry[0])
-                                CERT_RENEWED = True
-                            else:
-                                LOGGER.error("Renewal of cert for %s failed, "
-                                             "please clean up manually and "
-                                             "check the backup files!",
-                                             entry[0])
-                        else:
-                            LOGGER.error("Renewal of cert for %s failed, "
-                                         "please clean up manually and "
-                                         "check the backup files!", entry[0])
-
-if CERT_RENEWED:
-    LOGGER.debug("Checking apache configuration")
-    try:
-        subprocess.check_call(["/usr/sbin/apache2ctl", "-t"])
-    except subprocess.CalledProcessError:
-        LOGGER.error("Error in apache configuration, will not restart the "
-                     "web server")
-    else:
-        try:
-            subprocess.check_call(["/etc/init.d/apache2", "restart"])
-        except subprocess.CalledProcessError:
-            LOGGER.error("Apache restart failed!")
-        else:
-            LOGGER.info("Apache restarted successfully")