refactor: standardize formatting and improve code consistency across components

This commit is contained in:
maxDorninger
2025-05-17 23:43:24 +02:00
parent ef7b020043
commit bae450f7a4
31 changed files with 1375 additions and 991 deletions

View File

@@ -2,10 +2,11 @@ from sqlalchemy import select, delete
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import Session, joinedload
from torrent.schemas import TorrentId
from torrent.models import Torrent
from torrent.schemas import TorrentId, Torrent as TorrentSchema
from tv.models import Season, Show, Episode, SeasonRequest, SeasonFile
from tv.schemas import Season as SeasonSchema, SeasonId, Show as ShowSchema, ShowId, \
SeasonRequest as SeasonRequestSchema, SeasonFile as SeasonFileSchema
SeasonRequest as SeasonRequestSchema, SeasonFile as SeasonFileSchema, SeasonNumber
def get_show(show_id: ShowId, db: Session) -> ShowSchema | None:
@@ -20,7 +21,7 @@ def get_show(show_id: ShowId, db: Session) -> ShowSchema | None:
select(Show)
.where(Show.id == show_id)
.options(
joinedload(Show.seasons).joinedload(Season.episodes) # Load relationships
joinedload(Show.seasons).joinedload(Season.episodes)
)
)
@@ -30,7 +31,6 @@ def get_show(show_id: ShowId, db: Session) -> ShowSchema | None:
return ShowSchema.model_validate(result)
def get_show_by_external_id(external_id: int, db: Session, metadata_provider: str) -> ShowSchema | None:
"""
Retrieve a show by its external ID, including nested seasons and episodes.
@@ -45,7 +45,7 @@ def get_show_by_external_id(external_id: int, db: Session, metadata_provider: st
.where(Show.external_id == external_id)
.where(Show.metadata_provider == metadata_provider)
.options(
joinedload(Show.seasons).joinedload(Season.episodes) # Load relationships
joinedload(Show.seasons).joinedload(Season.episodes)
)
)
@@ -91,7 +91,7 @@ def save_show(show: ShowSchema, db: Session) -> ShowSchema:
seasons=[
Season(
id=season.id,
show_id=show.id, # Correctly linking to the parent show
show_id=show.id,
number=season.number,
external_id=season.external_id,
name=season.name,
@@ -99,13 +99,13 @@ def save_show(show: ShowSchema, db: Session) -> ShowSchema:
episodes=[
Episode(
id=episode.id,
season_id=season.id, # Correctly linking to the parent season
season_id=season.id,
number=episode.number,
external_id=episode.external_id,
title=episode.title
) for episode in season.episodes # Convert episodes properly
) for episode in season.episodes
]
) for season in show.seasons # Convert seasons properly
) for season in show.seasons
]
)
@@ -192,3 +192,56 @@ def remove_season_files_by_torrent_id(db: Session, torrent_id: TorrentId):
where(SeasonFile.torrent_id == torrent_id)
)
db.execute(stmt)
def get_season_files_by_season_id(db: Session, season_id: SeasonId):
stmt = (
select(SeasonFile).
where(SeasonFile.season_id == season_id)
)
result = db.execute(stmt).scalars().all()
return [SeasonFileSchema.model_validate(season_file) for season_file in result]
def get_torrents_by_show_id(db: Session, show_id: ShowId) -> list[TorrentSchema]:
stmt = (
select(Torrent)
.distinct()
.join(SeasonFile, SeasonFile.torrent_id == Torrent.id)
.join(Season, Season.id == SeasonFile.season_id)
.where(Season.show_id == show_id)
)
result = db.execute(stmt).scalars().unique().all()
return [TorrentSchema.model_validate(torrent) for torrent in result]
def get_all_shows_with_torrents(db: Session) -> list[ShowSchema]:
"""
Retrieve all shows that are associated with a torrent alphabetically from the database.
:param db: The database session.
:return: A list of ShowSchema objects.
"""
stmt = (
select(Show)
.distinct()
.join(Season, Show.id == Season.show_id)
.join(SeasonFile, Season.id == SeasonFile.season_id)
.join(Torrent, SeasonFile.torrent_id == Torrent.id)
.options(joinedload(Show.seasons).joinedload(Season.episodes))
.order_by(Show.name)
)
results = db.execute(stmt).scalars().unique().all()
return [ShowSchema.model_validate(show) for show in results]
def get_seasons_by_torrent_id(db: Session, torrent_id: TorrentId) -> list[SeasonNumber]:
stmt = (
select(Season.number)
.distinct()
.join(SeasonFile, SeasonFile.torrent_id == Torrent.id)
.join(Season, Season.id == SeasonFile.season_id)
.where(Torrent.id == torrent_id)
.select_from(Torrent)
)
result = db.execute(stmt).scalars().unique().all()
return [SeasonNumber(x) for x in result]

View File

@@ -1,3 +1,6 @@
import logging
import pprint
from fastapi import APIRouter, Depends, status
from fastapi.responses import JSONResponse
@@ -9,7 +12,7 @@ from indexer.schemas import PublicIndexerQueryResult, IndexerQueryResultId
from metadataProvider.schemas import MetaDataProviderShowSearchResult
from torrent.schemas import Torrent
from tv.exceptions import MediaAlreadyExists
from tv.schemas import Show, SeasonRequest, ShowId
from tv.schemas import Show, SeasonRequest, ShowId, RichShowTorrent, PublicShow
router = APIRouter()
@@ -43,22 +46,22 @@ def delete_a_show(db: DbSessionDependency, show_id: ShowId):
@router.get("/shows", dependencies=[Depends(current_active_user)], response_model=list[Show])
def get_all_shows(db: DbSessionDependency, external_id: int = None, metadata_provider: str = "tmdb"):
""""""
if external_id is not None:
return tv.service.get_show_by_external_id(db=db, external_id=external_id, metadata_provider=metadata_provider)
else:
return tv.service.get_all_shows(db=db)
@router.get("/shows/{show_id}", dependencies=[Depends(current_active_user)], response_model=Show)
@router.get("/shows/torrents", dependencies=[Depends(current_active_user)], response_model=list[RichShowTorrent])
def get_shows_with_torrents(db: DbSessionDependency):
"""
get all shows that are associated with torrents
:return: A list of shows with all their torrents
"""
result = tv.service.get_all_shows_with_torrents(db=db)
return result
@router.get("/shows/{show_id}", dependencies=[Depends(current_active_user)], response_model=PublicShow)
def get_a_show(db: DbSessionDependency, show_id: ShowId):
"""
:param show_id:
:type show_id:
:return:
:rtype:
"""
return tv.service.get_show_by_id(db=db, show_id=show_id)

View File

@@ -5,7 +5,7 @@ from uuid import UUID
from pydantic import BaseModel, Field, ConfigDict
from torrent.models import Quality
from torrent.schemas import TorrentId
from torrent.schemas import TorrentId, TorrentStatus
ShowId = typing.NewType("ShowId", UUID)
SeasonId = typing.NewType("SeasonId", UUID)
@@ -66,5 +66,56 @@ class SeasonFile(BaseModel):
season_id: SeasonId
quality: Quality
torrent_id: TorrentId
torrent_id: TorrentId | None
file_path_suffix: str
class RichSeasonTorrent(BaseModel):
model_config = ConfigDict(from_attributes=True)
torrent_id: TorrentId
torrent_title: str
status: TorrentStatus
quality: Quality
imported: bool
file_path_suffix: str
seasons: list[SeasonNumber]
class RichShowTorrent(BaseModel):
model_config = ConfigDict(from_attributes=True)
show_id: ShowId
name: str
year: int | None
metadata_provider: str
torrents: list[RichSeasonTorrent]
class PublicSeason(BaseModel):
model_config = ConfigDict(from_attributes=True)
id: SeasonId
number: SeasonNumber
downloaded: bool = False
name: str
overview: str
external_id: int
episodes: list[Episode]
class PublicShow(BaseModel):
model_config = ConfigDict(from_attributes=True)
id: ShowId
name: str
overview: str
year: int | None
external_id: int
metadata_provider: str
seasons: list[PublicSeason]

View File

@@ -2,16 +2,19 @@ from sqlalchemy.orm import Session
import indexer.service
import metadataProvider
import torrent.repository
import tv.repository
from indexer import IndexerQueryResult
from indexer.schemas import IndexerQueryResultId
from metadataProvider.schemas import MetaDataProviderShowSearchResult
from torrent.repository import get_seasons_files_of_torrent
from torrent.schemas import Torrent
from torrent.service import TorrentService
from tv import log
from tv.exceptions import MediaAlreadyExists
from tv.repository import add_season_file
from tv.schemas import Show, ShowId, SeasonRequest, SeasonFile, SeasonId, Season
from tv.repository import add_season_file, get_season_files_by_season_id
from tv.schemas import Show, ShowId, SeasonRequest, SeasonFile, SeasonId, Season, RichShowTorrent, RichSeasonTorrent, \
PublicSeason, PublicShow
def add_show(db: Session, external_id: int, metadata_provider: str) -> Show | None:
@@ -82,13 +85,42 @@ def search_for_show(query: str, metadata_provider: str, db: Session) -> list[Met
result.added = True
return results
def get_show_by_id(db: Session, show_id: ShowId) -> Show | None:
return tv.repository.get_show(show_id=show_id, db=db)
def get_show_by_id(db: Session, show_id: ShowId) -> PublicShow:
show = tv.repository.get_show(show_id=show_id, db=db)
seasons = [PublicSeason.model_validate(season) for season in show.seasons]
for season in seasons:
season.downloaded = is_season_downloaded(db=db, season_id=season.id)
public_show = PublicShow.model_validate(show)
public_show.seasons = seasons
return public_show
def is_season_downloaded(db: Session, season_id: SeasonId) -> bool:
season_files = get_season_files_by_season_id(db=db, season_id=season_id)
for season_file in season_files:
if season_file.torrent_id is None:
return True
else:
torrent_file = torrent.repository.get_torrent_by_id(db=db, torrent_id=season_file.torrent_id)
if torrent_file.imported:
return True
return False
def check_if_season_exists_on_file(db: Session, season_id: SeasonId) -> bool:
season = tv.repository.get_season(season_id=season_id, db=db)
if season:
return True
else:
raise ValueError(f"A season with this ID {season_id} does not exist")
def get_show_by_external_id(db: Session, external_id: int, metadata_provider: str) -> Show | None:
return tv.repository.get_show_by_external_id(external_id=external_id, metadata_provider=metadata_provider, db=db)
def get_season(db: Session, season_id: SeasonId) -> Season:
return tv.repository.get_season(season_id=season_id, db=db)
@@ -97,6 +129,27 @@ def get_all_requested_seasons(db: Session) -> list[SeasonRequest]:
return tv.repository.get_season_requests(db=db)
def get_all_shows_with_torrents(db: Session) -> list[RichShowTorrent]:
shows = tv.repository.get_all_shows_with_torrents(db=db)
result = []
for show in shows:
show_torrents = tv.repository.get_torrents_by_show_id(db=db, show_id=show.id)
rich_season_torrents = []
for torrent in show_torrents:
seasons = tv.repository.get_seasons_by_torrent_id(db=db, torrent_id=torrent.id)
season_files = get_seasons_files_of_torrent(db=db, torrent_id=torrent.id)
file_path_suffix = season_files[0].file_path_suffix
season_torrent = RichSeasonTorrent(torrent_id=torrent.id, torrent_title=torrent.title,
status=torrent.status, quality=torrent.quality,
imported=torrent.imported, seasons=seasons,
file_path_suffix=file_path_suffix
)
rich_season_torrents.append(season_torrent)
result.append(RichShowTorrent(show_id=show.id, name=show.name, year=show.year,
metadata_provider=show.metadata_provider, torrents=rich_season_torrents))
return result
def download_torrent(db: Session, public_indexer_result_id: IndexerQueryResultId, show_id: ShowId,
override_show_file_path_suffix: str = "") -> Torrent:
indexer_result = indexer.service.get_indexer_query_result(db=db, result_id=public_indexer_result_id)