import pprint import re import shutil from pathlib import Path from typing import overload from sqlalchemy.exc import IntegrityError from media_manager.config import MediaManagerConfig from media_manager.exceptions import InvalidConfigError, NotFoundError, RenameError from media_manager.indexer.schemas import IndexerQueryResult, IndexerQueryResultId from media_manager.indexer.service import IndexerService from media_manager.indexer.utils import evaluate_indexer_query_results from media_manager.metadataProvider.abstract_metadata_provider import ( AbstractMetadataProvider, ) from media_manager.metadataProvider.schemas import MetaDataProviderSearchResult from media_manager.metadataProvider.tmdb import TmdbMetadataProvider from media_manager.metadataProvider.tvdb import TvdbMetadataProvider from media_manager.notification.service import NotificationService from media_manager.schemas import MediaImportSuggestion from media_manager.torrent.schemas import ( Quality, Torrent, TorrentStatus, ) from media_manager.torrent.service import TorrentService from media_manager.torrent.utils import ( extract_external_id_from_string, get_files_for_import, get_importable_media_directories, import_file, remove_special_characters, remove_special_chars_and_parentheses, ) from media_manager.tv import log from media_manager.tv.repository import TvRepository from media_manager.tv.schemas import ( Episode, EpisodeFile, EpisodeId, EpisodeNumber, PublicEpisodeFile, PublicSeason, PublicShow, RichSeasonTorrent, RichShowTorrent, Season, SeasonId, Show, ShowId, ) class TvService: def __init__( self, tv_repository: TvRepository, torrent_service: TorrentService, indexer_service: IndexerService, notification_service: NotificationService, ) -> None: self.tv_repository = tv_repository self.torrent_service = torrent_service self.indexer_service = indexer_service self.notification_service = notification_service def add_show( self, external_id: int, metadata_provider: AbstractMetadataProvider, language: str | None = None, ) -> Show: """ Add a new show to the database. :param external_id: The ID of the show in the metadata provider\'s system. :param metadata_provider: The name of the metadata provider. :param language: Optional language code (ISO 639-1) to fetch metadata in. """ show_with_metadata = metadata_provider.get_show_metadata( show_id=external_id, language=language ) saved_show = self.tv_repository.save_show(show=show_with_metadata) metadata_provider.download_show_poster_image(show=saved_show) return saved_show def get_total_downloaded_episoded_count(self) -> int: """ Get total number of downloaded episodes. """ return self.tv_repository.get_total_downloaded_episodes_count() def set_show_library(self, show: Show, library: str) -> None: self.tv_repository.set_show_library(show_id=show.id, library=library) def delete_show( self, show: Show, delete_files_on_disk: bool = False, delete_torrents: bool = False, ) -> None: """ Delete a show from the database, optionally deleting files and torrents. :param show: The show to delete. :param delete_files_on_disk: Whether to delete the show's files from disk. :param delete_torrents: Whether to delete associated torrents from the torrent client. """ if delete_files_on_disk or delete_torrents: log.debug(f"Deleting ID: {show.id} - Name: {show.name}") if delete_files_on_disk: show_dir = self.get_root_show_directory(show=show) log.debug(f"Attempt to delete show directory: {show_dir}") if show_dir.exists() and show_dir.is_dir(): shutil.rmtree(show_dir) log.info(f"Deleted show directory: {show_dir}") if delete_torrents: torrents = self.tv_repository.get_torrents_by_show_id(show_id=show.id) for torrent in torrents: try: self.torrent_service.cancel_download(torrent, delete_files=True) self.torrent_service.delete_torrent(torrent_id=torrent.id) log.info(f"Deleted torrent: {torrent.hash}") except Exception: log.warning( f"Failed to delete torrent {torrent.hash}", exc_info=True ) self.tv_repository.delete_show(show_id=show.id) def get_public_episode_files_by_season_id( self, season: Season ) -> list[PublicEpisodeFile]: """ Get all public episode files for a given season. :param season: The season object. :return: A list of public episode files. """ episode_files = self.tv_repository.get_episode_files_by_season_id( season_id=season.id ) public_episode_files = [ PublicEpisodeFile.model_validate(x) for x in episode_files ] result = [] for episode_file in public_episode_files: if self.episode_file_exists_on_file(episode_file=episode_file): episode_file.downloaded = True result.append(episode_file) return result @overload def check_if_show_exists(self, *, external_id: int, metadata_provider: str) -> bool: """ Check if a show exists in the database. :param external_id: The external ID of the show. :param metadata_provider: The metadata provider. :return: True if the show exists, False otherwise. """ @overload def check_if_show_exists(self, *, show_id: ShowId) -> bool: """ Check if a show exists in the database. :param show_id: The ID of the show. :return: True if the show exists, False otherwise. """ def check_if_show_exists( self, *, external_id=None, metadata_provider=None, show_id=None ) -> bool: if not (external_id is None or metadata_provider is None): try: self.tv_repository.get_show_by_external_id( external_id=external_id, metadata_provider=metadata_provider ) except NotFoundError: return False elif show_id is not None: try: self.tv_repository.get_show_by_id(show_id=show_id) except NotFoundError: return False else: msg = "Use one of the provided overloads for this function!" raise ValueError(msg) return True def get_all_available_torrents_for_a_season( self, season_number: int, show_id: ShowId, search_query_override: str | None = None, ) -> list[IndexerQueryResult]: """ Get all available torrents for a given season. :param season_number: The number of the season. :param show_id: The ID of the show. :param search_query_override: Optional override for the search query. :return: A list of indexer query results. """ if search_query_override: return self.indexer_service.search(query=search_query_override, is_tv=True) show = self.tv_repository.get_show_by_id(show_id=show_id) torrents = self.indexer_service.search_season( show=show, season_number=season_number ) results = [torrent for torrent in torrents if season_number in torrent.season] return evaluate_indexer_query_results( is_tv=True, query_results=results, media=show ) def get_all_shows(self) -> list[Show]: """ Get all shows. :return: A list of all shows. """ return self.tv_repository.get_shows() def search_for_show( self, query: str, metadata_provider: AbstractMetadataProvider ) -> list[MetaDataProviderSearchResult]: """ Search for shows using a given query. :param query: The search query. :param metadata_provider: The metadata provider to search. :return: A list of metadata provider show search results. """ results = metadata_provider.search_show(query) for result in results: if self.check_if_show_exists( external_id=result.external_id, metadata_provider=metadata_provider.name ): result.added = True try: show = self.tv_repository.get_show_by_external_id( external_id=result.external_id, metadata_provider=metadata_provider.name, ) result.id = show.id except Exception: log.error( f"Unable to find internal show ID for {result.external_id} on {metadata_provider.name}" ) return results def get_popular_shows( self, metadata_provider: AbstractMetadataProvider ) -> list[MetaDataProviderSearchResult]: """ Get popular shows from a given metadata provider. :param metadata_provider: The metadata provider to use. :return: A list of metadata provider show search results. """ results = metadata_provider.search_show() return [ result for result in results if not self.check_if_show_exists( external_id=result.external_id, metadata_provider=metadata_provider.name ) ] def get_public_show_by_id(self, show: Show) -> PublicShow: """ Get a public show from a Show object. :param show: The show object. :return: A public show. """ public_show = PublicShow.model_validate(show) public_seasons: list[PublicSeason] = [] for season in show.seasons: public_season = PublicSeason.model_validate(season) for episode in public_season.episodes: episode.downloaded = self.is_episode_downloaded( episode=episode, season=season, show=show, ) # A season is considered downloaded if it has episodes and all of them are downloaded, # matching the behavior of is_season_downloaded. public_season.downloaded = bool(public_season.episodes) and all( episode.downloaded for episode in public_season.episodes ) public_seasons.append(public_season) public_show.seasons = public_seasons return public_show def get_show_by_id(self, show_id: ShowId) -> Show: """ Get a show by its ID. :param show_id: The ID of the show. :return: The show. """ return self.tv_repository.get_show_by_id(show_id=show_id) def is_season_downloaded(self, season: Season, show: Show) -> bool: """ Check if a season is downloaded. :param season: The season object. :param show: The show object. :return: True if the season is downloaded, False otherwise. """ episodes = season.episodes if not episodes: return False for episode in episodes: if not self.is_episode_downloaded( episode=episode, season=season, show=show ): return False return True def is_episode_downloaded( self, episode: Episode, season: Season, show: Show ) -> bool: """ Check if an episode is downloaded and imported (file exists on disk). An episode is considered downloaded if: - There is at least one EpisodeFile in the database AND - A matching episode file exists in the season directory on disk. :param episode: The episode object. :param season: The season object. :param show: The show object. :return: True if the episode is downloaded and imported, False otherwise. """ episode_files = self.tv_repository.get_episode_files_by_episode_id( episode_id=episode.id ) if not episode_files: return False season_dir = self.get_root_season_directory(show, season.number) if not season_dir.exists(): return False episode_token = f"S{season.number:02d}E{episode.number:02d}" video_extensions = {".mkv", ".mp4", ".avi", ".mov"} try: for file in season_dir.iterdir(): if ( file.is_file() and episode_token.lower() in file.name.lower() and file.suffix.lower() in video_extensions ): return True except OSError as e: log.error( f"Disk check failed for episode {episode.id} in {season_dir}: {e}" ) return False def episode_file_exists_on_file(self, episode_file: EpisodeFile) -> bool: """ Check if an episode file exists on the filesystem. :param episode_file: The episode file to check. :return: True if the file exists, False otherwise. """ if episode_file.torrent_id is None: return True try: torrent_file = self.torrent_service.get_torrent_by_id( torrent_id=episode_file.torrent_id ) if torrent_file.imported: return True except RuntimeError: log.exception("Error retrieving torrent") return False def get_show_by_external_id( self, external_id: int, metadata_provider: str ) -> Show | None: """ Get a show by its external ID and metadata provider. :param external_id: The external ID of the show. :param metadata_provider: The metadata provider. :return: The show or None if not found. """ return self.tv_repository.get_show_by_external_id( external_id=external_id, metadata_provider=metadata_provider ) def get_season(self, season_id: SeasonId) -> Season: """ Get a season by its ID. :param season_id: The ID of the season. :return: The season. """ return self.tv_repository.get_season(season_id=season_id) def get_episode(self, episode_id: EpisodeId) -> Episode: """ Get an episode by its ID. :param episode_id: The ID of the episode. :return: The episode. """ return self.tv_repository.get_episode(episode_id=episode_id) def get_season_by_episode(self, episode_id: EpisodeId) -> Season: """ Get a season by the episode ID. :param episode_id: The ID of the episode. :return: The season. """ return self.tv_repository.get_season_by_episode(episode_id=episode_id) def get_torrents_for_show(self, show: Show) -> RichShowTorrent: """ Get torrents for a given show. :param show: The show. :return: A rich show torrent. """ show_torrents = self.tv_repository.get_torrents_by_show_id(show_id=show.id) rich_season_torrents = [] for show_torrent in show_torrents: seasons = self.tv_repository.get_seasons_by_torrent_id( torrent_id=show_torrent.id ) episodes = self.tv_repository.get_episodes_by_torrent_id( torrent_id=show_torrent.id ) episode_files = self.torrent_service.get_episode_files_of_torrent( torrent=show_torrent ) file_path_suffix = ( episode_files[0].file_path_suffix if episode_files else "" ) season_torrent = RichSeasonTorrent( torrent_id=show_torrent.id, torrent_title=show_torrent.title, status=show_torrent.status, quality=show_torrent.quality, imported=show_torrent.imported, seasons=seasons, episodes=episodes if len(seasons) == 1 else [], file_path_suffix=file_path_suffix, usenet=show_torrent.usenet, ) rich_season_torrents.append(season_torrent) return RichShowTorrent( show_id=show.id, name=show.name, year=show.year, metadata_provider=show.metadata_provider, torrents=rich_season_torrents, ) def get_all_shows_with_torrents(self) -> list[RichShowTorrent]: """ Get all shows with torrents. :return: A list of rich show torrents. """ shows = self.tv_repository.get_all_shows_with_torrents() return [self.get_torrents_for_show(show=show) for show in shows] def download_torrent( self, public_indexer_result_id: IndexerQueryResultId, show_id: ShowId, override_show_file_path_suffix: str = "", ) -> Torrent: """ Download a torrent for a given indexer result and show. :param public_indexer_result_id: The ID of the indexer result. :param show_id: The ID of the show. :param override_show_file_path_suffix: Optional override for the file path suffix. :return: The downloaded torrent. """ indexer_result = self.indexer_service.get_result( result_id=public_indexer_result_id ) show_torrent = self.torrent_service.download(indexer_result=indexer_result) self.torrent_service.pause_download(torrent=show_torrent) try: for season_number in indexer_result.season: season = self.tv_repository.get_season_by_number( season_number=season_number, show_id=show_id ) episodes = {episode.number: episode.id for episode in season.episodes} if indexer_result.episode: episode_ids = [] missing_episodes = [] for ep_number in indexer_result.episode: ep_id = episodes.get(EpisodeNumber(ep_number)) if ep_id is None: missing_episodes.append(ep_number) continue episode_ids.append(ep_id) if missing_episodes: log.warning( "Some episodes from indexer result were not found in season %s " "for show %s and will be skipped: %s", season.id, show_id, ", ".join(str(ep) for ep in missing_episodes), ) else: episode_ids = [episode.id for episode in season.episodes] for episode_id in episode_ids: episode_file = EpisodeFile( episode_id=episode_id, quality=indexer_result.quality, torrent_id=show_torrent.id, file_path_suffix=override_show_file_path_suffix, ) self.tv_repository.add_episode_file(episode_file=episode_file) except IntegrityError: log.error( f"Episode file for episode {episode_id} of season {season.id} and quality {indexer_result.quality} already exists, skipping." ) self.tv_repository.remove_episode_files_by_torrent_id(show_torrent.id) self.torrent_service.cancel_download( torrent=show_torrent, delete_files=True ) raise else: log.info( f"Successfully added episode files for torrent {show_torrent.title} and show ID {show_id}" ) self.torrent_service.resume_download(torrent=show_torrent) return show_torrent def get_root_show_directory(self, show: Show) -> Path: misc_config = MediaManagerConfig().misc show_directory_name = f"{remove_special_characters(show.name)} ({show.year}) [{show.metadata_provider}id-{show.external_id}]" log.debug( f"Show {show.name} without special characters: {remove_special_characters(show.name)}" ) if show.library != "Default": for library in misc_config.tv_libraries: if library.name == show.library: log.debug( f"Using library {library.name} for show {show.name} ({show.year})" ) return Path(library.path) / show_directory_name else: log.warning( f"Library {show.library} not defined in config, using default TV directory." ) return misc_config.tv_directory / show_directory_name def get_root_season_directory(self, show: Show, season_number: int) -> Path: return self.get_root_show_directory(show) / Path(f"Season {season_number}") def import_episode( self, show: Show, season: Season, episode_number: int, video_files: list[Path], subtitle_files: list[Path], file_path_suffix: str = "", ) -> bool: episode_file_name = f"{remove_special_characters(show.name)} S{season.number:02d}E{episode_number:02d}" if file_path_suffix != "": episode_file_name += f" - {file_path_suffix}" pattern = ( r".*[. ]S0?" + str(season.number) + r"E0?" + str(episode_number) + r"[. ].*" ) subtitle_pattern = pattern + r"[. ]([A-Za-z]{2})[. ]srt" target_file_name = ( self.get_root_season_directory(show=show, season_number=season.number) / episode_file_name ) # import subtitle for subtitle_file in subtitle_files: regex_result = re.search( subtitle_pattern, subtitle_file.name, re.IGNORECASE ) if regex_result: language_code = regex_result.group(1) target_subtitle_file = target_file_name.with_suffix( f".{language_code}.srt" ) import_file(target_file=target_subtitle_file, source_file=subtitle_file) else: log.debug( f"Didn't find any pattern {subtitle_pattern} in subtitle file: {subtitle_file.name}" ) # import episode videos for file in video_files: if re.search(pattern, file.name, re.IGNORECASE): target_video_file = target_file_name.with_suffix(file.suffix) import_file(target_file=target_video_file, source_file=file) return True else: msg = f"Could not find any video file for episode {episode_number} of show {show.name} S{season.number}" raise Exception(msg) # noqa: TRY002 # TODO: resolve this def import_season( self, show: Show, season: Season, video_files: list[Path], subtitle_files: list[Path], file_path_suffix: str = "", ) -> tuple[bool, list[Episode]]: season_path = self.get_root_season_directory( show=show, season_number=season.number ) success = True imported_episodes = [] try: season_path.mkdir(parents=True, exist_ok=True) except Exception as e: log.exception(f"Could not create path {season_path}") msg = f"Could not create path {season_path}" raise Exception(msg) from e # noqa: TRY002 # TODO: resolve this for episode in season.episodes: try: imported = self.import_episode( show=show, subtitle_files=subtitle_files, video_files=video_files, season=season, episode_number=episode.number, file_path_suffix=file_path_suffix, ) if imported: imported_episodes.append(episode) except Exception: # Send notification about missing episode file if self.notification_service: self.notification_service.send_notification_to_all_providers( title="Missing Episode File", message=f"No video file found for S{season.number:02d}E{episode.number:02d} for show {show.name}. Manual intervention may be required.", ) success = False log.warning( f"S{season.number}E{episode.number} not found when trying to import episode for show {show.name}." ) return success, imported_episodes def import_episode_files( self, show: Show, season: Season, episode: Episode, video_files: list[Path], subtitle_files: list[Path], file_path_suffix: str = "", ) -> bool: episode_file_name = f"{remove_special_characters(show.name)} S{season.number:02d}E{episode.number:02d}" if file_path_suffix != "": episode_file_name += f" - {file_path_suffix}" pattern = ( r".*[. ]S0?" + str(season.number) + r"E0?" + str(episode.number) + r"[. ].*" ) subtitle_pattern = pattern + r"[. ]([A-Za-z]{2})[. ]srt" target_file_name = ( self.get_root_season_directory(show=show, season_number=season.number) / episode_file_name ) # import subtitle for subtitle_file in subtitle_files: regex_result = re.search( subtitle_pattern, subtitle_file.name, re.IGNORECASE ) if regex_result: language_code = regex_result.group(1) target_subtitle_file = target_file_name.with_suffix( f".{language_code}.srt" ) import_file(target_file=target_subtitle_file, source_file=subtitle_file) else: log.debug( f"Didn't find any pattern {subtitle_pattern} in subtitle file: {subtitle_file.name}" ) found_video = False # import episode videos for file in video_files: if re.search(pattern, file.name, re.IGNORECASE): target_video_file = target_file_name.with_suffix(file.suffix) import_file(target_file=target_video_file, source_file=file) found_video = True break if not found_video: # Send notification about missing episode file if self.notification_service: self.notification_service.send_notification_to_all_providers( title="Missing Episode File", message=f"No video file found for S{season.number:02d}E{episode.number:02d} for show {show.name}. Manual intervention may be required.", ) log.warning( f"File for S{season.number}E{episode.number} not found when trying to import episode for show {show.name}." ) return False return True def import_episode_files_from_torrent(self, torrent: Torrent, show: Show) -> None: """ Organizes episodes files from a torrent into the TV directory structure, mapping them to seasons and episodes. :param torrent: The Torrent object :param show: The Show object """ video_files, subtitle_files, _all_files = get_files_for_import(torrent=torrent) success: list[bool] = [] log.debug( f"Importing these {len(video_files)} files:\n" + pprint.pformat(video_files) ) episode_files = self.torrent_service.get_episode_files_of_torrent( torrent=torrent ) if not episode_files: log.warning( f"No episode files associated with torrent {torrent.title}, skipping import." ) return log.info( f"Found {len(episode_files)} episode files associated with torrent {torrent.title}" ) imported_episodes_by_season: dict[int, list[int]] = {} for episode_file in episode_files: season = self.get_season_by_episode(episode_id=episode_file.episode_id) episode = self.get_episode(episode_file.episode_id) season_path = self.get_root_season_directory( show=show, season_number=season.number ) if not season_path.exists(): try: season_path.mkdir(parents=True) except Exception as e: log.warning(f"Could not create path {season_path}: {e}") msg = f"Could not create path {season_path}" raise Exception(msg) from e # noqa: TRY002 episoded_import_success = self.import_episode_files( show=show, season=season, episode=episode, video_files=video_files, subtitle_files=subtitle_files, file_path_suffix=episode_file.file_path_suffix, ) success.append(episoded_import_success) if episoded_import_success: imported_episodes_by_season.setdefault(season.number, []).append( episode.number ) log.info( f"Episode {episode.number} from Season {season.number} successfully imported from torrent {torrent.title}" ) else: log.warning( f"Episode {episode.number} from Season {season.number} failed to import from torrent {torrent.title}" ) success_messages: list[str] = [] for season_number, episodes in imported_episodes_by_season.items(): episode_list = ",".join(str(e) for e in sorted(episodes)) success_messages.append( f"Episode(s): {episode_list} from Season {season_number}" ) episodes_summary = "; ".join(success_messages) if all(success): torrent.imported = True self.torrent_service.torrent_repository.save_torrent(torrent=torrent) # Send successful season download notification if self.notification_service: self.notification_service.send_notification_to_all_providers( title="TV Show imported successfully", message=( f"Successfully imported {episodes_summary} " f"of {show.name} ({show.year}) " f"from torrent {torrent.title}." ), ) else: if self.notification_service: self.notification_service.send_notification_to_all_providers( title="Failed to import TV Show", message=f"Importing {show.name} ({show.year}) from torrent {torrent.title} completed with errors. Please check the logs for details.", ) log.info( f"Finished importing files for torrent {torrent.title} {'without' if all(success) else 'with'} errors" ) def update_show_metadata( self, db_show: Show, metadata_provider: AbstractMetadataProvider ) -> Show | None: """ Updates the metadata of a show. This includes adding new seasons and episodes if available from the metadata provider. It also updates existing show, season, and episode attributes if they have changed. :param metadata_provider: The metadata provider object to fetch fresh data from. :param db_show: The Show to update :return: The updated Show object, or None if the show is not found or an error occurs. """ log.debug(f"Found show: {db_show.name} for metadata update.") # Use stored original_language preference for metadata fetching fresh_show_data = metadata_provider.get_show_metadata( show_id=db_show.external_id, language=db_show.original_language ) if not fresh_show_data: log.warning( f"Could not fetch fresh metadata for show {db_show.name} (External ID: {db_show.external_id}) from {db_show.metadata_provider}." ) return db_show log.debug(f"Fetched fresh metadata for show: {fresh_show_data.name}") self.tv_repository.update_show_attributes( show_id=db_show.id, name=fresh_show_data.name, overview=fresh_show_data.overview, year=fresh_show_data.year, ended=fresh_show_data.ended, imdb_id=fresh_show_data.imdb_id, continuous_download=db_show.continuous_download if fresh_show_data.ended is False else False, ) # Process seasons and episodes existing_season_external_ids = {s.external_id: s for s in db_show.seasons} for fresh_season_data in fresh_show_data.seasons: if fresh_season_data.external_id in existing_season_external_ids: # Update existing season existing_season = existing_season_external_ids[ fresh_season_data.external_id ] self.tv_repository.update_season_attributes( season_id=existing_season.id, name=fresh_season_data.name, overview=fresh_season_data.overview, ) # Process episodes for this season existing_episode_external_ids = { ep.external_id: ep for ep in existing_season.episodes } for fresh_episode_data in fresh_season_data.episodes: if fresh_episode_data.external_id in existing_episode_external_ids: # Update existing episode existing_episode = existing_episode_external_ids[ fresh_episode_data.external_id ] self.tv_repository.update_episode_attributes( episode_id=existing_episode.id, title=fresh_episode_data.title, overview=fresh_episode_data.overview, ) else: # Add new episode log.debug( f"Adding new episode {fresh_episode_data.number} to season {existing_season.number}" ) episode_schema = Episode( id=EpisodeId(fresh_episode_data.id), number=fresh_episode_data.number, external_id=fresh_episode_data.external_id, title=fresh_episode_data.title, overview=fresh_episode_data.overview, ) self.tv_repository.add_episode_to_season( season_id=existing_season.id, episode_data=episode_schema ) else: # Add new season (and its episodes) log.debug( f"Adding new season {fresh_season_data.number} to show {db_show.name}" ) episodes_for_schema = [ Episode( id=EpisodeId(ep_data.id), number=ep_data.number, external_id=ep_data.external_id, title=ep_data.title, overview=ep_data.overview, ) for ep_data in fresh_season_data.episodes ] season_schema = Season( id=SeasonId(fresh_season_data.id), number=fresh_season_data.number, name=fresh_season_data.name, overview=fresh_season_data.overview, external_id=fresh_season_data.external_id, episodes=episodes_for_schema, ) self.tv_repository.add_season_to_show( show_id=db_show.id, season_data=season_schema ) updated_show = self.tv_repository.get_show_by_id(show_id=db_show.id) log.info(f"Successfully updated metadata for show: {updated_show.name}") metadata_provider.download_show_poster_image(show=updated_show) return updated_show def set_show_continuous_download( self, show: Show, continuous_download: bool ) -> Show: """ Set the continuous download flag for a show. :param show: The show object. :param continuous_download: True to enable continuous download, False to disable. :return: The updated Show object. """ return self.tv_repository.update_show_attributes( show_id=show.id, continuous_download=continuous_download ) def get_import_candidates( self, tv_show: Path, metadata_provider: AbstractMetadataProvider ) -> MediaImportSuggestion: search_result = self.search_for_show( remove_special_chars_and_parentheses(tv_show.name), metadata_provider ) import_candidates = MediaImportSuggestion( directory=tv_show, candidates=search_result ) log.debug( f"Found {len(import_candidates.candidates)} candidates for {import_candidates.directory}" ) return import_candidates def import_existing_tv_show(self, tv_show: Show, source_directory: Path) -> None: new_source_path = source_directory.parent / ("." + source_directory.name) try: source_directory.rename(new_source_path) except Exception as e: log.exception(f"Failed to rename {source_directory} to {new_source_path}") raise RenameError from e video_files, subtitle_files, _all_files = get_files_for_import( directory=new_source_path ) for season in tv_show.seasons: _success, imported_episodes = self.import_season( show=tv_show, season=season, video_files=video_files, subtitle_files=subtitle_files, file_path_suffix="IMPORTED", ) for episode in imported_episodes: episode_file = EpisodeFile( episode_id=episode.id, quality=Quality.unknown, file_path_suffix="IMPORTED", torrent_id=None, ) self.tv_repository.add_episode_file(episode_file=episode_file) def get_importable_tv_shows( self, metadata_provider: AbstractMetadataProvider ) -> list[MediaImportSuggestion]: tv_directory = MediaManagerConfig().misc.tv_directory import_suggestions: list[MediaImportSuggestion] = [] candidate_dirs = get_importable_media_directories(tv_directory) for item in candidate_dirs: metadata, external_id = extract_external_id_from_string(item.name) if metadata is not None and external_id is not None: try: self.tv_repository.get_show_by_external_id( external_id=external_id, metadata_provider=metadata, ) log.debug( f"Show {item.name} already exists in the database, skipping import suggestion." ) continue except NotFoundError: log.debug( f"Show {item.name} not found in database, checking for import candidates." ) import_suggestion = self.get_import_candidates( tv_show=item, metadata_provider=metadata_provider ) import_suggestions.append(import_suggestion) log.debug(f"Detected {len(import_suggestions)} importable TV shows.") return import_suggestions def import_all_torrents(self) -> None: log.info("Importing all torrents") torrents = self.torrent_service.get_all_torrents() log.info("Found %d torrents to import", len(torrents)) for t in torrents: show = None try: if not t.imported and t.status == TorrentStatus.finished: show = self.torrent_service.get_show_of_torrent(torrent=t) if show is None: log.warning( f"torrent {t.title} is not a tv torrent, skipping import." ) continue self.import_episode_files_from_torrent(torrent=t, show=show) except RuntimeError as e: show_name = show.name if show is not None else "" log.error( f"Error importing torrent {t.title} for show {show_name}: {e}", exc_info=True, ) log.info("Finished importing all torrents") def update_all_non_ended_shows_metadata(self) -> None: """Updates the metadata of all non-ended shows.""" log.info("Updating metadata for all non-ended shows") shows = [show for show in self.tv_repository.get_shows() if not show.ended] log.info(f"Found {len(shows)} non-ended shows to update") for show in shows: try: if show.metadata_provider == "tmdb": metadata_provider = TmdbMetadataProvider() elif show.metadata_provider == "tvdb": metadata_provider = TvdbMetadataProvider() else: log.error( f"Unsupported metadata provider {show.metadata_provider} for show {show.name}, skipping update." ) continue except InvalidConfigError: log.exception( f"Error initializing metadata provider {show.metadata_provider} for show {show.name}" ) continue updated_show = self.update_show_metadata( db_show=show, metadata_provider=metadata_provider ) if updated_show: log.debug("Updated show metadata", extra={"show": updated_show.name}) else: log.warning(f"Failed to update metadata for show: {show.name}")