mirror of
https://github.com/maxdorninger/MediaManager.git
synced 2026-04-17 15:13:24 +02:00
This PR removes the requests feature. The functionality will be replaced either by Seerr or by reimplementing it in a better way.
1156 lines
44 KiB
Python
1156 lines
44 KiB
Python
import pprint
|
|
import re
|
|
import shutil
|
|
from pathlib import Path
|
|
from typing import overload
|
|
|
|
from sqlalchemy.exc import IntegrityError
|
|
|
|
from media_manager.config import MediaManagerConfig
|
|
from media_manager.database import get_session
|
|
from media_manager.exceptions import InvalidConfigError, NotFoundError, RenameError
|
|
from media_manager.indexer.repository import IndexerRepository
|
|
from media_manager.indexer.schemas import IndexerQueryResult, IndexerQueryResultId
|
|
from media_manager.indexer.service import IndexerService
|
|
from media_manager.indexer.utils import evaluate_indexer_query_results
|
|
from media_manager.metadataProvider.abstract_metadata_provider import (
|
|
AbstractMetadataProvider,
|
|
)
|
|
from media_manager.metadataProvider.schemas import MetaDataProviderSearchResult
|
|
from media_manager.metadataProvider.tmdb import TmdbMetadataProvider
|
|
from media_manager.metadataProvider.tvdb import TvdbMetadataProvider
|
|
from media_manager.notification.repository import NotificationRepository
|
|
from media_manager.notification.service import NotificationService
|
|
from media_manager.schemas import MediaImportSuggestion
|
|
from media_manager.torrent.repository import TorrentRepository
|
|
from media_manager.torrent.schemas import (
|
|
Quality,
|
|
Torrent,
|
|
TorrentStatus,
|
|
)
|
|
from media_manager.torrent.service import TorrentService
|
|
from media_manager.torrent.utils import (
|
|
extract_external_id_from_string,
|
|
get_files_for_import,
|
|
get_importable_media_directories,
|
|
import_file,
|
|
remove_special_characters,
|
|
remove_special_chars_and_parentheses,
|
|
)
|
|
from media_manager.tv import log
|
|
from media_manager.tv.repository import TvRepository
|
|
from media_manager.tv.schemas import (
|
|
Episode,
|
|
EpisodeFile,
|
|
EpisodeId,
|
|
EpisodeNumber,
|
|
PublicEpisodeFile,
|
|
PublicSeason,
|
|
PublicShow,
|
|
RichSeasonTorrent,
|
|
RichShowTorrent,
|
|
Season,
|
|
SeasonId,
|
|
Show,
|
|
ShowId,
|
|
)
|
|
|
|
|
|
class TvService:
|
|
def __init__(
|
|
self,
|
|
tv_repository: TvRepository,
|
|
torrent_service: TorrentService,
|
|
indexer_service: IndexerService,
|
|
notification_service: NotificationService,
|
|
) -> None:
|
|
self.tv_repository = tv_repository
|
|
self.torrent_service = torrent_service
|
|
self.indexer_service = indexer_service
|
|
self.notification_service = notification_service
|
|
|
|
def add_show(
|
|
self,
|
|
external_id: int,
|
|
metadata_provider: AbstractMetadataProvider,
|
|
language: str | None = None,
|
|
) -> Show:
|
|
"""
|
|
Add a new show to the database.
|
|
|
|
:param external_id: The ID of the show in the metadata provider\'s system.
|
|
:param metadata_provider: The name of the metadata provider.
|
|
:param language: Optional language code (ISO 639-1) to fetch metadata in.
|
|
"""
|
|
show_with_metadata = metadata_provider.get_show_metadata(
|
|
show_id=external_id, language=language
|
|
)
|
|
saved_show = self.tv_repository.save_show(show=show_with_metadata)
|
|
metadata_provider.download_show_poster_image(show=saved_show)
|
|
return saved_show
|
|
|
|
def get_total_downloaded_episoded_count(self) -> int:
|
|
"""
|
|
Get total number of downloaded episodes.
|
|
"""
|
|
|
|
return self.tv_repository.get_total_downloaded_episodes_count()
|
|
|
|
def set_show_library(self, show: Show, library: str) -> None:
|
|
self.tv_repository.set_show_library(show_id=show.id, library=library)
|
|
|
|
def delete_show(
|
|
self,
|
|
show: Show,
|
|
delete_files_on_disk: bool = False,
|
|
delete_torrents: bool = False,
|
|
) -> None:
|
|
"""
|
|
Delete a show from the database, optionally deleting files and torrents.
|
|
|
|
:param show: The show to delete.
|
|
:param delete_files_on_disk: Whether to delete the show's files from disk.
|
|
:param delete_torrents: Whether to delete associated torrents from the torrent client.
|
|
"""
|
|
if delete_files_on_disk or delete_torrents:
|
|
log.debug(f"Deleting ID: {show.id} - Name: {show.name}")
|
|
|
|
if delete_files_on_disk:
|
|
show_dir = self.get_root_show_directory(show=show)
|
|
|
|
log.debug(f"Attempt to delete show directory: {show_dir}")
|
|
if show_dir.exists() and show_dir.is_dir():
|
|
shutil.rmtree(show_dir)
|
|
log.info(f"Deleted show directory: {show_dir}")
|
|
|
|
if delete_torrents:
|
|
torrents = self.tv_repository.get_torrents_by_show_id(show_id=show.id)
|
|
for torrent in torrents:
|
|
try:
|
|
self.torrent_service.cancel_download(torrent, delete_files=True)
|
|
self.torrent_service.delete_torrent(torrent_id=torrent.id)
|
|
log.info(f"Deleted torrent: {torrent.hash}")
|
|
except Exception:
|
|
log.warning(
|
|
f"Failed to delete torrent {torrent.hash}", exc_info=True
|
|
)
|
|
|
|
self.tv_repository.delete_show(show_id=show.id)
|
|
|
|
def get_public_episode_files_by_season_id(
|
|
self, season: Season
|
|
) -> list[PublicEpisodeFile]:
|
|
"""
|
|
Get all public episode files for a given season.
|
|
|
|
:param season: The season object.
|
|
:return: A list of public episode files.
|
|
"""
|
|
episode_files = self.tv_repository.get_episode_files_by_season_id(
|
|
season_id=season.id
|
|
)
|
|
public_episode_files = [
|
|
PublicEpisodeFile.model_validate(x) for x in episode_files
|
|
]
|
|
result = []
|
|
for episode_file in public_episode_files:
|
|
if self.episode_file_exists_on_file(episode_file=episode_file):
|
|
episode_file.downloaded = True
|
|
result.append(episode_file)
|
|
return result
|
|
|
|
@overload
|
|
def check_if_show_exists(self, *, external_id: int, metadata_provider: str) -> bool:
|
|
"""
|
|
Check if a show exists in the database.
|
|
|
|
:param external_id: The external ID of the show.
|
|
:param metadata_provider: The metadata provider.
|
|
:return: True if the show exists, False otherwise.
|
|
"""
|
|
|
|
@overload
|
|
def check_if_show_exists(self, *, show_id: ShowId) -> bool:
|
|
"""
|
|
Check if a show exists in the database.
|
|
|
|
:param show_id: The ID of the show.
|
|
:return: True if the show exists, False otherwise.
|
|
"""
|
|
|
|
def check_if_show_exists(
|
|
self, *, external_id=None, metadata_provider=None, show_id=None
|
|
) -> bool:
|
|
if not (external_id is None or metadata_provider is None):
|
|
try:
|
|
self.tv_repository.get_show_by_external_id(
|
|
external_id=external_id, metadata_provider=metadata_provider
|
|
)
|
|
except NotFoundError:
|
|
return False
|
|
elif show_id is not None:
|
|
try:
|
|
self.tv_repository.get_show_by_id(show_id=show_id)
|
|
except NotFoundError:
|
|
return False
|
|
else:
|
|
msg = "Use one of the provided overloads for this function!"
|
|
raise ValueError(msg)
|
|
|
|
return True
|
|
|
|
def get_all_available_torrents_for_a_season(
|
|
self,
|
|
season_number: int,
|
|
show_id: ShowId,
|
|
search_query_override: str | None = None,
|
|
) -> list[IndexerQueryResult]:
|
|
"""
|
|
Get all available torrents for a given season.
|
|
|
|
:param season_number: The number of the season.
|
|
:param show_id: The ID of the show.
|
|
:param search_query_override: Optional override for the search query.
|
|
:return: A list of indexer query results.
|
|
"""
|
|
|
|
if search_query_override:
|
|
return self.indexer_service.search(query=search_query_override, is_tv=True)
|
|
|
|
show = self.tv_repository.get_show_by_id(show_id=show_id)
|
|
|
|
torrents = self.indexer_service.search_season(
|
|
show=show, season_number=season_number
|
|
)
|
|
|
|
results = [torrent for torrent in torrents if season_number in torrent.season]
|
|
|
|
return evaluate_indexer_query_results(
|
|
is_tv=True, query_results=results, media=show
|
|
)
|
|
|
|
def get_all_shows(self) -> list[Show]:
|
|
"""
|
|
Get all shows.
|
|
|
|
:return: A list of all shows.
|
|
"""
|
|
return self.tv_repository.get_shows()
|
|
|
|
def search_for_show(
|
|
self, query: str, metadata_provider: AbstractMetadataProvider
|
|
) -> list[MetaDataProviderSearchResult]:
|
|
"""
|
|
Search for shows using a given query.
|
|
|
|
:param query: The search query.
|
|
:param metadata_provider: The metadata provider to search.
|
|
:return: A list of metadata provider show search results.
|
|
"""
|
|
results = metadata_provider.search_show(query)
|
|
for result in results:
|
|
if self.check_if_show_exists(
|
|
external_id=result.external_id, metadata_provider=metadata_provider.name
|
|
):
|
|
result.added = True
|
|
|
|
try:
|
|
show = self.tv_repository.get_show_by_external_id(
|
|
external_id=result.external_id,
|
|
metadata_provider=metadata_provider.name,
|
|
)
|
|
result.id = show.id
|
|
except Exception:
|
|
log.error(
|
|
f"Unable to find internal show ID for {result.external_id} on {metadata_provider.name}"
|
|
)
|
|
return results
|
|
|
|
def get_popular_shows(
|
|
self, metadata_provider: AbstractMetadataProvider
|
|
) -> list[MetaDataProviderSearchResult]:
|
|
"""
|
|
Get popular shows from a given metadata provider.
|
|
|
|
:param metadata_provider: The metadata provider to use.
|
|
:return: A list of metadata provider show search results.
|
|
"""
|
|
results = metadata_provider.search_show()
|
|
|
|
return [
|
|
result
|
|
for result in results
|
|
if not self.check_if_show_exists(
|
|
external_id=result.external_id, metadata_provider=metadata_provider.name
|
|
)
|
|
]
|
|
|
|
def get_public_show_by_id(self, show: Show) -> PublicShow:
|
|
"""
|
|
Get a public show from a Show object.
|
|
|
|
:param show: The show object.
|
|
:return: A public show.
|
|
"""
|
|
public_show = PublicShow.model_validate(show)
|
|
public_seasons: list[PublicSeason] = []
|
|
|
|
for season in show.seasons:
|
|
public_season = PublicSeason.model_validate(season)
|
|
|
|
for episode in public_season.episodes:
|
|
episode.downloaded = self.is_episode_downloaded(
|
|
episode=episode,
|
|
season=season,
|
|
show=show,
|
|
)
|
|
|
|
# A season is considered downloaded if it has episodes and all of them are downloaded,
|
|
# matching the behavior of is_season_downloaded.
|
|
public_season.downloaded = bool(public_season.episodes) and all(
|
|
episode.downloaded for episode in public_season.episodes
|
|
)
|
|
public_seasons.append(public_season)
|
|
|
|
public_show.seasons = public_seasons
|
|
return public_show
|
|
|
|
def get_show_by_id(self, show_id: ShowId) -> Show:
|
|
"""
|
|
Get a show by its ID.
|
|
|
|
:param show_id: The ID of the show.
|
|
:return: The show.
|
|
"""
|
|
return self.tv_repository.get_show_by_id(show_id=show_id)
|
|
|
|
def is_season_downloaded(self, season: Season, show: Show) -> bool:
|
|
"""
|
|
Check if a season is downloaded.
|
|
|
|
:param season: The season object.
|
|
:param show: The show object.
|
|
:return: True if the season is downloaded, False otherwise.
|
|
"""
|
|
episodes = season.episodes
|
|
|
|
if not episodes:
|
|
return False
|
|
|
|
for episode in episodes:
|
|
if not self.is_episode_downloaded(
|
|
episode=episode, season=season, show=show
|
|
):
|
|
return False
|
|
return True
|
|
|
|
def is_episode_downloaded(
|
|
self, episode: Episode, season: Season, show: Show
|
|
) -> bool:
|
|
"""
|
|
Check if an episode is downloaded and imported (file exists on disk).
|
|
|
|
An episode is considered downloaded if:
|
|
- There is at least one EpisodeFile in the database AND
|
|
- A matching episode file exists in the season directory on disk.
|
|
|
|
:param episode: The episode object.
|
|
:param season: The season object.
|
|
:param show: The show object.
|
|
:return: True if the episode is downloaded and imported, False otherwise.
|
|
"""
|
|
episode_files = self.tv_repository.get_episode_files_by_episode_id(
|
|
episode_id=episode.id
|
|
)
|
|
|
|
if not episode_files:
|
|
return False
|
|
|
|
season_dir = self.get_root_season_directory(show, season.number)
|
|
|
|
if not season_dir.exists():
|
|
return False
|
|
|
|
episode_token = f"S{season.number:02d}E{episode.number:02d}"
|
|
|
|
video_extensions = {".mkv", ".mp4", ".avi", ".mov"}
|
|
|
|
try:
|
|
for file in season_dir.iterdir():
|
|
if (
|
|
file.is_file()
|
|
and episode_token.lower() in file.name.lower()
|
|
and file.suffix.lower() in video_extensions
|
|
):
|
|
return True
|
|
|
|
except OSError as e:
|
|
log.error(
|
|
f"Disk check failed for episode {episode.id} in {season_dir}: {e}"
|
|
)
|
|
|
|
return False
|
|
|
|
def episode_file_exists_on_file(self, episode_file: EpisodeFile) -> bool:
|
|
"""
|
|
Check if an episode file exists on the filesystem.
|
|
|
|
:param episode_file: The episode file to check.
|
|
:return: True if the file exists, False otherwise.
|
|
"""
|
|
if episode_file.torrent_id is None:
|
|
return True
|
|
try:
|
|
torrent_file = self.torrent_service.get_torrent_by_id(
|
|
torrent_id=episode_file.torrent_id
|
|
)
|
|
|
|
if torrent_file.imported:
|
|
return True
|
|
except RuntimeError:
|
|
log.exception("Error retrieving torrent")
|
|
|
|
return False
|
|
|
|
def get_show_by_external_id(
|
|
self, external_id: int, metadata_provider: str
|
|
) -> Show | None:
|
|
"""
|
|
Get a show by its external ID and metadata provider.
|
|
|
|
:param external_id: The external ID of the show.
|
|
:param metadata_provider: The metadata provider.
|
|
:return: The show or None if not found.
|
|
"""
|
|
return self.tv_repository.get_show_by_external_id(
|
|
external_id=external_id, metadata_provider=metadata_provider
|
|
)
|
|
|
|
def get_season(self, season_id: SeasonId) -> Season:
|
|
"""
|
|
Get a season by its ID.
|
|
|
|
:param season_id: The ID of the season.
|
|
:return: The season.
|
|
"""
|
|
return self.tv_repository.get_season(season_id=season_id)
|
|
|
|
def get_episode(self, episode_id: EpisodeId) -> Episode:
|
|
"""
|
|
Get an episode by its ID.
|
|
|
|
:param episode_id: The ID of the episode.
|
|
:return: The episode.
|
|
"""
|
|
return self.tv_repository.get_episode(episode_id=episode_id)
|
|
|
|
def get_season_by_episode(self, episode_id: EpisodeId) -> Season:
|
|
"""
|
|
Get a season by the episode ID.
|
|
|
|
:param episode_id: The ID of the episode.
|
|
:return: The season.
|
|
"""
|
|
return self.tv_repository.get_season_by_episode(episode_id=episode_id)
|
|
|
|
def get_torrents_for_show(self, show: Show) -> RichShowTorrent:
|
|
"""
|
|
Get torrents for a given show.
|
|
|
|
:param show: The show.
|
|
:return: A rich show torrent.
|
|
"""
|
|
show_torrents = self.tv_repository.get_torrents_by_show_id(show_id=show.id)
|
|
rich_season_torrents = []
|
|
for show_torrent in show_torrents:
|
|
seasons = self.tv_repository.get_seasons_by_torrent_id(
|
|
torrent_id=show_torrent.id
|
|
)
|
|
episodes = self.tv_repository.get_episodes_by_torrent_id(
|
|
torrent_id=show_torrent.id
|
|
)
|
|
episode_files = self.torrent_service.get_episode_files_of_torrent(
|
|
torrent=show_torrent
|
|
)
|
|
|
|
file_path_suffix = (
|
|
episode_files[0].file_path_suffix if episode_files else ""
|
|
)
|
|
season_torrent = RichSeasonTorrent(
|
|
torrent_id=show_torrent.id,
|
|
torrent_title=show_torrent.title,
|
|
status=show_torrent.status,
|
|
quality=show_torrent.quality,
|
|
imported=show_torrent.imported,
|
|
seasons=seasons,
|
|
episodes=episodes if len(seasons) == 1 else [],
|
|
file_path_suffix=file_path_suffix,
|
|
usenet=show_torrent.usenet,
|
|
)
|
|
rich_season_torrents.append(season_torrent)
|
|
|
|
return RichShowTorrent(
|
|
show_id=show.id,
|
|
name=show.name,
|
|
year=show.year,
|
|
metadata_provider=show.metadata_provider,
|
|
torrents=rich_season_torrents,
|
|
)
|
|
|
|
def get_all_shows_with_torrents(self) -> list[RichShowTorrent]:
|
|
"""
|
|
Get all shows with torrents.
|
|
|
|
:return: A list of rich show torrents.
|
|
"""
|
|
shows = self.tv_repository.get_all_shows_with_torrents()
|
|
return [self.get_torrents_for_show(show=show) for show in shows]
|
|
|
|
def download_torrent(
|
|
self,
|
|
public_indexer_result_id: IndexerQueryResultId,
|
|
show_id: ShowId,
|
|
override_show_file_path_suffix: str = "",
|
|
) -> Torrent:
|
|
"""
|
|
Download a torrent for a given indexer result and show.
|
|
|
|
:param public_indexer_result_id: The ID of the indexer result.
|
|
:param show_id: The ID of the show.
|
|
:param override_show_file_path_suffix: Optional override for the file path suffix.
|
|
:return: The downloaded torrent.
|
|
"""
|
|
indexer_result = self.indexer_service.get_result(
|
|
result_id=public_indexer_result_id
|
|
)
|
|
show_torrent = self.torrent_service.download(indexer_result=indexer_result)
|
|
self.torrent_service.pause_download(torrent=show_torrent)
|
|
|
|
try:
|
|
for season_number in indexer_result.season:
|
|
season = self.tv_repository.get_season_by_number(
|
|
season_number=season_number, show_id=show_id
|
|
)
|
|
episodes = {episode.number: episode.id for episode in season.episodes}
|
|
|
|
if indexer_result.episode:
|
|
episode_ids = []
|
|
missing_episodes = []
|
|
for ep_number in indexer_result.episode:
|
|
ep_id = episodes.get(EpisodeNumber(ep_number))
|
|
if ep_id is None:
|
|
missing_episodes.append(ep_number)
|
|
continue
|
|
episode_ids.append(ep_id)
|
|
if missing_episodes:
|
|
log.warning(
|
|
"Some episodes from indexer result were not found in season %s "
|
|
"for show %s and will be skipped: %s",
|
|
season.id,
|
|
show_id,
|
|
", ".join(str(ep) for ep in missing_episodes),
|
|
)
|
|
else:
|
|
episode_ids = [episode.id for episode in season.episodes]
|
|
|
|
for episode_id in episode_ids:
|
|
episode_file = EpisodeFile(
|
|
episode_id=episode_id,
|
|
quality=indexer_result.quality,
|
|
torrent_id=show_torrent.id,
|
|
file_path_suffix=override_show_file_path_suffix,
|
|
)
|
|
self.tv_repository.add_episode_file(episode_file=episode_file)
|
|
|
|
except IntegrityError:
|
|
log.error(
|
|
f"Episode file for episode {episode_id} of season {season.id} and quality {indexer_result.quality} already exists, skipping."
|
|
)
|
|
self.tv_repository.remove_episode_files_by_torrent_id(show_torrent.id)
|
|
self.torrent_service.cancel_download(
|
|
torrent=show_torrent, delete_files=True
|
|
)
|
|
raise
|
|
else:
|
|
log.info(
|
|
f"Successfully added episode files for torrent {show_torrent.title} and show ID {show_id}"
|
|
)
|
|
self.torrent_service.resume_download(torrent=show_torrent)
|
|
|
|
return show_torrent
|
|
|
|
def get_root_show_directory(self, show: Show) -> Path:
|
|
misc_config = MediaManagerConfig().misc
|
|
show_directory_name = f"{remove_special_characters(show.name)} ({show.year}) [{show.metadata_provider}id-{show.external_id}]"
|
|
log.debug(
|
|
f"Show {show.name} without special characters: {remove_special_characters(show.name)}"
|
|
)
|
|
|
|
if show.library != "Default":
|
|
for library in misc_config.tv_libraries:
|
|
if library.name == show.library:
|
|
log.debug(
|
|
f"Using library {library.name} for show {show.name} ({show.year})"
|
|
)
|
|
return Path(library.path) / show_directory_name
|
|
else:
|
|
log.warning(
|
|
f"Library {show.library} not defined in config, using default TV directory."
|
|
)
|
|
return misc_config.tv_directory / show_directory_name
|
|
|
|
def get_root_season_directory(self, show: Show, season_number: int) -> Path:
|
|
return self.get_root_show_directory(show) / Path(f"Season {season_number}")
|
|
|
|
def import_episode(
|
|
self,
|
|
show: Show,
|
|
season: Season,
|
|
episode_number: int,
|
|
video_files: list[Path],
|
|
subtitle_files: list[Path],
|
|
file_path_suffix: str = "",
|
|
) -> bool:
|
|
episode_file_name = f"{remove_special_characters(show.name)} S{season.number:02d}E{episode_number:02d}"
|
|
if file_path_suffix != "":
|
|
episode_file_name += f" - {file_path_suffix}"
|
|
pattern = (
|
|
r".*[. ]S0?" + str(season.number) + r"E0?" + str(episode_number) + r"[. ].*"
|
|
)
|
|
subtitle_pattern = pattern + r"[. ]([A-Za-z]{2})[. ]srt"
|
|
target_file_name = (
|
|
self.get_root_season_directory(show=show, season_number=season.number)
|
|
/ episode_file_name
|
|
)
|
|
|
|
# import subtitle
|
|
for subtitle_file in subtitle_files:
|
|
regex_result = re.search(
|
|
subtitle_pattern, subtitle_file.name, re.IGNORECASE
|
|
)
|
|
if regex_result:
|
|
language_code = regex_result.group(1)
|
|
target_subtitle_file = target_file_name.with_suffix(
|
|
f".{language_code}.srt"
|
|
)
|
|
import_file(target_file=target_subtitle_file, source_file=subtitle_file)
|
|
else:
|
|
log.debug(
|
|
f"Didn't find any pattern {subtitle_pattern} in subtitle file: {subtitle_file.name}"
|
|
)
|
|
|
|
# import episode videos
|
|
for file in video_files:
|
|
if re.search(pattern, file.name, re.IGNORECASE):
|
|
target_video_file = target_file_name.with_suffix(file.suffix)
|
|
import_file(target_file=target_video_file, source_file=file)
|
|
return True
|
|
else:
|
|
msg = f"Could not find any video file for episode {episode_number} of show {show.name} S{season.number}"
|
|
raise Exception(msg) # noqa: TRY002 # TODO: resolve this
|
|
|
|
def import_season(
|
|
self,
|
|
show: Show,
|
|
season: Season,
|
|
video_files: list[Path],
|
|
subtitle_files: list[Path],
|
|
file_path_suffix: str = "",
|
|
) -> tuple[bool, list[Episode]]:
|
|
season_path = self.get_root_season_directory(
|
|
show=show, season_number=season.number
|
|
)
|
|
success = True
|
|
imported_episodes = []
|
|
try:
|
|
season_path.mkdir(parents=True, exist_ok=True)
|
|
except Exception as e:
|
|
log.exception(f"Could not create path {season_path}")
|
|
msg = f"Could not create path {season_path}"
|
|
raise Exception(msg) from e # noqa: TRY002 # TODO: resolve this
|
|
|
|
for episode in season.episodes:
|
|
try:
|
|
imported = self.import_episode(
|
|
show=show,
|
|
subtitle_files=subtitle_files,
|
|
video_files=video_files,
|
|
season=season,
|
|
episode_number=episode.number,
|
|
file_path_suffix=file_path_suffix,
|
|
)
|
|
if imported:
|
|
imported_episodes.append(episode)
|
|
|
|
except Exception:
|
|
# Send notification about missing episode file
|
|
if self.notification_service:
|
|
self.notification_service.send_notification_to_all_providers(
|
|
title="Missing Episode File",
|
|
message=f"No video file found for S{season.number:02d}E{episode.number:02d} for show {show.name}. Manual intervention may be required.",
|
|
)
|
|
success = False
|
|
log.warning(
|
|
f"S{season.number}E{episode.number} not found when trying to import episode for show {show.name}."
|
|
)
|
|
return success, imported_episodes
|
|
|
|
def import_episode_files(
|
|
self,
|
|
show: Show,
|
|
season: Season,
|
|
episode: Episode,
|
|
video_files: list[Path],
|
|
subtitle_files: list[Path],
|
|
file_path_suffix: str = "",
|
|
) -> bool:
|
|
episode_file_name = f"{remove_special_characters(show.name)} S{season.number:02d}E{episode.number:02d}"
|
|
if file_path_suffix != "":
|
|
episode_file_name += f" - {file_path_suffix}"
|
|
pattern = (
|
|
r".*[. ]S0?" + str(season.number) + r"E0?" + str(episode.number) + r"[. ].*"
|
|
)
|
|
subtitle_pattern = pattern + r"[. ]([A-Za-z]{2})[. ]srt"
|
|
target_file_name = (
|
|
self.get_root_season_directory(show=show, season_number=season.number)
|
|
/ episode_file_name
|
|
)
|
|
|
|
# import subtitle
|
|
for subtitle_file in subtitle_files:
|
|
regex_result = re.search(
|
|
subtitle_pattern, subtitle_file.name, re.IGNORECASE
|
|
)
|
|
if regex_result:
|
|
language_code = regex_result.group(1)
|
|
target_subtitle_file = target_file_name.with_suffix(
|
|
f".{language_code}.srt"
|
|
)
|
|
import_file(target_file=target_subtitle_file, source_file=subtitle_file)
|
|
else:
|
|
log.debug(
|
|
f"Didn't find any pattern {subtitle_pattern} in subtitle file: {subtitle_file.name}"
|
|
)
|
|
|
|
found_video = False
|
|
|
|
# import episode videos
|
|
for file in video_files:
|
|
if re.search(pattern, file.name, re.IGNORECASE):
|
|
target_video_file = target_file_name.with_suffix(file.suffix)
|
|
import_file(target_file=target_video_file, source_file=file)
|
|
found_video = True
|
|
break
|
|
|
|
if not found_video:
|
|
# Send notification about missing episode file
|
|
if self.notification_service:
|
|
self.notification_service.send_notification_to_all_providers(
|
|
title="Missing Episode File",
|
|
message=f"No video file found for S{season.number:02d}E{episode.number:02d} for show {show.name}. Manual intervention may be required.",
|
|
)
|
|
log.warning(
|
|
f"File for S{season.number}E{episode.number} not found when trying to import episode for show {show.name}."
|
|
)
|
|
return False
|
|
|
|
return True
|
|
|
|
def import_episode_files_from_torrent(self, torrent: Torrent, show: Show) -> None:
|
|
"""
|
|
Organizes episodes files from a torrent into the TV directory structure, mapping them to seasons and episodes.
|
|
:param torrent: The Torrent object
|
|
:param show: The Show object
|
|
"""
|
|
|
|
video_files, subtitle_files, _all_files = get_files_for_import(torrent=torrent)
|
|
|
|
success: list[bool] = []
|
|
|
|
log.debug(
|
|
f"Importing these {len(video_files)} files:\n" + pprint.pformat(video_files)
|
|
)
|
|
|
|
episode_files = self.torrent_service.get_episode_files_of_torrent(
|
|
torrent=torrent
|
|
)
|
|
if not episode_files:
|
|
log.warning(
|
|
f"No episode files associated with torrent {torrent.title}, skipping import."
|
|
)
|
|
return
|
|
|
|
log.info(
|
|
f"Found {len(episode_files)} episode files associated with torrent {torrent.title}"
|
|
)
|
|
|
|
imported_episodes_by_season: dict[int, list[int]] = {}
|
|
|
|
for episode_file in episode_files:
|
|
season = self.get_season_by_episode(episode_id=episode_file.episode_id)
|
|
episode = self.get_episode(episode_file.episode_id)
|
|
|
|
season_path = self.get_root_season_directory(
|
|
show=show, season_number=season.number
|
|
)
|
|
if not season_path.exists():
|
|
try:
|
|
season_path.mkdir(parents=True)
|
|
except Exception as e:
|
|
log.warning(f"Could not create path {season_path}: {e}")
|
|
msg = f"Could not create path {season_path}"
|
|
raise Exception(msg) from e # noqa: TRY002
|
|
|
|
episoded_import_success = self.import_episode_files(
|
|
show=show,
|
|
season=season,
|
|
episode=episode,
|
|
video_files=video_files,
|
|
subtitle_files=subtitle_files,
|
|
file_path_suffix=episode_file.file_path_suffix,
|
|
)
|
|
success.append(episoded_import_success)
|
|
|
|
if episoded_import_success:
|
|
imported_episodes_by_season.setdefault(season.number, []).append(
|
|
episode.number
|
|
)
|
|
|
|
log.info(
|
|
f"Episode {episode.number} from Season {season.number} successfully imported from torrent {torrent.title}"
|
|
)
|
|
else:
|
|
log.warning(
|
|
f"Episode {episode.number} from Season {season.number} failed to import from torrent {torrent.title}"
|
|
)
|
|
|
|
success_messages: list[str] = []
|
|
|
|
for season_number, episodes in imported_episodes_by_season.items():
|
|
episode_list = ",".join(str(e) for e in sorted(episodes))
|
|
success_messages.append(
|
|
f"Episode(s): {episode_list} from Season {season_number}"
|
|
)
|
|
|
|
episodes_summary = "; ".join(success_messages)
|
|
|
|
if all(success):
|
|
torrent.imported = True
|
|
self.torrent_service.torrent_repository.save_torrent(torrent=torrent)
|
|
|
|
# Send successful season download notification
|
|
if self.notification_service:
|
|
self.notification_service.send_notification_to_all_providers(
|
|
title="TV Show imported successfully",
|
|
message=(
|
|
f"Successfully imported {episodes_summary} "
|
|
f"of {show.name} ({show.year}) "
|
|
f"from torrent {torrent.title}."
|
|
),
|
|
)
|
|
else:
|
|
if self.notification_service:
|
|
self.notification_service.send_notification_to_all_providers(
|
|
title="Failed to import TV Show",
|
|
message=f"Importing {show.name} ({show.year}) from torrent {torrent.title} completed with errors. Please check the logs for details.",
|
|
)
|
|
|
|
log.info(
|
|
f"Finished importing files for torrent {torrent.title} {'without' if all(success) else 'with'} errors"
|
|
)
|
|
|
|
def update_show_metadata(
|
|
self, db_show: Show, metadata_provider: AbstractMetadataProvider
|
|
) -> Show | None:
|
|
"""
|
|
Updates the metadata of a show.
|
|
This includes adding new seasons and episodes if available from the metadata provider.
|
|
It also updates existing show, season, and episode attributes if they have changed.
|
|
|
|
:param metadata_provider: The metadata provider object to fetch fresh data from.
|
|
:param db_show: The Show to update
|
|
:return: The updated Show object, or None if the show is not found or an error occurs.
|
|
"""
|
|
log.debug(f"Found show: {db_show.name} for metadata update.")
|
|
|
|
# Use stored original_language preference for metadata fetching
|
|
fresh_show_data = metadata_provider.get_show_metadata(
|
|
show_id=db_show.external_id, language=db_show.original_language
|
|
)
|
|
if not fresh_show_data:
|
|
log.warning(
|
|
f"Could not fetch fresh metadata for show {db_show.name} (External ID: {db_show.external_id}) from {db_show.metadata_provider}."
|
|
)
|
|
return db_show
|
|
log.debug(f"Fetched fresh metadata for show: {fresh_show_data.name}")
|
|
|
|
self.tv_repository.update_show_attributes(
|
|
show_id=db_show.id,
|
|
name=fresh_show_data.name,
|
|
overview=fresh_show_data.overview,
|
|
year=fresh_show_data.year,
|
|
ended=fresh_show_data.ended,
|
|
imdb_id=fresh_show_data.imdb_id,
|
|
continuous_download=db_show.continuous_download
|
|
if fresh_show_data.ended is False
|
|
else False,
|
|
)
|
|
|
|
# Process seasons and episodes
|
|
existing_season_external_ids = {s.external_id: s for s in db_show.seasons}
|
|
|
|
for fresh_season_data in fresh_show_data.seasons:
|
|
if fresh_season_data.external_id in existing_season_external_ids:
|
|
# Update existing season
|
|
existing_season = existing_season_external_ids[
|
|
fresh_season_data.external_id
|
|
]
|
|
log.debug(
|
|
f"Updating existing season {existing_season.number} for show {db_show.name}"
|
|
)
|
|
self.tv_repository.update_season_attributes(
|
|
season_id=existing_season.id,
|
|
name=fresh_season_data.name,
|
|
overview=fresh_season_data.overview,
|
|
)
|
|
|
|
# Process episodes for this season
|
|
existing_episode_external_ids = {
|
|
ep.external_id: ep for ep in existing_season.episodes
|
|
}
|
|
for fresh_episode_data in fresh_season_data.episodes:
|
|
if fresh_episode_data.number in existing_episode_external_ids:
|
|
# Update existing episode
|
|
existing_episode = existing_episode_external_ids[
|
|
fresh_episode_data.external_id
|
|
]
|
|
log.debug(
|
|
f"Updating existing episode {existing_episode.number} for season {existing_season.number}"
|
|
)
|
|
self.tv_repository.update_episode_attributes(
|
|
episode_id=existing_episode.id,
|
|
title=fresh_episode_data.title,
|
|
overview=fresh_episode_data.overview,
|
|
)
|
|
else:
|
|
# Add new episode
|
|
log.debug(
|
|
f"Adding new episode {fresh_episode_data.number} to season {existing_season.number}"
|
|
)
|
|
episode_schema = Episode(
|
|
id=EpisodeId(fresh_episode_data.id),
|
|
number=fresh_episode_data.number,
|
|
external_id=fresh_episode_data.external_id,
|
|
title=fresh_episode_data.title,
|
|
overview=fresh_episode_data.overview,
|
|
)
|
|
self.tv_repository.add_episode_to_season(
|
|
season_id=existing_season.id, episode_data=episode_schema
|
|
)
|
|
else:
|
|
# Add new season (and its episodes)
|
|
log.debug(
|
|
f"Adding new season {fresh_season_data.number} to show {db_show.name}"
|
|
)
|
|
episodes_for_schema = [
|
|
Episode(
|
|
id=EpisodeId(ep_data.id),
|
|
number=ep_data.number,
|
|
external_id=ep_data.external_id,
|
|
title=ep_data.title,
|
|
overview=ep_data.overview,
|
|
)
|
|
for ep_data in fresh_season_data.episodes
|
|
]
|
|
|
|
season_schema = Season(
|
|
id=SeasonId(fresh_season_data.id),
|
|
number=fresh_season_data.number,
|
|
name=fresh_season_data.name,
|
|
overview=fresh_season_data.overview,
|
|
external_id=fresh_season_data.external_id,
|
|
episodes=episodes_for_schema,
|
|
)
|
|
self.tv_repository.add_season_to_show(
|
|
show_id=db_show.id, season_data=season_schema
|
|
)
|
|
|
|
updated_show = self.tv_repository.get_show_by_id(show_id=db_show.id)
|
|
|
|
log.info(f"Successfully updated metadata for show ID: {db_show.id}")
|
|
metadata_provider.download_show_poster_image(show=updated_show)
|
|
return updated_show
|
|
|
|
def set_show_continuous_download(
|
|
self, show: Show, continuous_download: bool
|
|
) -> Show:
|
|
"""
|
|
Set the continuous download flag for a show.
|
|
|
|
:param show: The show object.
|
|
:param continuous_download: True to enable continuous download, False to disable.
|
|
:return: The updated Show object.
|
|
"""
|
|
return self.tv_repository.update_show_attributes(
|
|
show_id=show.id, continuous_download=continuous_download
|
|
)
|
|
|
|
def get_import_candidates(
|
|
self, tv_show: Path, metadata_provider: AbstractMetadataProvider
|
|
) -> MediaImportSuggestion:
|
|
search_result = self.search_for_show(
|
|
remove_special_chars_and_parentheses(tv_show.name), metadata_provider
|
|
)
|
|
import_candidates = MediaImportSuggestion(
|
|
directory=tv_show, candidates=search_result
|
|
)
|
|
log.debug(
|
|
f"Found {len(import_candidates.candidates)} candidates for {import_candidates.directory}"
|
|
)
|
|
return import_candidates
|
|
|
|
def import_existing_tv_show(self, tv_show: Show, source_directory: Path) -> None:
|
|
new_source_path = source_directory.parent / ("." + source_directory.name)
|
|
try:
|
|
source_directory.rename(new_source_path)
|
|
except Exception as e:
|
|
log.exception(f"Failed to rename {source_directory} to {new_source_path}")
|
|
raise RenameError from e
|
|
|
|
video_files, subtitle_files, _all_files = get_files_for_import(
|
|
directory=new_source_path
|
|
)
|
|
for season in tv_show.seasons:
|
|
_success, imported_episodes = self.import_season(
|
|
show=tv_show,
|
|
season=season,
|
|
video_files=video_files,
|
|
subtitle_files=subtitle_files,
|
|
file_path_suffix="IMPORTED",
|
|
)
|
|
for episode in imported_episodes:
|
|
episode_file = EpisodeFile(
|
|
episode_id=episode.id,
|
|
quality=Quality.unknown,
|
|
file_path_suffix="IMPORTED",
|
|
torrent_id=None,
|
|
)
|
|
|
|
self.tv_repository.add_episode_file(episode_file=episode_file)
|
|
|
|
def get_importable_tv_shows(
|
|
self, metadata_provider: AbstractMetadataProvider
|
|
) -> list[MediaImportSuggestion]:
|
|
tv_directory = MediaManagerConfig().misc.tv_directory
|
|
import_suggestions: list[MediaImportSuggestion] = []
|
|
candidate_dirs = get_importable_media_directories(tv_directory)
|
|
|
|
for item in candidate_dirs:
|
|
metadata, external_id = extract_external_id_from_string(item.name)
|
|
if metadata is not None and external_id is not None:
|
|
try:
|
|
self.tv_repository.get_show_by_external_id(
|
|
external_id=external_id,
|
|
metadata_provider=metadata,
|
|
)
|
|
log.debug(
|
|
f"Show {item.name} already exists in the database, skipping import suggestion."
|
|
)
|
|
continue
|
|
except NotFoundError:
|
|
log.debug(
|
|
f"Show {item.name} not found in database, checking for import candidates."
|
|
)
|
|
|
|
import_suggestion = self.get_import_candidates(
|
|
tv_show=item, metadata_provider=metadata_provider
|
|
)
|
|
import_suggestions.append(import_suggestion)
|
|
|
|
log.debug(f"Detected {len(import_suggestions)} importable TV shows.")
|
|
return import_suggestions
|
|
|
|
|
|
def import_all_show_torrents() -> None:
|
|
with next(get_session()) as db:
|
|
tv_repository = TvRepository(db=db)
|
|
torrent_service = TorrentService(torrent_repository=TorrentRepository(db=db))
|
|
indexer_service = IndexerService(indexer_repository=IndexerRepository(db=db))
|
|
notification_service = NotificationService(
|
|
notification_repository=NotificationRepository(db=db)
|
|
)
|
|
tv_service = TvService(
|
|
tv_repository=tv_repository,
|
|
torrent_service=torrent_service,
|
|
indexer_service=indexer_service,
|
|
notification_service=notification_service,
|
|
)
|
|
log.info("Importing all torrents")
|
|
torrents = torrent_service.get_all_torrents()
|
|
log.info("Found %d torrents to import", len(torrents))
|
|
for t in torrents:
|
|
try:
|
|
if not t.imported and t.status == TorrentStatus.finished:
|
|
show = torrent_service.get_show_of_torrent(torrent=t)
|
|
if show is None:
|
|
log.warning(
|
|
f"torrent {t.title} is not a tv torrent, skipping import."
|
|
)
|
|
continue
|
|
tv_service.import_episode_files_from_torrent(torrent=t, show=show)
|
|
except RuntimeError as e:
|
|
log.error(
|
|
f"Error importing torrent {t.title} for show {show.name}: {e}",
|
|
exc_info=True,
|
|
)
|
|
log.info("Finished importing all torrents")
|
|
db.commit()
|
|
|
|
|
|
def update_all_non_ended_shows_metadata() -> None:
|
|
"""
|
|
Updates the metadata of all non-ended shows.
|
|
"""
|
|
with next(get_session()) as db:
|
|
tv_repository = TvRepository(db=db)
|
|
tv_service = TvService(
|
|
tv_repository=tv_repository,
|
|
torrent_service=TorrentService(torrent_repository=TorrentRepository(db=db)),
|
|
indexer_service=IndexerService(indexer_repository=IndexerRepository(db=db)),
|
|
notification_service=NotificationService(
|
|
notification_repository=NotificationRepository(db=db)
|
|
),
|
|
)
|
|
|
|
log.info("Updating metadata for all non-ended shows")
|
|
|
|
shows = [show for show in tv_repository.get_shows() if not show.ended]
|
|
|
|
log.info(f"Found {len(shows)} non-ended shows to update")
|
|
|
|
for show in shows:
|
|
try:
|
|
if show.metadata_provider == "tmdb":
|
|
metadata_provider = TmdbMetadataProvider()
|
|
elif show.metadata_provider == "tvdb":
|
|
metadata_provider = TvdbMetadataProvider()
|
|
else:
|
|
log.error(
|
|
f"Unsupported metadata provider {show.metadata_provider} for show {show.name}, skipping update."
|
|
)
|
|
continue
|
|
except InvalidConfigError:
|
|
log.exception(
|
|
f"Error initializing metadata provider {show.metadata_provider} for show {show.name}"
|
|
)
|
|
continue
|
|
updated_show = tv_service.update_show_metadata(
|
|
db_show=show, metadata_provider=metadata_provider
|
|
)
|
|
|
|
if updated_show:
|
|
log.debug("Updated show metadata", extra={"show": updated_show.name})
|
|
else:
|
|
log.warning(f"Failed to update metadata for show: {show.name}")
|
|
db.commit()
|