cleanmedia/tests/test_cleanmedia.py

419 lines
14 KiB
Python

"""Tests for cleanmedia module."""
from datetime import datetime, timedelta
from pathlib import Path
from typing import Any, Tuple
from unittest.mock import MagicMock
import pytest
from pytest_mock import MockerFixture
from cleanmedia import File, MediaRepository, parse_options, process_single_media, process_user_media, read_config
@pytest.fixture
def mock_db_conn(mocker: MockerFixture) -> Tuple[Any, Any]:
"""Create mock database connection and cursor.
Args:
mocker: pytest-mock fixture
Returns:
Tuple of mock connection and cursor
"""
conn_mock = mocker.Mock()
cursor_mock = mocker.Mock()
ctx_manager = mocker.Mock()
ctx_manager.__enter__ = mocker.Mock(return_value=cursor_mock)
ctx_manager.__exit__ = mocker.Mock(return_value=None)
conn_mock.cursor.return_value = ctx_manager
return conn_mock, cursor_mock
@pytest.fixture
def media_repo(tmp_path: Any, mock_db_conn: Tuple[Any, Any], mocker: MockerFixture) -> MediaRepository:
"""Create MediaRepository instance with mocked database connection.
Args:
tmp_path: pytest temporary directory fixture
mock_db_conn: Mock database connection fixture
mocker: pytest-mock fixture
Returns:
Configured MediaRepository instance
"""
conn_mock, _ = mock_db_conn
media_path = tmp_path / "media"
media_path.mkdir()
mocker.patch("cleanmedia.MediaRepository.connect_db", return_value=conn_mock)
return MediaRepository(media_path, "postgresql://fake")
def test_file_init(mocker: MockerFixture) -> None:
"""Test File class initialization."""
repo = mocker.Mock()
file = File(repo, "mxid123", 1600000000, "base64hash123")
assert file.media_id == "mxid123"
assert file.create_date == datetime.fromtimestamp(1600000000)
assert file.base64hash == "base64hash123"
assert file.repo == repo
def test_file_fullpath(media_repo: MediaRepository) -> None:
"""Test File.fullpath property returns correct path."""
file = File(media_repo, "mxid123", 1600000000, "abc123")
expected_path = media_repo.media_path / "a" / "b" / "c123"
assert file.fullpath == expected_path
def test_file_exists_no_path(media_repo: MediaRepository) -> None:
"""Test File.exists returns False when fullpath is None."""
file = File(media_repo, "mxid123", 1600000000, "") # Empty hash ensures fullpath is None
assert file.exists() is False
def test_file_delete_no_path(media_repo: MediaRepository) -> None:
"""Test File._delete_files when file path is None."""
file = File(media_repo, "mxid123", 1600000000, "")
assert file._delete_files() is False
def test_file_delete_oserror(media_repo: MediaRepository, mocker: MockerFixture, caplog: Any) -> None:
"""Test File._delete_files when OSError occurs."""
file = File(media_repo, "mxid123", 1600000000, "abc123")
# Create directory structure
file_path = media_repo.media_path / "a" / "b" / "c123"
file_path.mkdir(parents=True)
(file_path / "file").touch()
# Mock Path.glob to raise OSError
mocker.patch.object(Path, "glob", side_effect=OSError("Permission denied"))
assert file._delete_files() is False
assert "Failed to delete files for mxid123: Permission denied" in caplog.text
def test_file_fullpath_none_if_no_hash(media_repo: MediaRepository) -> None:
"""Test File.fullpath returns None when hash is empty."""
file = File(media_repo, "mxid123", 1600000000, "")
assert file.fullpath is None
def test_file_exists(media_repo: MediaRepository) -> None:
"""Test File.exists returns True when file exists."""
file = File(media_repo, "mxid123", 1600000000, "abc123")
file_path = media_repo.media_path / "a" / "b" / "c123"
file_path.mkdir(parents=True)
(file_path / "file").touch()
assert file.exists() is True
def test_file_not_exists(media_repo: MediaRepository) -> None:
"""Test File.exists returns False when file doesn't exist."""
file = File(media_repo, "mxid123", 1600000000, "abc123")
assert file.exists() is False
def test_file_delete(media_repo: MediaRepository, mock_db_conn: Tuple[Any, Any]) -> None:
"""Test File.delete removes files and database entries."""
_, cursor_mock = mock_db_conn
file = File(media_repo, "mxid123", 1600000000, "abc123")
file_path = media_repo.media_path / "a" / "b" / "c123"
file_path.mkdir(parents=True)
(file_path / "file").touch()
(file_path / "thumb").touch()
assert file.delete() is True
assert not file_path.exists()
cursor_mock.execute.assert_any_call("DELETE from mediaapi_thumbnail WHERE media_id=%s;", ("mxid123",))
cursor_mock.execute.assert_any_call("DELETE from mediaapi_media_repository WHERE media_id=%s;", ("mxid123",))
def test_get_single_media(media_repo: MediaRepository, mock_db_conn: Tuple[Any, Any]) -> None:
"""Test MediaRepository.get_single_media returns correct File object."""
_, cursor_mock = mock_db_conn
cursor_mock.fetchone.return_value = ("mxid123", 1600000000000, "abc123")
file = media_repo.get_single_media("mxid123")
assert file is not None
assert file.media_id == "mxid123"
assert file.base64hash == "abc123"
cursor_mock.execute.assert_called_with(
"SELECT media_id, creation_ts, base64hash from mediaapi_media_repository WHERE media_id = %s;",
("mxid123",),
)
def test_get_single_media_not_found(media_repo: MediaRepository, mock_db_conn: Tuple[Any, Any]) -> None:
"""Test MediaRepository.get_single_media returns None when media not found."""
_, cursor_mock = mock_db_conn
cursor_mock.fetchone.return_value = None
file = media_repo.get_single_media("mxid123")
assert file is None
def test_clean_media_files(media_repo: MediaRepository, mock_db_conn: Tuple[Any, Any]) -> None:
"""Test MediaRepository.clean_media_files deletes old files."""
_, cursor_mock = mock_db_conn
old_date = int((datetime.now() - timedelta(days=31)).timestamp())
new_date = int((datetime.now() - timedelta(days=1)).timestamp())
cursor_mock.fetchall.return_value = [
("old_file", old_date * 1000, "abc123"),
("new_file", new_date * 1000, "def456"),
]
media_repo._avatar_media_ids = []
num_deleted = media_repo.clean_media_files(30, False, False)
assert num_deleted == 1
def test_clean_media_files_dryrun(media_repo: MediaRepository, mock_db_conn: Tuple[Any, Any]) -> None:
"""Test MediaRepository.clean_media_files in dry run mode."""
_, cursor_mock = mock_db_conn
old_date = int((datetime.now() - timedelta(days=31)).timestamp())
cursor_mock.fetchall.return_value = [
("old_file", old_date * 1000, "abc123"),
]
media_repo._avatar_media_ids = []
num_deleted = media_repo.clean_media_files(30, False, True)
assert num_deleted == 1
def test_clean_media_files_local(media_repo: MediaRepository, mocker: MockerFixture) -> None:
"""Test clean_media_files fetches avatar images when local=True."""
mock_get_avatars = mocker.patch.object(media_repo, "get_avatar_images")
media_repo.get_all_media = mocker.Mock(return_value=[]) # type: ignore
media_repo.clean_media_files(30, local=True)
mock_get_avatars.assert_called_once()
def test_sanity_check_thumbnails(media_repo: MediaRepository, mock_db_conn: Tuple[Any, Any], caplog: Any) -> None:
"""Test MediaRepository.sanity_check_thumbnails logs correct message."""
_, cursor_mock = mock_db_conn
cursor_mock.fetchone.return_value = (5,)
media_repo.sanity_check_thumbnails()
assert "You have 5 thumbnails in your db that do not refer to media" in caplog.text
def test_get_avatar_images(media_repo: MediaRepository, mock_db_conn: Tuple[Any, Any]) -> None:
"""Test MediaRepository.get_avatar_images returns correct list."""
_, cursor_mock = mock_db_conn
cursor_mock.fetchall.return_value = [
("mxc://matrix.org/abc123",),
("mxc://matrix.org/def456",),
]
avatar_ids = media_repo.get_avatar_images()
assert avatar_ids == ["abc123", "def456"]
def test_get_avatar_images_invalid_url(media_repo: MediaRepository, mock_db_conn: Tuple[Any, Any], caplog: Any) -> None:
"""Test get_avatar_images with invalid URL."""
_, cursor_mock = mock_db_conn
cursor_mock.fetchall.return_value = [("invalid_url",)]
avatar_ids = media_repo.get_avatar_images()
assert avatar_ids == []
assert "Invalid avatar URL: invalid_url" in caplog.text
def test_validate_media_path_absolute(tmp_path: Path) -> None:
"""Test _validate_media_path with absolute path."""
MediaRepository._validate_media_path(tmp_path)
def test_validate_media_path_not_exists(tmp_path: Path) -> None:
"""Test _validate_media_path with non-existent directory."""
invalid_path = tmp_path / "nonexistent"
with pytest.raises(ValueError, match="Media directory not found"):
MediaRepository._validate_media_path(invalid_path)
def test_validate_media_path_relative(caplog: Any) -> None:
"""Test _validate_media_path with relative path."""
relative_path = Path(".")
MediaRepository._validate_media_path(relative_path)
assert "Media path is relative" in caplog.text
def test_connect_db_success(mocker: MockerFixture) -> None:
"""Test successful database connection."""
mock_connect = mocker.patch("psycopg2.connect")
repo = MediaRepository(Path("/tmp"), "postgresql://fake")
repo.connect_db()
mock_connect.assert_called_with("postgresql://fake")
def test_connect_db_invalid_string(tmp_path: Path) -> None:
"""Test connect_db with invalid connection string."""
with pytest.raises(ValueError, match="Invalid PostgreSQL connection string"):
repo = MediaRepository(tmp_path, "invalid")
repo.connect_db()
def test_get_local_user_media(media_repo: MediaRepository, mock_db_conn: Tuple[Any, Any]) -> None:
"""Test get_local_user_media returns correct files."""
_, cursor_mock = mock_db_conn
cursor_mock.fetchall.return_value = [
("media1", 1600000000000, "hash1"),
("media2", 1600000000000, "hash2"),
]
files = media_repo.get_local_user_media("@user:domain.com")
assert len(files) == 2 # noqa PLR2004
assert files[0].media_id == "media1"
assert files[1].media_id == "media2"
cursor_mock.execute.assert_called_with(
"SELECT media_id, creation_ts, base64hash FROM mediaapi_media_repository WHERE user_id = %s;",
("@user:domain.com",),
)
def test_process_single_media(media_repo: MediaRepository) -> None:
"""Test process_single_media deletes file."""
args = MagicMock()
args.mxid = "test_media"
args.dryrun = False
file_mock = MagicMock()
media_repo.get_single_media = MagicMock(return_value=file_mock) # type: ignore
process_single_media(media_repo, args)
media_repo.get_single_media.assert_called_once_with("test_media")
file_mock.delete.assert_called_once()
def test_process_user_media(media_repo: MediaRepository) -> None:
"""Test process_user_media deletes all user files."""
args = MagicMock()
args.userid = "@test:domain.com"
args.dryrun = False
file1, file2 = MagicMock(), MagicMock()
media_repo.get_local_user_media = MagicMock(return_value=[file1, file2]) # type: ignore
process_user_media(media_repo, args)
media_repo.get_local_user_media.assert_called_once_with("@test:domain.com")
file1.delete.assert_called_once()
file2.delete.assert_called_once()
def test_read_config_missing_file() -> None:
"""Test read_config with missing config file."""
with pytest.raises(SystemExit):
read_config("nonexistent.yaml")
def test_read_config_invalid_content(tmp_path: Path) -> None:
"""Test read_config with invalid config content."""
config_file = tmp_path / "config.yaml"
config_file.write_text("invalid: true")
with pytest.raises(SystemExit):
read_config(config_file)
def test_read_config_valid(tmp_path: Path) -> None:
"""Test read_config with valid config."""
config_file = tmp_path / "config.yaml"
config_file.write_text("""
media_api:
base_path: /media/path
database:
connection_string: postgresql://user:pass@localhost/db
""")
path, conn_string = read_config(config_file)
assert path == Path("/media/path")
assert conn_string == "postgresql://user:pass@localhost/db"
def test_read_config_global_database(tmp_path: Path) -> None:
"""Test read_config gets connection string from global database section."""
config_file = tmp_path / "config.yaml"
config_file.write_text("""
media_api:
base_path: /media/path
global:
database:
connection_string: postgresql://global/db
""")
path, conn_string = read_config(config_file)
assert conn_string == "postgresql://global/db"
def test_read_config_missing_conn_string(tmp_path: Path) -> None:
"""Test read_config exits when connection string is missing."""
config_file = tmp_path / "config.yaml"
config_file.write_text("""
media_api:
base_path: /media/path
database: {}
""")
with pytest.raises(SystemExit):
read_config(config_file)
def test_read_config_missing_base_path(tmp_path: Path) -> None:
"""Test read_config exits when base_path is missing."""
config_file = tmp_path / "config.yaml"
config_file.write_text("""
media_api:
database:
connection_string: postgresql://fake/db
""")
with pytest.raises(SystemExit):
read_config(config_file)
def test_parse_options_defaults(mocker: MockerFixture) -> None:
"""Test parse_options default values."""
mocker.patch("sys.argv", ["cleanmedia"])
args = parse_options()
assert args.config == "config.yaml"
assert args.days == 30 # noqa PLR2004
assert not args.local
assert not args.dryrun
def test_file_has_thumbnails(media_repo: MediaRepository, mock_db_conn: Tuple[Any, Any]) -> None:
"""Test File.has_thumbnail returns correct count."""
_, cursor_mock = mock_db_conn
cursor_mock.fetchone.return_value = (3,)
file = File(media_repo, "mxid123", 1600000000, "abc123")
assert file.has_thumbnail() == 3 # noqa PLR2004
cursor_mock.execute.assert_called_with(
"SELECT COUNT(media_id) FROM mediaapi_thumbnail WHERE media_id = %s;",
("mxid123",),
)
def test_file_has_no_thumbnails(media_repo: MediaRepository, mock_db_conn: Tuple[Any, Any]) -> None:
"""Test File.has_thumbnail returns 0 when no thumbnails exist."""
_, cursor_mock = mock_db_conn
cursor_mock.fetchone.return_value = None
file = File(media_repo, "mxid123", 1600000000, "abc123")
assert file.has_thumbnail() == 0