add notifications for movies and tv module

This commit is contained in:
maxDorninger
2025-07-01 16:01:57 +02:00
parent c5da032506
commit 92e00f118c
4 changed files with 137 additions and 13 deletions

View File

@@ -10,6 +10,7 @@ from media_manager.metadataProvider.abstractMetaDataProvider import (
from media_manager.metadataProvider.schemas import MetaDataProviderSearchResult
from media_manager.tv.schemas import Episode, Season, Show, SeasonNumber, EpisodeNumber
from media_manager.movies.schemas import Movie
from media_manager.notification.manager import notification_manager
class TmdbConfig(BaseSettings):
@@ -29,29 +30,110 @@ class TmdbMetadataProvider(AbstractMetadataProvider):
self.url = config.TMDB_RELAY_URL
def __get_show_metadata(self, id: int) -> dict:
return requests.get(url=f"{self.url}/tv/shows/{id}").json()
try:
response = requests.get(url=f"{self.url}/tv/shows/{id}")
response.raise_for_status()
return response.json()
except requests.RequestException as e:
log.error(f"TMDB API error getting show metadata for ID {id}: {e}")
if notification_manager.is_configured():
notification_manager.send_notification(
title="TMDB API Error",
message=f"Failed to fetch show metadata for ID {id} from TMDB. Error: {str(e)}",
)
raise
def __get_season_metadata(self, show_id: int, season_number: int) -> dict:
return requests.get(url=f"{self.url}/tv/shows/{show_id}/{season_number}").json()
try:
response = requests.get(
url=f"{self.url}/tv/shows/{show_id}/{season_number}"
)
response.raise_for_status()
return response.json()
except requests.RequestException as e:
log.error(
f"TMDB API error getting season {season_number} metadata for show ID {show_id}: {e}"
)
if notification_manager.is_configured():
notification_manager.send_notification(
title="TMDB API Error",
message=f"Failed to fetch season {season_number} metadata for show ID {show_id} from TMDB. Error: {str(e)}",
)
raise
def __search_tv(self, query: str, page: int) -> dict:
return requests.get(
url=f"{self.url}/tv/search", params={"query": query, "page": page}
).json()
try:
response = requests.get(
url=f"{self.url}/tv/search", params={"query": query, "page": page}
)
response.raise_for_status()
return response.json()
except requests.RequestException as e:
log.error(f"TMDB API error searching TV shows with query '{query}': {e}")
if notification_manager.is_configured():
notification_manager.send_notification(
title="TMDB API Error",
message=f"Failed to search TV shows with query '{query}' on TMDB. Error: {str(e)}",
)
raise
def __get_trending_tv(self) -> dict:
return requests.get(url=f"{self.url}/tv/trending").json()
try:
response = requests.get(url=f"{self.url}/tv/trending")
response.raise_for_status()
return response.json()
except requests.RequestException as e:
log.error(f"TMDB API error getting trending TV: {e}")
if notification_manager.is_configured():
notification_manager.send_notification(
title="TMDB API Error",
message=f"Failed to fetch trending TV shows from TMDB. Error: {str(e)}",
)
raise
def __get_movie_metadata(self, id: int) -> dict:
return requests.get(url=f"{self.url}/movies/{id}").json()
try:
response = requests.get(url=f"{self.url}/movies/{id}")
response.raise_for_status()
return response.json()
except requests.RequestException as e:
log.error(f"TMDB API error getting movie metadata for ID {id}: {e}")
if notification_manager.is_configured():
notification_manager.send_notification(
title="TMDB API Error",
message=f"Failed to fetch movie metadata for ID {id} from TMDB. Error: {str(e)}",
)
raise
def __search_movie(self, query: str, page: int) -> dict:
return requests.get(
url=f"{self.url}/movies/search", params={"query": query, "page": page}
).json()
try:
response = requests.get(
url=f"{self.url}/movies/search", params={"query": query, "page": page}
)
response.raise_for_status()
return response.json()
except requests.RequestException as e:
log.error(f"TMDB API error searching movies with query '{query}': {e}")
if notification_manager.is_configured():
notification_manager.send_notification(
title="TMDB API Error",
message=f"Failed to search movies with query '{query}' on TMDB. Error: {str(e)}",
)
raise
def __get_trending_movies(self) -> dict:
return requests.get(url=f"{self.url}/movies/trending").json()
try:
response = requests.get(url=f"{self.url}/movies/trending")
response.raise_for_status()
return response.json()
except requests.RequestException as e:
log.error(f"TMDB API error getting trending movies: {e}")
if notification_manager.is_configured():
notification_manager.send_notification(
title="TMDB API Error",
message=f"Failed to fetch trending movies from TMDB. Error: {str(e)}",
)
raise
def download_show_poster_image(self, show: Show) -> bool:
show_metadata = self.__get_show_metadata(show.external_id)