Periodic Rescans Added (New Feature)

This commit is contained in:
Benjamin Harder
2024-09-14 13:32:54 +02:00
parent c7e5c0e374
commit 34fcff4de8
21 changed files with 472 additions and 253 deletions

View File

@@ -11,6 +11,7 @@ from src.jobs.remove_orphans import remove_orphans
from src.jobs.remove_slow import remove_slow
from src.jobs.remove_stalled import remove_stalled
from src.jobs.remove_unmonitored import remove_unmonitored
from src.jobs.run_periodic_rescans import run_periodic_rescans
from src.utils.trackers import Deleted_Downloads
@@ -56,119 +57,127 @@ async def queueCleaner(
# Cleans up the downloads queue
logger.verbose("Cleaning queue on %s:", NAME)
# Refresh queue:
full_queue = await get_queue(BASE_URL, API_KEY, params={full_queue_param: True})
if not full_queue:
logger.verbose(">>> Queue is empty.")
return
else:
logger.debug("queueCleaner/full_queue at start:")
logger.debug(full_queue)
deleted_downloads = Deleted_Downloads([])
items_detected = 0
try:
if settingsDict["REMOVE_FAILED"]:
items_detected += await remove_failed(
settingsDict,
BASE_URL,
API_KEY,
NAME,
deleted_downloads,
defective_tracker,
protectedDownloadIDs,
privateDowloadIDs,
)
full_queue = await get_queue(BASE_URL, API_KEY, params={full_queue_param: True})
if full_queue:
logger.debug("queueCleaner/full_queue at start:")
logger.debug(full_queue)
if settingsDict["REMOVE_FAILED_IMPORTS"]:
items_detected += await remove_failed_imports(
settingsDict,
BASE_URL,
API_KEY,
NAME,
deleted_downloads,
defective_tracker,
protectedDownloadIDs,
privateDowloadIDs,
)
deleted_downloads = Deleted_Downloads([])
items_detected = 0
if settingsDict["REMOVE_METADATA_MISSING"]:
items_detected += await remove_metadata_missing(
settingsDict,
BASE_URL,
API_KEY,
NAME,
deleted_downloads,
defective_tracker,
protectedDownloadIDs,
privateDowloadIDs,
)
if settingsDict["REMOVE_FAILED"]:
items_detected += await remove_failed(
settingsDict,
BASE_URL,
API_KEY,
NAME,
deleted_downloads,
defective_tracker,
protectedDownloadIDs,
privateDowloadIDs,
)
if settingsDict["REMOVE_MISSING_FILES"]:
items_detected += await remove_missing_files(
settingsDict,
BASE_URL,
API_KEY,
NAME,
deleted_downloads,
defective_tracker,
protectedDownloadIDs,
privateDowloadIDs,
)
if settingsDict["REMOVE_FAILED_IMPORTS"]:
items_detected += await remove_failed_imports(
settingsDict,
BASE_URL,
API_KEY,
NAME,
deleted_downloads,
defective_tracker,
protectedDownloadIDs,
privateDowloadIDs,
)
if settingsDict["REMOVE_ORPHANS"]:
items_detected += await remove_orphans(
settingsDict,
BASE_URL,
API_KEY,
NAME,
deleted_downloads,
defective_tracker,
protectedDownloadIDs,
privateDowloadIDs,
full_queue_param,
)
if settingsDict["REMOVE_METADATA_MISSING"]:
items_detected += await remove_metadata_missing(
settingsDict,
BASE_URL,
API_KEY,
NAME,
deleted_downloads,
defective_tracker,
protectedDownloadIDs,
privateDowloadIDs,
)
if settingsDict["REMOVE_SLOW"]:
items_detected += await remove_slow(
settingsDict,
BASE_URL,
API_KEY,
NAME,
deleted_downloads,
defective_tracker,
protectedDownloadIDs,
privateDowloadIDs,
download_sizes_tracker,
)
if settingsDict["REMOVE_MISSING_FILES"]:
items_detected += await remove_missing_files(
settingsDict,
BASE_URL,
API_KEY,
NAME,
deleted_downloads,
defective_tracker,
protectedDownloadIDs,
privateDowloadIDs,
)
if settingsDict["REMOVE_STALLED"]:
items_detected += await remove_stalled(
settingsDict,
BASE_URL,
API_KEY,
NAME,
deleted_downloads,
defective_tracker,
protectedDownloadIDs,
privateDowloadIDs,
)
if settingsDict["REMOVE_ORPHANS"]:
items_detected += await remove_orphans(
settingsDict,
BASE_URL,
API_KEY,
NAME,
deleted_downloads,
defective_tracker,
protectedDownloadIDs,
privateDowloadIDs,
full_queue_param,
)
if settingsDict["REMOVE_UNMONITORED"]:
items_detected += await remove_unmonitored(
if settingsDict["REMOVE_SLOW"]:
items_detected += await remove_slow(
settingsDict,
BASE_URL,
API_KEY,
NAME,
deleted_downloads,
defective_tracker,
protectedDownloadIDs,
privateDowloadIDs,
download_sizes_tracker,
)
if settingsDict["REMOVE_STALLED"]:
items_detected += await remove_stalled(
settingsDict,
BASE_URL,
API_KEY,
NAME,
deleted_downloads,
defective_tracker,
protectedDownloadIDs,
privateDowloadIDs,
)
if settingsDict["REMOVE_UNMONITORED"]:
items_detected += await remove_unmonitored(
settingsDict,
BASE_URL,
API_KEY,
NAME,
deleted_downloads,
defective_tracker,
protectedDownloadIDs,
privateDowloadIDs,
arr_type,
)
if items_detected == 0:
logger.verbose(">>> Queue is clean.")
else:
logger.verbose(">>> Queue is empty.")
if settingsDict["RUN_PERIODIC_RESCANS"]:
await run_periodic_rescans(
settingsDict,
BASE_URL,
API_KEY,
NAME,
deleted_downloads,
defective_tracker,
protectedDownloadIDs,
privateDowloadIDs,
arr_type,
)
if items_detected == 0:
logger.verbose(">>> Queue is clean.")
except Exception as error:
errorDetails(NAME, error)
return

View File

@@ -30,13 +30,15 @@ async def remove_failed(
failType = "failed"
queue = await get_queue(BASE_URL, API_KEY)
logger.debug("remove_failed/queue IN: %s", formattedQueueInfo(queue))
if not queue:
return 0
if await qBitOffline(settingsDict, failType, NAME):
return 0
# Find items affected
affectedItems = []
for queueItem in queue["records"]:
for queueItem in queue:
if "errorMessage" in queueItem and "status" in queueItem:
if queueItem["status"] == "failed":
affectedItems.append(queueItem)

View File

@@ -30,8 +30,7 @@ async def remove_failed_imports(
patterns = settingsDict.get("FAILED_IMPORT_MESSAGE_PATTERNS", [])
if not patterns: # If patterns is empty or not present
patterns = None
for queueItem in queue["records"]:
for queueItem in queue:
if (
"status" in queueItem
and "trackedDownloadStatus" in queueItem

View File

@@ -36,7 +36,7 @@ async def remove_metadata_missing(
return 0
# Find items affected
affectedItems = []
for queueItem in queue["records"]:
for queueItem in queue:
if "errorMessage" in queueItem and "status" in queueItem:
if (
queueItem["status"] == "queued"

View File

@@ -36,7 +36,7 @@ async def remove_missing_files(
return 0
# Find items affected
affectedItems = []
for queueItem in queue["records"]:
for queueItem in queue:
if "status" in queueItem:
# case to check for failed torrents
if (

View File

@@ -37,10 +37,10 @@ async def remove_orphans(
# Find items affected
# 1. create a list of the "known" queue items
queueIDs = [queueItem["id"] for queueItem in queue["records"]] if queue else []
queueIDs = [queueItem["id"] for queueItem in queue] if queue else []
affectedItems = []
# 2. compare all queue items against the known ones, and those that are not found are the "unknown" or "orphan" ones
for queueItem in full_queue["records"]:
for queueItem in full_queue:
if queueItem["id"] not in queueIDs:
affectedItems.append(queueItem)

View File

@@ -39,7 +39,7 @@ async def remove_slow(
# Find items affected
affectedItems = []
alreadyCheckedDownloadIDs = []
for queueItem in queue["records"]:
for queueItem in queue:
if (
"downloadId" in queueItem
and "size" in queueItem

View File

@@ -36,7 +36,7 @@ async def remove_stalled(
return 0
# Find items affected
affectedItems = []
for queueItem in queue["records"]:
for queueItem in queue:
if "errorMessage" in queueItem and "status" in queueItem:
if (
queueItem["status"] == "warning"

View File

@@ -35,7 +35,7 @@ async def remove_unmonitored(
return 0
# Find items affected
monitoredDownloadIDs = []
for queueItem in queue["records"]:
for queueItem in queue:
if arr_type == "SONARR":
isMonitored = (
await rest_get(
@@ -70,7 +70,7 @@ async def remove_unmonitored(
monitoredDownloadIDs.append(queueItem["downloadId"])
affectedItems = []
for queueItem in queue["records"]:
for queueItem in queue:
if queueItem["downloadId"] not in monitoredDownloadIDs:
affectedItems.append(
queueItem

View File

@@ -0,0 +1,129 @@
from src.utils.shared import (
errorDetails,
rest_get,
rest_post,
get_queue,
get_arr_records,
)
import logging, verboselogs
from datetime import datetime, timedelta, timezone
import dateutil.parser
logger = verboselogs.VerboseLogger(__name__)
async def run_periodic_rescans(
settingsDict,
BASE_URL,
API_KEY,
NAME,
arr_type,
):
# Checks the wanted items and runs scans
if not arr_type in settingsDict["RUN_PERIODIC_RESCANS"]:
return
try:
queue = await get_queue(BASE_URL, API_KEY)
check_on_endpoint = []
RESCAN_SETTINGS = settingsDict["RUN_PERIODIC_RESCANS"][arr_type]
if RESCAN_SETTINGS["MISSING"]:
check_on_endpoint.append("missing")
if RESCAN_SETTINGS["CUTOFF_UNMET"]:
check_on_endpoint.append("cutoff")
params = {"sortDirection": "ascending"}
if arr_type == "SONARR":
params["sortKey"] = "episodes.lastSearchTime"
queue_ids = [r["seriesId"] for r in queue if "seriesId" in r]
series = await rest_get(f"{BASE_URL}/series", API_KEY)
series_dict = {s["id"]: s for s in series}
elif arr_type == "RADARR":
params["sortKey"] = "movies.lastSearchTime"
queue_ids = [r["movieId"] for r in queue if "movieId" in r]
for end_point in check_on_endpoint:
records = await get_arr_records(
BASE_URL, API_KEY, params=params, end_point=f"wanted/{end_point}"
)
if records is None:
logger.verbose(
f">>> Rescan: No {end_point} items, thus nothing to rescan."
)
continue
# Filter out items that are already being downloaded (are in queue)
records = [r for r in records if r["id"] not in queue_ids]
if records is None:
logger.verbose(
f">>> Rescan: All {end_point} items are already being downloaded, thus nothing to rescan."
)
continue
# Remove records that have recently been searched already
for record in reversed(records):
if not (
("lastSearchTime" not in record)
or (
(
dateutil.parser.isoparse(record["lastSearchTime"])
+ timedelta(days=RESCAN_SETTINGS["MIN_DAYS_BEFORE_RESCAN"])
)
< datetime.now(timezone.utc)
)
):
records.remove(record)
# Select oldest records
records = records[: RESCAN_SETTINGS["MAX_CONCURRENT_SCANS"]]
if not records:
logger.verbose(
f">>> Rescan: All {end_point} items have recently been scanned for, thus nothing to rescan."
)
continue
if arr_type == "SONARR":
for record in records:
series_id = record.get("seriesId")
if series_id and series_id in series_dict:
record["series"] = series_dict[series_id]
else:
record["series"] = (
None # Or handle missing series info as needed
)
logger.verbose(
f">>> Running a scan for {len(records)} {end_point} items:\n"
+ "\n".join(
[
f"{episode['series']['title']} (Season {episode['seasonNumber']} / Episode {episode['episodeNumber']} / Aired: {episode['airDate']}): {episode['title']}"
for episode in records
]
)
)
json = {
"name": "EpisodeSearch",
"episodeIds": [r["id"] for r in records],
}
elif arr_type == "RADARR":
print(records)
logger.verbose(
f">>> Running a scan for {len(records)} {end_point} items:\n"
+ "\n".join(
[f"{movie['title']} ({movie['year']})" for movie in records]
)
)
json = {"name": "MoviesSearch", "movieIds": [r["id"] for r in records]}
if not settingsDict["TEST_RUN"]:
await rest_post(
url=BASE_URL + "/command",
json=json,
headers={"X-Api-Key": API_KEY},
)
except Exception as error:
errorDetails(NAME, error)
return 0

View File

@@ -82,7 +82,10 @@ def showSettings(settingsDict):
logger.info('%s | Removing slow downloads (%s)', str(settingsDict['REMOVE_SLOW']), 'REMOVE_SLOW')
logger.info('%s | Removing stalled downloads (%s)', str(settingsDict['REMOVE_STALLED']), 'REMOVE_STALLED')
logger.info('%s | Removing downloads belonging to unmonitored items (%s)', str(settingsDict['REMOVE_UNMONITORED']), 'REMOVE_UNMONITORED')
logger.info('')
for arr_type, RESCAN_SETTINGS in settingsDict['RUN_PERIODIC_RESCANS'].items():
logger.info('%s/%s (%s) | Search missing/cutoff-unmet items. Max queries/list: %s. Min. days to re-search: %s (%s)', RESCAN_SETTINGS['MISSING'], RESCAN_SETTINGS['CUTOFF_UNMET'], arr_type, RESCAN_SETTINGS['MAX_CONCURRENT_SCANS'], RESCAN_SETTINGS['MIN_DAYS_BEFORE_RESCAN'], 'RUN_PERIODIC_RESCANS')
logger.info('')
logger.info('Running every: %s', fmt.format(rd(minutes=settingsDict['REMOVE_TIMER'])))
if settingsDict['REMOVE_SLOW']:
logger.info('Minimum speed enforced: %s KB/s', str(settingsDict['MIN_DOWNLOAD_SPEED']))

View File

@@ -1,47 +1,51 @@
# Import Libraries
import asyncio
import asyncio
import logging, verboselogs
logger = verboselogs.VerboseLogger(__name__)
import json
# Import Functions
from config.definitions import settingsDict
from src.utils.loadScripts import *
from src.decluttarr import queueCleaner
from src.utils.rest import rest_get, rest_post
from src.utils.trackers import Defective_Tracker, Download_Sizes_Tracker
from src.utils.rest import rest_get, rest_post
from src.utils.trackers import Defective_Tracker, Download_Sizes_Tracker
# Hide SSL Verification Warnings
if settingsDict['SSL_VERIFICATION']==False:
if settingsDict["SSL_VERIFICATION"] == False:
import warnings
warnings.filterwarnings("ignore", message="Unverified HTTPS request")
# Set up logging
setLoggingFormat(settingsDict)
# Main function
async def main(settingsDict):
# Adds to settings Dict the instances that are actually configures
settingsDict['INSTANCES'] = []
for arrApplication in settingsDict['SUPPORTED_ARR_APPS']:
if settingsDict[arrApplication + '_URL']:
settingsDict['INSTANCES'].append(arrApplication)
# Adds to settings Dict the instances that are actually configures
settingsDict["INSTANCES"] = []
for arrApplication in settingsDict["SUPPORTED_ARR_APPS"]:
if settingsDict[arrApplication + "_URL"]:
settingsDict["INSTANCES"].append(arrApplication)
# Pre-populates the dictionaries (in classes) that track the items that were already caught as having problems or removed
defectiveTrackingInstances = {}
for instance in settingsDict['INSTANCES']:
defectiveTrackingInstances = {}
for instance in settingsDict["INSTANCES"]:
defectiveTrackingInstances[instance] = {}
defective_tracker = Defective_Tracker(defectiveTrackingInstances)
download_sizes_tracker = Download_Sizes_Tracker({})
# Get name of arr-instances
for instance in settingsDict['INSTANCES']:
for instance in settingsDict["INSTANCES"]:
settingsDict = await getArrInstanceName(settingsDict, instance)
# Check outdated
upgradeChecks(settingsDict)
# Welcome Message
showWelcome()
showWelcome()
# Current Settings
showSettings(settingsDict)
@@ -57,21 +61,29 @@ async def main(settingsDict):
# Start Cleaning
while True:
logger.verbose('-' * 50)
# Cache protected (via Tag) and private torrents
protectedDownloadIDs, privateDowloadIDs = await getProtectedAndPrivateFromQbit(settingsDict)
logger.verbose("-" * 50)
# Cache protected (via Tag) and private torrents
protectedDownloadIDs, privateDowloadIDs = await getProtectedAndPrivateFromQbit(
settingsDict
)
# Run script for each instance
for instance in settingsDict['INSTANCES']:
await queueCleaner(settingsDict, instance, defective_tracker, download_sizes_tracker, protectedDownloadIDs, privateDowloadIDs)
logger.verbose('')
logger.verbose('Queue clean-up complete!')
for instance in settingsDict["INSTANCES"]:
await queueCleaner(
settingsDict,
instance,
defective_tracker,
download_sizes_tracker,
protectedDownloadIDs,
privateDowloadIDs,
)
logger.verbose("")
logger.verbose("Queue clean-up complete!")
# Wait for the next run
await asyncio.sleep(settingsDict['REMOVE_TIMER']*60)
await asyncio.sleep(settingsDict["REMOVE_TIMER"] * 60)
return
if __name__ == '__main__':
if __name__ == "__main__":
asyncio.run(main(settingsDict))

View File

@@ -7,21 +7,29 @@ from src.utils.nest_functions import add_keys_nested_dict, nested_get
import sys, os, traceback
async def get_arr_records(BASE_URL, API_KEY, params={}, end_point=""):
# All records from a given endpoint
record_count = (await rest_get(f"{BASE_URL}/{end_point}", API_KEY, params))[
"totalRecords"
]
if record_count == 0:
return []
records = await rest_get(
f"{BASE_URL}/{end_point}",
API_KEY,
{"page": "1", "pageSize": record_count} | params,
)
return records["records"]
async def get_queue(BASE_URL, API_KEY, params={}):
# Retrieves the current queue
# Refreshes and retrieves the current queue
await rest_post(
url=BASE_URL + "/command",
json={"name": "RefreshMonitoredDownloads"},
headers={"X-Api-Key": API_KEY},
)
totalRecords = (await rest_get(f"{BASE_URL}/queue", API_KEY, params))[
"totalRecords"
]
if totalRecords == 0:
return None
queue = await rest_get(
f"{BASE_URL}/queue", API_KEY, {"page": "1", "pageSize": totalRecords} | params
)
queue = await get_arr_records(BASE_URL, API_KEY, params=params, end_point="queue")
queue = filterOutDelayedQueueItems(queue)
return queue
@@ -29,29 +37,26 @@ async def get_queue(BASE_URL, API_KEY, params={}):
def filterOutDelayedQueueItems(queue):
# Ignores delayed queue items
if queue is None:
return None
return queue
seen_combinations = set()
filtered_records = []
for record in queue["records"]:
filtered_queue = []
for queue_item in queue:
# Use get() method with default value "No indexer" if 'indexer' key does not exist
indexer = record.get("indexer", "No indexer")
protocol = record.get("protocol", "No protocol")
combination = (record["title"], protocol, indexer)
if record["status"] == "delay":
indexer = queue_item.get("indexer", "No indexer")
protocol = queue_item.get("protocol", "No protocol")
combination = (queue_item["title"], protocol, indexer)
if queue_item["status"] == "delay":
if combination not in seen_combinations:
seen_combinations.add(combination)
logger.debug(
">>> Delayed queue item ignored: %s (Protocol: %s, Indexer: %s)",
record["title"],
queue_item["title"],
protocol,
indexer,
)
else:
filtered_records.append(record)
if not filtered_records:
return None
queue["records"] = filtered_records
return queue
filtered_queue.append(queue_item)
return filtered_queue
def privateTrackerCheck(settingsDict, affectedItems, failType, privateDowloadIDs):
@@ -326,10 +331,10 @@ def formattedQueueInfo(queue):
if not queue:
return "empty"
formatted_list = []
for record in queue["records"]:
download_id = record["downloadId"]
title = record["title"]
item_id = record["id"]
for queue_item in queue:
download_id = queue_item["downloadId"]
title = queue_item["title"]
item_id = queue_item["id"]
# Check if there is an entry with the same download_id and title
existing_entry = next(
(item for item in formatted_list if item["downloadId"] == download_id),