Added database cache functionality

This commit is contained in:
Roger Gonzalez 2025-03-28 20:18:36 -03:00
parent 518146097e
commit 6885a0c491
Signed by: rogs
GPG Key ID: C7ECE9C6C36EC2E6
2 changed files with 194 additions and 11 deletions

View File

@ -0,0 +1,3 @@
"""Subscleaner package."""
__version__ = "1.3.0"

View File

@ -18,14 +18,18 @@ You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>. along with this program. If not, see <https://www.gnu.org/licenses/>.
""" """
import argparse
import hashlib
import os import os
import pathlib import pathlib
import re import re
import sqlite3
import sys import sys
import time import time
import chardet import chardet
import pysrt import pysrt
from appdirs import user_data_dir
AD_PATTERNS = [ AD_PATTERNS = [
re.compile(r"\bnordvpn\b", re.IGNORECASE), re.compile(r"\bnordvpn\b", re.IGNORECASE),
@ -98,6 +102,118 @@ AD_PATTERNS = [
] ]
def get_db_path(debug=False):
"""
Get the path to the SQLite database.
Args:
debug (bool): If True, use the current directory for the database.
Returns:
pathlib.Path: The path to the database file.
"""
if debug:
return pathlib.Path.cwd() / "subscleaner.db"
app_data_dir = pathlib.Path(user_data_dir("subscleaner", "subscleaner"))
app_data_dir.mkdir(parents=True, exist_ok=True)
return app_data_dir / "subscleaner.db"
def init_db(db_path):
"""
Initialize the database if it doesn't exist.
Args:
db_path (pathlib.Path): The path to the database file.
"""
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS processed_files (
file_path TEXT PRIMARY KEY,
file_hash TEXT NOT NULL,
processed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
conn.commit()
conn.close()
def get_file_hash(file_path):
"""
Generate an MD5 hash of the file content.
Args:
file_path (pathlib.Path): The path to the file.
Returns:
str: The MD5 hash of the file content.
"""
hash_md5 = hashlib.md5()
try:
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()
except Exception as e:
print(f"Error generating hash for {file_path}: {e}")
return None
def is_file_processed(db_path, file_path, file_hash):
"""
Check if the file has been processed before.
Args:
db_path (pathlib.Path): The path to the database file.
file_path (str): The path to the file.
file_hash (str): The MD5 hash of the file content.
Returns:
bool: True if the file has been processed before, False otherwise.
"""
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute(
"SELECT file_hash FROM processed_files WHERE file_path = ?",
(str(file_path),),
)
result = cursor.fetchone()
conn.close()
if result is None:
return False
# If the hash has changed, the file has been modified
return result[0] == file_hash
def mark_file_processed(db_path, file_path, file_hash):
"""
Mark the file as processed in the database.
Args:
db_path (pathlib.Path): The path to the database file.
file_path (str): The path to the file.
file_hash (str): The MD5 hash of the file content.
"""
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute(
"INSERT OR REPLACE INTO processed_files (file_path, file_hash) VALUES (?, ?)",
(str(file_path), file_hash),
)
conn.commit()
conn.close()
def contains_ad(subtitle_line: str) -> bool: def contains_ad(subtitle_line: str) -> bool:
""" """
Check if the given subtitle line contains an ad. Check if the given subtitle line contains an ad.
@ -175,62 +291,118 @@ def remove_ad_lines(subtitle_data: pysrt.SubRipFile) -> bool:
return modified return modified
def process_subtitle_file(subtitle_file_path: str) -> bool: def is_already_processed(subtitle_file, db_path, file_hash, force=False):
"""
Check if the subtitle file has already been processed.
This function checks both the database and the timestamp to determine
if a file has already been processed.
Args:
subtitle_file (pathlib.Path): The path to the subtitle file.
db_path (pathlib.Path): The path to the database file.
file_hash (str): The MD5 hash of the file content.
force (bool): If True, ignore previous processing status.
Returns:
bool: True if the file has already been processed, False otherwise.
"""
if force:
return False
# Check if the file is in the database with the same hash
if is_file_processed(db_path, str(subtitle_file), file_hash):
print(f"Already processed {subtitle_file} (hash match)")
return True
# Check based on timestamp
if is_processed_before(subtitle_file):
print(f"Already processed {subtitle_file} (timestamp check)")
# Still mark it in the database
mark_file_processed(db_path, str(subtitle_file), file_hash)
return True
return False
def process_subtitle_file(subtitle_file_path: str, db_path, force=False) -> bool:
""" """
Process a subtitle file to remove ad lines. Process a subtitle file to remove ad lines.
Args: Args:
subtitle_file_path (str): The path to the subtitle file. subtitle_file_path (str): The path to the subtitle file.
db_path (pathlib.Path): The path to the database file.
force (bool): If True, process the file even if it has been processed before.
Returns: Returns:
bool: True if the subtitle file was modified, False otherwise. bool: True if the subtitle file was modified, False otherwise.
""" """
try: try:
subtitle_file = pathlib.Path(subtitle_file_path) subtitle_file = pathlib.Path(subtitle_file_path)
print(f"Analyzing: {subtitle_file}") print(f"Analyzing: {subtitle_file}")
# Early validation checks
if not subtitle_file.exists(): if not subtitle_file.exists():
print(f"File not found: {subtitle_file}") print(f"File not found: {subtitle_file}")
return False return False
if is_processed_before(subtitle_file): # Get file hash and check if already processed
print(f"Already processed {subtitle_file}") file_hash = get_file_hash(subtitle_file)
if file_hash is None or is_already_processed(subtitle_file, db_path, file_hash, force):
return False return False
# Process the subtitle file
modified = False
encoding = get_encoding(subtitle_file) encoding = get_encoding(subtitle_file)
# Try to open the subtitle file
subtitle_data = None
try: try:
subtitle_data = pysrt.open(subtitle_file, encoding=encoding) subtitle_data = pysrt.open(subtitle_file, encoding=encoding)
except UnicodeDecodeError: except UnicodeDecodeError:
print(f"Failed to open with detected encoding {encoding}, trying utf-8") print(f"Failed to open with detected encoding {encoding}, trying utf-8")
subtitle_data = pysrt.open(subtitle_file, encoding="utf-8") try:
subtitle_data = pysrt.open(subtitle_file, encoding="utf-8")
except Exception as e:
print(f"Error opening subtitle file with pysrt: {e}")
return False
except Exception as e: except Exception as e:
print(f"Error opening subtitle file with pysrt: {e}") print(f"Error opening subtitle file with pysrt: {e}")
return False return False
if remove_ad_lines(subtitle_data): # Remove ad lines and save if modified
if subtitle_data and remove_ad_lines(subtitle_data):
print(f"Saving {subtitle_file}") print(f"Saving {subtitle_file}")
subtitle_data.save(subtitle_file) subtitle_data.save(subtitle_file)
return True # Update the hash after modification
return False new_hash = get_file_hash(subtitle_file)
mark_file_processed(db_path, str(subtitle_file), new_hash)
modified = True
else:
# Mark as processed even if no changes were made
mark_file_processed(db_path, str(subtitle_file), file_hash)
return modified
except Exception as e: except Exception as e:
print(f"Error processing {subtitle_file_path}: {e}") print(f"Error processing {subtitle_file_path}: {e}")
return False return False
def process_subtitle_files(subtitle_files: list[str]) -> list[str]: def process_subtitle_files(subtitle_files: list[str], db_path, force=False) -> list[str]:
""" """
Process multiple subtitle files to remove ad lines. Process multiple subtitle files to remove ad lines.
Args: Args:
subtitle_files (list[str]): A list of subtitle file paths. subtitle_files (list[str]): A list of subtitle file paths.
db_path (pathlib.Path): The path to the database file.
force (bool): If True, process files even if they have been processed before.
Returns: Returns:
list[str]: A list of modified subtitle file paths. list[str]: A list of modified subtitle file paths.
""" """
modified_files = [] modified_files = []
for subtitle_file in subtitle_files: for subtitle_file in subtitle_files:
if process_subtitle_file(subtitle_file): if process_subtitle_file(subtitle_file, db_path, force):
modified_files.append(subtitle_file) modified_files.append(subtitle_file)
return modified_files return modified_files
@ -243,9 +415,17 @@ def main():
and print the result. Keep track of the modified files and print and print the result. Keep track of the modified files and print
a summary at the end. a summary at the end.
""" """
parser = argparse.ArgumentParser(description="Remove advertisements from subtitle files.")
parser.add_argument("--debug", action="store_true", help="Use current directory for database")
parser.add_argument("--force", action="store_true", help="Process files even if they have been processed before")
args = parser.parse_args()
db_path = get_db_path(args.debug)
init_db(db_path)
subtitle_files = [file_path.strip() for file_path in sys.stdin] subtitle_files = [file_path.strip() for file_path in sys.stdin]
print("Starting script") print("Starting script")
modified_files = process_subtitle_files(subtitle_files) modified_files = process_subtitle_files(subtitle_files, db_path, args.force)
if modified_files: if modified_files:
print(f"Modified {len(modified_files)} files") print(f"Modified {len(modified_files)} files")
print("Done") print("Done")