Refactored main function
This commit is contained in:
parent
1848e7a2d9
commit
9e8394f9ad
@ -18,7 +18,7 @@ lint:
|
||||
test:
|
||||
stage: test
|
||||
script:
|
||||
- poetry run pytest --cov=. --cov-report=xml
|
||||
- poetry run pytest -v --cov=. --cov-report=xml
|
||||
after_script:
|
||||
- bash <(curl -s https://codecov.io/bash)
|
||||
artifacts:
|
||||
|
@ -1,4 +1,3 @@
|
||||
#!/usr/bin/python3
|
||||
"""Main Subscleaner module."""
|
||||
|
||||
"""
|
||||
@ -84,7 +83,57 @@ def ads_in_line(line: str) -> bool:
|
||||
return any(re.match(ad, line, re.DOTALL) for ad in ADS)
|
||||
|
||||
|
||||
def process_file(filename):
|
||||
def is_already_processed(filename: str) -> bool:
|
||||
"""
|
||||
Check if the file has already been processed.
|
||||
|
||||
Args:
|
||||
filename (str): The path to the subtitle file.
|
||||
|
||||
Returns:
|
||||
bool: True if the file has already been processed, False otherwise.
|
||||
"""
|
||||
created = os.path.getctime(filename)
|
||||
already_processed = time.mktime(
|
||||
time.strptime("2021-05-13 00:00:00", "%Y-%m-%d %H:%M:%S"),
|
||||
)
|
||||
return created < already_processed
|
||||
|
||||
|
||||
def detect_encoding(filename: str) -> str:
|
||||
"""
|
||||
Detect the encoding of the subtitle file.
|
||||
|
||||
Args:
|
||||
filename (str): The path to the subtitle file.
|
||||
|
||||
Returns:
|
||||
str: The detected encoding of the file.
|
||||
"""
|
||||
with open(filename, "rb") as f:
|
||||
return chardet.detect(f.read())["encoding"]
|
||||
|
||||
|
||||
def remove_ads(subs: pysrt.SubRipFile) -> bool:
|
||||
"""
|
||||
Remove ads from the subtitle file.
|
||||
|
||||
Args:
|
||||
subs (pysrt.SubRipFile): The subtitle file object.
|
||||
|
||||
Returns:
|
||||
bool: True if the file was modified, False otherwise.
|
||||
"""
|
||||
modified = False
|
||||
for i, line in enumerate(subs):
|
||||
if ads_in_line(line.text):
|
||||
print(f"Removing: {line}\n")
|
||||
del subs[i]
|
||||
modified = True
|
||||
return modified
|
||||
|
||||
|
||||
def process_file(filename: str) -> bool:
|
||||
"""
|
||||
Process a subtitle file to remove ads.
|
||||
|
||||
@ -95,28 +144,16 @@ def process_file(filename):
|
||||
bool: True if the file was modified, False otherwise.
|
||||
"""
|
||||
try:
|
||||
created = os.path.getctime(filename)
|
||||
already_processed = time.mktime(
|
||||
time.strptime("2021-05-13 00:00:00", "%Y-%m-%d %H:%M:%S"),
|
||||
)
|
||||
if created < already_processed:
|
||||
if is_already_processed(filename):
|
||||
print(f"Already processed {filename}")
|
||||
return False
|
||||
|
||||
print(f"Analyzing: {filename}")
|
||||
|
||||
with open(filename, "rb") as f:
|
||||
encoding = chardet.detect(f.read())["encoding"]
|
||||
|
||||
encoding = detect_encoding(filename)
|
||||
subs = pysrt.open(filename, encoding=encoding)
|
||||
modified = False
|
||||
for i, line in enumerate(subs):
|
||||
if ads_in_line(line.text):
|
||||
print(f"Removing: {line}\n")
|
||||
del subs[i]
|
||||
modified = True
|
||||
|
||||
if modified:
|
||||
if remove_ads(subs):
|
||||
print(f"Saving {filename}")
|
||||
subs.save(filename)
|
||||
return True
|
||||
@ -126,6 +163,23 @@ def process_file(filename):
|
||||
return False
|
||||
|
||||
|
||||
def process_files(filenames: list[str]) -> list[str]:
|
||||
"""
|
||||
Process multiple subtitle files to remove ads.
|
||||
|
||||
Args:
|
||||
filenames (list[str]): A list of subtitle file paths.
|
||||
|
||||
Returns:
|
||||
list[str]: A list of modified file paths.
|
||||
"""
|
||||
modified_files = []
|
||||
for filename in filenames:
|
||||
if process_file(filename):
|
||||
modified_files.append(filename)
|
||||
return modified_files
|
||||
|
||||
|
||||
def main():
|
||||
"""
|
||||
Process subtitle files to remove ads.
|
||||
@ -134,13 +188,9 @@ def main():
|
||||
and print the result. Keep track of the modified files and print
|
||||
a summary at the end.
|
||||
"""
|
||||
modified_files = []
|
||||
filenames = [filename.strip() for filename in sys.stdin]
|
||||
print("Starting script")
|
||||
for filename in sys.stdin:
|
||||
filename = filename.strip()
|
||||
if process_file(filename):
|
||||
modified_files.append(filename)
|
||||
|
||||
modified_files = process_files(filenames)
|
||||
if modified_files:
|
||||
print(f"Modified {len(modified_files)} files")
|
||||
print("Done")
|
||||
|
@ -3,9 +3,18 @@
|
||||
from io import StringIO
|
||||
from unittest.mock import patch
|
||||
|
||||
import pysrt
|
||||
import pytest
|
||||
|
||||
from src.subscleaner.subscleaner import ads_in_line, main, process_file
|
||||
from src.subscleaner.subscleaner import (
|
||||
ads_in_line,
|
||||
detect_encoding,
|
||||
is_already_processed,
|
||||
main,
|
||||
process_file,
|
||||
process_files,
|
||||
remove_ads,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
@ -51,6 +60,50 @@ def test_ads_in_line(line, expected):
|
||||
assert ads_in_line(line) is expected
|
||||
|
||||
|
||||
def test_is_already_processed(tmpdir):
|
||||
"""
|
||||
Test the is_already_processed function.
|
||||
|
||||
Args:
|
||||
tmpdir (pytest.fixture): A temporary directory for creating the sample SRT file.
|
||||
"""
|
||||
file_path = create_sample_srt_file(tmpdir, "")
|
||||
with patch("src.subscleaner.subscleaner.os.path.getctime", return_value=0):
|
||||
assert is_already_processed(file_path) is True
|
||||
|
||||
with patch("src.subscleaner.subscleaner.os.path.getctime", return_value=9999999999):
|
||||
assert is_already_processed(file_path) is False
|
||||
|
||||
|
||||
def test_detect_encoding(tmpdir, sample_srt_content):
|
||||
"""
|
||||
Test the detect_encoding function.
|
||||
|
||||
Args:
|
||||
tmpdir (pytest.fixture): A temporary directory for creating the sample SRT file.
|
||||
sample_srt_content (str): The sample SRT content.
|
||||
"""
|
||||
file_path = create_sample_srt_file(tmpdir, sample_srt_content)
|
||||
assert detect_encoding(file_path) == "ascii"
|
||||
|
||||
|
||||
def test_remove_ads(sample_srt_content):
|
||||
"""
|
||||
Test the remove_ads function.
|
||||
|
||||
Args:
|
||||
sample_srt_content (str): The sample SRT content.
|
||||
"""
|
||||
subs = pysrt.from_string(sample_srt_content)
|
||||
subs_expected_ammount = 2
|
||||
assert remove_ads(subs) is True
|
||||
assert len(subs) == subs_expected_ammount
|
||||
|
||||
subs = pysrt.from_string("1\n00:00:01,000 --> 00:00:03,000\nThis is a sample subtitle.")
|
||||
assert remove_ads(subs) is False
|
||||
assert len(subs) == 1
|
||||
|
||||
|
||||
def test_process_file_no_modification(tmpdir, sample_srt_content):
|
||||
"""
|
||||
Test the process_file function when the file does not require modification.
|
||||
@ -60,7 +113,7 @@ def test_process_file_no_modification(tmpdir, sample_srt_content):
|
||||
sample_srt_content (str): The sample SRT content.
|
||||
"""
|
||||
file_path = create_sample_srt_file(tmpdir, sample_srt_content)
|
||||
with patch("src.subscleaner.subscleaner.os.path.getctime", return_value=0):
|
||||
with patch("src.subscleaner.subscleaner.is_already_processed", return_value=True):
|
||||
assert process_file(file_path) is False
|
||||
|
||||
|
||||
@ -73,7 +126,7 @@ def test_process_file_with_modification(tmpdir, sample_srt_content):
|
||||
sample_srt_content (str): The sample SRT content.
|
||||
"""
|
||||
file_path = create_sample_srt_file(tmpdir, sample_srt_content)
|
||||
with patch("src.subscleaner.subscleaner.os.path.getctime", return_value=9999999999):
|
||||
with patch("src.subscleaner.subscleaner.is_already_processed", return_value=False):
|
||||
assert process_file(file_path) is True
|
||||
|
||||
|
||||
@ -88,6 +141,21 @@ def test_process_file_error(tmpdir):
|
||||
assert process_file(str(file_path)) is False
|
||||
|
||||
|
||||
def test_process_files(tmpdir, sample_srt_content):
|
||||
"""
|
||||
Test the process_files function.
|
||||
|
||||
Args:
|
||||
tmpdir (pytest.fixture): A temporary directory for creating the sample SRT files.
|
||||
sample_srt_content (str): The sample SRT content.
|
||||
"""
|
||||
file_path1 = create_sample_srt_file(tmpdir, sample_srt_content)
|
||||
file_path2 = create_sample_srt_file(tmpdir, "1\n00:00:01,000 --> 00:00:03,000\nThis is a sample subtitle.")
|
||||
with patch("src.subscleaner.subscleaner.process_file", side_effect=[True, False]):
|
||||
modified_files = process_files([file_path1, file_path2])
|
||||
assert modified_files == [file_path1]
|
||||
|
||||
|
||||
def test_main_no_modification(tmpdir, sample_srt_content):
|
||||
"""
|
||||
Test the main function when no files require modification.
|
||||
@ -99,10 +167,10 @@ def test_main_no_modification(tmpdir, sample_srt_content):
|
||||
file_path = create_sample_srt_file(tmpdir, sample_srt_content)
|
||||
with (
|
||||
patch("sys.stdin", StringIO(file_path)),
|
||||
patch("src.subscleaner.subscleaner.process_file", return_value=False) as mock_process_file,
|
||||
patch("src.subscleaner.subscleaner.process_files", return_value=[]) as mock_process_files,
|
||||
):
|
||||
main()
|
||||
mock_process_file.assert_called_once_with(file_path)
|
||||
mock_process_files.assert_called_once_with([file_path])
|
||||
|
||||
|
||||
def test_main_with_modification(tmpdir, sample_srt_content):
|
||||
@ -116,7 +184,7 @@ def test_main_with_modification(tmpdir, sample_srt_content):
|
||||
file_path = create_sample_srt_file(tmpdir, sample_srt_content)
|
||||
with (
|
||||
patch("sys.stdin", StringIO(file_path)),
|
||||
patch("src.subscleaner.subscleaner.process_file", return_value=True) as mock_process_file,
|
||||
patch("src.subscleaner.subscleaner.process_files", return_value=[file_path]) as mock_process_files,
|
||||
):
|
||||
main()
|
||||
mock_process_file.assert_called_once_with(file_path)
|
||||
mock_process_files.assert_called_once_with([file_path])
|
||||
|
Loading…
x
Reference in New Issue
Block a user