refactor indexers: add TorznabMixin for improved search result processing and use Torznab Prowlarr endpoint

This commit is contained in:
maxid
2025-12-28 23:24:53 +01:00
parent 0da3e53bcb
commit 733c7f78de
3 changed files with 190 additions and 236 deletions

View File

@@ -1,12 +1,11 @@
import concurrent
import logging
import xml.etree.ElementTree as ET
from concurrent.futures.thread import ThreadPoolExecutor
from xml.etree.ElementTree import Element
import requests
from media_manager.indexer.indexers.generic import GenericIndexer
from media_manager.indexer.indexers.torznab_mixin import TorznabMixin
from media_manager.indexer.schemas import IndexerQueryResult
from media_manager.config import AllEncompassingConfig
from media_manager.movies.schemas import Movie
@@ -15,8 +14,8 @@ from media_manager.tv.schemas import Show
log = logging.getLogger(__name__)
class Jackett(GenericIndexer):
def __init__(self, **kwargs):
class Jackett(GenericIndexer, TorznabMixin):
def __init__(self):
"""
A subclass of GenericIndexer for interacting with the Jacket API.
@@ -54,10 +53,6 @@ class Jackett(GenericIndexer):
def get_torrents_by_indexer(
self, indexer: str, query: str, is_tv: bool, session: requests.Session
) -> list[IndexerQueryResult]:
download_volume_factor = 1.0 # Default value
upload_volume_factor = 1 # Default value
seeders = 0 # Default value
url = (
self.url
+ f"/api/v2.0/indexers/{indexer}/results/torznab/api?apikey={self.api_key}&t={'tvsearch' if is_tv else 'movie'}&q={query}"
@@ -70,61 +65,15 @@ class Jackett(GenericIndexer):
)
return []
result_list: list[IndexerQueryResult] = []
xml_tree = ET.fromstring(response.content)
xmlns = {
"torznab": "http://torznab.com/schemas/2015/feed",
"atom": "http://www.w3.org/2005/Atom",
}
for item in xml_tree.findall("channel/item"):
try:
attributes: list[Element] = [
x for x in item.findall("torznab:attr", xmlns)
]
for attribute in attributes:
if attribute.attrib["name"] == "seeders":
seeders = int(attribute.attrib["value"])
if attribute.attrib["name"] == "downloadvolumefactor":
download_volume_factor = float(attribute.attrib["value"])
if attribute.attrib["name"] == "uploadvolumefactor":
upload_volume_factor = int(attribute.attrib["value"])
flags = []
if download_volume_factor == 0:
flags.append("freeleech")
if download_volume_factor == 0.5:
flags.append("halfleech")
if download_volume_factor == 0.75:
flags.append("freeleech75")
if download_volume_factor == 0.25:
flags.append("freeleech25")
if upload_volume_factor == 2:
flags.append("doubleupload")
results = self.process_search_result(response.content)
result = IndexerQueryResult(
title=item.find("title").text,
download_url=str(item.find("enclosure").attrib["url"]),
seeders=seeders,
flags=flags,
size=int(item.find("size").text),
usenet=False, # always False, because Jackett doesn't support usenet
age=0, # always 0 for torrents, as Jackett does not provide age information in a convenient format
indexer=item.find("jackettindexer").text
if item.find("jackettindexer") is not None
else None,
)
result_list.append(result)
except Exception as e:
log.error(
f"1 Jackett search result errored with indexer {indexer}, error: {e}"
)
log.info(f"Indexer {indexer.name} returned {len(results)} results")
return results
log.info(
f"found {len(result_list)} results for query '{query}' from indexer '{indexer}'"
)
return result_list
def search_season(self, query: str, show: Show, season_number: int) -> list[IndexerQueryResult]:
def search_season(
self, query: str, show: Show, season_number: int
) -> list[IndexerQueryResult]:
pass
def search_movie(self, query: str, movie: Movie) -> list[IndexerQueryResult]:
pass
pass

View File

@@ -1,16 +1,12 @@
import concurrent
import logging
from concurrent.futures import ThreadPoolExecutor
from contextlib import contextmanager
from dataclasses import dataclass
from requests import Session
import prowlarr
from media_manager.indexer.indexers.generic import GenericIndexer
from media_manager.config import AllEncompassingConfig
from media_manager.indexer.indexers.torznab_mixin import TorznabMixin
from media_manager.indexer.schemas import IndexerQueryResult
from media_manager.indexer.utils import follow_redirects_to_final_torrent_url
from media_manager.movies.schemas import Movie
from media_manager.tv.schemas import Show
@@ -22,190 +18,122 @@ class IndexerInfo:
id: int
name: str
supports_tv_search: bool
supports_tv_search_tmdb: bool
supports_tv_search_imdb: bool
supports_tv_search_tvdb: bool
supports_tv_search_season: bool
supports_movie_search: bool
supports_movie_search_tmdb: bool
supports_movie_search_imdb: bool
supports_movie_search_tvdb: bool
class Prowlarr(GenericIndexer):
def __init__(self, **kwargs):
class Prowlarr(GenericIndexer, TorznabMixin):
def __init__(self):
"""
A subclass of GenericIndexer for interacting with the Prowlarr API.
:param api_key: The API key for authenticating requests to Prowlarr.
:param kwargs: Additional keyword arguments to pass to the superclass constructor.
"""
super().__init__(name="prowlarr")
config = AllEncompassingConfig().indexers.prowlarr
configuration = prowlarr.Configuration(host=config.url, retries=3)
configuration.api_key["X-Api-Key"] = config.api_key
self.config = configuration
self.reject_torrents_on_url_error = config.reject_torrents_on_url_error
self.timeout_seconds = config.timeout_seconds
self.follow_redirects = config.follow_redirects
self.config = AllEncompassingConfig().indexers.prowlarr
@contextmanager
def __get_api(self):
with prowlarr.ApiClient(self.config) as api_instance:
yield api_instance
def _call_prowlarr_api(self, path: str, parameters: dict = None):
url = f"{self.config.url}/api/v1{path}"
headers = {"X-Api-Key": self.config.api_key}
with Session() as session:
return session.get(
url=url,
params=parameters,
timeout=self.config.timeout_seconds,
headers=headers,
)
def __get_indexers(self) -> list[IndexerInfo]:
with self.__get_api() as client:
api = prowlarr.IndexerApi(client)
indexers = api.list_indexer()
indexer_info_list: list[IndexerInfo] = []
for indexer in indexers:
tv_search_params = (
indexer.capabilities.tv_search_params
if indexer.capabilities.tv_search_params
else []
)
movie_search_params = (
indexer.capabilities.movie_search_params
if indexer.capabilities.movie_search_params
else []
)
def _newznab_search(
self, indexer: IndexerInfo, parameters: dict = None
) -> list[IndexerQueryResult]:
if parameters is None:
parameters = {}
indexer_info = IndexerInfo(
id=indexer.id,
name=indexer.name,
supports_tv_search_tmdb="tmdbId" in tv_search_params,
supports_tv_search_imdb="imdbId" in tv_search_params,
supports_tv_search_tvdb="tvdbId" in tv_search_params,
supports_tv_search_season="season" in tv_search_params,
supports_movie_search_tmdb="tmdbId" in movie_search_params,
supports_movie_search_imdb="imdbId" in movie_search_params,
supports_movie_search_tvdb="tvdbId" in movie_search_params,
)
indexer_info_list.append(indexer_info)
return indexer_info_list
parameters["limit"] = 10000
results = self._call_prowlarr_api(
path=f"/indexer/{indexer.id}/newznab", parameters=parameters
)
results = self.process_search_result(xml=results.content)
log.info(
f"Indexer {indexer.name} returned {len(results)} results for search: {parameters}"
)
return results
def _get_indexers(self) -> list[IndexerInfo]:
indexers = self._call_prowlarr_api(path="/indexer")
indexers = indexers.json()
indexer_info_list: list[IndexerInfo] = []
for indexer in indexers:
supports_tv_search = False
supports_movie_search = False
tv_search_params = []
movie_search_params = []
if not indexer["capabilities"].get("tvSearchParams"):
supports_tv_search = False
else:
supports_tv_search = True
tv_search_params = indexer["capabilities"]["tvSearchParams"]
if not indexer["capabilities"].get("movieSearchParams"):
supports_movie_search = False
else:
supports_movie_search = True
movie_search_params = indexer["capabilities"]["movieSearchParams"]
indexer_info = IndexerInfo(
id=indexer["id"],
name=indexer.get("name", "unknown"),
supports_tv_search=supports_tv_search,
supports_tv_search_tmdb="tmdbId" in tv_search_params,
supports_tv_search_imdb="imdbId" in tv_search_params,
supports_tv_search_tvdb="tvdbId" in tv_search_params,
supports_tv_search_season="season" in tv_search_params,
supports_movie_search=supports_movie_search,
supports_movie_search_tmdb="tmdbId" in movie_search_params,
supports_movie_search_imdb="imdbId" in movie_search_params,
supports_movie_search_tvdb="tvdbId" in movie_search_params,
)
indexer_info_list.append(indexer_info)
return indexer_info_list
def _get_tv_indexers(self) -> list[IndexerInfo]:
return [x for x in self._get_indexers() if x.supports_tv_search]
def _get_movie_indexers(self) -> list[IndexerInfo]:
return [x for x in self._get_indexers() if x.supports_movie_search]
def search(self, query: str, is_tv: bool) -> list[IndexerQueryResult]:
log.info(f"Searching for: {query}")
processed_results: list[IndexerQueryResult] = []
raw_results = None
with self.__get_api() as api:
search_api = prowlarr.SearchApi(api.api_client)
params = {
"q": query,
"t": "tvsearch" if is_tv else "movie",
}
raw_results = []
indexers = self._get_tv_indexers() if is_tv else self._get_movie_indexers()
try:
raw_results = search_api.list_search(
query=query, categories=[5000] if is_tv else [2000], limit=10000
)
except Exception as e:
log.error(f"Prowlarr search error: {e}")
raise RuntimeError(f"Prowlarr search error: {e}") from e
for indexer in indexers:
raw_results.extend(self._newznab_search(parameters=params, indexer=indexer))
for result in raw_results:
try:
processed_result = self.__process_result(result=result)
if processed_result:
processed_results.append(processed_result)
except Exception as e:
log.error(f"Failed to process result {result}: {e}")
return processed_results
def __process_result(self, result) -> IndexerQueryResult | None:
# process usenet search result
if result["protocol"] != "torrent":
return IndexerQueryResult(
download_url=result["downloadUrl"],
title=result["sortTitle"],
seeders=0, # Usenet results do not have seeders
flags=result["indexerFlags"] if "indexerFlags" in result else [],
size=result["size"],
usenet=True,
age=int(result["ageMinutes"]) * 60,
indexer=result["indexer"] if "indexer" in result else None,
)
# process torrent search result
initial_url = None
if "downloadUrl" in result:
initial_url = result["downloadUrl"]
elif "magnetUrl" in result:
initial_url = result["magnetUrl"]
elif "guid" in result:
initial_url = result["guid"]
else:
log.debug(f"No valid download URL found for result: {result}")
raise RuntimeError("No valid download URL found in torrent search result")
if not initial_url.startswith("magnet:") and self.follow_redirects:
try:
final_download_url = follow_redirects_to_final_torrent_url(
initial_url=initial_url,
session=Session(),
timeout=self.timeout_seconds,
)
except RuntimeError as e:
log.warning(
f"Failed to follow redirects for {initial_url}, falling back to the initial url as download url, error: {e}"
)
if self.reject_torrents_on_url_error:
return None
else:
final_download_url = initial_url
else:
final_download_url = initial_url
return IndexerQueryResult(
download_url=final_download_url,
title=result["sortTitle"],
seeders=result["seeders"] if "seeders" in result else 0,
flags=result["indexerFlags"] if "indexerFlags" in result else [],
size=result["size"],
usenet=False,
age=0, # Torrent results do not need age information
indexer=result["indexer"] if "indexer" in result else None,
)
def __process_results(self, results: list) -> list[IndexerQueryResult]:
processed_results: list[IndexerQueryResult] = []
for result in results:
try:
processed_result = self.__process_result(result=result)
if processed_result:
processed_results.append(processed_result)
except Exception as e:
log.error(f"Failed to process result {result}: {e}")
return processed_results
def __get_newznab_api(self, searches: list) -> list:
results = []
with self.__get_api() as api_client:
api = prowlarr.NewznabApi(api_client)
futures = []
with ThreadPoolExecutor() as executor:
for search in searches:
future = executor.submit(api.get_indexer_newznab, **search)
futures.append(future)
for future in concurrent.futures.as_completed(futures):
try:
result = future.result()
if result is not None:
results.extend(result)
except Exception as e:
log.error(f"Querying one indexer failed because: {e}")
return results
return raw_results
def search_season(
self, query: str, show: Show, season_number: int
) -> list[IndexerQueryResult]:
indexers = self.__get_indexers()
indexers = self._get_tv_indexers()
raw_results = []
searches = []
for indexer in indexers:
log.debug("Preparing search for indexer: " + indexer.name)
search_params = {
"id": indexer.id,
"cat": "5000",
"limit": 10000,
"q": query,
"t": "tvsearch",
}
@@ -219,23 +147,22 @@ class Prowlarr(GenericIndexer):
if indexer.supports_tv_search_season:
search_params["season"] = season_number
searches.append(search_params)
raw_results.extend(
self._newznab_search(parameters=search_params, indexer=indexer)
)
raw_results = self.__get_newznab_api(searches=searches)
search_results = self.__process_results(results=raw_results)
return search_results
return raw_results
def search_movie(self, query: str, movie: Movie) -> list[IndexerQueryResult]:
indexers = self.__get_indexers()
indexers = self._get_movie_indexers()
raw_results = []
searches = []
for indexer in indexers:
log.debug("Preparing search for indexer: " + indexer.name)
search_params = {
"id": indexer.id,
"cat": "2000",
"limit": 10000,
"q": query,
"t": "movie",
}
@@ -247,10 +174,8 @@ class Prowlarr(GenericIndexer):
if indexer.supports_movie_search_imdb:
search_params["imdbid"] = movie.imdb_id
searches.append(search_params)
raw_results.extend(
self._newznab_search(parameters=search_params, indexer=indexer)
)
raw_results = self.__get_newznab_api(searches=searches)
search_results = self.__process_results(results=raw_results)
return search_results
return raw_results

View File

@@ -0,0 +1,80 @@
import logging
from media_manager.indexer.schemas import IndexerQueryResult
import xml.etree.ElementTree as ET
from xml.etree.ElementTree import Element
from email.utils import parsedate_to_datetime
from datetime import datetime, timezone
log = logging.getLogger(__name__)
class TorznabMixin:
def process_search_result(self, xml: str) -> list[IndexerQueryResult]:
result_list: list[IndexerQueryResult] = []
xml_tree = ET.fromstring(xml)
xmlns = {
"torznab": "http://torznab.com/schemas/2015/feed",
"atom": "http://www.w3.org/2005/Atom",
}
for item in xml_tree.findall("channel/item"):
try:
flags = []
seeders = 0
age = 0
indexer_name = "unknown"
if item.find("jackettindexer") is not None:
indexer_name = item.find("jackettindexer").text
if item.find("prowlarrindexer") is not None:
indexer_name = item.find("prowlarrindexer").text
is_usenet = (
item.find("enclosure").attrib["type"] != "application/x-bittorrent"
)
attributes: list[Element] = [
x for x in item.findall("torznab:attr", xmlns)
]
for attribute in attributes:
if is_usenet:
if attribute.attrib["name"] == "usenetdate":
posted_date = parsedate_to_datetime(
attribute.attrib["value"]
)
now = datetime.now(timezone.utc)
age = int((now - posted_date).total_seconds())
else:
if attribute.attrib["name"] == "seeders":
seeders = int(attribute.attrib["value"])
if attribute.attrib["name"] == "downloadvolumefactor":
download_volume_factor = float(attribute.attrib["value"])
if download_volume_factor == 0:
flags.append("freeleech")
if download_volume_factor == 0.5:
flags.append("halfleech")
if download_volume_factor == 0.75:
flags.append("freeleech75")
if download_volume_factor == 0.25:
flags.append("freeleech25")
if attribute.attrib["name"] == "uploadvolumefactor":
upload_volume_factor = int(attribute.attrib["value"])
if upload_volume_factor == 2:
flags.append("doubleupload")
result = IndexerQueryResult(
title=item.find("title").text,
download_url=str(item.find("enclosure").attrib["url"]),
seeders=seeders,
flags=flags,
size=int(item.find("size").text),
usenet=is_usenet,
age=age,
indexer=indexer_name,
)
result_list.append(result)
except Exception as e:
log.error(f"1 Torznab search result errored with error: {e}")
return result_list