#! /usr/bin/env python3 # -*- coding: utf8 -*- """ Takes care of updating the Let's encrypt certificate for dovecot and postfix. It also updates the TLSA records for the entries relevant to the mail server automatically. Requirements: * python 3.x * module OpenSSL * module dnspython """ import socket import logging import datetime import os import hashlib import shutil import subprocess import OpenSSL import dns.tsigkeyring import dns.update import dns.query HOSTNAME = socket.gethostname() FQDN = socket.getfqdn() SERVICES = { "postfix": {"cert": "/etc/postfix/%s.crt" % HOSTNAME, "key": "/etc/postfix/%s.key" % HOSTNAME, "ports": [25, 465, 587]}, "dovecot": {"cert": "/usr/share/ssl/certs/dovecot.pem", "key": "/usr/share/ssl/private/dovecot.pem", "ports": [993]} } DOMAIN = "%s.%s" % (FQDN.split(".")[-2], FQDN.split(".")[-1]) # State directory for acmetool, default is /var/lib/acme ACME_STATE_DIR = "/var/lib/acme" # Log level, default is info LOG_LEVEL = logging.INFO LOGGER = logging.getLogger("acme_tlsa_mail") LOGGER.setLevel(LOG_LEVEL) CONSOLE_HANDLER = logging.StreamHandler() LOGGER.addHandler(CONSOLE_HANDLER) # Path to the bind9 session key NAMED_SESSION_KEY = "/run/named/session.key" # Address of the DNS server, defaults to localhost DNS_SERVER = "localhost" def check_renewal(cert): """ Checks if the certificate has been renewed. :param cert: the certificate that needs to be checked :type cert: OpenSSL.crypto.X509 :return: renewal status :rtype: bool """ acme_cert_path = os.path.join(ACME_STATE_DIR, "live", FQDN, "cert") try: with open(acme_cert_path, "r") as acme_cert_file: acme_cert_text = acme_cert_file.read() except IOError: LOGGER.error("Could not open certificate for %s in acme " "state directory", FQDN) else: x509_acme_cert = OpenSSL.crypto.load_certificate( OpenSSL.crypto.FILETYPE_PEM, acme_cert_text ) expiry_date = x509_acme_cert.get_notAfter().decode("utf-8") expiry_datetime = parse_asn1_time(expiry_date) if expiry_datetime < datetime.datetime.utcnow(): LOGGER.warning("Certificate for %s is expired and no newer " "one is available, bailing out!", FQDN) return False else: serial_current_cert = cert.get_serial_number() serial_acme_cert = x509_acme_cert.get_serial_number() if serial_current_cert == serial_acme_cert: LOGGER.debug("Cert for %s matches with the one " "installed, nothing to do.", FQDN) return False else: return True def parse_asn1_time(timestamp): """ parses an ANS1 timestamp as returned by OpenSSL and turns it into a python datetime object. :param timestamp: ASN1 timestamp :type timestamp: str :return: timestamp as datetime object :rtype: datetime """ year = int(timestamp[:4]) month = int(timestamp[4:6]) day = int(timestamp[6:8]) date = datetime.datetime(year, month, day) return date def create_tlsa_hash(cert): """ Creates an tlsa 3 1 1 hash to create TLSA records for a given certificate :param cert: certificate to be used :type cert: OpenSSL.crypto.X509 :return: sha256 has of the public key :rtype: str """ pubkey = cert.get_pubkey() pubkey_der = OpenSSL.crypto.dump_publickey(OpenSSL.crypto.FILETYPE_ASN1, pubkey) sha256 = hashlib.sha256() sha256.update(pubkey_der) hexdigest = sha256.hexdigest() return hexdigest def copy_file(source, destination, backup=True): """ Copies a file from the given source file to the given destionation and creates a copy of the source file. :param source: source file path :type source: str :param destination: destination file path :type destination: str :param backup: whether to take a backup of the destination file before \ overwriting it :type backup: bool :return: success :rtype: bool """ backup_file = destination + ".bak_%s" % datetime.datetime.now().strftime( "%Y%m%d%H%M%S") if backup: try: shutil.copy(destination, backup_file) except IOError: LOGGER.error("Creating of backup file for %s failed!", destination) return False try: shutil.copy(source, destination) except IOError: LOGGER.error("Copying of file %s to %s failed!", source, destination) else: os.chmod(destination, 0o0644) os.chown(destination, 0, 0) return True def get_tsig_key(): """ Reads the named session key and generates a keyring object for it. :return: keyring, algorithm :rtype: tuple """ key_name = None key_algorithm = None secret = None try: with open(NAMED_SESSION_KEY, "r") as bind_key: for line in bind_key: if "key" in line: key_name = line.split(" ")[1].strip("\"") elif "algorithm" in line: key_algorithm = line.strip().split(" ")[1].strip(";") elif "secret" in line: secret = line.strip().split(" ")[1].strip("\"").strip(";") except IOError: LOGGER.error("Error while opening the bind session key") return None, None else: if key_name and key_algorithm and secret: keyring = dns.tsigkeyring.from_text({ key_name: secret }) return keyring, key_algorithm else: return None, None def update_tlsa_record(zone, tlsa_port, digest, keyring, keyalgorithm, subdomain="", ttl=300, protocol="tcp"): """ Updates the tlsa record on the DNS server. :param zone: Zone of the (sub) domain :type zone: str :param tlsa_port: port for the tlsa record :type tlsa_port: str :param digest: cryptographic hash of the certificate public key :type digest: str :param keyring: keyring object :type keyring: dict :param keyalgorithm: algorithm used for the tsig key :type keyalgorithm: str :param subdomain: subdomain to create the tlsa record for :type subdomain: str :param ttl: TTL to use for the TLSA record :type ttl: int :param protocol: protocol for the TLSA record :type protocol: str :returns: response of the operation :rtype: dns.message.Message """ update = dns.update.Update(zone, keyring=keyring, keyalgorithm=keyalgorithm) tlsa_content = "3 1 1 %s" % digest if subdomain: tlsa_record = "_%s._%s.%s." % (tlsa_port, protocol, subdomain) else: tlsa_record = "_%s._%s.%s." % (tlsa_port, protocol, zone) update.replace(tlsa_record, ttl, "tlsa", tlsa_content) response = dns.query.tcp(update, 'localhost') return response if __name__ == "__main__": TSIG, KEYALGO = get_tsig_key() for service in SERVICES: try: with open(SERVICES[service]["cert"], "r") as service_cert_file: service_cert_text = service_cert_file.read() except IOError: LOGGER.error("Error while opening the postfix certificate") else: service_cert = OpenSSL.crypto.load_certificate( OpenSSL.crypto.FILETYPE_PEM, service_cert_text ) if check_renewal(service_cert): newcert_path = os.path.join(ACME_STATE_DIR, "live", FQDN, "fullchain") try: with open(newcert_path, "r") as new_cert_file: new_cert_text = new_cert_file.read() except IOError: LOGGER.error("Error while opening new %s certificate file", service) else: new_cert = OpenSSL.crypto.load_certificate( OpenSSL.crypto.FILETYPE_PEM, new_cert_text ) hash_digest = create_tlsa_hash(new_cert) for port in SERVICES[service]["ports"]: update_tlsa_record(DOMAIN, port, hash_digest, TSIG, KEYALGO, FQDN) if copy_file(newcert_path, SERVICES[service]["cert"]): newkey_path = os.path.join(ACME_STATE_DIR, "live", FQDN, "privkey") if copy_file(newkey_path, SERVICES[service]["key"]): LOGGER.info("Certificate for %s successfully " "renewed, restarting service.", service) subprocess.call(["/etc/init.d/%s" % service, "restart"]) else: LOGGER.error("Renewal of cert for %s failed, " "please clean up manually and " "check the backup files!", service) else: LOGGER.error("Renewal of cert for %s failed, " "please clean up manually and " "check the backup files!", service)