123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278 |
- #! /usr/bin/env python3
- # -*- coding: utf8 -*-
- """
- Takes care of updating the Let's encrypt certificate for dovecot and postfix.
- It also updates the TLSA records for the entries relevant to
- the mail server automatically.
- Requirements:
- * python 3.x
- * module OpenSSL
- * module dnspython
- """
- import socket
- import logging
- import datetime
- import os
- import hashlib
- import shutil
- import subprocess
- import OpenSSL
- import dns.tsigkeyring
- import dns.update
- import dns.query
- HOSTNAME = socket.gethostname()
- FQDN = socket.getfqdn()
- SERVICES = {
- "postfix": {"cert": "/etc/postfix/%s.crt" % HOSTNAME,
- "key": "/etc/postfix/%s.key" % HOSTNAME,
- "ports": [25, 465, 587]},
- "dovecot": {"cert": "/usr/share/ssl/certs/dovecot.pem",
- "key": "/usr/share/ssl/private/dovecot.pem",
- "ports": [993]}
- }
- DOMAIN = "%s.%s" % (FQDN.split(".")[-2], FQDN.split(".")[-1])
- # State directory for acmetool, default is /var/lib/acme
- ACME_STATE_DIR = "/var/lib/acme"
- # Log level, default is info
- LOG_LEVEL = logging.INFO
- LOGGER = logging.getLogger("acme_tlsa_mail")
- LOGGER.setLevel(LOG_LEVEL)
- CONSOLE_HANDLER = logging.StreamHandler()
- LOGGER.addHandler(CONSOLE_HANDLER)
- # Path to the bind9 session key
- NAMED_SESSION_KEY = "/run/named/session.key"
- # Address of the DNS server, defaults to localhost
- DNS_SERVER = "localhost"
- def check_renewal(cert):
- """
- Checks if the certificate has been renewed.
- :param cert: the certificate that needs to be checked
- :type cert: OpenSSL.crypto.X509
- :return: renewal status
- :rtype: bool
- """
- acme_cert_path = os.path.join(ACME_STATE_DIR, "live", FQDN, "cert")
- try:
- with open(acme_cert_path, "r") as acme_cert_file:
- acme_cert_text = acme_cert_file.read()
- except IOError:
- LOGGER.error("Could not open certificate for %s in acme "
- "state directory", FQDN)
- else:
- x509_acme_cert = OpenSSL.crypto.load_certificate(
- OpenSSL.crypto.FILETYPE_PEM, acme_cert_text
- )
- expiry_date = x509_acme_cert.get_notAfter().decode("utf-8")
- expiry_datetime = parse_asn1_time(expiry_date)
- if expiry_datetime < datetime.datetime.utcnow():
- LOGGER.warning("Certificate for %s is expired and no newer "
- "one is available, bailing out!", FQDN)
- return False
- else:
- serial_current_cert = cert.get_serial_number()
- serial_acme_cert = x509_acme_cert.get_serial_number()
- if serial_current_cert == serial_acme_cert:
- LOGGER.debug("Cert for %s matches with the one "
- "installed, nothing to do.", FQDN)
- return False
- else:
- return True
- def parse_asn1_time(timestamp):
- """
- parses an ANS1 timestamp as returned by OpenSSL and turns it into a python
- datetime object.
- :param timestamp: ASN1 timestamp
- :type timestamp: str
- :return: timestamp as datetime object
- :rtype: datetime
- """
- year = int(timestamp[:4])
- month = int(timestamp[4:6])
- day = int(timestamp[6:8])
- date = datetime.datetime(year, month, day)
- return date
- def create_tlsa_hash(cert):
- """
- Creates an tlsa 3 1 1 hash to create TLSA records for a given certificate
- :param cert: certificate to be used
- :type cert: OpenSSL.crypto.X509
- :return: sha256 has of the public key
- :rtype: str
- """
- pubkey = cert.get_pubkey()
- pubkey_der = OpenSSL.crypto.dump_publickey(OpenSSL.crypto.FILETYPE_ASN1,
- pubkey)
- sha256 = hashlib.sha256()
- sha256.update(pubkey_der)
- hexdigest = sha256.hexdigest()
- return hexdigest
- def copy_file(source, destination, backup=True):
- """
- Copies a file from the given source file to
- the given destionation and creates a copy of the
- source file.
- :param source: source file path
- :type source: str
- :param destination: destination file path
- :type destination: str
- :param backup: whether to take a backup of the destination file before \
- overwriting it
- :type backup: bool
- :return: success
- :rtype: bool
- """
- backup_file = destination + ".bak_%s" % datetime.datetime.now().strftime(
- "%Y%m%d%H%M%S")
- if backup:
- try:
- shutil.copy(destination, backup_file)
- except IOError:
- LOGGER.error("Creating of backup file for %s failed!", destination)
- return False
- try:
- shutil.copy(source, destination)
- except IOError:
- LOGGER.error("Copying of file %s to %s failed!",
- source, destination)
- else:
- os.chmod(destination, 0o0644)
- os.chown(destination, 0, 0)
- return True
- def get_tsig_key():
- """
- Reads the named session key and generates a keyring object for it.
- :return: keyring, algorithm
- :rtype: tuple
- """
- key_name = None
- key_algorithm = None
- secret = None
- try:
- with open(NAMED_SESSION_KEY, "r") as bind_key:
- for line in bind_key:
- if "key" in line:
- key_name = line.split(" ")[1].strip("\"")
- elif "algorithm" in line:
- key_algorithm = line.strip().split(" ")[1].strip(";")
- elif "secret" in line:
- secret = line.strip().split(" ")[1].strip("\"").strip(";")
- except IOError:
- LOGGER.error("Error while opening the bind session key")
- return None, None
- else:
- if key_name and key_algorithm and secret:
- keyring = dns.tsigkeyring.from_text({
- key_name: secret
- })
- return keyring, key_algorithm
- else:
- return None, None
- def update_tlsa_record(zone, tlsa_port, digest, keyring, keyalgorithm,
- subdomain="", ttl=300, protocol="tcp"):
- """
- Updates the tlsa record on the DNS server.
- :param zone: Zone of the (sub) domain
- :type zone: str
- :param tlsa_port: port for the tlsa record
- :type tlsa_port: str
- :param digest: cryptographic hash of the certificate public key
- :type digest: str
- :param keyring: keyring object
- :type keyring: dict
- :param keyalgorithm: algorithm used for the tsig key
- :type keyalgorithm: str
- :param subdomain: subdomain to create the tlsa record for
- :type subdomain: str
- :param ttl: TTL to use for the TLSA record
- :type ttl: int
- :param protocol: protocol for the TLSA record
- :type protocol: str
- :returns: response of the operation
- :rtype: dns.message.Message
- """
- update = dns.update.Update(zone, keyring=keyring,
- keyalgorithm=keyalgorithm)
- tlsa_content = "3 1 1 %s" % digest
- if subdomain:
- tlsa_record = "_%s._%s.%s." % (tlsa_port, protocol, subdomain)
- else:
- tlsa_record = "_%s._%s.%s." % (tlsa_port, protocol, zone)
- update.replace(tlsa_record, ttl, "tlsa", tlsa_content)
- response = dns.query.tcp(update, 'localhost')
- return response
- if __name__ == "__main__":
- TSIG, KEYALGO = get_tsig_key()
- for service in SERVICES:
- try:
- with open(SERVICES[service]["cert"], "r") as service_cert_file:
- service_cert_text = service_cert_file.read()
- except IOError:
- LOGGER.error("Error while opening the postfix certificate")
- else:
- service_cert = OpenSSL.crypto.load_certificate(
- OpenSSL.crypto.FILETYPE_PEM, service_cert_text
- )
- if check_renewal(service_cert):
- newcert_path = os.path.join(ACME_STATE_DIR, "live", FQDN,
- "fullchain")
- try:
- with open(newcert_path, "r") as new_cert_file:
- new_cert_text = new_cert_file.read()
- except IOError:
- LOGGER.error("Error while opening new %s certificate file",
- service)
- else:
- new_cert = OpenSSL.crypto.load_certificate(
- OpenSSL.crypto.FILETYPE_PEM, new_cert_text
- )
- hash_digest = create_tlsa_hash(new_cert)
- for port in SERVICES[service]["ports"]:
- update_tlsa_record(DOMAIN, port, hash_digest, TSIG,
- KEYALGO, FQDN)
- if copy_file(newcert_path, SERVICES[service]["cert"]):
- newkey_path = os.path.join(ACME_STATE_DIR, "live",
- FQDN, "privkey")
- if copy_file(newkey_path, SERVICES[service]["key"]):
- LOGGER.info("Certificate for %s successfully "
- "renewed, restarting service.",
- service)
- subprocess.call(["/etc/init.d/%s"
- % service, "restart"])
- else:
- LOGGER.error("Renewal of cert for %s failed, "
- "please clean up manually and "
- "check the backup files!", service)
- else:
- LOGGER.error("Renewal of cert for %s failed, "
- "please clean up manually and "
- "check the backup files!", service)
|