diff --git a/docker/requirements.txt b/docker/requirements.txt index 823cd6b..05a4555 100644 --- a/docker/requirements.txt +++ b/docker/requirements.txt @@ -11,4 +11,5 @@ autoflake==2.3.1 isort==5.13.2 envyaml==1.10.211231 demjson3==3.0.6 -ruff==0.11.11 \ No newline at end of file +ruff==0.11.11 +watchdog==6.0.0 \ No newline at end of file diff --git a/main.py b/main.py index b5dd868..5f64dd5 100644 --- a/main.py +++ b/main.py @@ -8,9 +8,11 @@ from src.job_manager import JobManager from src.settings.settings import Settings from src.utils.log_setup import logger from src.utils.startup import launch_steps +from src.deletion_handler.deletion_handler import WatcherManager settings = Settings() job_manager = JobManager(settings) +watch_manager = WatcherManager(settings) def terminate(sigterm: signal.SIGTERM, frame: types.FrameType) -> None: # noqa: ARG001, pylint: disable=unused-argument @@ -24,6 +26,7 @@ def terminate(sigterm: signal.SIGTERM, frame: types.FrameType) -> None: # noqa: """ logger.info(f"Termination signal received at {datetime.datetime.now()}.") # noqa: DTZ005 + watch_manager.stop() sys.exit(0) async def wait_next_run(): @@ -40,6 +43,8 @@ async def wait_next_run(): async def main(): await launch_steps(settings) + if settings.jobs.detect_deletions: + await WatcherManager(settings).setup() # Start Cleaning while True: logger.info("-" * 50) diff --git a/src/deletion_handler/__init__.py b/src/deletion_handler/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/deletion_handler/deletion_handler.py b/src/deletion_handler/deletion_handler.py new file mode 100644 index 0000000..83d70cd --- /dev/null +++ b/src/deletion_handler/deletion_handler.py @@ -0,0 +1,143 @@ +import asyncio +from pathlib import Path +from collections import defaultdict + +from watchdog.observers import Observer +from watchdog.events import FileSystemEventHandler +from src.utils.log_setup import logger + + +class DeletionHandler(FileSystemEventHandler): + def __init__(self, arr, loop): + super().__init__() + self.arr = arr + self.deleted_files = set() # store deleted file paths + self._process_task = None + self._lock = asyncio.Lock() + self.delay = 5 # Group deletes into 5 second badges + self.loop = loop + + def on_deleted(self, event): + if event.is_directory: + return + + deleted_file = event.src_path + asyncio.run_coroutine_threadsafe(self._queue_delete(deleted_file), self.loop) + + + async def _queue_delete(self, deleted_file): + async with self._lock: + self.deleted_files.add(deleted_file) + if self._process_task is None or self._process_task.done(): + # Schedule batch processing 5 seconds from now + self._process_task = asyncio.create_task( + self._process_deletes_after_delay() + ) + + async def _process_deletes_after_delay(self): + """Retrieve all files that were deleted, wait a few seconds, and then handle the parent folders""" + await asyncio.sleep(self.delay) + async with self._lock: + # Copy and clear the deleted files set + files_to_process = self.deleted_files.copy() + logger.debug(f"deletion_handler.py/_process_deletes_after_delay: Deleted files: {' '.join(files_to_process)}") + for handler in logger.handlers: + handler.flush() + self.deleted_files.clear() + + # Extract parent folder paths, deduplicate them + deletions = self._group_deletions_by_folder(files_to_process) + logger.debug(f"deletion_handler.py/_process_deletes_after_delay: Folders with deletes: {' '.join(deletions.keys())}") + + await self._handle_folders(deletions) + + @staticmethod + def _group_deletions_by_folder(files_to_process): + deletions = defaultdict(list) + for f in files_to_process: + deletions[str(Path(f).parent)].append(Path(f).name) + return dict(deletions) + + async def _handle_folders(self, deletions): + """Async handle folder paths: lookup item IDs and log results.""" + for folder_path, files in deletions.items(): + refresh_item = await self.arr.get_refresh_item_by_path(folder_path) + if refresh_item: + logger.info( + f"Job 'detect_deletions' triggered media refresh on {self.arr.name} ({self.arr.base_url}): {refresh_item['title']}" + ) + await self.arr.refresh_item(refresh_item['id']) + else: + logger.verbose( + f"Job 'detect_deletions' detected a deleted file, but couldn't find a corresponding media item on {self.arr.name} ({self.arr.base_url})" + ) + logger.verbose(f"Deleted Files:") + for file in files: + logger.verbose(f"- {Path(folder_path) / file}") + + async def await_completion(self): + # For pytests to know when background task has finsished + if self._process_task: + await self._process_task + +class WatcherManager: + # Checks which folders are set up on arr and sets a watcher on them for deletes + def __init__(self, settings): + self.settings = settings + self.observers = [] + self.handlers = [] + self.loop = None + + async def setup(self): + self.loop = asyncio.get_running_loop() + folders_to_watch = await self.get_folders_to_watch() + for arr, folder_path in folders_to_watch: + self.set_watcher(arr, folder_path) + + async def get_folders_to_watch(self): + """Gets from all arrs the root folders and lists those that are accessible for the arr, and have present for decluttarr.""" + folders_to_watch = [] + logger.verbose("") + logger.verbose("*** Setting up monitoring for deletions ***") + for arr in self.settings.instances: + if arr.arr_type not in ( + "sonarr", + "radarr", + ): # only working for sonarr / radarr for now + continue + root_folders = await arr.get_root_folders() + + for folder in root_folders: + if folder.get("accessible") and "path" in folder: + path = Path(folder["path"]) + if path.exists(): + folders_to_watch.append((arr, folder["path"])) + else: + logger.warning( + f"Job 'detect_deletions' on {arr.name} ({arr.base_url}) does not have access to this path and will not monitor it: '{path}'" + ) + logger.info( + '>>> 💡 Tip: Make sure that the paths in decluttarr and in your arr instance are identical.' + ) + if self.settings.envs.in_docker: + logger.info( + '>>> 💡 Tip: Make sure decluttarr and your arr instance have the same mount points' + ) + + return folders_to_watch + + def set_watcher(self, arr, folder_to_watch): + """Adds a file deletion watcher for the specified folder and arr instance, creating an event handler to process deletion events and an observer to monitor the filesystem; starts the observer and stores both the handler and observer for later management + """ + event_handler = DeletionHandler(arr, self.loop) + observer = Observer() + observer.schedule(event_handler, folder_to_watch, recursive=True) + observer.start() + self.handlers.append(event_handler) + logger.verbose(f"Job 'detect_deletions' started monitoring folder on {arr.name} ({arr.base_url}): {folder_to_watch}") + self.observers.append(observer) + + def stop(self): + for observer in self.observers: + observer.stop() + observer.join() diff --git a/src/jobs/search_handler.py b/src/jobs/search_handler.py index b16be97..a4bf75d 100644 --- a/src/jobs/search_handler.py +++ b/src/jobs/search_handler.py @@ -135,7 +135,3 @@ class SearchHandler: season = item.get("seasonNumber", "00") season_numbering = f"S{int(season):02}/E{int(episode):02}" logger.verbose(f"- {series_title} ({season_numbering})") - - async def _get_series_dict(self): - series = await self.arr.rest_get("series") - return {s["id"]: s for s in series} diff --git a/src/settings/_constants.py b/src/settings/_constants.py index 6e38b89..d9c59f0 100644 --- a/src/settings/_constants.py +++ b/src/settings/_constants.py @@ -60,3 +60,13 @@ class DetailItemSearchCommand: lidarr = "AlbumSearch" readarr = "BookSearch" whisparr = None + + +class RefreshItemKey: + radarr = "movie" + sonarr = "series" + + +class RefreshItemCommand: + radarr = "RefreshMovie" + sonarr = "RefreshSeries" diff --git a/src/settings/_instances.py b/src/settings/_instances.py index 28d7d83..6af7dcd 100644 --- a/src/settings/_instances.py +++ b/src/settings/_instances.py @@ -1,3 +1,4 @@ +from pathlib import Path import requests from packaging import version @@ -8,6 +9,8 @@ from src.settings._constants import ( DetailItemSearchCommand, FullQueueParameter, MinVersions, + RefreshItemKey, + RefreshItemCommand, ) from src.utils.common import make_request, wait_and_exit, extract_json_from_response from src.utils.log_setup import logger @@ -50,7 +53,6 @@ class ArrError(Exception): pass - class ArrInstances(list): """Represents all Arr clients (Sonarr, Radarr, etc.).""" @@ -152,6 +154,9 @@ class ArrInstance: self.detail_item_id_key = self.detail_item_key + "Id" self.detail_item_ids_key = self.detail_item_key + "Ids" self.detail_item_search_command = getattr(DetailItemSearchCommand, arr_type) + self.refresh_item_key = getattr(RefreshItemKey, arr_type) + self.refresh_item_id_key = self.refresh_item_key + "Id" + self.refresh_item_command = getattr(RefreshItemCommand, arr_type) async def _check_ui_language(self): """Check if the UI language is set to English.""" @@ -296,7 +301,6 @@ class ArrInstance: logger.info(tip) return - async def remove_queue_item(self, queue_id, *, blocklist=False): """ Remove a specific queue item from the queue by its queue id. @@ -345,3 +349,39 @@ class ArrInstance: headers = {"X-Api-Key": self.api_key} response = await make_request("get", endpoint, self.settings, headers=headers) return response.json() + + async def get_root_folders(self): + """Fetch Root folders.""" + endpoint = self.api_url + "/rootFolder" + headers = {"X-Api-Key": self.api_key} + response = await make_request("get", endpoint, self.settings, headers=headers) + return response.json() + + async def get_refresh_item(self): + endpoint = self.api_url + "/" + self.refresh_item_key + headers = {"X-Api-Key": self.api_key} + response = await make_request("get", endpoint, self.settings, headers=headers) + return response.json() + + async def get_refresh_item_by_path(self, folder_path): + """Returns the movie/series id that matches a given folder""" + items = await self.get_refresh_item() + for item in items: + if Path(folder_path).is_relative_to(Path(item.get("path"))): + return item + return None + + async def refresh_item(self, refresh_item_id): + # Refresh the queue by making the POST request using an external make_request function + logger.debug("_instances.py/_refresh_item: Refreshing Item") + await make_request( + method="POST", + endpoint=f"{self.api_url}/command", + settings=self.settings, + json={ + "name": self.detail_item_search_command, + self.refresh_item_id_key: refresh_item_id, + }, + headers={"X-Api-Key": self.api_key}, + ) + return diff --git a/src/settings/_jobs.py b/src/settings/_jobs.py index 3cd2076..1e17bb8 100644 --- a/src/settings/_jobs.py +++ b/src/settings/_jobs.py @@ -107,6 +107,8 @@ class Jobs: max_concurrent_searches=self.job_defaults.max_concurrent_searches, min_days_between_searches=self.job_defaults.min_days_between_searches, ) + self.detect_deletions = JobParams() + def _set_job_configs(self, config): # Populate jobs from YAML config diff --git a/src/settings/_user_config.py b/src/settings/_user_config.py index 19b43ba..3400de4 100644 --- a/src/settings/_user_config.py +++ b/src/settings/_user_config.py @@ -15,6 +15,7 @@ CONFIG_MAPPING = { "PUBLIC_TRACKER_HANDLING", "OBSOLETE_TAG", "PROTECTED_TAG", + "DETECT_DELETIONS", ], "job_defaults": [ "MAX_STRIKES", diff --git a/tests/deletion_handler/test_deletion_handler.py b/tests/deletion_handler/test_deletion_handler.py new file mode 100644 index 0000000..bbbd6bf --- /dev/null +++ b/tests/deletion_handler/test_deletion_handler.py @@ -0,0 +1,212 @@ +# pylint: disable=W0212 +import asyncio +from pathlib import Path +from unittest.mock import AsyncMock, MagicMock, patch +import pytest + +from src.deletion_handler.deletion_handler import WatcherManager, DeletionHandler + + +@pytest.mark.asyncio +async def test_get_folders_to_watch(caplog): + arr_mock = MagicMock() + arr_mock.name = "Sonarr" + arr_mock.base_url = "http://sonarr:8989" + arr_mock.arr_type = "sonarr" + + arr_mock.get_root_folders = AsyncMock( + return_value=[ + {"accessible": True, "path": "/valid/path"}, + {"accessible": True, "path": "/missing/path"}, + {"accessible": False, "path": "/ignored/path"}, + {"path": "/no_access_field"}, + {"accessible": True}, # Missing "path" + ] + ) + + settings = MagicMock() + settings.instances = [arr_mock] + watcher_manager = WatcherManager(settings) + + # Patch Path.exists to simulate filesystem behavior + def fake_exists(self): + return str(self) == "/valid/path" + + with patch("pathlib.Path.exists", new=fake_exists): + with caplog.at_level("WARNING"): + folders = await watcher_manager.get_folders_to_watch() + + assert folders == [(arr_mock, "/valid/path")] + + assert any( + " does not have access to this path" in record.message + and "/missing/path" in record.message + for record in caplog.records + ) + + +class FakeEvent: + def __init__(self, src_path, is_directory=False): + self.src_path = src_path + self.is_directory = is_directory + + +@pytest.mark.asyncio +async def test_deletion_handler_batch_processing(): + """This test verifies that DeletionHandler batches multiple file deletions, processes their parent folder once after a delay, and correctly calls the arr API with the expected folder path.""" + arr_mock = AsyncMock() + arr_mock.name = "Sonarr" + arr_mock.arr_type = "sonarr" + arr_mock.get_refresh_item_by_path = AsyncMock( + side_effect=lambda path: {"id": f"id_for_{path}", "title": "abc"} + ) + arr_mock.refresh_item = AsyncMock() + loop = asyncio.get_running_loop() + handler = DeletionHandler(arr_mock, loop) + handler.delay = 0 # immediate execution for tests + + # Trigger deletions + handler.on_deleted(FakeEvent("/folder/file1.txt")) + handler.on_deleted(FakeEvent("/folder/file2.txt")) + # Let the event loop process scheduled task + await asyncio.sleep(0.01) + # Await batch completion + await handler.await_completion() + + # Validate the call + expected_calls = {"/folder"} + actual_calls = { + call.args[0] for call in arr_mock.get_refresh_item_by_path.call_args_list + } + assert actual_calls == expected_calls + + arr_mock.refresh_item.assert_called_once_with("id_for_/folder") + + +def test_group_deletions_by_folder(): + """Check that files are grouped by their parent folder correctly""" + files = { + "/tmp/folder1/file1.txt", + "/tmp/folder1/file2.txt", + "/tmp/folder2/file3.txt", + } + expected = { + str(Path("/tmp/folder1")): ["file1.txt", "file2.txt"], + str(Path("/tmp/folder2")): ["file3.txt"], + } + deletions = DeletionHandler._group_deletions_by_folder(files) + + # Since the value lists could be in any order due to set input, compare after sorting + for folder, files in expected.items(): + assert sorted(deletions.get(folder, [])) == sorted(files) + + # Also check no extra keys + assert set(deletions.keys()) == set(expected.keys()) + +@pytest.mark.asyncio +async def test_process_deletes_after_delay_clears_deleted_files(monkeypatch): + """Tests that _process_deletes_after_delay clears deleted files and correctly processes their parent folders asynchronously.""" + + class DummyArr: + def __init__(self): + self.called = [] + self.name = "DummyArr" # add this attribute + + async def get_refresh_item_id_by_path(self, path): + self.called.append(path) + return "id" + + arr = DummyArr() + loop = asyncio.get_running_loop() + handler = DeletionHandler(arr, loop) + handler.delay = 0 # no delay for test + + handler.deleted_files = { + "/tmp/folder1/file1.txt", + "/tmp/folder2/file2.txt", + } + + async def no_sleep(_): + return + + monkeypatch.setattr(asyncio, "sleep", no_sleep) + + # Patch _handle_folders to actually call dummy arr method and record calls + async def fake_handle_folders(folders): + for folder_path in folders: + await arr.get_refresh_item_id_by_path(folder_path) + + handler._handle_folders = fake_handle_folders + + await handler._process_deletes_after_delay() + + assert not handler.deleted_files + + expected_folders = { + str(Path(f).parent) + for f in ["/tmp/folder1/file1.txt", "/tmp/folder2/file2.txt"] + } + assert set(arr.called) == expected_folders + + + + +@pytest.mark.asyncio +async def test_file_deletion_triggers_handler_with_watchermanager(tmp_path): + """Tests that when a file is deleted in a watched directory, + the WatcherManager’s DeletionHandler receives the event and + calls the appropriate methods on the arr instance with the correct folder path.""" + + folder_to_watch = tmp_path / "watched" + folder_to_watch.mkdir() + + class TestArr: + def __init__(self): + self.name = "Test" + self.arr_type = "sonarr" + self.base_url = "http://localhost" + self.called_paths = [] + self.refreshed_ids = [] + + async def get_root_folders(self): + return [{"accessible": True, "path": str(folder_to_watch)}] + + async def get_refresh_item_by_path(self, path): + self.called_paths.append(path) + # Return a dict with a 'title' key (and any other keys needed) + return {"id": f"id_for_{path}", "title": f"Title for {path}"} + + async def refresh_item(self, item_id): + self.refreshed_ids.append(item_id) + + settings = MagicMock() + test_arr_instance = TestArr() + settings.instances = [test_arr_instance] + + watcher = WatcherManager(settings) + await watcher.setup() + + # Reduce delay for faster test execution + for handler in watcher.handlers: + handler.delay = 0.1 + + try: + test_file = folder_to_watch / "file1.txt" + test_file.write_text("hello") + test_file.unlink() # delete the file to trigger the handler + + # Wait enough time for deletion event and async processing to complete + await asyncio.sleep(0.3) + + # Await completion for all handlers to ensure background tasks done + for handler in watcher.handlers: + await handler.await_completion() + + # Assert the folder path was passed to get_refresh_item_id_by_path + assert str(folder_to_watch) in test_arr_instance.called_paths + # Assert that refresh_item was called with the expected IDs + expected_id = f"id_for_{str(folder_to_watch)}" + assert expected_id in test_arr_instance.refreshed_ids + + finally: + watcher.stop() diff --git a/tests/settings/test_instances.py b/tests/settings/test_instances.py new file mode 100644 index 0000000..c69fb96 --- /dev/null +++ b/tests/settings/test_instances.py @@ -0,0 +1,60 @@ +from unittest.mock import AsyncMock, patch, MagicMock +import pytest +from src.settings._instances import ArrInstance + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + "arr_type, expected_key", + [ + ("radarr", "movie"), + ("sonarr", "series"), + ], +) +async def test_get_refresh_item_calls_make_request_with_correct_params( + arr_type, expected_key +): + base_url = f"http://{arr_type}/" + api_key = "test_key" + settings = {} + + arr = ArrInstance(settings, arr_type, base_url, api_key) + + # Fake response data your get_refresh_item expects + fake_json = [{"id": 1, "path": "/media/example"}] + + # Patch make_request to return an object whose .json() coroutine returns fake_json + with patch( + "src.settings._instances.make_request", new_callable=AsyncMock + ) as mock_make_request: + mock_response = AsyncMock() + mock_response.json = MagicMock(return_value=fake_json) + mock_make_request.return_value = mock_response + + result = await arr.get_refresh_item() + + mock_make_request.assert_awaited_once_with( + "get", + arr.api_url + "/" + expected_key, + settings, + headers={"X-Api-Key": api_key}, + ) + assert result == fake_json + + +@pytest.mark.asyncio +async def test_get_refresh_item_by_path_returns_correct_item(): + arr = ArrInstance({}, "radarr", "http://radarr/", "test_key") + + mock_items = [ + {"id": 123, "path": "/media/folder1"}, + {"id": 456, "path": "/media/folder2"}, + ] + + with patch.object( + arr, "get_refresh_item", AsyncMock(return_value=mock_items) + ) as mock_method: + result = await arr.get_refresh_item_by_path("/media/folder2/some_subfolder") + + mock_method.assert_awaited_once() + assert result == {"id": 456, "path": "/media/folder2"} diff --git a/tests/settings/test__user_config_from_env.py b/tests/settings/test_user_config_from_env.py similarity index 100% rename from tests/settings/test__user_config_from_env.py rename to tests/settings/test_user_config_from_env.py