feat: add remove_completed job and new download client job type

This commit is contained in:
Jakub Buzuk
2025-09-11 02:08:56 +02:00
parent b2cb1ebf86
commit b433e06338
7 changed files with 580 additions and 15 deletions

View File

@@ -42,6 +42,11 @@ jobs:
search_missing:
# min_days_between_searches: 7
# max_concurrent_searches: 3
remove_completed:
target_tags:
- "Obsolete"
# target_categories:
# - "autobrr"
instances:
sonarr:

31
main.py
View File

@@ -1,22 +1,26 @@
import asyncio
import signal
import types
import datetime
import signal
import sys
import types
from src.deletion_handler.deletion_handler import WatcherManager
from src.job_manager import JobManager
from src.settings.settings import Settings
from src.utils.log_setup import logger
from src.utils.startup import launch_steps
from src.deletion_handler.deletion_handler import WatcherManager
settings = Settings()
job_manager = JobManager(settings)
watch_manager = WatcherManager(settings)
def terminate(sigterm: signal.SIGTERM, frame: types.FrameType) -> None: # noqa: ARG001, pylint: disable=unused-argument
"""Terminate cleanly. Needed for respecting 'docker stop'.
def terminate(
sigterm: signal.SIGTERM,
frame: types.FrameType,
) -> None:
"""
Terminate cleanly. Needed for respecting 'docker stop'.
Args:
----
@@ -24,14 +28,18 @@ def terminate(sigterm: signal.SIGTERM, frame: types.FrameType) -> None: # noqa:
frame: The execution frame.
"""
logger.info(f"Termination signal received at {datetime.datetime.now()}.") # noqa: DTZ005
logger.info(
f"Termination signal received at {datetime.datetime.now()}.",
)
watch_manager.stop()
sys.exit(0)
async def wait_next_run():
# Calculate next run time dynamically (to display)
next_run = datetime.datetime.now() + datetime.timedelta(minutes=settings.general.timer)
# Calculate next run time dynamically (to display)
next_run = datetime.datetime.now() + datetime.timedelta(
minutes=settings.general.timer,
)
formatted_next_run = next_run.strftime("%Y-%m-%d %H:%M")
logger.verbose(f"*** Done - Next run at {formatted_next_run} ****")
@@ -39,6 +47,7 @@ async def wait_next_run():
# Wait for the next run
await asyncio.sleep(settings.general.timer * 60)
# Main function
async def main():
await launch_steps(settings)
@@ -58,9 +67,11 @@ async def main():
await job_manager.run_jobs(arr)
logger.verbose("")
# Run download client jobs (these run independently of *arr instances)
await job_manager.run_download_client_jobs()
# Wait for the next run
await wait_next_run()
return
if __name__ == "__main__":

View File

@@ -1,5 +1,6 @@
# Cleans the download queue
from src.jobs.remove_bad_files import RemoveBadFiles
from src.jobs.remove_completed import RemoveCompleted
from src.jobs.remove_failed_downloads import RemoveFailedDownloads
from src.jobs.remove_failed_imports import RemoveFailedImports
from src.jobs.remove_metadata_missing import RemoveMetadataMissing
@@ -25,6 +26,41 @@ class JobManager:
await self.removal_jobs()
await self.search_jobs()
async def run_download_client_jobs(self):
"""Run jobs that operate on download clients directly."""
if not await self._download_clients_connected():
return None
items_detected = 0
for download_client_type in ["qbittorrent", "sabnzbd"]:
download_clients = getattr(
self.settings.download_clients,
download_client_type,
[],
)
for client in download_clients:
logger.info(
f"*** Running jobs on {client.name} ({client.base_url}) ***",
)
# Get jobs for this client
download_client_jobs = self._get_download_client_jobs_for_client(
client,
download_client_type,
)
if not any(job.job.enabled for job in download_client_jobs):
logger.verbose(
"Download Client Jobs: None triggered (No jobs active)",
)
continue
for download_client_job in download_client_jobs:
items_detected += await download_client_job.run()
return items_detected
async def removal_jobs(self):
# Check removal jobs
removal_jobs = self._get_removal_jobs()
@@ -72,7 +108,7 @@ class JobManager:
async def _queue_has_items(self):
logger.debug(
f"job_manager.py/_queue_has_items (Before any removal jobs): Checking if any items in full queue"
"job_manager.py/_queue_has_items (Before any removal jobs): Checking if any items in full queue",
)
queue_manager = QueueManager(self.arr, self.settings)
full_queue = await queue_manager.get_queue_items("full")
@@ -99,10 +135,12 @@ class JobManager:
async def _check_client_connection_status(self, clients):
for client in clients:
logger.debug(
f"job_manager.py/_check_client_connection_status: Checking if {client.name} is connected"
f"job_manager.py/_check_client_connection_status: Checking if {client.name} is connected",
)
if not await client.check_connected():
logger.warning(f">>> {client.name} is disconnected. Skipping queue cleaning on {self.arr.name}.")
logger.warning(
f">>> {client.name} is disconnected. Skipping queue cleaning on {self.arr.name}.",
)
return False
return True
@@ -131,3 +169,26 @@ class JobManager:
removal_job_class(self.arr, self.settings, removal_job_name),
)
return jobs
def _get_download_client_jobs_for_client(self, client, client_type):
"""
Return a list of download client job instances for a specific download client.
Each job is included if the corresponding attribute exists and is truthy in settings.jobs.
"""
download_client_job_classes = {
"remove_completed": RemoveCompleted,
}
jobs = []
for job_name, job_class in download_client_job_classes.items():
if getattr(self.settings.jobs, job_name, False):
jobs.append(
job_class(
client,
client_type,
self.settings,
job_name,
),
)
return jobs

View File

@@ -0,0 +1,152 @@
from abc import ABC, abstractmethod
from src.utils.common import make_request
from src.utils.log_setup import logger
class DownloadClientRemovalJob(ABC):
"""Base class for removal jobs that run on download clients directly."""
job_name = None
def __init__(
self,
download_client: object,
download_client_type: str,
settings: object,
job_name: str,
) -> None:
self.download_client = download_client
self.download_client_type = download_client_type
self.settings = settings
self.job_name = job_name
self.job = getattr(self.settings.jobs, self.job_name)
async def run(self) -> int:
"""Run the download client job."""
if not self.job.enabled:
return 0
logger.debug(
f"download_client_job.py/run: Launching job '{self.job_name}' on {self.download_client.name} "
f"({self.download_client_type})",
)
all_items = await self._get_all_items()
if not all_items:
return 0
items_to_remove = await self._get_items_to_remove(all_items)
# Filter out protected items
items_to_remove = self._filter_protected_items(items_to_remove)
if not items_to_remove:
logger.debug(f"No items to remove for job '{self.job_name}'.")
return 0
# Remove the affected items
await self._remove_items(items_to_remove)
return len(items_to_remove)
async def _get_all_items(self) -> list:
"""Get all items from the download client."""
try:
if self.download_client_type == "qbittorrent":
return await self.download_client.get_qbit_items()
if self.download_client_type == "sabnzbd":
return await self.download_client.get_history_items()
except Exception as e:
logger.error(
f"Error fetching items from {self.download_client.name}: {e}",
)
return []
def _filter_protected_items(self, items: list) -> list:
"""Filter out items that are protected by tags or categories."""
protected_tag = getattr(self.settings.general, "protected_tag", None)
if not protected_tag:
return items
filtered_items = []
for item in items:
is_protected = False
item_name = item.get("name", "unknown")
if self.download_client_type == "qbittorrent":
tags = item.get("tags", "").split(",")
tags = [tag.strip() for tag in tags if tag.strip()]
category = item.get("category", "")
if protected_tag in tags or protected_tag == category:
is_protected = True
elif self.download_client_type == "sabnzbd":
category = item.get("category", "")
if protected_tag == category:
is_protected = True
if is_protected:
logger.debug(f"Ignoring protected item: {item_name}")
else:
filtered_items.append(item)
return filtered_items
@abstractmethod
async def _get_items_to_remove(self, items: list) -> list:
"""Return a list of items to remove from the download client."""
async def _remove_items(self, items: list) -> None:
"""Remove the affected items from the download client."""
if self.settings.general.test_run:
logger.info("Test run is enabled. Skipping actual removal.")
for item in items:
item_name = item.get("name", "unknown")
logger.info(f"Would have removed download: {item_name}")
return
for item in items:
item_name = item.get("name", "unknown")
try:
if self.download_client_type == "qbittorrent":
await self._remove_qbittorrent_item(item)
elif self.download_client_type == "sabnzbd":
await self._remove_sabnzbd_item(item)
logger.info(
f"Removed download: {item_name}",
)
except Exception as e:
logger.error(f"Failed to remove {item_name}: {e}")
async def _remove_qbittorrent_item(self, item: dict) -> None:
"""Remove a torrent from qBittorrent."""
download_id = item["hash"].lower()
data = {
"hashes": download_id,
"deleteFiles": "true",
}
await make_request(
"post",
f"{self.download_client.api_url}/torrents/delete",
self.settings,
data=data,
cookies=self.download_client.cookie,
)
async def _remove_sabnzbd_item(self, item: dict) -> None:
"""Remove a download from SABnzbd history."""
download_id = item["nzo_id"]
params = {
"mode": "history",
"name": "delete",
"value": download_id,
"apikey": self.download_client.api_key,
"output": "json",
}
await make_request(
"get",
self.download_client.api_url,
self.settings,
params=params,
)

View File

@@ -0,0 +1,87 @@
"""Removes completed torrents that have specific tags/categories."""
from src.jobs.download_client_removal_job import DownloadClientRemovalJob
from src.utils.log_setup import logger
COMPLETED_STATES = [
"stoppedUP",
"pausedUP", # Older qBittorrent versions
]
class RemoveCompleted(DownloadClientRemovalJob):
"""Job to remove completed torrents that match specific tags or categories."""
async def run(self) -> int:
if self.download_client_type == "sabnzbd":
logger.debug(
f"Skipping job '{self.job_name}' for Usenet client {self.download_client.name}.",
)
return 0
return await super().run()
async def _get_items_to_remove(self, items: list) -> list:
"""
Filters a list of items from a download client and returns those
that should be removed based on completion status and other criteria.
"""
target_tags, target_categories = self._get_targets()
if not target_tags and not target_categories:
logger.debug(
"No target tags or categories specified for remove_completed job.",
)
return []
items_to_remove = [
item
for item in items
if self._is_completed(item)
and self._meets_target_criteria(item, target_tags, target_categories)
]
for item in items_to_remove:
logger.debug(
f"Found completed item to remove: {item.get('name', 'unknown')}",
)
return items_to_remove
def _is_completed(self, item: dict) -> bool:
"""Check if an item has met its seeding goals."""
state = item.get("state", "")
if state not in COMPLETED_STATES:
return False
# Additional sanity checks for ratio and seeding time
ratio = item.get("ratio", 0)
ratio_limit = item.get("ratio_limit", -1)
seeding_time = item.get("seeding_time", 0)
seeding_time_limit = item.get("seeding_time_limit", -1)
ratio_limit_met = ratio >= ratio_limit > 0
seeding_time_limit_met = seeding_time >= seeding_time_limit > 0
return ratio_limit_met or seeding_time_limit_met
def _meets_target_criteria(
self,
item: dict,
target_tags: list,
target_categories: list,
) -> bool:
"""Check if an item has the required tags or categories for removal."""
item_category = item.get("category", "")
if item_category in target_categories:
return True
tags = item.get("tags", "").split(",")
item_tags = {tag.strip() for tag in tags if tag.strip()}
return bool(item_tags.intersection(target_tags))
def _get_targets(self) -> tuple[list, list]:
"""Get the list of tags and categories to look for from job settings."""
tags = getattr(self.job, "target_tags", [])
categories = getattr(self.job, "target_categories", [])
return tags, categories

View File

@@ -64,7 +64,7 @@ class JobDefaults:
self.max_concurrent_searches = max_concurrent_searches
else:
logger.warning(
f"Job default 'max_concurrent_searches' must be an integer greater 0. Found: {str(max_concurrent_searches)}. Using default: {self.max_concurrent_searches}"
f"Job default 'max_concurrent_searches' must be an integer greater 0. Found: {max_concurrent_searches!s}. Using default: {self.max_concurrent_searches}",
)
self.min_days_between_searches = job_defaults_config.get(
"min_days_between_searches",
@@ -84,6 +84,7 @@ class Jobs:
def _set_job_defaults(self):
self.remove_bad_files = JobParams(keep_archives=self.job_defaults.keep_archives)
self.remove_completed = JobParams()
self.remove_failed_downloads = JobParams()
self.remove_failed_imports = JobParams(
message_patterns=self.job_defaults.message_patterns,
@@ -109,7 +110,6 @@ class Jobs:
)
self.detect_deletions = JobParams()
def _set_job_configs(self, config):
# Populate jobs from YAML config
for job_name in self.__dict__:

View File

@@ -0,0 +1,249 @@
"""Tests for the remove_completed job."""
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from src.jobs.remove_completed import COMPLETED_STATES, RemoveCompleted
def create_mock_settings(target_tags=None, target_categories=None):
"""Create mock settings for testing."""
settings = MagicMock()
settings.jobs = MagicMock()
settings.jobs.remove_completed.enabled = True
settings.jobs.remove_completed.target_tags = target_tags or []
settings.jobs.remove_completed.target_categories = target_categories or []
settings.general = MagicMock()
settings.general.protected_tag = "protected"
return settings
def create_mock_download_client(items: list):
"""Create a mock download client."""
client = MagicMock()
client.get_qbit_items = AsyncMock(return_value=items)
return client
# Default item properties for tests
ITEM_DEFAULTS = {
"progress": 1,
"ratio": 0,
"ratio_limit": -1,
"seeding_time": 0,
"seeding_time_limit": -1,
"tags": "",
"category": "movies",
"state": "stoppedUP",
}
@pytest.mark.asyncio
@pytest.mark.parametrize(
("item_properties", "target_tags", "target_categories", "should_be_removed"),
[
# Ratio limit met, matching tag and category
(
{"ratio": 2, "ratio_limit": 2, "tags": "tag1"},
["tag1"],
["movies"],
True,
),
# Seeding time limit met, matching tag and category
(
{"seeding_time": 100, "seeding_time_limit": 100, "tags": "tag1"},
["tag1"],
["movies"],
True,
),
# Neither limit met
({"ratio": 1, "ratio_limit": 2}, ["tag1"], ["movies"], False),
# Progress less than 1 (should not be considered completed)
(
{"progress": 0.5, "state": "downloading"},
["tag1"],
["movies"],
False,
),
# No matching tags or categories
(
{"ratio": 2, "ratio_limit": 2, "tags": "other", "category": "tv"},
["tag1"],
["movies"],
False,
),
# Matching category, but not completed
({"category": "tv", "state": "downloading"}, [], ["tv"], False),
# Matching tag, but not completed
({"tags": "tag2", "state": "downloading"}, ["tag2"], [], False),
# Matching category and completed (ratio)
(
{"ratio": 2, "ratio_limit": 2, "category": "tv"},
[],
["tv"],
True,
),
# Matching tag and completed (seeding time)
(
{"seeding_time": 100, "seeding_time_limit": 100, "tags": "tag2"},
["tag2"],
[],
True,
),
# No targets specified
({"ratio": 2, "ratio_limit": 2}, [], [], False),
# Item with multiple tags, one is a target
(
{"tags": "tag1,tag2", "ratio": 2, "ratio_limit": 2},
["tag2"],
[],
True,
),
# Item with a tag that is a substring of a target tag (should not match)
({"tags": "tag", "ratio": 2, "ratio_limit": 2}, ["tag1"], [], False),
# Item with a category that is a substring of a target (should not match)
(
{"category": "movie", "ratio": 2, "ratio_limit": 2},
[],
["movies"],
False,
),
# Test with another completed state
(
{"ratio": 2, "ratio_limit": 2, "state": "pausedUP"},
["tag1"],
["movies"],
True,
),
],
)
async def test_remove_completed_logic(
item_properties: dict,
target_tags: list,
target_categories: list,
should_be_removed: bool,
):
"""Test the logic of the remove_completed job with various scenarios."""
item = {**ITEM_DEFAULTS, **item_properties, "name": "test_item"}
settings = create_mock_settings(target_tags, target_categories)
client = create_mock_download_client([item])
job = RemoveCompleted(client, "qbittorrent", settings, "remove_completed")
items_to_remove = await job._get_items_to_remove(await client.get_qbit_items())
if should_be_removed:
assert len(items_to_remove) == 1
assert items_to_remove[0]["name"] == "test_item"
else:
assert len(items_to_remove) == 0
@pytest.mark.asyncio
async def test_remove_completed_skipped_for_sabnzbd():
"""Test that the remove_completed job is skipped for SABnzbd clients."""
settings = create_mock_settings()
client = create_mock_download_client([])
job = RemoveCompleted(client, "sabnzbd", settings, "remove_completed")
# We check the log message instead of mocking the super run
with patch.object(job.logger, "debug") as mock_log:
result = await job.run()
assert result == 0
mock_log.assert_called_with(
"Skipping job 'remove_completed' for Usenet client mock_client_name.",
)
@pytest.mark.asyncio
async def test_remove_completed_test_run_enabled():
"""Test that no items are removed when test_run is enabled."""
item = {
**ITEM_DEFAULTS,
"ratio": 2,
"ratio_limit": 2,
"name": "test_item",
"tags": "tag1",
}
settings = create_mock_settings(target_tags=["tag1"])
settings.general.test_run = True
client = create_mock_download_client([item])
job = RemoveCompleted(client, "qbittorrent", settings, "remove_completed")
with patch.object(
job,
"_remove_qbittorrent_item",
new_callable=AsyncMock,
) as mock_remove:
result = await job.run()
assert (
result == 1
) # The job should still report the number of items it would have removed
mock_remove.assert_not_called()
@pytest.mark.asyncio
@pytest.mark.parametrize("protected_on", ["tag", "category"])
async def test_remove_completed_with_protected_item(protected_on):
"""Test that items with a protected tag or category are not removed."""
item_properties = {"ratio": 2, "ratio_limit": 2, "name": "protected_item"}
target_tags = ["tag1"]
target_categories = ["movies"]
if protected_on == "tag":
item_properties["tags"] = "protected"
# Also add a targetable tag to ensure it's the protection that stops it
item_properties["tags"] += ",tag1"
else:
item_properties["category"] = "protected"
item = {**ITEM_DEFAULTS, **item_properties}
settings = create_mock_settings(
target_tags=target_tags,
target_categories=target_categories,
)
client = create_mock_download_client([item])
job = RemoveCompleted(client, "qbittorrent", settings, "remove_completed")
with patch.object(
job,
"_remove_qbittorrent_item",
new_callable=AsyncMock,
) as mock_remove:
result = await job.run()
assert result == 0 # No items should be removed
mock_remove.assert_not_called()
@pytest.mark.asyncio
async def test_is_completed_logic():
"""Test the internal _is_completed logic with different states and limits."""
job = RemoveCompleted(MagicMock(), "qbittorrent", MagicMock(), "remove_completed")
# Completed states
for state in COMPLETED_STATES:
# Ratio met
assert job._is_completed(
{"state": state, "ratio": 2, "ratio_limit": 2},
), f"Failed for state {state} with ratio met"
# Seeding time met
assert job._is_completed(
{"state": state, "seeding_time": 100, "seeding_time_limit": 100},
), f"Failed for state {state} with seeding time met"
# Neither met
assert not job._is_completed(
{"state": state, "ratio": 1, "ratio_limit": 2},
), f"Failed for state {state} with neither limit met"
# Limits not set
assert not job._is_completed(
{"state": state, "ratio": 1, "ratio_limit": -1},
), f"Failed for state {state} with no ratio limit"
# Non-completed states
assert not job._is_completed(
{"state": "downloading", "ratio": 2, "ratio_limit": 1},
), "Failed for non-completed state"