make movies be importable

This commit is contained in:
maxid
2025-12-07 19:24:24 +01:00
parent ec93c1abc4
commit d2e8d5eeb7
5 changed files with 106 additions and 34 deletions

View File

@@ -1,7 +1,7 @@
from pathlib import Path
from typing import Annotated
from fastapi import APIRouter, Depends, status
from fastapi.responses import JSONResponse
from fastapi import APIRouter, Depends, status, HTTPException
from media_manager.auth.schemas import UserRead
from media_manager.auth.users import current_active_user, current_superuser
@@ -11,6 +11,8 @@ from media_manager.indexer.schemas import (
IndexerQueryResult,
)
from media_manager.metadataProvider.schemas import MetaDataProviderSearchResult
from media_manager.schemas import MediaImportSuggestion
from media_manager.torrent.utils import detect_unknown_media
from media_manager.torrent.schemas import Torrent
from media_manager.movies import log
from media_manager.exceptions import MediaAlreadyExists
@@ -47,8 +49,7 @@ router = APIRouter()
status.HTTP_201_CREATED: {
"model": Movie,
"description": "Successfully created movie",
},
status.HTTP_409_CONFLICT: {"model": str, "description": "Movie already exists"},
}
},
)
def add_a_movie(
@@ -61,9 +62,9 @@ def add_a_movie(
external_id=movie_id,
metadata_provider=metadata_provider,
)
except MediaAlreadyExists as e:
return JSONResponse(
status_code=status.HTTP_409_CONFLICT, content={"message": str(e)}
except ValueError:
movie = movie_service.get_movie_by_external_id(
external_id=movie_id, metadata_provider=metadata_provider.name
)
return movie
@@ -73,6 +74,49 @@ def add_a_movie(
# --------------------------------
@router.get(
"/importable",
status_code=status.HTTP_200_OK,
dependencies=[Depends(current_superuser)],
response_model=list[MediaImportSuggestion],
)
def get_all_importable_movies(
movie_service: movie_service_dep, metadata_provider: metadata_provider_dep
):
"""
get a list of unknown movies that were detected in the movie directory and are importable
"""
directories = detect_unknown_media(AllEncompassingConfig().misc.movie_directory)
movies = []
for directory in directories:
movies.append(
movie_service.get_import_candidates(
movie=directory, metadata_provider=metadata_provider
)
)
return movies
@router.post(
"/importable/{movie_id}",
dependencies=[Depends(current_superuser)],
status_code=status.HTTP_204_NO_CONTENT,
)
def import_detected_movie(
movie_service: movie_service_dep, movie_id: MovieId, directory: str
):
"""
get a list of unknown movies that were detected in the tv directory and are importable
"""
source_directory = Path(directory)
if source_directory not in detect_unknown_media(
AllEncompassingConfig().misc.movie_directory
):
raise HTTPException(status.HTTP_400_BAD_REQUEST, "No such directory")
movie = movie_service.get_movie_by_id(movie_id=movie_id)
movie_service.import_existing_movie(movie=movie, source_directory=source_directory)
@router.get(
"",
dependencies=[Depends(current_active_user)],

View File

@@ -13,6 +13,7 @@ from media_manager.indexer.schemas import IndexerQueryResultId
from media_manager.indexer.utils import evaluate_indexer_query_results
from media_manager.metadataProvider.schemas import MetaDataProviderSearchResult
from media_manager.notification.service import NotificationService
from media_manager.schemas import MediaImportSuggestion
from media_manager.torrent.schemas import Torrent, TorrentStatus
from media_manager.torrent.service import TorrentService
from media_manager.movies import log
@@ -35,6 +36,7 @@ from media_manager.torrent.utils import (
import_file,
get_files_for_import,
remove_special_characters,
strip_trailing_year,
)
from media_manager.indexer.service import IndexerService
from media_manager.metadataProvider.abstractMetaDataProvider import (
@@ -463,7 +465,7 @@ class MovieService:
return True
def get_movie_root_path(self, movie: Movie) -> Path:
misc_config = AllEncompassingConfig().misc_config
misc_config = AllEncompassingConfig().misc
movie_file_path = (
misc_config.movie_directory
/ f"{remove_special_characters(movie.name)} ({movie.year}) [{movie.metadata_provider}id-{movie.external_id}]"
@@ -584,6 +586,34 @@ class MovieService:
log.info(f"Finished importing files for torrent {torrent.title}")
def get_import_candidates(
self, movie: Path, metadata_provider: AbstractMetadataProvider
) -> MediaImportSuggestion:
search_result = self.search_for_movie(
strip_trailing_year(movie.name), metadata_provider
)
import_candidates = MediaImportSuggestion(
directory=movie, candidates=search_result
)
log.debug(
f"Found {len(import_candidates.candidates)} candidates for {import_candidates.directory}"
)
return import_candidates
def import_existing_movie(self, movie: Movie, source_directory: Path) -> None:
video_files, subtitle_files, all_files = get_files_for_import(
directory=source_directory
)
self.import_movie(
movie=movie,
video_files=video_files,
subtitle_files=subtitle_files,
file_path_suffix="IMPORTED",
)
new_source_path = source_directory.parent / ("." + source_directory.name)
source_directory.rename(new_source_path)
def update_movie_metadata(
self, db_movie: Movie, metadata_provider: AbstractMetadataProvider
) -> Movie | None:

View File

@@ -1,9 +1,7 @@
import logging
import re
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from pathlib import Path
from media_manager.config import AllEncompassingConfig
@@ -25,26 +23,3 @@ def send_email(subject: str, html: str, addressee: str) -> None:
server.sendmail(email_conf.from_email, addressee, message.as_string())
log.info(f"Successfully sent email to {addressee} with subject: {subject}")
def detect_unknown_media(path: Path) -> list[Path]:
libraries = []
libraries.extend(AllEncompassingConfig().misc.movie_libraries)
libraries.extend(AllEncompassingConfig().misc.tv_libraries)
show_dirs = path.glob("*")
log.debug(f"Using Directory {path}")
unknown_tv_shows = []
for media_dir in show_dirs:
# check if directory is one created by MediaManager (contins [tmdbd/tvdbid-0000) or if it is a library
if (
re.search(r"\[(?:tmdbid|tvdbid)-\d+]", media_dir.name, re.IGNORECASE)
or media_dir.absolute()
in [Path(library.path).absolute() for library in libraries]
or media_dir.name.startswith(".")
):
log.debug(f"MediaManager directory detected: {media_dir.name}")
else:
log.info(f"Detected unknown media directory: {media_dir.name}")
unknown_tv_shows.append(media_dir)
return unknown_tv_shows

View File

@@ -183,3 +183,26 @@ def strip_trailing_year(title: str) -> str:
Removes a trailing space + (4-digit year) at end of string
"""
return re.sub(r"\s*\(\d{4}\)\s*$", "", title).strip()
def detect_unknown_media(path: Path) -> list[Path]:
libraries = []
libraries.extend(AllEncompassingConfig().misc.movie_libraries)
libraries.extend(AllEncompassingConfig().misc.tv_libraries)
show_dirs = path.glob("*")
log.debug(f"Using Directory {path}")
unknown_tv_shows = []
for media_dir in show_dirs:
# check if directory is one created by MediaManager (contins [tmdbd/tvdbid-0000) or if it is a library
if (
re.search(r"\[(?:tmdbid|tvdbid)-\d+]", media_dir.name, re.IGNORECASE)
or media_dir.absolute()
in [Path(library.path).absolute() for library in libraries]
or media_dir.name.startswith(".")
):
log.debug(f"MediaManager directory detected: {media_dir.name}")
else:
log.info(f"Detected unknown media directory: {media_dir.name}")
unknown_tv_shows.append(media_dir)
return unknown_tv_shows

View File

@@ -12,7 +12,7 @@ from media_manager.indexer.schemas import (
IndexerQueryResult,
)
from media_manager.metadataProvider.schemas import MetaDataProviderSearchResult
from media_manager.notification.utils import detect_unknown_media
from media_manager.torrent.utils import detect_unknown_media
from media_manager.torrent.schemas import Torrent
from media_manager.tv import log
from media_manager.exceptions import MediaAlreadyExists