From b433e063384b7a3804e507d8fb2fb3313c98f69a Mon Sep 17 00:00:00 2001 From: Jakub Buzuk <61548378+Baz00k@users.noreply.github.com> Date: Thu, 11 Sep 2025 02:08:56 +0200 Subject: [PATCH] feat: add remove_completed job and new download client job type --- config/config_example.yaml | 5 + main.py | 31 ++- src/job_manager.py | 67 ++++++- src/jobs/download_client_removal_job.py | 152 +++++++++++++++ src/jobs/remove_completed.py | 87 +++++++++ src/settings/_jobs.py | 4 +- tests/jobs/test_remove_completed.py | 249 ++++++++++++++++++++++++ 7 files changed, 580 insertions(+), 15 deletions(-) create mode 100644 src/jobs/download_client_removal_job.py create mode 100644 src/jobs/remove_completed.py create mode 100644 tests/jobs/test_remove_completed.py diff --git a/config/config_example.yaml b/config/config_example.yaml index 59bcb15..e1893f3 100644 --- a/config/config_example.yaml +++ b/config/config_example.yaml @@ -42,6 +42,11 @@ jobs: search_missing: # min_days_between_searches: 7 # max_concurrent_searches: 3 + remove_completed: + target_tags: + - "Obsolete" + # target_categories: + # - "autobrr" instances: sonarr: diff --git a/main.py b/main.py index 5f64dd5..0ce0f3f 100644 --- a/main.py +++ b/main.py @@ -1,22 +1,26 @@ import asyncio -import signal -import types import datetime +import signal import sys +import types +from src.deletion_handler.deletion_handler import WatcherManager from src.job_manager import JobManager from src.settings.settings import Settings from src.utils.log_setup import logger from src.utils.startup import launch_steps -from src.deletion_handler.deletion_handler import WatcherManager settings = Settings() job_manager = JobManager(settings) watch_manager = WatcherManager(settings) -def terminate(sigterm: signal.SIGTERM, frame: types.FrameType) -> None: # noqa: ARG001, pylint: disable=unused-argument - """Terminate cleanly. Needed for respecting 'docker stop'. +def terminate( + sigterm: signal.SIGTERM, + frame: types.FrameType, +) -> None: + """ + Terminate cleanly. Needed for respecting 'docker stop'. Args: ---- @@ -24,14 +28,18 @@ def terminate(sigterm: signal.SIGTERM, frame: types.FrameType) -> None: # noqa: frame: The execution frame. """ - - logger.info(f"Termination signal received at {datetime.datetime.now()}.") # noqa: DTZ005 + logger.info( + f"Termination signal received at {datetime.datetime.now()}.", + ) watch_manager.stop() sys.exit(0) + async def wait_next_run(): - # Calculate next run time dynamically (to display) - next_run = datetime.datetime.now() + datetime.timedelta(minutes=settings.general.timer) + # Calculate next run time dynamically (to display) + next_run = datetime.datetime.now() + datetime.timedelta( + minutes=settings.general.timer, + ) formatted_next_run = next_run.strftime("%Y-%m-%d %H:%M") logger.verbose(f"*** Done - Next run at {formatted_next_run} ****") @@ -39,6 +47,7 @@ async def wait_next_run(): # Wait for the next run await asyncio.sleep(settings.general.timer * 60) + # Main function async def main(): await launch_steps(settings) @@ -58,9 +67,11 @@ async def main(): await job_manager.run_jobs(arr) logger.verbose("") + # Run download client jobs (these run independently of *arr instances) + await job_manager.run_download_client_jobs() + # Wait for the next run await wait_next_run() - return if __name__ == "__main__": diff --git a/src/job_manager.py b/src/job_manager.py index 8de4fff..7a1906f 100644 --- a/src/job_manager.py +++ b/src/job_manager.py @@ -1,5 +1,6 @@ # Cleans the download queue from src.jobs.remove_bad_files import RemoveBadFiles +from src.jobs.remove_completed import RemoveCompleted from src.jobs.remove_failed_downloads import RemoveFailedDownloads from src.jobs.remove_failed_imports import RemoveFailedImports from src.jobs.remove_metadata_missing import RemoveMetadataMissing @@ -25,6 +26,41 @@ class JobManager: await self.removal_jobs() await self.search_jobs() + async def run_download_client_jobs(self): + """Run jobs that operate on download clients directly.""" + if not await self._download_clients_connected(): + return None + + items_detected = 0 + for download_client_type in ["qbittorrent", "sabnzbd"]: + download_clients = getattr( + self.settings.download_clients, + download_client_type, + [], + ) + + for client in download_clients: + logger.info( + f"*** Running jobs on {client.name} ({client.base_url}) ***", + ) + + # Get jobs for this client + download_client_jobs = self._get_download_client_jobs_for_client( + client, + download_client_type, + ) + + if not any(job.job.enabled for job in download_client_jobs): + logger.verbose( + "Download Client Jobs: None triggered (No jobs active)", + ) + continue + + for download_client_job in download_client_jobs: + items_detected += await download_client_job.run() + + return items_detected + async def removal_jobs(self): # Check removal jobs removal_jobs = self._get_removal_jobs() @@ -72,7 +108,7 @@ class JobManager: async def _queue_has_items(self): logger.debug( - f"job_manager.py/_queue_has_items (Before any removal jobs): Checking if any items in full queue" + "job_manager.py/_queue_has_items (Before any removal jobs): Checking if any items in full queue", ) queue_manager = QueueManager(self.arr, self.settings) full_queue = await queue_manager.get_queue_items("full") @@ -99,10 +135,12 @@ class JobManager: async def _check_client_connection_status(self, clients): for client in clients: logger.debug( - f"job_manager.py/_check_client_connection_status: Checking if {client.name} is connected" + f"job_manager.py/_check_client_connection_status: Checking if {client.name} is connected", ) if not await client.check_connected(): - logger.warning(f">>> {client.name} is disconnected. Skipping queue cleaning on {self.arr.name}.") + logger.warning( + f">>> {client.name} is disconnected. Skipping queue cleaning on {self.arr.name}.", + ) return False return True @@ -131,3 +169,26 @@ class JobManager: removal_job_class(self.arr, self.settings, removal_job_name), ) return jobs + + def _get_download_client_jobs_for_client(self, client, client_type): + """ + Return a list of download client job instances for a specific download client. + + Each job is included if the corresponding attribute exists and is truthy in settings.jobs. + """ + download_client_job_classes = { + "remove_completed": RemoveCompleted, + } + + jobs = [] + for job_name, job_class in download_client_job_classes.items(): + if getattr(self.settings.jobs, job_name, False): + jobs.append( + job_class( + client, + client_type, + self.settings, + job_name, + ), + ) + return jobs diff --git a/src/jobs/download_client_removal_job.py b/src/jobs/download_client_removal_job.py new file mode 100644 index 0000000..6b95958 --- /dev/null +++ b/src/jobs/download_client_removal_job.py @@ -0,0 +1,152 @@ +from abc import ABC, abstractmethod + +from src.utils.common import make_request +from src.utils.log_setup import logger + + +class DownloadClientRemovalJob(ABC): + """Base class for removal jobs that run on download clients directly.""" + + job_name = None + + def __init__( + self, + download_client: object, + download_client_type: str, + settings: object, + job_name: str, + ) -> None: + self.download_client = download_client + self.download_client_type = download_client_type + self.settings = settings + self.job_name = job_name + self.job = getattr(self.settings.jobs, self.job_name) + + async def run(self) -> int: + """Run the download client job.""" + if not self.job.enabled: + return 0 + + logger.debug( + f"download_client_job.py/run: Launching job '{self.job_name}' on {self.download_client.name} " + f"({self.download_client_type})", + ) + + all_items = await self._get_all_items() + if not all_items: + return 0 + + items_to_remove = await self._get_items_to_remove(all_items) + + # Filter out protected items + items_to_remove = self._filter_protected_items(items_to_remove) + + if not items_to_remove: + logger.debug(f"No items to remove for job '{self.job_name}'.") + return 0 + + # Remove the affected items + await self._remove_items(items_to_remove) + + return len(items_to_remove) + + async def _get_all_items(self) -> list: + """Get all items from the download client.""" + try: + if self.download_client_type == "qbittorrent": + return await self.download_client.get_qbit_items() + if self.download_client_type == "sabnzbd": + return await self.download_client.get_history_items() + except Exception as e: + logger.error( + f"Error fetching items from {self.download_client.name}: {e}", + ) + return [] + + def _filter_protected_items(self, items: list) -> list: + """Filter out items that are protected by tags or categories.""" + protected_tag = getattr(self.settings.general, "protected_tag", None) + if not protected_tag: + return items + + filtered_items = [] + for item in items: + is_protected = False + item_name = item.get("name", "unknown") + if self.download_client_type == "qbittorrent": + tags = item.get("tags", "").split(",") + tags = [tag.strip() for tag in tags if tag.strip()] + category = item.get("category", "") + if protected_tag in tags or protected_tag == category: + is_protected = True + elif self.download_client_type == "sabnzbd": + category = item.get("category", "") + if protected_tag == category: + is_protected = True + + if is_protected: + logger.debug(f"Ignoring protected item: {item_name}") + else: + filtered_items.append(item) + + return filtered_items + + @abstractmethod + async def _get_items_to_remove(self, items: list) -> list: + """Return a list of items to remove from the download client.""" + + async def _remove_items(self, items: list) -> None: + """Remove the affected items from the download client.""" + if self.settings.general.test_run: + logger.info("Test run is enabled. Skipping actual removal.") + for item in items: + item_name = item.get("name", "unknown") + logger.info(f"Would have removed download: {item_name}") + return + + for item in items: + item_name = item.get("name", "unknown") + try: + if self.download_client_type == "qbittorrent": + await self._remove_qbittorrent_item(item) + elif self.download_client_type == "sabnzbd": + await self._remove_sabnzbd_item(item) + + logger.info( + f"Removed download: {item_name}", + ) + + except Exception as e: + logger.error(f"Failed to remove {item_name}: {e}") + + async def _remove_qbittorrent_item(self, item: dict) -> None: + """Remove a torrent from qBittorrent.""" + download_id = item["hash"].lower() + data = { + "hashes": download_id, + "deleteFiles": "true", + } + await make_request( + "post", + f"{self.download_client.api_url}/torrents/delete", + self.settings, + data=data, + cookies=self.download_client.cookie, + ) + + async def _remove_sabnzbd_item(self, item: dict) -> None: + """Remove a download from SABnzbd history.""" + download_id = item["nzo_id"] + params = { + "mode": "history", + "name": "delete", + "value": download_id, + "apikey": self.download_client.api_key, + "output": "json", + } + await make_request( + "get", + self.download_client.api_url, + self.settings, + params=params, + ) diff --git a/src/jobs/remove_completed.py b/src/jobs/remove_completed.py new file mode 100644 index 0000000..b4a7c3e --- /dev/null +++ b/src/jobs/remove_completed.py @@ -0,0 +1,87 @@ +"""Removes completed torrents that have specific tags/categories.""" + +from src.jobs.download_client_removal_job import DownloadClientRemovalJob +from src.utils.log_setup import logger + +COMPLETED_STATES = [ + "stoppedUP", + "pausedUP", # Older qBittorrent versions +] + + +class RemoveCompleted(DownloadClientRemovalJob): + """Job to remove completed torrents that match specific tags or categories.""" + + async def run(self) -> int: + if self.download_client_type == "sabnzbd": + logger.debug( + f"Skipping job '{self.job_name}' for Usenet client {self.download_client.name}.", + ) + return 0 + return await super().run() + + async def _get_items_to_remove(self, items: list) -> list: + """ + Filters a list of items from a download client and returns those + that should be removed based on completion status and other criteria. + """ + target_tags, target_categories = self._get_targets() + + if not target_tags and not target_categories: + logger.debug( + "No target tags or categories specified for remove_completed job.", + ) + return [] + + items_to_remove = [ + item + for item in items + if self._is_completed(item) + and self._meets_target_criteria(item, target_tags, target_categories) + ] + + for item in items_to_remove: + logger.debug( + f"Found completed item to remove: {item.get('name', 'unknown')}", + ) + + return items_to_remove + + def _is_completed(self, item: dict) -> bool: + """Check if an item has met its seeding goals.""" + state = item.get("state", "") + if state not in COMPLETED_STATES: + return False + + # Additional sanity checks for ratio and seeding time + ratio = item.get("ratio", 0) + ratio_limit = item.get("ratio_limit", -1) + seeding_time = item.get("seeding_time", 0) + seeding_time_limit = item.get("seeding_time_limit", -1) + + ratio_limit_met = ratio >= ratio_limit > 0 + seeding_time_limit_met = seeding_time >= seeding_time_limit > 0 + + return ratio_limit_met or seeding_time_limit_met + + def _meets_target_criteria( + self, + item: dict, + target_tags: list, + target_categories: list, + ) -> bool: + """Check if an item has the required tags or categories for removal.""" + item_category = item.get("category", "") + if item_category in target_categories: + return True + + tags = item.get("tags", "").split(",") + item_tags = {tag.strip() for tag in tags if tag.strip()} + + return bool(item_tags.intersection(target_tags)) + + def _get_targets(self) -> tuple[list, list]: + """Get the list of tags and categories to look for from job settings.""" + tags = getattr(self.job, "target_tags", []) + categories = getattr(self.job, "target_categories", []) + return tags, categories diff --git a/src/settings/_jobs.py b/src/settings/_jobs.py index 1e17bb8..c8da1b1 100644 --- a/src/settings/_jobs.py +++ b/src/settings/_jobs.py @@ -64,7 +64,7 @@ class JobDefaults: self.max_concurrent_searches = max_concurrent_searches else: logger.warning( - f"Job default 'max_concurrent_searches' must be an integer greater 0. Found: {str(max_concurrent_searches)}. Using default: {self.max_concurrent_searches}" + f"Job default 'max_concurrent_searches' must be an integer greater 0. Found: {max_concurrent_searches!s}. Using default: {self.max_concurrent_searches}", ) self.min_days_between_searches = job_defaults_config.get( "min_days_between_searches", @@ -84,6 +84,7 @@ class Jobs: def _set_job_defaults(self): self.remove_bad_files = JobParams(keep_archives=self.job_defaults.keep_archives) + self.remove_completed = JobParams() self.remove_failed_downloads = JobParams() self.remove_failed_imports = JobParams( message_patterns=self.job_defaults.message_patterns, @@ -109,7 +110,6 @@ class Jobs: ) self.detect_deletions = JobParams() - def _set_job_configs(self, config): # Populate jobs from YAML config for job_name in self.__dict__: diff --git a/tests/jobs/test_remove_completed.py b/tests/jobs/test_remove_completed.py new file mode 100644 index 0000000..32df3c0 --- /dev/null +++ b/tests/jobs/test_remove_completed.py @@ -0,0 +1,249 @@ +"""Tests for the remove_completed job.""" + +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from src.jobs.remove_completed import COMPLETED_STATES, RemoveCompleted + + +def create_mock_settings(target_tags=None, target_categories=None): + """Create mock settings for testing.""" + settings = MagicMock() + settings.jobs = MagicMock() + settings.jobs.remove_completed.enabled = True + settings.jobs.remove_completed.target_tags = target_tags or [] + settings.jobs.remove_completed.target_categories = target_categories or [] + settings.general = MagicMock() + settings.general.protected_tag = "protected" + return settings + + +def create_mock_download_client(items: list): + """Create a mock download client.""" + client = MagicMock() + client.get_qbit_items = AsyncMock(return_value=items) + return client + + +# Default item properties for tests +ITEM_DEFAULTS = { + "progress": 1, + "ratio": 0, + "ratio_limit": -1, + "seeding_time": 0, + "seeding_time_limit": -1, + "tags": "", + "category": "movies", + "state": "stoppedUP", +} + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + ("item_properties", "target_tags", "target_categories", "should_be_removed"), + [ + # Ratio limit met, matching tag and category + ( + {"ratio": 2, "ratio_limit": 2, "tags": "tag1"}, + ["tag1"], + ["movies"], + True, + ), + # Seeding time limit met, matching tag and category + ( + {"seeding_time": 100, "seeding_time_limit": 100, "tags": "tag1"}, + ["tag1"], + ["movies"], + True, + ), + # Neither limit met + ({"ratio": 1, "ratio_limit": 2}, ["tag1"], ["movies"], False), + # Progress less than 1 (should not be considered completed) + ( + {"progress": 0.5, "state": "downloading"}, + ["tag1"], + ["movies"], + False, + ), + # No matching tags or categories + ( + {"ratio": 2, "ratio_limit": 2, "tags": "other", "category": "tv"}, + ["tag1"], + ["movies"], + False, + ), + # Matching category, but not completed + ({"category": "tv", "state": "downloading"}, [], ["tv"], False), + # Matching tag, but not completed + ({"tags": "tag2", "state": "downloading"}, ["tag2"], [], False), + # Matching category and completed (ratio) + ( + {"ratio": 2, "ratio_limit": 2, "category": "tv"}, + [], + ["tv"], + True, + ), + # Matching tag and completed (seeding time) + ( + {"seeding_time": 100, "seeding_time_limit": 100, "tags": "tag2"}, + ["tag2"], + [], + True, + ), + # No targets specified + ({"ratio": 2, "ratio_limit": 2}, [], [], False), + # Item with multiple tags, one is a target + ( + {"tags": "tag1,tag2", "ratio": 2, "ratio_limit": 2}, + ["tag2"], + [], + True, + ), + # Item with a tag that is a substring of a target tag (should not match) + ({"tags": "tag", "ratio": 2, "ratio_limit": 2}, ["tag1"], [], False), + # Item with a category that is a substring of a target (should not match) + ( + {"category": "movie", "ratio": 2, "ratio_limit": 2}, + [], + ["movies"], + False, + ), + # Test with another completed state + ( + {"ratio": 2, "ratio_limit": 2, "state": "pausedUP"}, + ["tag1"], + ["movies"], + True, + ), + ], +) +async def test_remove_completed_logic( + item_properties: dict, + target_tags: list, + target_categories: list, + should_be_removed: bool, +): + """Test the logic of the remove_completed job with various scenarios.""" + item = {**ITEM_DEFAULTS, **item_properties, "name": "test_item"} + + settings = create_mock_settings(target_tags, target_categories) + client = create_mock_download_client([item]) + + job = RemoveCompleted(client, "qbittorrent", settings, "remove_completed") + + items_to_remove = await job._get_items_to_remove(await client.get_qbit_items()) + + if should_be_removed: + assert len(items_to_remove) == 1 + assert items_to_remove[0]["name"] == "test_item" + else: + assert len(items_to_remove) == 0 + + +@pytest.mark.asyncio +async def test_remove_completed_skipped_for_sabnzbd(): + """Test that the remove_completed job is skipped for SABnzbd clients.""" + settings = create_mock_settings() + client = create_mock_download_client([]) + job = RemoveCompleted(client, "sabnzbd", settings, "remove_completed") + + # We check the log message instead of mocking the super run + with patch.object(job.logger, "debug") as mock_log: + result = await job.run() + assert result == 0 + mock_log.assert_called_with( + "Skipping job 'remove_completed' for Usenet client mock_client_name.", + ) + + +@pytest.mark.asyncio +async def test_remove_completed_test_run_enabled(): + """Test that no items are removed when test_run is enabled.""" + item = { + **ITEM_DEFAULTS, + "ratio": 2, + "ratio_limit": 2, + "name": "test_item", + "tags": "tag1", + } + settings = create_mock_settings(target_tags=["tag1"]) + settings.general.test_run = True + client = create_mock_download_client([item]) + job = RemoveCompleted(client, "qbittorrent", settings, "remove_completed") + + with patch.object( + job, + "_remove_qbittorrent_item", + new_callable=AsyncMock, + ) as mock_remove: + result = await job.run() + + assert ( + result == 1 + ) # The job should still report the number of items it would have removed + mock_remove.assert_not_called() + + +@pytest.mark.asyncio +@pytest.mark.parametrize("protected_on", ["tag", "category"]) +async def test_remove_completed_with_protected_item(protected_on): + """Test that items with a protected tag or category are not removed.""" + item_properties = {"ratio": 2, "ratio_limit": 2, "name": "protected_item"} + target_tags = ["tag1"] + target_categories = ["movies"] + + if protected_on == "tag": + item_properties["tags"] = "protected" + # Also add a targetable tag to ensure it's the protection that stops it + item_properties["tags"] += ",tag1" + else: + item_properties["category"] = "protected" + + item = {**ITEM_DEFAULTS, **item_properties} + + settings = create_mock_settings( + target_tags=target_tags, + target_categories=target_categories, + ) + client = create_mock_download_client([item]) + job = RemoveCompleted(client, "qbittorrent", settings, "remove_completed") + + with patch.object( + job, + "_remove_qbittorrent_item", + new_callable=AsyncMock, + ) as mock_remove: + result = await job.run() + assert result == 0 # No items should be removed + mock_remove.assert_not_called() + + +@pytest.mark.asyncio +async def test_is_completed_logic(): + """Test the internal _is_completed logic with different states and limits.""" + job = RemoveCompleted(MagicMock(), "qbittorrent", MagicMock(), "remove_completed") + + # Completed states + for state in COMPLETED_STATES: + # Ratio met + assert job._is_completed( + {"state": state, "ratio": 2, "ratio_limit": 2}, + ), f"Failed for state {state} with ratio met" + # Seeding time met + assert job._is_completed( + {"state": state, "seeding_time": 100, "seeding_time_limit": 100}, + ), f"Failed for state {state} with seeding time met" + # Neither met + assert not job._is_completed( + {"state": state, "ratio": 1, "ratio_limit": 2}, + ), f"Failed for state {state} with neither limit met" + # Limits not set + assert not job._is_completed( + {"state": state, "ratio": 1, "ratio_limit": -1}, + ), f"Failed for state {state} with no ratio limit" + + # Non-completed states + assert not job._is_completed( + {"state": "downloading", "ratio": 2, "ratio_limit": 1}, + ), "Failed for non-completed state"