Browse Source

add first version of acme_tlsa_mail.py

Helmut Pozimski 7 years ago
parent
commit
12ecb169b7
2 changed files with 276 additions and 0 deletions
  1. 1 0
      README.md
  2. 275 0
      acme_tlsa_mail.py

+ 1 - 0
README.md

@@ -9,6 +9,7 @@ This repository contains a collection of scripts written by me which are not big
 * cyber_generator: wsgi script that creates a random word with the prefix "cyber", currently used at [cyber-everything.de](https://cyber-everything.de/)
 * dnsping: Collection of scripts to implement a basic dynamic dns service
 * acme_updater.py: Python scripts to automatically replace letsencrypt certificates managed and renewed by acmetool
+* acme_tlsa_mail.py: Python script to replace letsencrypt certificates managed and renewed by acmetool for dovecot and postfix and also update the relevant TLSA records
 
 ## Copying
 

+ 275 - 0
acme_tlsa_mail.py

@@ -0,0 +1,275 @@
+#! /usr/bin/env python3
+# -*- coding: utf8 -*-
+"""
+Takes care of updating the Let's encrypt certificate for dovecot and postfix.
+It also updates the TLSA records for the entries relevant to
+the mail server automatically.
+
+Requirements:
+
+* python 3.x
+* module OpenSSL
+* module dnspython
+"""
+
+import socket
+import logging
+import datetime
+import os
+import hashlib
+import shutil
+import subprocess
+
+import OpenSSL
+import dns.tsigkeyring
+import dns.update
+import dns.query
+
+HOSTNAME = socket.gethostname()
+FQDN = socket.getfqdn()
+
+SERVICES = {
+    "postfix": {"cert": "/etc/postfix/%s.crt" % HOSTNAME,
+                "key": "/etc/postfix/%s.key" % HOSTNAME,
+                "ports": [25, 465, 587]},
+    "dovecot": {"cert": "/usr/share/ssl/certs/dovecot.pem",
+                "key": "/usr/share/ssl/private/dovecot.pem",
+                "ports": [993]}
+}
+
+DOMAIN = "%s.%s" % (FQDN.split(".")[-2], FQDN.split(".")[-1])
+
+# State directory for acmetool, default is /var/lib/acme
+ACME_STATE_DIR = "/var/lib/acme"
+
+# Log level, default is info
+LOG_LEVEL = logging.INFO
+
+LOGGER = logging.getLogger("acme_tlsa_mail")
+LOGGER.setLevel(LOG_LEVEL)
+CONSOLE_HANDLER = logging.StreamHandler()
+LOGGER.addHandler(CONSOLE_HANDLER)
+
+# Path to the bind9 session key
+NAMED_SESSION_KEY = "/run/named/session.key"
+
+# Address of the DNS server, defaults to localhost
+DNS_SERVER = "localhost"
+
+
+def check_renewal(cert):
+    """
+    Checks if the certificate has been renewed.
+
+    :param cert: the certificate that needs to be checked
+    :type cert: OpenSSL.crypto.X509
+    :return: renewal status
+    :rtype: bool
+    """
+    acme_cert_path = os.path.join(ACME_STATE_DIR, "live", FQDN, "cert")
+    try:
+        with open(acme_cert_path, "r") as acme_cert_file:
+            acme_cert_text = acme_cert_file.read()
+    except IOError:
+        LOGGER.error("Could not open certificate for %s in acme "
+                     "state directory", FQDN)
+    else:
+        x509_acme_cert = OpenSSL.crypto.load_certificate(
+            OpenSSL.crypto.FILETYPE_PEM, acme_cert_text
+        )
+        expiry_date = x509_acme_cert.get_notAfter().decode("utf-8")
+        expiry_datetime = parse_asn1_time(expiry_date)
+        if expiry_datetime < datetime.datetime.utcnow():
+            LOGGER.warning("Certificate for %s is expired and no newer "
+                           "one is available, bailing out!", FQDN)
+            return False
+        else:
+            serial_current_cert = cert.get_serial_number()
+            serial_acme_cert = x509_acme_cert.get_serial_number()
+            if serial_current_cert == serial_acme_cert:
+                LOGGER.debug("Cert for %s matches with the one "
+                             "installed, nothing to do.", FQDN)
+                return False
+            elif serial_acme_cert > serial_current_cert:
+                return True
+
+
+def parse_asn1_time(timestamp):
+    """
+    parses an ANS1 timestamp as returned by OpenSSL and turns it into a python
+    datetime object.
+
+    :param timestamp: ASN1 timestamp
+    :type timestamp: str
+    :return: timestamp as datetime object
+    :rtype: datetime
+    """
+    year = int(timestamp[:4])
+    month = int(timestamp[4:6])
+    day = int(timestamp[6:8])
+    date = datetime.datetime(year, month, day)
+    return date
+
+
+def create_tlsa_hash(cert):
+    """
+    Creates an tlsa 3 1 1 hash to create TLSA records for a given certificate
+    :param cert: certificate to be used
+    :type cert: OpenSSL.crypto.X509
+    :return: sha256 has of the public key
+    :rtype: str
+    """
+    pubkey = cert.get_pubkey()
+    pubkey_der = OpenSSL.crypto.dump_publickey(OpenSSL.crypto.FILETYPE_ASN1,
+                                               pubkey)
+    sha256 = hashlib.sha256()
+    sha256.update(pubkey_der)
+    hexdigest = sha256.hexdigest()
+    return hexdigest
+
+
+def copy_file(source, destination):
+    """
+    Copies a file from the given source file to
+    the given destionation and creates a copy of the
+    source file.
+
+    :param source: source file path
+    :type source: str
+    :param destination: destionation file path
+    :type destination: str
+    :return: success
+    :rtype: bool
+    """
+    backup_file = source + ".bak_%s" % datetime.datetime.now().strftime(
+        "%Y%m%d%H%M%S")
+    try:
+        shutil.copy(source, backup_file)
+    except IOError:
+        LOGGER.error("Creating of backup file for %s failed!", source)
+        return False
+    else:
+        try:
+            shutil.copy(source, destination)
+        except IOError:
+            LOGGER.error("Copying of file %s to %s failed!",
+                         source, destination)
+        else:
+            os.chmod(destination, 0o0644)
+            os.chown(destination, 0, 0)
+            return True
+
+
+def get_tsig_key():
+    """
+    Reads the named session key and generates a keyring object for it.
+
+    :return: keyring, algorithm
+    :rtype: tuple
+    """
+    key_name = None
+    key_algorithm = None
+    secret = None
+    try:
+        with open(NAMED_SESSION_KEY, "r") as bind_key:
+            for line in bind_key:
+                if "key" in line:
+                    key_name = line.split(" ")[1].strip("\"")
+                elif "algorithm" in line:
+                    key_algorithm = line.strip().split(" ")[1].strip(";")
+                elif "secret" in line:
+                    secret = line.strip().split(" ")[1].strip("\"").strip(";")
+    except IOError:
+        LOGGER.error("Error while opening the bind session key")
+        return None, None
+    else:
+        if key_name and key_algorithm and secret:
+            keyring = dns.tsigkeyring.from_text({
+                key_name: secret
+            })
+            return keyring, key_algorithm
+        else:
+            return None, None
+
+
+def update_tlsa_record(zone, tlsa_port, digest, keyring, keyalgorithm,
+                       subdomain="", ttl=300, protocol="tcp"):
+    """
+    Updates the tlsa record on the DNS server.
+
+    :param zone: Zone of the (sub) domain
+    :type zone: str
+    :param tlsa_port: port for the tlsa record
+    :type tlsa_port: str
+    :param digest: cryptographic hash of the certificate public key
+    :type digest: str
+    :param keyring: keyring object
+    :type keyring: dict
+    :param keyalgorithm: algorithm used for the tsig key
+    :type keyalgorithm: str
+    :param subdomain: subdomain to create the tlsa record for
+    :type subdomain: str
+    :param ttl: TTL to use for the TLSA record
+    :type ttl: int
+    :param protocol: protocol for the TLSA record
+    :type protocol: str
+    :returns: response of the operation
+    :rtype: dns.message.Message
+    """
+    update = dns.update.Update(zone, keyring=keyring,
+                               keyalgorithm=keyalgorithm)
+    tlsa_content = "3 1 1 %s" % digest
+    if subdomain:
+        tlsa_record = "_%s._%s.%s." % (tlsa_port, protocol, subdomain)
+    else:
+        tlsa_record = "_%s._%s.%s." % (tlsa_port, protocol, zone)
+    update.replace(tlsa_record, ttl, "tlsa", tlsa_content)
+    response = dns.query.tcp(update, 'localhost')
+    return response
+
+if __name__ == "__main__":
+    TSIG, KEYALGO = get_tsig_key()
+    for service in SERVICES:
+        try:
+            with open(SERVICES[service]["cert"], "r") as service_cert_file:
+                service_cert_text = service_cert_file.read()
+        except IOError:
+            LOGGER.error("Error while opening the postfix certificate")
+        else:
+            service_cert = OpenSSL.crypto.load_certificate(
+                OpenSSL.crypto.FILETYPE_PEM, service_cert_text
+            )
+            if check_renewal(service_cert):
+                newcert_path = os.path.join(ACME_STATE_DIR, "live", FQDN,
+                                            "fullchain")
+                try:
+                    with open(newcert_path, "r") as new_cert_file:
+                        new_cert_text = new_cert_file.read()
+                except IOError:
+                    LOGGER.error("Error while opening new %s certificate file",
+                                 service)
+                else:
+                    new_cert = OpenSSL.crypto.load_certificate(
+                        OpenSSL.crypto.FILETYPE_PEM, new_cert_text
+                    )
+                    hash_digest = create_tlsa_hash(new_cert)
+                    for port in SERVICES[service]["ports"]:
+                        update_tlsa_record(DOMAIN, port, hash_digest, TSIG,
+                                           KEYALGO, FQDN)
+                    if copy_file(newcert_path, SERVICES[service]["cert"]):
+                        newkey_path = os.path.join(ACME_STATE_DIR, "live",
+                                                   FQDN, "privkey")
+                        if copy_file(newkey_path, SERVICES[service]["key"]):
+                            LOGGER.info("Certificate for %s successfully "
+                                        "renewed, restarting service.",
+                                        service)
+                            subprocess.call(["/etc/init.d/%s"
+                                             % service, "restart"])
+                        else:
+                            LOGGER.error("Renewal of cert for %s failed, "
+                                         "please clean up manually and "
+                                         "check the backup files!", service)
+                    else:
+                        LOGGER.error("Renewal of cert for %s failed, "
+                                     "please clean up manually and "
+                                     "check the backup files!", service_cert)