Browse Source

Refactor certificate path determination to allow support for tools other than acmetool

Helmut Pozimski 10 months ago
parent
commit
ea8300a16d
10 changed files with 128 additions and 50 deletions
  1. 1 0
      .gitignore
  2. 1 1
      README.md
  3. 12 12
      amulib/apache.py
  4. 40 0
      amulib/cert_path_provider.py
  5. 13 10
      amulib/dovecot.py
  6. 7 4
      amulib/ejabberd.py
  7. 36 7
      amulib/main.py
  8. 6 3
      amulib/postfix.py
  9. 11 13
      amulib/service.py
  10. 1 0
      example/config.json

+ 1 - 0
.gitignore

@@ -1,2 +1,3 @@
 .idea
 *.pyc
+*.egg-info

+ 1 - 1
README.md

@@ -7,7 +7,7 @@ acme-updater is a tool that takes care of automatically replacing Let's Encrypt
 * dovecot
 * ejabberd
 
-Current development is focused on Debian 8 jessie with sysvinit because that's what I use mysel. It might be extended to support other distributions or init systems in the future.
+Current development is focused on current Debian and Gentoo versions because that's what I use myself. It might be extended to support other distributions in the future.
 
 ## Usage
 

+ 12 - 12
amulib/apache.py

@@ -9,27 +9,29 @@ Contains the apache module which takes care of maintaining the apache 2 SSL
 certificates.
 """
 
+import datetime
 import logging
 import os
-import datetime
 import subprocess
 
-from amulib import helpers
 import OpenSSL
 
+from amulib import helpers
+from amulib.cert_path_provider import CertPathProvider
+
 LOGGER = logging.getLogger("acme-updater")
 
 
-def run(config=None, acme_dir="/var/lib/acme",
+def run(cert_path_provider: CertPathProvider, config=None,
         named_key_path="/run/named/session.key", dns_server="localhost"):
     """
     Main method of the apache module, actually replaces the certificates,
     manages the service and writes TLSA records if necessary.
 
+    :param cert_path_provider: provider of the certificate path
+    :type cert_path_provider: CertPathProvider
     :param config: configuration for the module.
     :type config: dict
-    :param acme_dir: path to the acme state dir
-    :type acme_dir: str
     :param named_key_path: path to the named session.key
     :type named_key_path: str
     :param dns_server: DNS server to use to create TLSA records
@@ -65,10 +67,10 @@ def run(config=None, acme_dir="/var/lib/acme",
             x509_current_cert = OpenSSL.crypto.load_certificate(
                 OpenSSL.crypto.FILETYPE_PEM, cert_text)
             if "Let's Encrypt" in x509_current_cert.get_issuer().__str__():
-                acme_cert_path = os.path.join(acme_dir, "live", entry[0],
-                                              "fullchain")
+                cert_path = cert_path_provider.provide_cert_path(entry[0])
+                fullchain_path = cert_path_provider.provide_fullchain_path(entry[0])
                 try:
-                    with open(acme_cert_path, "r") as acme_cert_file:
+                    with open(cert_path, "r") as acme_cert_file:
                         acme_cert_text = acme_cert_file.read()
                 except IOError:
                     LOGGER.error("Could not open certificate for %s in acme "
@@ -98,10 +100,8 @@ def run(config=None, acme_dir="/var/lib/acme",
                                         helpers.create_tlsa_records(
                                             domain, "443", x509_acme_cert,
                                             named_key_path, dns_server)
-                            if helpers.copy_file(acme_cert_path, entry[1]):
-                                acme_key_path = os.path.join(acme_dir,
-                                                             "live", entry[0],
-                                                             "privkey")
+                            if helpers.copy_file(fullchain_path, entry[1]):
+                                acme_key_path = cert_path_provider.provide_key_path(entry[0])
                                 if helpers.copy_file(acme_key_path, entry[2]):
                                     LOGGER.info(
                                         "Successfully renewed cert for %s",

+ 40 - 0
amulib/cert_path_provider.py

@@ -0,0 +1,40 @@
+# SPDX-FileCopyrightText: 2023 Helmut Pozimski <helmut@pozimski.eu>
+#
+# SPDX-License-Identifier: GPL-2.0-only
+
+import ntpath
+import os
+from abc import ABC, abstractmethod
+
+# -*- coding: utf8 -*-
+
+
+class CertPathProvider(ABC):
+    @abstractmethod
+    def provide_cert_path(self, fqdn: str) -> ntpath:
+        pass
+
+    @abstractmethod
+    def provide_fullchain_path(self, fqdn: str) -> ntpath:
+        pass
+
+    @abstractmethod
+    def provide_key_path(self, fqdn: str) -> ntpath:
+        pass
+
+
+class AcmeToolCertPathProvider(CertPathProvider):
+    def __init__(self, acme_dir: str):
+        self._acme_dir = acme_dir
+
+    def provide_key_path(self, fqdn: str) -> ntpath:
+        return self._join_paths(fqdn, "privkey")
+
+    def _join_paths(self, fqdn: str, file_name: str) -> ntpath:
+        return os.path.join(self._acme_dir, "live", fqdn, file_name)
+
+    def provide_cert_path(self, fqdn: str) -> ntpath:
+        return self._join_paths(fqdn, "cert")
+
+    def provide_fullchain_path(self, fqdn: str) -> ntpath:
+        return self._join_paths(fqdn, "fullchain")

+ 13 - 10
amulib/dovecot.py

@@ -1,4 +1,4 @@
-# SPDX-FileCopyrightText: 2016-2017 Helmut Pozimski <helmut@pozimski.eu>
+# SPDX-FileCopyrightText: 2016-2023 Helmut Pozimski <helmut@pozimski.eu>
 #
 # SPDX-License-Identifier: GPL-2.0-only
 
@@ -9,19 +9,22 @@ mail server.
 """
 
 from amulib import service
+from amulib.cert_path_provider import CertPathProvider
 
 
-def run(config=None, acme_dir="/var/lib/acme",
+def run(cert_path_provider: CertPathProvider, config=None,
         named_key_path="/run/named/session.key", dns_server="localhost"):
     """ manages the certificates for dovecot
 
-       :param config: configuration for the service
-       :type config: dict
-       :param acme_dir: path to the acme state dir
-       :type acme_dir: str
-       :param named_key_path: path to the named session.key
-       :type named_key_path: str
-       """
+        :param cert_path_provider: provider for certificate paths
+        :type cert_path_provider: CertPathProvider
+        :param config: configuration for the service
+        :type config: dict
+        :param named_key_path: path to the named session.key
+        :type named_key_path: str
+        :param dns_server: dns server to use
+        :type dns_server: str
+    """
     if not config:
         config = {
             "certificate_path": "/usr/share/ssl/certs/dovecot.pem",
@@ -29,4 +32,4 @@ def run(config=None, acme_dir="/var/lib/acme",
             "tlsa": True,
             "tlsa_ports": [993]
         }
-    service.run("dovecot", config, acme_dir, named_key_path, dns_server)
+    service.run(cert_path_provider, "dovecot", config, named_key_path, dns_server)

+ 7 - 4
amulib/ejabberd.py

@@ -1,4 +1,4 @@
-# SPDX-FileCopyrightText: 2016-2017 Helmut Pozimski <helmut@pozimski.eu>
+# SPDX-FileCopyrightText: 2016-2023 Helmut Pozimski <helmut@pozimski.eu>
 #
 # SPDX-License-Identifier: GPL-2.0-only
 
@@ -10,12 +10,15 @@
 """
 
 from amulib import service
+from amulib.cert_path_provider import CertPathProvider
 
 
-def run(config=None, acme_dir="/var/lib/acme",
-        named_key_path="/run/named/session.key", dns_server="localhost"):
+def run(cert_path_provider: CertPathProvider, config=None, named_key_path="/run/named/session.key",
+        dns_server="localhost"):
     """ manages the certificates for dovecot
 
+        :param cert_path_provider: provider for certificate paths
+        :type cert_path_provider: CertPathProvider
         :param config: configuration for the service
         :type config: dict
         :param acme_dir: path to the acme state dir
@@ -32,4 +35,4 @@ def run(config=None, acme_dir="/var/lib/acme",
             "tlsa": True,
             "tlsa_ports": [5222, 5269]
         }
-    service.run("ejabberd", config, acme_dir, named_key_path, dns_server)
+    service.run(cert_path_provider, config, named_key_path, dns_server)

+ 36 - 7
amulib/main.py

@@ -1,4 +1,4 @@
-# SPDX-FileCopyrightText: 2016-2017 Helmut Pozimski <helmut@pozimski.eu>
+# SPDX-FileCopyrightText: 2016-2023 Helmut Pozimski <helmut@pozimski.eu>
 #
 # SPDX-License-Identifier: GPL-2.0-only
 
@@ -14,6 +14,7 @@ import logging
 import json
 import sys
 
+from amulib.cert_path_provider import AcmeToolCertPathProvider
 from amulib.helpers import get_log_level
 from amulib import apache
 from amulib import postfix
@@ -26,6 +27,7 @@ def main():
     Main function of acme-updater.
     """
     config = None
+    cert_path_provider = None
     parser = argparse.ArgumentParser()
     parser.add_argument("--apache", help="use the apache module",
                         action="store_true")
@@ -43,6 +45,14 @@ def main():
     args = parser.parse_args()
     logger = logging.getLogger("acme-updater")
     logger.addHandler(logging.StreamHandler())
+    config = read_config(args, config, logger)
+    set_log_level(args, config, logger)
+    cert_path_provider = create_cert_path_provider(cert_path_provider, config, logger)
+
+    execute_services(args, cert_path_provider, config)
+
+
+def read_config(args, config, logger):
     if args.config:
         try:
             with open(args.config, "r") as fobj:
@@ -53,33 +63,52 @@ def main():
         except json.JSONDecodeError:
             logger.error("Error: Could not parse configuration file")
             sys.exit(1)
+    else:
+        logger.error("A configuration file has to be provided with --config")
+        sys.exit(1)
+    return config
+
+
+def set_log_level(args, config, logger):
     if args.verbose:
         logger.setLevel(logging.DEBUG)
     elif config:
         logger.setLevel(get_log_level(config["loglevel"]))
     else:
         logger.setLevel(logging.INFO)
+
+
+def create_cert_path_provider(cert_path_provider, config, logger):
+    if "acme_tool" in config and config["acme_tool"] == "acmetool":
+        cert_path_provider = AcmeToolCertPathProvider(config["acme_dir"])
+    else:
+        logger.error("Invalid acme tooling specified")
+        sys.exit(1)
+    return cert_path_provider
+
+
+def execute_services(args, cert_path_provider, config):
     if args.apache:
         if config:
-            apache.run(config["apache"], config["acme_dir"],
+            apache.run(cert_path_provider, config["apache"],
                        config["named_key_path"], config["dns_server"])
         else:
-            apache.run()
+            apache.run(cert_path_provider)
     if args.postfix:
         if config:
             postfix.run(config["postfix"], config["acme_dir"],
                         config["named_key_path"], config["dns_server"])
         else:
-            postfix.run()
+            postfix.run(cert_path_provider)
     if args.dovecot:
         if config:
             dovecot.run(config["dovecot"], config["acme_dir"],
                         config["named_key_path"], config["dns_server"])
         else:
-            dovecot.run()
+            dovecot.run(cert_path_provider)
     if args.ejabberd:
         if config:
             ejabberd.run(config["ejabberd"], config["acme_dir"],
-                         config["named_key_path"], config["dns_server"])
+                         config["dns_server"])
         else:
-            ejabberd.run()
+            ejabberd.run(cert_path_provider)

+ 6 - 3
amulib/postfix.py

@@ -1,4 +1,4 @@
-# SPDX-FileCopyrightText: 2016-2017 Helmut Pozimski <helmut@pozimski.eu>
+# SPDX-FileCopyrightText: 2016-2023 Helmut Pozimski <helmut@pozimski.eu>
 #
 # SPDX-License-Identifier: GPL-2.0-only
 
@@ -12,13 +12,16 @@ mail server.
 import socket
 
 from amulib import service
+from amulib.cert_path_provider import CertPathProvider
 
 
-def run(config=None, acme_dir="/var/lib/acme",
+def run(cert_path_provider: CertPathProvider, config=None,
         named_key_path="/run/named/session.key",
         dns_server="localhost"):
     """ manages the certificates for postfix
 
+    :param cert_path_provider: provider for certificate paths
+    :type cert_path_provider: CertPathProvider
     :param config: configuration for the service
     :type config: dict
     :param acme_dir: path to the acme state dir
@@ -36,4 +39,4 @@ def run(config=None, acme_dir="/var/lib/acme",
             "tlsa": True,
             "tlsa_ports": [25, 465, 587]
         }
-    service.run("postfix", config, acme_dir, named_key_path, dns_server)
+    service.run(cert_path_provider, "postfix", config, named_key_path, dns_server)

+ 11 - 13
amulib/service.py

@@ -1,4 +1,4 @@
-# SPDX-FileCopyrightText: 2016-2017 Helmut Pozimski <helmut@pozimski.eu>
+# SPDX-FileCopyrightText: 2016-2023 Helmut Pozimski <helmut@pozimski.eu>
 #
 # SPDX-License-Identifier: GPL-2.0-only
 
@@ -12,26 +12,27 @@ needs to at least contain the keys "certificate_path", "key_path",
 """
 
 import logging
-import os
-import subprocess
 import shutil
+import subprocess
 
-from amulib import helpers
 import OpenSSL
 
+from amulib import helpers
+from amulib.cert_path_provider import CertPathProvider
+
 LOGGER = logging.getLogger("acme-updater")
 
 
-def run(service_name, config, acme_dir="/var/lib/acme",
+def run(cert_path_provider: CertPathProvider, service_name, config,
         named_key_path="/run/named/session.key", dns_server="localhost"):
     """
 
+    :param cert_path_provider: provider for certifcate paths
+    :type cert_path_provider: CertPathProvider
     :param service_name: name of the service
     :type service_name: str
     :param config: configuration for the service
     :type config: dict
-    :param acme_dir: path to the acme state dir
-    :type acme_dir: str
     :param named_key_path: path to the named session.key
     :type named_key_path: str
     :param dns_server: DNS server to use to create TLSA records
@@ -53,10 +54,8 @@ def run(service_name, config, acme_dir="/var/lib/acme",
         )
         cert_alt_names = helpers.get_subject_alt_name(current_cert)
         fqdn = cert_alt_names[0]
-        acme_cert_path = os.path.join(acme_dir, "live", fqdn,
-                                      "cert")
-        acme_fullchain_path = os.path.join(acme_dir, "live", fqdn,
-                                           "fullchain")
+        acme_cert_path = cert_path_provider.provide_cert_path(fqdn)
+        acme_fullchain_path = cert_path_provider.provide_fullchain_path(fqdn)
         if helpers.check_renewal(current_cert, acme_cert_path):
             try:
                 with open(acme_cert_path, "r") as acme_cert_file:
@@ -74,8 +73,7 @@ def run(service_name, config, acme_dir="/var/lib/acme",
                             helpers.create_tlsa_records(name, port, acme_cert,
                                                         named_key_path,
                                                         dns_server)
-                newkey_path = os.path.join(acme_dir, "live",
-                                           fqdn, "privkey")
+                newkey_path = cert_path_provider.provide_key_path(fqdn)
                 if certificate_path == key_path:
                     if helpers.create_backup_copy(certificate_path):
                         try:

+ 1 - 0
example/config.json

@@ -3,6 +3,7 @@
   "acme_dir": "/var/lib/acme",
   "named_key_path": "/run/named/session.key",
   "dns_server": "localhost",
+  "acme_tool": "acmetool",
   "apache": {
     "vhosts_dir": "/etc/apache2/sites-enabled",
     "tlsa": false,