# This file is part of stov, written by Helmut Pozimski 2012-2021. # # stov is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, version 2 of the License. # # stov is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with stov. If not, see . # -*- coding: utf8 -*- """This module is responsible for all database related operations.""" import logging import sqlite3 from lib_stov import generic_video from lib_stov import stov_exceptions from lib_stov import subscription LOGGER = logging.getLogger("stov") class Db: """This class is used to construct the module which will take care of all database related operations like opening the database, reading from and writing to it. """ _instance = None def __init__(self, path, version): """Constructor of the db class, populates the object with the relevant attributes, connects to the database and creates it if asked to. """ self.__path = path self.__version = version try: self.__connection = sqlite3.connect(self.__path) except sqlite3.OperationalError as exc: raise stov_exceptions.DBConnectionFailedException() from exc else: self.__cursor = self.__connection.cursor() def __del__(self): """Destructor, closes the connection to the database.""" self.__connection.close() def __new__(cls, *args, **kwargs): if not Db._instance: Db._instance = super(Db, cls).__new__(cls) return Db._instance def _execute_statement(self, statement, argument=None): """Executes a statement, works as a wrapper around cursor execute.""" try: if not argument: LOGGER.debug(_("Executing STATEMENT: %s"), statement) result = self.__cursor.execute(statement) else: LOGGER.debug(_("Executing STATEMENT: %s with arguments %s"), statement, argument) result = self.__cursor.execute(statement, argument) except sqlite3.OperationalError as error: LOGGER.debug(error) raise stov_exceptions.DBWriteAccessFailedException() else: self.__connection.commit() return result @staticmethod def get_instance(): """ Return the singleton instance of Db""" return Db._instance def populate(self): """Populates the database with the initial structure.""" self._execute_statement("""CREATE TABLE subscriptions ( id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL, title TEXT, name TEXT, type TEXT, searchstring TEXT, directory TEXT, disabled INTEGER DEFAULT 0, site INTEGER NOT NULL );""") self._execute_statement("""CREATE TABLE videos ( id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL, title TEXT, ytid TEXT, subscription_id INTEGER, downloaded INT, failcnt INTEGER DEFAULT 0 );""") self._execute_statement("""CREATE TABLE sites( id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL, title TEXT);""") def update(self): """Updates the database structure to match the current version""" if int(self.__version) == 1: # Changes between version 1 and 2 self._execute_statement("ALTER TABLE videos ADD COLUMN failcnt \ INTEGER DEFAULT 0;") self.__version = 2 if int(self.__version) == 2: # Changes between version 2 and 3 self._execute_statement("ALTER TABLE subscriptions ADD COLUMN" " disabled INTEGER DEFAULT 0;") self._execute_statement("UPDATE subscriptions SET disabled=0;") self.__version = 3 if int(self.__version) == 3: # Pseudo version without changes to the database structure, # converts existing channel subscriptions into user ones. self._execute_statement("UPDATE subscriptions SET type='user' " "WHERE type='channel'") self.__version = 4 if int(self.__version) == 4: # Changes between version 4 and 5 self._execute_statement("""CREATE TABLE videos_backup ( id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL, title TEXT, ytid TEXT, subscription_id INTEGER, downloaded INT, failcnt INTEGER DEFAULT 0 );""") self._execute_statement("INSERT INTO videos_backup SELECT" " id, title, ytid, subscription_id, " "downloaded, failcnt FROM videos;") self._execute_statement("DROP TABLE videos;") self._execute_statement("ALTER TABLE videos_backup RENAME TO " "videos;") self.__version = 5 if int(self.__version) == 5: self._execute_statement("""CREATE TABLE sites( id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL, title TEXT);""") self._execute_statement("""ALTER TABLE subscriptions ADD COLUMN site INTEGER;""") self.add_site("youtube") self._execute_statement("""UPDATE subscriptions SET site=1;""") self._execute_statement("""CREATE TABLE subscriptions_backup ( id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL, title TEXT, name TEXT, type TEXT, searchstring TEXT, directory TEXT, disabled INTEGER DEFAULT 0, site INTEGER NOT NULL );""") self._execute_statement("INSERT INTO subscriptions_backup SELECT" " id, title, name, type, " "searchstring, directory, disabled, " "site FROM subscriptions;") self._execute_statement("DROP TABLE subscriptions;") self._execute_statement("ALTER TABLE subscriptions_backup " "RENAME TO subscriptions;") self.__version = 6 def get_version(self): """Simple getter method, returns the DB version""" return self.__version def delete_subscription(self, sub_id): """Deletes a subscription and all associated videos from the database """ checkquery = "SELECT * FROM subscriptions WHERE id=?" checkresult = self._execute_statement(checkquery, (sub_id,)) if not checkresult.fetchall(): raise stov_exceptions.SubscriptionNotFoundException() delete_videos = "DELETE FROM videos WHERE subscription_id=?" self._execute_statement(delete_videos, (sub_id,)) delete_subscription = "DELETE FROM subscriptions WHERE id=?" self._execute_statement(delete_subscription, (sub_id,)) def get_videos(self, subscription_id): """Gets all videos of a given subscription id from the database and returns them """ videos_list = [] video_query = "SELECT id, title, ytid, downloaded, " \ "failcnt FROM videos WHERE subscription_id=?" cursor = self._execute_statement(video_query, (subscription_id,)) result = cursor.fetchall() for video in result: videos_list.append( generic_video.Video(title=video[1], site_id=video[2], downloaded=video[3], failcount=video[4], video_id=video[0])) return videos_list def video_in_database(self, ytid): """Checks if the video with a given youtube id already exists in the database """ video_query = "SELECT id FROM videos WHERE ytid=?" cursor = self._execute_statement(video_query, (ytid,)) result = cursor.fetchall() if not result: return False return True def insert_video(self, video, subscription_id): """Inserts a video with the given data into the database""" video_insert = "INSERT INTO videos (title, ytid, \ subscription_id, downloaded) VALUES \ (?, ?, ?, ?)" insert_data = (video.title, video.video_id, subscription_id, 0) self._execute_statement(video_insert, insert_data) def insert_subscription(self, data): """Inserts a subscription with the given data into the database""" subscription_insert = """INSERT INTO subscriptions (title, type, searchstring, directory, name, disabled, site) VALUES (?, ?, ?, ?, ?, ?, ?)""" self._execute_statement(subscription_insert, data) subscription_getid = "SELECT id from subscriptions where title=?" query_cursor = self._execute_statement(subscription_getid, (data[0],)) subscription_id = query_cursor.fetchone()[0] return subscription_id def update_video_download_status(self, video_id, status): """Updates the download status of a video in the database""" update_statement = "UPDATE videos SET downloaded = ? WHERE id = ?" self._execute_statement(update_statement, (status, video_id)) def disable_failed_video(self, video_id): """Disables a video in the database""" update_statement = "UPDATE videos SET downloaded = -1 WHERE id = ?" self._execute_statement(update_statement, (video_id,)) def update_video_fail_count(self, count, video_id): """Updates the fail count of a video in the database""" update_statement = "UPDATE videos SET failcnt = ? WHERE id = ?" self._execute_statement(update_statement, (count, video_id)) def delete_video(self, video_id): """Deletes a video from the database""" delete_statement = "DELETE FROM videos where id = ?" self._execute_statement(delete_statement, (video_id,)) def get_subscription(self, sub_id): """Retrieves a specific subscription from the database""" sub_query = """SELECT subscriptions.id, subscriptions.title, type, name,searchstring, directory,disabled, sites.title FROM subscriptions INNER JOIN sites ON subscriptions.site=sites.id WHERE subscriptions.id=?""" result_cursor = self._execute_statement(sub_query, (sub_id,)) result = result_cursor.fetchall() return result def get_subscription_title(self, sub_id): """Retrieves only the title of a specified subscription from the database """ title_query = "SELECT title from subscriptions where id=?" result_cursor = self._execute_statement(title_query, (sub_id,)) result = result_cursor.fetchall() return result def mark_video_downloaded(self, video_id): """ Marks all videos in a specified subscription as downloaded""" update_statement = "UPDATE videos SET downloaded = 1 " \ "WHERE subscription_id =?" self._execute_statement(update_statement, (video_id,)) def get_subscriptions(self): """Retrieves all subscriptions from the database""" subscriptions_list = [] subscriptions_query = """SELECT subscriptions.id, subscriptions.title, type, name,searchstring, directory,disabled, sites.title FROM subscriptions INNER JOIN sites ON subscriptions.site=sites.id;""" result_cursor = self._execute_statement(subscriptions_query) result = result_cursor.fetchall() for sub in result: subscriptions_list.append( subscription.Sub(subscription_type=sub[2], name=sub[3], site=sub[7], search=sub[4], subscription_id=sub[0], title=sub[1], directory=sub[5], disabled=sub[6])) return subscriptions_list def vacuum(self): """Vacuums the database, shrinking it in size""" self._execute_statement("VACUUM") def change_subscription_state(self, sub_id, state): """Enables or disables a subscription depending on the parameter state """ update_statement = "UPDATE subscriptions SET disabled=? WHERE id=?" self._execute_statement(update_statement, (state, sub_id)) def add_site(self, name): """ Adds a site with the specified name to the database. :param name: name of the new site :type name: str """ insert_statement = "INSERT INTO sites (title) VALUES (?)" self._execute_statement(insert_statement, (name,)) def remove_site(self, name): """ Removes a site with the specified name to the database. :param name: name of the new site :type name: str """ site_id = self.get_site_id(name) subscriptions = self._get_subscriptions_by_site_id(site_id) delete_videos = "DELETE FROM videos WHERE subscription_id = ?" delete_subscription = "DELETE FROM SUBSCRIPTIONS WHERE id=?" for sub in subscriptions: self._execute_statement(delete_videos, (sub,)) self._execute_statement(delete_subscription, (sub,)) delete_site = "DELETE FROM sites WHERE id=?" self._execute_statement(delete_site, (site_id,)) def get_site_id(self, name): """ Get the ID of a specific site :param name: name of the new site :type name: str :return: the site ID :rtype: int """ query = "SELECT id FROM sites WHERE title=?" cursor = self._execute_statement(query, (name,)) result = cursor.fetchone()[0] return result def get_sites(self): """ Retrieves all sites from the database. :return: list of sites with their respective IDs :rtype: tuple """ query = "SELECT id,title FROM sites" cursor = self._execute_statement(query) result = cursor.fetchall() return result def _get_subscriptions_by_site_id(self, site_id): """ Retrieves all subscriptions associated with the specified site_id from the database. :param site_id: ID of the site :type site_id: int :return: list of subscriptions associated with the site_id :rtype: tuple """ query = "SELECT id FROM subscriptions WHERE site=?" cursor = self._execute_statement(query, (site_id,)) return cursor.fetchall()