diff --git a/media_manager/auth/users.py b/media_manager/auth/users.py index e9a023e..5bd64ab 100644 --- a/media_manager/auth/users.py +++ b/media_manager/auth/users.py @@ -66,7 +66,7 @@ class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]): async def on_after_forgot_password( self, user: User, token: str, request: Optional[Request] = None ): - link = f"{AllEncompassingConfig().misc.frontend_url}/web/login/reset-password?token={token}" + link = f"{AllEncompassingConfig().misc.frontend_url}web/login/reset-password?token={token}" log.info(f"User {user.id} has forgot their password. Reset Link: {link}") if not config.email_password_resets: @@ -184,7 +184,7 @@ def get_jwt_strategy() -> JWTStrategy[models.UP, models.ID]: class RedirectingCookieTransport(CookieTransport): async def get_login_response(self, token: str) -> Response: response = RedirectResponse( - str(AllEncompassingConfig().misc.frontend_url) + "/web/dashboard", + str(AllEncompassingConfig().misc.frontend_url) + "web/dashboard", status_code=status.HTTP_302_FOUND, ) return self._set_login_cookie(response, token) diff --git a/media_manager/config.py b/media_manager/config.py index c0b9024..2bbb799 100644 --- a/media_manager/config.py +++ b/media_manager/config.py @@ -41,7 +41,7 @@ class BasicConfig(BaseSettings): movie_directory: Path = Path(__file__).parent.parent / "data" / "movies" torrent_directory: Path = Path(__file__).parent.parent / "data" / "torrents" - frontend_url: AnyHttpUrl = "http://localhost:3000" + frontend_url: AnyHttpUrl = "http://localhost:8000" cors_urls: list[str] = [] development: bool = False diff --git a/media_manager/movies/service.py b/media_manager/movies/service.py index 828a4c0..885851d 100644 --- a/media_manager/movies/service.py +++ b/media_manager/movies/service.py @@ -34,7 +34,7 @@ import pprint from media_manager.torrent.repository import TorrentRepository from media_manager.torrent.utils import ( import_file, - import_torrent, + get_files_for_import, remove_special_characters, ) from media_manager.indexer.service import IndexerService @@ -470,7 +470,7 @@ class MovieService: :param movie: The Movie object """ - video_files, subtitle_files, all_files = import_torrent(torrent=torrent) + video_files, subtitle_files, all_files = get_files_for_import(torrent=torrent) success: bool = False # determines if the import was successful, if true, the Imported flag will be set to True after the import if len(video_files) != 0: diff --git a/media_manager/torrent/utils.py b/media_manager/torrent/utils.py index 91c30a5..96a7650 100644 --- a/media_manager/torrent/utils.py +++ b/media_manager/torrent/utils.py @@ -77,18 +77,23 @@ def import_file(target_file: Path, source_file: Path): shutil.copy(src=source_file, dst=target_file) -def import_torrent(torrent: Torrent) -> tuple[list[Path], list[Path], list[Path]]: +def get_files_for_import( + torrent: Torrent | None = None, directory: Path | None = None +) -> tuple[list[Path], list[Path], list[Path]]: """ Extracts all files from the torrent download directory, including extracting archives. Returns a tuple containing: seperated video files, subtitle files, and all files found in the torrent directory. """ - log.info(f"Importing torrent {torrent}") - all_files: list[Path] = list_files_recursively( - path=get_torrent_filepath(torrent=torrent) - ) + search_directory = directory if directory else get_torrent_filepath(torrent=torrent) + if torrent: + log.info(f"Importing torrent {torrent}") + else: + log.info(f"Importing files from directory {directory}") + + all_files: list[Path] = list_files_recursively(path=search_directory) log.debug(f"Found {len(all_files)} files downloaded by the torrent") extract_archives(all_files) - all_files = list_files_recursively(path=get_torrent_filepath(torrent=torrent)) + all_files = list_files_recursively(path=search_directory) video_files: list[Path] = [] subtitle_files: list[Path] = [] diff --git a/media_manager/tv/router.py b/media_manager/tv/router.py index eb9b496..f2d6262 100644 --- a/media_manager/tv/router.py +++ b/media_manager/tv/router.py @@ -1,3 +1,4 @@ +from pathlib import Path from typing import Annotated from fastapi import APIRouter, Depends, status, HTTPException @@ -27,6 +28,7 @@ from media_manager.tv.schemas import ( UpdateSeasonRequest, RichSeasonRequest, Season, + TvShowImportSuggestion, ) from media_manager.tv.dependencies import ( season_dep, @@ -103,6 +105,46 @@ def get_all_shows(tv_service: tv_service_dep): return tv_service.get_all_shows() +@router.get( + "/importable", + dependencies=[Depends(current_superuser)], + response_model=list[TvShowImportSuggestion], +) +def get_all_importable_shows( + tv_service: tv_service_dep, metadata_provider: metadata_provider_dep +): + """ + get a list of unknown shows that were detected in the tv directory and are importable + """ + directories = tv_service.detect_unknown_tv_shows() + shows = [] + for directory in directories: + shows.append( + tv_service.get_import_candidates( + tv_show=directory, metadata_provider=metadata_provider + ) + ) + return shows + + +@router.post( + "/importable", + dependencies=[Depends(current_superuser)], + response_model=list[TvShowImportSuggestion], + status_code=status.HTTP_204_NO_CONTENT, +) +def import_detected_show(tv_service: tv_service_dep, tv_show: show_dep, directory: str): + """ + get a list of unknown shows that were detected in the tv directory and are importable + """ + source_directory = Path(directory) + if source_directory not in tv_service.detect_unknown_tv_shows(): + raise HTTPException(status.HTTP_400_BAD_REQUEST, "No such directory") + tv_service.import_existing_tv_show( + tv_show=tv_show, source_directory=source_directory + ) + + @router.get( "/shows/torrents", dependencies=[Depends(current_active_user)], diff --git a/media_manager/tv/schemas.py b/media_manager/tv/schemas.py index cacb963..83fafc8 100644 --- a/media_manager/tv/schemas.py +++ b/media_manager/tv/schemas.py @@ -1,10 +1,12 @@ import typing import uuid +from pathlib import Path from uuid import UUID from pydantic import BaseModel, Field, ConfigDict, model_validator from media_manager.auth.schemas import UserRead +from media_manager.metadataProvider.schemas import MetaDataProviderSearchResult from media_manager.torrent.models import Quality from media_manager.torrent.schemas import TorrentId, TorrentStatus @@ -166,3 +168,8 @@ class PublicShow(BaseModel): library: str seasons: list[PublicSeason] + + +class TvShowImportSuggestion(BaseModel): + directory: Path + candidates: list[MetaDataProviderSearchResult] diff --git a/media_manager/tv/service.py b/media_manager/tv/service.py index fc43cb8..3e6e3e8 100644 --- a/media_manager/tv/service.py +++ b/media_manager/tv/service.py @@ -30,6 +30,7 @@ from media_manager.tv.schemas import ( RichSeasonRequest, EpisodeId, Episode as EpisodeSchema, + TvShowImportSuggestion, ) from media_manager.torrent.schemas import QualityStrings from media_manager.tv.repository import TvRepository @@ -39,7 +40,7 @@ from pathlib import Path from media_manager.torrent.repository import TorrentRepository from media_manager.torrent.utils import ( import_file, - import_torrent, + get_files_for_import, remove_special_characters, ) from media_manager.indexer.service import IndexerService @@ -522,23 +523,9 @@ class TvService: self.delete_season_request(season_request.id) return True - def import_torrent_files(self, torrent: Torrent, show: Show) -> None: - """ - Organizes files from a torrent into the TV directory structure, mapping them to seasons and episodes. - :param torrent: The Torrent object - :param show: The Show object - """ - - video_files, subtitle_files, all_files = import_torrent(torrent=torrent) - - success: bool = True # determines if the import was successful, if true, the Imported flag will be set to True after the import - - log.info( - f"Importing these {len(video_files)} files:\n" + pprint.pformat(video_files) - ) + def get_root_show_directory(self, show: Show): misc_config = AllEncompassingConfig().misc show_directory_name = f"{remove_special_characters(show.name)} ({show.year}) [{show.metadata_provider}id-{show.external_id}]" - show_file_path = None log.debug( f"Show {show.name} without special characters: {remove_special_characters(show.name)}" ) @@ -546,18 +533,131 @@ class TvService: if show.library != "Default": for library in misc_config.tv_libraries: if library.name == show.library: - log.info( + log.debug( f"Using library {library.name} for show {show.name} ({show.year})" ) - show_file_path = Path(library.path) / show_directory_name - break + return Path(library.path) / show_directory_name else: log.warning( f"Library {show.library} not defined in config, using default TV directory." ) - show_file_path = misc_config.tv_directory / show_directory_name + return misc_config.tv_directory / show_directory_name + + def get_root_season_directory(self, show: Show, season_number: int) -> Path: + return self.get_root_show_directory(show) / Path(f"Season {season_number}") + + def import_episode( + self, + show: Show, + season: Season, + episode_number: int, + video_files: list[Path], + subtitle_files: list[Path], + file_path_suffix: str = "", + ) -> None: + episode_file_name = f"{remove_special_characters(show.name)} S{season.number:02d}E{episode_number:02d}" + if file_path_suffix != "": + episode_file_name += f" - {file_path_suffix}" + pattern = ( + r".*[. ]S0?" + str(season.number) + r"E0?" + str(episode_number) + r"[. ].*" + ) + subtitle_pattern = pattern + r"[. ]([A-Za-z]{2})[. ]srt" + target_file_name = ( + self.get_root_season_directory(show=show, season_number=season.number) + / episode_file_name + ) + + # import subtitles + for subtitle_file in subtitle_files: + log.debug( + f"Searching for pattern {subtitle_pattern} in subtitle file: {subtitle_file.name}" + ) + regex_result = re.search( + subtitle_pattern, subtitle_file.name, re.IGNORECASE + ) + if regex_result: + language_code = regex_result.group(1) + log.debug( + f"Found matching pattern: {subtitle_pattern} in subtitle file: {subtitle_file.name}," + + f" extracted language code: {language_code}" + ) + target_subtitle_file = target_file_name.with_suffix( + f".{language_code}.srt" + ) + import_file(target_file=target_subtitle_file, source_file=subtitle_file) + else: + log.debug( + f"Didn't find any pattern {subtitle_pattern} in subtitle file: {subtitle_file.name}" + ) + + # import episode videos + for file in video_files: + log.debug(f"Searching for pattern {pattern} in video file: {file.name}") + if re.search(pattern, file.name, re.IGNORECASE): + log.debug(f"Found matching pattern: {pattern} in file {file.name}") + target_video_file = target_file_name.with_suffix(file.suffix) + import_file(target_file=target_video_file, source_file=file) + break else: - show_file_path = misc_config.tv_directory / show_directory_name + raise Exception( + f"Could not find any video file for episode {episode_number} of show {show.name} S{season.number}" + ) + + def import_season( + self, + show: Show, + season: Season, + video_files: list[Path], + subtitle_files: list[Path], + file_path_suffix: str = "", + ) -> bool: + season_path = self.get_root_season_directory( + show=show, season_number=season.number + ) + success = True + try: + season_path.mkdir(parents=True, exist_ok=True) + except Exception as e: + log.warning(f"Could not create path {season_path}: {e}") + raise Exception(f"Could not create path {season_path}") from e + + for episode in season.episodes: + try: + self.import_episode( + show=show, + subtitle_files=subtitle_files, + video_files=video_files, + season=season, + episode_number=episode.number, + file_path_suffix=file_path_suffix, + ) + except Exception: + # Send notification about missing episode file + if self.notification_service: + self.notification_service.send_notification_to_all_providers( + title="Missing Episode File", + message=f"No video file found for S{season.number:02d}E{episode.number:02d} for show {show.name}. Manual intervention may be required.", + ) + success = False + log.warning( + f"S{season.number}E{episode.number} not found when trying to import episode for show {show.name}." + ) + return success + + def import_torrent_files(self, torrent: Torrent, show: Show) -> None: + """ + Organizes files from a torrent into the TV directory structure, mapping them to seasons and episodes. + :param torrent: The Torrent object + :param show: The Show object + """ + + video_files, subtitle_files, all_files = get_files_for_import(torrent=torrent) + + success: bool = True # determines if the import was successful, if true, the Imported flag will be set to True after the import + + log.debug( + f"Importing these {len(video_files)} files:\n" + pprint.pformat(video_files) + ) season_files = self.torrent_service.get_season_files_of_torrent(torrent=torrent) log.info( @@ -566,89 +666,37 @@ class TvService: for season_file in season_files: season = self.get_season(season_id=season_file.season_id) - season_path = show_file_path / Path(f"Season {season.number}") - try: - season_path.mkdir(parents=True, exist_ok=True) - except Exception as e: - log.warning(f"Could not create path {season_path}: {e}") - for episode in season.episodes: - episode_file_name = f"{remove_special_characters(show.name)} S{season.number:02d}E{episode.number:02d}" - if season_file.file_path_suffix != "": - episode_file_name += f" - {season_file.file_path_suffix}" - pattern = ( - r".*[. ]S0?" - + str(season.number) - + r"E0?" - + str(episode.number) - + r"[. ].*" + if self.import_season( + show=show, + season=season, + video_files=video_files, + subtitle_files=subtitle_files, + file_path_suffix=season_file.file_path, + ): + log.info( + f"Season {season.number} successfully imported from torrent {torrent.title}" ) - subtitle_pattern = pattern + r"[. ]([A-Za-z]{2})[. ]srt" - target_file_name = season_path / episode_file_name + else: + log.warning( + f"Season {season.number} failed to import from torrent {torrent.title}" + ) + success = False - # import subtitles - for subtitle_file in subtitle_files: - log.debug( - f"Searching for pattern {subtitle_pattern} in subtitle file: {subtitle_file.name}" - ) - regex_result = re.search( - subtitle_pattern, subtitle_file.name, re.IGNORECASE - ) - if regex_result: - language_code = regex_result.group(1) - log.debug( - f"Found matching pattern: {subtitle_pattern} in subtitle file: {subtitle_file.name}," - + f" extracted language code: {language_code}" - ) - target_subtitle_file = target_file_name.with_suffix( - f".{language_code}.srt" - ) - import_file( - target_file=target_subtitle_file, source_file=subtitle_file - ) - else: - log.debug( - f"Didn't find any pattern {subtitle_pattern} in subtitle file: {subtitle_file.name}" - ) + log.info( + f"Finished importing files for torrent {torrent.title} {'without' if success else 'with'} errors" + ) - # import episode videos - for file in video_files: - log.debug( - f"Searching for pattern {pattern} in video file: {file.name}" - ) - if re.search(pattern, file.name, re.IGNORECASE): - log.debug( - f"Found matching pattern: {pattern} in file {file.name}" - ) - target_video_file = target_file_name.with_suffix(file.suffix) - import_file(target_file=target_video_file, source_file=file) - break - else: - # Send notification about missing episode file - if self.notification_service: - self.notification_service.send_notification_to_all_providers( - title="Missing Episode File", - message=f"No video file found for S{season.number:02d}E{episode.number:02d} in torrent '{torrent.title}' for show {show.name}. Manual intervention may be required.", - ) - success = False - log.warning( - f"S{season.number}E{episode.number} in Torrent {torrent.title}'s files not found." - ) if success: torrent.imported = True self.torrent_service.torrent_repository.save_torrent(torrent=torrent) # Send successful season download notification if self.notification_service: - season_info = ", ".join( - [f"Season {season_file.season_id}" for season_file in season_files] - ) self.notification_service.send_notification_to_all_providers( title="TV Season Downloaded", - message=f"Successfully downloaded {show.name} ({show.year}) - {season_info}", + message=f"Successfully imported {show.name} ({show.year}) from torrent {torrent.title}.", ) - log.info(f"Finished organizing files for torrent {torrent.title}") - def update_show_metadata( self, db_show: Show, metadata_provider: AbstractMetadataProvider ) -> Show | None: @@ -782,6 +830,50 @@ class TvService: show_id=show_id, continuous_download=continuous_download ) + def detect_unknown_tv_shows(self) -> list[Path]: + tv_libraries = AllEncompassingConfig().misc.tv_libraries + tv_directory = AllEncompassingConfig().misc.tv_directory + show_dirs = tv_directory.glob("*") + log.debug(f"Using Directory {tv_directory}") + unknown_tv_shows = [] + for show_dir in show_dirs: + # check if directory is one created by MediaManager (contins [tmdbd/tvdbid-0000) or if it is a library + if re.search( + r"\[(?:tmdbid|tvdbid)-\d+]", show_dir.name, re.IGNORECASE + ) or show_dir.absolute() in [ + Path(library.path).absolute() for library in tv_libraries + ]: + log.debug(f"MediaManager directory detected: {show_dir.name}") + else: + log.info(f"Detected unknown tv show directory: {show_dir.name}") + unknown_tv_shows.append(show_dir) + return unknown_tv_shows + + def get_import_candidates( + self, tv_show: Path, metadata_provider: AbstractMetadataProvider + ) -> TvShowImportSuggestion: + search_result = self.search_for_show(tv_show.name, metadata_provider) + import_candidates = TvShowImportSuggestion( + directory=tv_show, candidates=search_result + ) + log.debug( + f"Found {len(import_candidates.candidates)} candidates for {import_candidates.directory}" + ) + return import_candidates + + def import_existing_tv_show(self, tv_show: Show, source_directory: Path) -> None: + video_files, subtitle_files, all_files = get_files_for_import( + directory=source_directory + ) + for season in tv_show.seasons: + self.import_season( + show=tv_show, + season=season, + video_files=video_files, + subtitle_files=subtitle_files, + file_path_suffix="IMPORTED", + ) + def auto_download_all_approved_season_requests() -> None: """