voixicron.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. #! /usr/bin/env python3
  2. # -*- coding: utf8 -*-
  3. """
  4. This script checks for available updates on Void Linux using xbps and notifies
  5. the configured administrator account about them. It currently requires
  6. python 3.x and no additional packages and is supposed to be run with cron or
  7. another scheduled execution mechanism.
  8. """
  9. import subprocess
  10. import smtplib
  11. import socket
  12. import re
  13. import logging
  14. import sys
  15. import json
  16. import argparse
  17. from email.mime.multipart import MIMEMultipart
  18. from email.mime.text import MIMEText
  19. __version__ = "0.3.1"
  20. # Configuration parameters used to send notifications and change the logging
  21. # behaviour. The option configuration file may contain any of the keys used
  22. # below in a json object.
  23. CONFIG = {
  24. "admin_email": "root",
  25. "smtp_server": "localhost",
  26. "smtp_port": 25,
  27. "smtp_user": "",
  28. "smtp_password": "",
  29. "log_level": logging.INFO
  30. }
  31. LOGGER = logging.getLogger("voixicron")
  32. LOGGER.setLevel(CONFIG["log_level"])
  33. CONSOLE_HANDLER = logging.StreamHandler()
  34. LOGGER.addHandler(CONSOLE_HANDLER)
  35. PARSER = argparse.ArgumentParser(description="checks for available updates on "
  36. "Void Linux systems.")
  37. PARSER.add_argument("-c", "--config", help="path to json configuration file")
  38. ARGS = PARSER.parse_args()
  39. if ARGS.config:
  40. try:
  41. with open(ARGS.config, "r") as conf_file:
  42. try:
  43. PARSED_JSON = json.load(conf_file)
  44. except ValueError:
  45. LOGGER.warn("The configuration is not a valid json document, "
  46. "it will be ignored.")
  47. else:
  48. CONFIG.update(PARSED_JSON)
  49. LOGGER.setLevel(CONFIG["log_level"])
  50. except IOError as error:
  51. LOGGER.warn("Configuration file could not be opened, error: %s",
  52. error.strerror)
  53. AVAILABLE_UPDATES = []
  54. DEVNULL = open("/dev/null", "w")
  55. try:
  56. subprocess.call(["xbps-install", "-S"], stdout=DEVNULL, stderr=DEVNULL)
  57. except subprocess.CalledProcessError:
  58. LOGGER.warn("Repository information could not be updated, voixicron will "
  59. "report outdated data!")
  60. try:
  61. XBPS_INSTALL_RESULT = subprocess.check_output(["xbps-install", "-Sun"],
  62. stderr=subprocess.STDOUT)
  63. except subprocess.CalledProcessError as error:
  64. LOGGER.error("Could not get list of available updates, "
  65. "xbps-install error:"
  66. " %s", error.output)
  67. sys.exit(1)
  68. else:
  69. XBPS_INSTALL_RESULT = XBPS_INSTALL_RESULT.decode("utf-8").split("\n")
  70. for line in XBPS_INSTALL_RESULT:
  71. if "update" in line:
  72. try:
  73. package = line.split(" ")[0]
  74. package_regex = "^([A-Za-z0-9-._+]*)-([0-9].*_[0-9]*)$"
  75. match_object = re.match(package_regex, package)
  76. if match_object:
  77. package_name = match_object.groups()[0]
  78. new_package_version = match_object.groups()[1]
  79. else:
  80. LOGGER.error("Not match for package %s", package)
  81. continue
  82. except IndexError:
  83. continue
  84. else:
  85. try:
  86. xbps_query_result = subprocess.check_output(
  87. ["xbps-query", "--show", package_name], stderr=DEVNULL)
  88. except subprocess.CalledProcessError as error:
  89. LOGGER.debug("Querying the installed version of package %s"
  90. " failed with error message: %s",
  91. package_name, error.output)
  92. else:
  93. xbps_query_result = xbps_query_result.decode(
  94. "utf-8").split("\n")
  95. for line in xbps_query_result:
  96. if "pkgver" in line:
  97. installed_package = line.split(
  98. ":")[1].strip()
  99. match = re.match(package_regex,
  100. installed_package)
  101. if match:
  102. installed_package_version = match.groups()[1]
  103. package_update = {
  104. "name": package_name,
  105. "new_version": new_package_version,
  106. "old_version": installed_package_version,
  107. }
  108. else:
  109. LOGGER.error("Unable to detect installed "
  110. "version of package %s",
  111. package_name)
  112. package_update = {
  113. "name": package_name,
  114. "new_version": new_package_version,
  115. "old_version": "unknown",
  116. }
  117. AVAILABLE_UPDATES.append(package_update)
  118. break
  119. if AVAILABLE_UPDATES:
  120. HOSTNAME = socket.getfqdn()
  121. MSG = MIMEMultipart()
  122. MAIL_TEXT = "The following package updates are available on host %s:\n\n"\
  123. % HOSTNAME
  124. MSG["Subject"] = "%s updates available on host %s"\
  125. % (len(AVAILABLE_UPDATES), HOSTNAME)
  126. MSG["From"] = "voixicron@%s" % HOSTNAME
  127. MSG["To"] = CONFIG["admin_email"]
  128. for update in AVAILABLE_UPDATES:
  129. MAIL_TEXT += "%s (%s -> %s)\n"\
  130. % (update["name"],
  131. update["old_version"],
  132. update["new_version"])
  133. MAIL_TEXT += "\n\nYou can install the updates by issuing the command:" \
  134. "\n\n\txbps-install -Su\n\nas root on %s\n\n--\nvoixicron"\
  135. % HOSTNAME
  136. MSG_TEXT = MIMEText(MAIL_TEXT.encode("utf-8"), _charset="utf-8")
  137. MSG.attach(MSG_TEXT)
  138. try:
  139. SERVER_CONNECTION = smtplib.SMTP(CONFIG["smtp_server"],
  140. CONFIG["smtp_port"])
  141. except (smtplib.SMTPConnectError, smtplib.SMTPServerDisconnected,
  142. socket.error):
  143. LOGGER.error("Failed to establish a connection to the SMTP server")
  144. else:
  145. try:
  146. SERVER_CONNECTION.starttls()
  147. except smtplib.SMTPException:
  148. pass
  149. if CONFIG["smtp_user"]:
  150. try:
  151. SERVER_CONNECTION.login(CONFIG["smtp_user"],
  152. CONFIG["smtp_password"])
  153. except smtplib.SMTPAuthenticationError:
  154. LOGGER.error("Authentication on the mail server with user %s "
  155. "failed.", CONFIG["smtp_user"])
  156. except smtplib.SMTPException:
  157. LOGGER.error("Error during authentication on the mail server.")
  158. try:
  159. SERVER_CONNECTION.sendmail("voixicron@%s" % HOSTNAME,
  160. CONFIG["admin_email"],
  161. MSG.as_string())
  162. except smtplib.SMTPRecipientsRefused:
  163. LOGGER.error("The mail server refused the recipient.")
  164. except smtplib.SMTPSenderRefused:
  165. LOGGER.error("The mail server refused the sender.")
  166. else:
  167. SERVER_CONNECTION.quit()
  168. DEVNULL.close()