Merge pull request #173 from maxdorninger/multithread-indexer-module

Speedup torrent searching and disable useless logs
This commit is contained in:
Maximilian Dorninger
2025-10-30 19:00:35 +01:00
committed by GitHub
9 changed files with 211 additions and 168 deletions

View File

@@ -1,6 +1,7 @@
# Indexers # Indexers
Indexer settings are configured in the `[indexers]` section of your `config.toml` file. MediaManager supports both Prowlarr and Jackett as indexer providers. Indexer settings are configured in the `[indexers]` section of your `config.toml` file. MediaManager supports both
Prowlarr and Jackett as indexer providers.
## Prowlarr (`[indexers.prowlarr]`) ## Prowlarr (`[indexers.prowlarr]`)
@@ -16,6 +17,12 @@ Base URL of your Prowlarr instance.
API key for Prowlarr. You can find this in Prowlarr's settings under General. API key for Prowlarr. You can find this in Prowlarr's settings under General.
- `reject_torrents_on_url_error`
Set to `true` to reject torrents if there is a URL error when fetching from Prowlarr. Until MediaManager v1.9.0 the
default behavior was `false`, but from v1.9.0 onwards the default is `true`. It's recommended to set this to `true` to
avoid adding possibly invalid torrents.
## Jackett (`[indexers.jackett]`) ## Jackett (`[indexers.jackett]`)
- `enabled` - `enabled`
@@ -40,14 +47,14 @@ Here's a complete example of the indexers section in your `config.toml`:
```toml ```toml
[indexers] [indexers]
[indexers.prowlarr] [indexers.prowlarr]
enabled = true enabled = true
url = "http://prowlarr:9696" url = "http://prowlarr:9696"
api_key = "your_prowlarr_api_key" api_key = "your_prowlarr_api_key"
[indexers.jackett] [indexers.jackett]
enabled = false enabled = false
url = "http://jackett:9117" url = "http://jackett:9117"
api_key = "your_jackett_api_key" api_key = "your_jackett_api_key"
indexers = ["1337x", "rarbg"] indexers = ["1337x", "rarbg"]
``` ```

View File

@@ -119,6 +119,7 @@ base_path = "/api"
enabled = false enabled = false
url = "http://localhost:9696" url = "http://localhost:9696"
api_key = "" api_key = ""
reject_torrents_on_url_error = true
# Jackett settings # Jackett settings
[indexers.jackett] [indexers.jackett]

View File

@@ -119,6 +119,7 @@ base_path = "/api"
enabled = false enabled = false
url = "http://localhost:9696" url = "http://localhost:9696"
api_key = "" api_key = ""
reject_torrents_on_url_error = true
# Jackett settings # Jackett settings
[indexers.jackett] [indexers.jackett]

View File

@@ -5,6 +5,7 @@ class ProwlarrConfig(BaseSettings):
enabled: bool = False enabled: bool = False
api_key: str = "" api_key: str = ""
url: str = "http://localhost:9696" url: str = "http://localhost:9696"
reject_torrents_on_url_error: bool = True
class JackettConfig(BaseSettings): class JackettConfig(BaseSettings):

View File

@@ -1,5 +1,7 @@
import concurrent
import logging import logging
import xml.etree.ElementTree as ET import xml.etree.ElementTree as ET
from concurrent.futures.thread import ThreadPoolExecutor
from xml.etree.ElementTree import Element from xml.etree.ElementTree import Element
import requests import requests
@@ -26,67 +28,84 @@ class Jackett(GenericIndexer):
# NOTE: this could be done in parallel, but if there aren't more than a dozen indexers, it shouldn't matter # NOTE: this could be done in parallel, but if there aren't more than a dozen indexers, it shouldn't matter
def search(self, query: str, is_tv: bool) -> list[IndexerQueryResult]: def search(self, query: str, is_tv: bool) -> list[IndexerQueryResult]:
log.debug("Searching for " + query)
futures = []
with ThreadPoolExecutor() as executor, requests.Session() as session:
for indexer in self.indexers:
future = executor.submit(
self.get_torrents_by_indexer, indexer, query, is_tv, session
)
futures.append(future)
responses = []
for future in concurrent.futures.as_completed(futures):
responses.extend(future.result())
return responses
def get_torrents_by_indexer(
self, indexer: str, query: str, is_tv: bool, session: requests.Session
) -> list[IndexerQueryResult]:
download_volume_factor = 1.0 # Default value download_volume_factor = 1.0 # Default value
upload_volume_factor = 1 # Default value upload_volume_factor = 1 # Default value
seeders = 0 # Default value seeders = 0 # Default value
log.debug("Searching for " + query)
responses = [] url = (
for indexer in self.indexers: self.url
log.debug(f"Searching in indexer: {indexer}") + f"/api/v2.0/indexers/{indexer}/results/torznab/api?apikey={self.api_key}&t={'tvsearch' if is_tv else 'movie'}&q={query}"
url = ( )
self.url response = session.get(url)
+ f"/api/v2.0/indexers/{indexer}/results/torznab/api?apikey={self.api_key}&t={'tvsearch' if is_tv else 'movie'}&q={query}"
if response.status_code != 200:
log.error(
f"Jacket error with indexer {indexer}, error: {response.status_code}"
) )
response = requests.get(url) return []
responses.append(response)
result_list: list[IndexerQueryResult] = []
xml_tree = ET.fromstring(response.content)
xmlns = { xmlns = {
"torznab": "http://torznab.com/schemas/2015/feed", "torznab": "http://torznab.com/schemas/2015/feed",
"atom": "http://www.w3.org/2005/Atom", "atom": "http://www.w3.org/2005/Atom",
} }
result_list: list[IndexerQueryResult] = [] for item in xml_tree.findall("channel/item"):
for response in responses: attributes: list[Element] = [x for x in item.findall("torznab:attr", xmlns)]
if response.status_code == 200: for attribute in attributes:
xml_tree = ET.fromstring(response.content) if attribute.attrib["name"] == "seeders":
for item in xml_tree.findall("channel/item"): seeders = int(attribute.attrib["value"])
attributes: list[Element] = [ if attribute.attrib["name"] == "downloadvolumefactor":
x for x in item.findall("torznab:attr", xmlns) download_volume_factor = float(attribute.attrib["value"])
] if attribute.attrib["name"] == "uploadvolumefactor":
for attribute in attributes: upload_volume_factor = int(attribute.attrib["value"])
if attribute.attrib["name"] == "seeders": flags = []
seeders = int(attribute.attrib["value"]) if download_volume_factor == 0:
if attribute.attrib["name"] == "downloadvolumefactor": flags.append("freeleech")
download_volume_factor = float(attribute.attrib["value"]) if download_volume_factor == 0.5:
if attribute.attrib["name"] == "uploadvolumefactor": flags.append("halfleech")
upload_volume_factor = int(attribute.attrib["value"]) if download_volume_factor == 0.75:
flags = [] flags.append("freeleech75")
if download_volume_factor == 0: if download_volume_factor == 0.25:
flags.append("freeleech") flags.append("freeleech25")
if download_volume_factor == 0.5: if upload_volume_factor == 2:
flags.append("halfleech") flags.append("doubleupload")
if download_volume_factor == 0.75:
flags.append("freeleech75")
if download_volume_factor == 0.25:
flags.append("freeleech25")
if upload_volume_factor == 2:
flags.append("doubleupload")
result = IndexerQueryResult( result = IndexerQueryResult(
title=item.find("title").text, title=item.find("title").text,
download_url=str(item.find("enclosure").attrib["url"]), download_url=str(item.find("enclosure").attrib["url"]),
seeders=seeders, seeders=seeders,
flags=flags, flags=flags,
size=int(item.find("size").text), size=int(item.find("size").text),
usenet=False, # always False, because Jackett doesn't support usenet usenet=False, # always False, because Jackett doesn't support usenet
age=0, # always 0 for torrents, as Jackett does not provide age information in a convenient format age=0, # always 0 for torrents, as Jackett does not provide age information in a convenient format
indexer=item.find("jackettindexer").text indexer=item.find("jackettindexer").text
if item.find("jackettindexer") is not None if item.find("jackettindexer") is not None
else None, else None,
) )
result_list.append(result) result_list.append(result)
log.debug(f"Raw result: {result.model_dump()}")
else: log.info(
log.error(f"Jacket Error: {response.status_code}") f"found {len(result_list)} results for query '{query}' from indexer '{indexer}'"
return [] )
return result_list return result_list

View File

@@ -1,6 +1,9 @@
import concurrent
import logging import logging
from concurrent.futures import ThreadPoolExecutor
import requests import requests
from requests.adapters import HTTPAdapter
from media_manager.indexer.indexers.generic import GenericIndexer from media_manager.indexer.indexers.generic import GenericIndexer
from media_manager.config import AllEncompassingConfig from media_manager.config import AllEncompassingConfig
@@ -22,6 +25,7 @@ class Prowlarr(GenericIndexer):
config = AllEncompassingConfig().indexers.prowlarr config = AllEncompassingConfig().indexers.prowlarr
self.api_key = config.api_key self.api_key = config.api_key
self.url = config.url self.url = config.url
self.reject_torrents_on_url_error = config.reject_torrents_on_url_error
log.debug("Registering Prowlarr as Indexer") log.debug("Registering Prowlarr as Indexer")
def search(self, query: str, is_tv: bool) -> list[IndexerQueryResult]: def search(self, query: str, is_tv: bool) -> list[IndexerQueryResult]:
@@ -34,70 +38,89 @@ class Prowlarr(GenericIndexer):
"categories": "5000" if is_tv else "2000", # TV: 5000, Movies: 2000 "categories": "5000" if is_tv else "2000", # TV: 5000, Movies: 2000
"limit": 10000, "limit": 10000,
} }
with requests.Session() as session:
adapter = HTTPAdapter(pool_connections=100, pool_maxsize=100)
session.mount("http://", adapter)
session.mount("https://", adapter)
response = requests.get(url, params=params) response = session.get(url, params=params)
if response.status_code == 200: log.debug(f"Prowlarr response time for query '{query}': {response.elapsed}")
if response.status_code != 200:
log.error(f"Prowlarr Error: {response.status_code}")
return []
futures = []
result_list: list[IndexerQueryResult] = [] result_list: list[IndexerQueryResult] = []
for result in response.json():
if result["protocol"] == "torrent":
initial_url = None
if "downloadUrl" in result:
log.info(f"Using download URL: {result['downloadUrl']}")
initial_url = result["downloadUrl"]
elif "magnetUrl" in result:
log.info(
f"Using magnet URL as fallback for download URL: {result['magnetUrl']}"
)
initial_url = result["magnetUrl"]
elif "guid" in result:
log.warning(
f"Using guid as fallback for download URL: {result['guid']}"
)
initial_url = result["guid"]
else:
log.error(f"No valid download URL found for result: {result}")
continue
if not initial_url.startswith("magnet:"): with ThreadPoolExecutor() as executor:
try: for item in response.json():
final_download_url = follow_redirects_to_final_torrent_url( future = executor.submit(self.process_result, item, session)
initial_url=initial_url futures.append(future)
)
except RuntimeError as e: for future in concurrent.futures.as_completed(futures):
log.error( result = future.result()
f"Failed to follow redirects for {initial_url}, falling back to the initial url as download url, error: {e}" if result is not None:
) result_list.append(result)
final_download_url = initial_url
else:
final_download_url = initial_url
result_list.append(
IndexerQueryResult(
download_url=final_download_url,
title=result["sortTitle"],
seeders=result["seeders"],
flags=result["indexerFlags"],
size=result["size"],
usenet=False,
age=0, # Torrent results do not need age information
indexer=result["indexer"] if "indexer" in result else None,
)
)
else:
result_list.append(
IndexerQueryResult(
download_url=result["downloadUrl"],
title=result["sortTitle"],
seeders=0, # Usenet results do not have seeders
flags=result["indexerFlags"],
size=result["size"],
usenet=True,
age=int(result["ageMinutes"]) * 60,
indexer=result["indexer"] if "indexer" in result else None,
)
)
log.debug("torrent result: " + result.__str__())
return result_list return result_list
def process_result(
self, result, session: requests.Session
) -> IndexerQueryResult | None:
if result["protocol"] == "torrent":
initial_url = None
if "downloadUrl" in result:
log.info(f"Using download URL: {result['downloadUrl']}")
initial_url = result["downloadUrl"]
elif "magnetUrl" in result:
log.info(
f"Using magnet URL as fallback for download URL: {result['magnetUrl']}"
)
initial_url = result["magnetUrl"]
elif "guid" in result:
log.warning(
f"Using guid as fallback for download URL: {result['guid']}"
)
initial_url = result["guid"]
else:
log.error(f"No valid download URL found for result: {result}")
return None
if not initial_url.startswith("magnet:"):
try:
final_download_url = follow_redirects_to_final_torrent_url(
initial_url=initial_url,
session=session,
)
except RuntimeError as e:
log.debug(
f"Failed to follow redirects for {initial_url}, falling back to the initial url as download url, error: {e}"
)
if self.reject_torrents_on_url_error:
return None
else:
final_download_url = initial_url
else:
final_download_url = initial_url
return IndexerQueryResult(
download_url=final_download_url,
title=result["sortTitle"],
seeders=result["seeders"],
flags=result["indexerFlags"],
size=result["size"],
usenet=False,
age=0, # Torrent results do not need age information
indexer=result["indexer"] if "indexer" in result else None,
)
else: else:
log.error(f"Prowlarr Error: {response.status_code}") return IndexerQueryResult(
return [] download_url=result["downloadUrl"],
title=result["sortTitle"],
seeders=0, # Usenet results do not have seeders
flags=result["indexerFlags"],
size=result["size"],
usenet=True,
age=int(result["ageMinutes"]) * 60,
indexer=result["indexer"] if "indexer" in result else None,
)

View File

@@ -1,4 +1,5 @@
import logging import logging
from urllib.parse import urljoin
import requests import requests
@@ -111,57 +112,41 @@ def evaluate_indexer_query_results(
return query_results return query_results
def follow_redirects_to_final_torrent_url(initial_url: str) -> str | None: def follow_redirects_to_final_torrent_url(
initial_url: str, session: requests.Session, timeout: float = 10
) -> str:
""" """
Follows redirects to get the final torrent URL. Follows redirects to get the final torrent URL.
:param initial_url: The initial URL to follow. :param initial_url: The initial URL to follow.
:return: The final torrent URL or None if it fails. :param session: A requests session to use for the requests.
:param timeout: Timeout in seconds for each redirect request.
:return: The final torrent URL.
:raises: RuntimeError if it fails.
""" """
current_url = initial_url current_url = initial_url
final_url = None
try: try:
while True: for _ in range(10): # Limit redirects to prevent infinite loops
response = requests.get(current_url, allow_redirects=False) response = session.get(current_url, allow_redirects=False, timeout=timeout)
if 300 <= response.status_code < 400: if 300 <= response.status_code < 400:
redirect_url = response.headers.get("Location") redirect_url = response.headers.get("Location")
if redirect_url.startswith("http://") or redirect_url.startswith( if not redirect_url:
"https://" raise RuntimeError("Redirect response without Location header")
):
# It's an HTTP/HTTPS redirect, continue following
current_url = redirect_url
log.info(f"Following HTTP/HTTPS redirect to: {current_url}")
elif redirect_url.startswith("magnet:"):
# It's a Magnet URL, this is our final destination
final_url = redirect_url
log.info(f"Reached Magnet URL: {final_url}")
break
else:
log.error(
f"Reached unexpected non-HTTP/HTTPS/magnet URL: {redirect_url}"
)
raise RuntimeError(
f"Reached unexpected non-HTTP/HTTPS/magnet URL: {redirect_url}"
)
else:
# Not a redirect, so the current URL is the final one
final_url = current_url
log.info(f"Reached final (non-redirect) URL: {final_url}")
break
except requests.exceptions.RequestException as e:
log.error(f"An error occurred during the request: {e}")
raise RuntimeError(f"An error occurred during the request: {e}")
if not final_url:
log.error("Final URL could not be determined.")
raise RuntimeError("Final URL could not be determined.")
if final_url.startswith("http://") or final_url.startswith("https://"):
log.info("Final URL protocol: HTTP/HTTPS")
elif final_url.startswith("magnet:"):
log.info("Final URL protocol: Magnet")
else:
log.error(f"Final URL is not a valid HTTP/HTTPS or Magnet URL: {final_url}")
raise RuntimeError(
f"Final URL is not a valid HTTP/HTTPS or Magnet URL: {final_url}"
)
return final_url # Resolve relative redirects against the last URL
current_url = urljoin(current_url, redirect_url)
log.debug(f"Following redirect to: {current_url}")
if current_url.startswith("magnet:"):
return current_url
else:
response.raise_for_status() # Raise an exception for bad status codes
return current_url
else:
raise RuntimeError("Exceeded maximum number of redirects")
except requests.exceptions.RequestException as e:
log.debug(f"An error occurred during the request for {initial_url}: {e}")
raise RuntimeError(f"An error occurred during the request: {e}") from e
return current_url

View File

@@ -44,6 +44,13 @@ logging.basicConfig(
format="%(asctime)s - %(levelname)s - %(name)s - %(funcName)s(): %(message)s", format="%(asctime)s - %(levelname)s - %(name)s - %(funcName)s(): %(message)s",
stream=sys.stdout, stream=sys.stdout,
) )
logging.getLogger("urllib3").setLevel(logging.WARNING)
logging.getLogger("requests").setLevel(logging.WARNING)
logging.getLogger("transmission_rpc").setLevel(logging.WARNING)
logging.getLogger("qbittorrentapi").setLevel(logging.WARNING)
logging.getLogger("sabnzbd_api").setLevel(logging.WARNING)
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
from psycopg.errors import UniqueViolation # noqa: E402 from psycopg.errors import UniqueViolation # noqa: E402

View File

@@ -36,7 +36,6 @@ class TransmissionDownloadClient(AbstractDownloadClient):
password=self.config.password, password=self.config.password,
protocol="https" if self.config.https_enabled else "http", protocol="https" if self.config.https_enabled else "http",
path=self.config.path, path=self.config.path,
logger=log,
) )
# Test connection # Test connection
self._client.session_stats() self._client.session_stats()