run linter and formatter

This commit is contained in:
maxDorninger
2025-06-07 13:29:48 +02:00
parent 1fab5d8056
commit 231c36efe0
16 changed files with 116 additions and 116 deletions

View File

@@ -65,17 +65,17 @@ class DbConfig(BaseSettings):
db_config = DbConfig()
db_url = (
"postgresql+psycopg"
+ "://"
+ db_config.USER
+ ":"
+ db_config.PASSWORD
+ "@"
+ db_config.HOST
+ ":"
+ str(db_config.PORT)
+ "/"
+ db_config.DBNAME
"postgresql+psycopg"
+ "://"
+ db_config.USER
+ ":"
+ db_config.PASSWORD
+ "@"
+ db_config.HOST
+ ":"
+ str(db_config.PORT)
+ "/"
+ db_config.DBNAME
)
config.set_main_option("sqlalchemy.url", db_url)

View File

@@ -21,20 +21,20 @@ class OAuth2AuthorizeResponse(BaseModel):
def generate_state_token(
data: dict[str, str], secret: SecretType, lifetime_seconds: int = 3600
data: dict[str, str], secret: SecretType, lifetime_seconds: int = 3600
) -> str:
data["aud"] = STATE_TOKEN_AUDIENCE
return generate_jwt(data, secret, lifetime_seconds)
def get_oauth_router(
oauth_client: BaseOAuth2,
backend: AuthenticationBackend[models.UP, models.ID],
get_user_manager: UserManagerDependency[models.UP, models.ID],
state_secret: SecretType,
redirect_url: Optional[str] = None,
associate_by_email: bool = False,
is_verified_by_default: bool = False,
oauth_client: BaseOAuth2,
backend: AuthenticationBackend[models.UP, models.ID],
get_user_manager: UserManagerDependency[models.UP, models.ID],
state_secret: SecretType,
redirect_url: Optional[str] = None,
associate_by_email: bool = False,
is_verified_by_default: bool = False,
) -> APIRouter:
"""Generate a router with the OAuth routes."""
router = APIRouter()
@@ -57,7 +57,7 @@ def get_oauth_router(
response_model=OAuth2AuthorizeResponse,
)
async def authorize(
request: Request, scopes: list[str] = ["openid", "profile", "email"]
request: Request, scopes: list[str] = ["openid", "profile", "email"]
) -> OAuth2AuthorizeResponse:
if redirect_url is not None:
authorize_redirect_url = redirect_url
@@ -99,12 +99,12 @@ def get_oauth_router(
},
)
async def callback(
request: Request,
access_token_state: tuple[OAuth2Token, str] = Depends(
oauth2_authorize_callback
),
user_manager: BaseUserManager[models.UP, models.ID] = Depends(get_user_manager),
strategy: Strategy[models.UP, models.ID] = Depends(backend.get_strategy),
request: Request,
access_token_state: tuple[OAuth2Token, str] = Depends(
oauth2_authorize_callback
),
user_manager: BaseUserManager[models.UP, models.ID] = Depends(get_user_manager),
strategy: Strategy[models.UP, models.ID] = Depends(backend.get_strategy),
):
token, state = access_token_state
account_id, account_email = await oauth_client.get_id_email(

View File

@@ -25,8 +25,8 @@ SECRET = config.token_secret
LIFETIME = config.session_lifetime
if (
os.getenv("OPENID_ENABLED") is not None
and os.getenv("OPENID_ENABLED").upper() == "TRUE"
os.getenv("OPENID_ENABLED") is not None
and os.getenv("OPENID_ENABLED").upper() == "TRUE"
):
openid_config = OpenIdConfig()
openid_client = OpenID(
@@ -53,17 +53,17 @@ class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]):
await self.update(user=user, user_update=updated_user)
async def on_after_forgot_password(
self, user: User, token: str, request: Optional[Request] = None
self, user: User, token: str, request: Optional[Request] = None
):
print(f"User {user.id} has forgot their password. Reset token: {token}")
async def on_after_reset_password(
self, user: User, request: Optional[Request] = None
self, user: User, request: Optional[Request] = None
):
print(f"User {user.id} has reset their password.")
async def on_after_request_verify(
self, user: User, token: str, request: Optional[Request] = None
self, user: User, token: str, request: Optional[Request] = None
):
print(f"Verification requested for user {user.id}. Verification token: {token}")

View File

@@ -14,17 +14,17 @@ log = logging.getLogger(__name__)
config = DbConfig()
db_url = (
"postgresql+psycopg"
+ "://"
+ config.USER
+ ":"
+ config.PASSWORD
+ "@"
+ config.HOST
+ ":"
+ str(config.PORT)
+ "/"
+ config.DBNAME
"postgresql+psycopg"
+ "://"
+ config.USER
+ ":"
+ config.PASSWORD
+ "@"
+ config.HOST
+ ":"
+ str(config.PORT)
+ "/"
+ config.DBNAME
)
engine = create_engine(db_url, echo=False)

View File

@@ -33,8 +33,8 @@ class Jackett(GenericIndexer):
for indexer in self.indexers:
log.debug(f"Searching in indexer: {indexer}")
url = (
self.url
+ f"/api/v2.0/indexers/{indexer}/results/torznab/api?apikey={self.api_key}&t=tvsearch&q={query}"
self.url
+ f"/api/v2.0/indexers/{indexer}/results/torznab/api?apikey={self.api_key}&t=tvsearch&q={query}"
)
response = requests.get(url)
responses.append(response)

View File

@@ -8,7 +8,7 @@ from media_manager.indexer.schemas import (
def get_result(
result_id: IndexerQueryResultId, db: Session
result_id: IndexerQueryResultId, db: Session
) -> IndexerQueryResultSchema:
return IndexerQueryResultSchema.model_validate(
db.get(IndexerQueryResult, result_id)
@@ -16,7 +16,7 @@ def get_result(
def save_result(
result: IndexerQueryResultSchema, db: Session
result: IndexerQueryResultSchema, db: Session
) -> IndexerQueryResultSchema:
db.add(IndexerQueryResult(**result.model_dump()))
return result

View File

@@ -20,6 +20,6 @@ def search(query: str, db: Session) -> list[IndexerQueryResult]:
def get_indexer_query_result(
result_id: IndexerQueryResultId, db: Session
result_id: IndexerQueryResultId, db: Session
) -> IndexerQueryResult:
return media_manager.indexer.repository.get_result(result_id=result_id, db=db)

View File

@@ -19,7 +19,7 @@ def get_show_metadata(id: int = None, provider: str = "tmdb") -> Show:
@cached(search_show_cache)
def search_show(
query: str | None = None, provider: str = "tmdb"
query: str | None = None, provider: str = "tmdb"
) -> list[MetaDataProviderShowSearchResult]:
"""
If no query is provided, it will return the most popular shows.

View File

@@ -77,10 +77,10 @@ class TmdbMetadataProvider(AbstractMetadataProvider):
# all pictures from TMDB should already be jpeg, so no need to convert
if show_metadata["poster_path"] is not None:
poster_url = (
"https://image.tmdb.org/t/p/original" + show_metadata["poster_path"]
"https://image.tmdb.org/t/p/original" + show_metadata["poster_path"]
)
if media_manager.metadataProvider.utils.download_poster_image(
storage_path=self.storage_path, poster_url=poster_url, show=show
storage_path=self.storage_path, poster_url=poster_url, show=show
):
log.info("Successfully downloaded poster image for show " + show.name)
else:
@@ -91,7 +91,7 @@ class TmdbMetadataProvider(AbstractMetadataProvider):
return show
def search_show(
self, query: str | None = None, max_pages: int = 5
self, query: str | None = None, max_pages: int = 5
) -> list[MetaDataProviderShowSearchResult]:
"""
Search for shows using TMDB API.
@@ -118,7 +118,7 @@ class TmdbMetadataProvider(AbstractMetadataProvider):
try:
if result["poster_path"] is not None:
poster_url = (
"https://image.tmdb.org/t/p/original" + result["poster_path"]
"https://image.tmdb.org/t/p/original" + result["poster_path"]
)
else:
poster_url = None

View File

@@ -81,7 +81,7 @@ class TvdbMetadataProvider(AbstractMetadataProvider):
return show
def search_show(
self, query: str | None = None
self, query: str | None = None
) -> list[MetaDataProviderShowSearchResult]:
if query is None:
results = self.tvdb_client.get_all_series()

View File

@@ -8,7 +8,7 @@ from media_manager.tv.schemas import SeasonFile as SeasonFileSchema, Show as Sho
def get_seasons_files_of_torrent(
db: Session, torrent_id: TorrentId
db: Session, torrent_id: TorrentId
) -> list[SeasonFileSchema]:
stmt = select(SeasonFile).where(SeasonFile.torrent_id == torrent_id)
result = db.execute(stmt).scalars().all()

View File

@@ -202,8 +202,8 @@ class TorrentService:
# Fetch show and season information
show: Show = get_show_of_torrent(db=self.db, torrent_id=torrent.id)
show_file_path = (
BasicConfig().tv_directory
/ f"{show.name} ({show.year}) [{show.metadata_provider}id-{show.external_id}]"
BasicConfig().tv_directory
/ f"{show.name} ({show.year}) [{show.metadata_provider}id-{show.external_id}]"
)
season_files: list[SeasonFile] = get_seasons_files_of_torrent(
db=self.db, torrent_id=torrent.id
@@ -232,11 +232,11 @@ class TorrentService:
episode_file_name += f" - {season_file.file_path_suffix}"
pattern = (
r".*[.]S0?"
+ str(season.number)
+ r"E0?"
+ str(episode.number)
+ r"[.].*"
r".*[.]S0?"
+ str(season.number)
+ r"E0?"
+ str(episode.number)
+ r"[.].*"
)
subtitle_pattern = pattern + r"[.]([A-Za-z]{2})[.]srt"
target_file_name = season_path / episode_file_name

View File

@@ -16,7 +16,7 @@ tv_repository_dep = Annotated[TvRepository, Depends(get_tv_repository)]
def get_tv_service(
tv_repository: tv_repository_dep,
tv_repository: tv_repository_dep,
) -> TvService:
return TvService(tv_repository)
@@ -25,8 +25,8 @@ tv_service_dep = Annotated[TvService, Depends(get_tv_service)]
def get_show_by_id(
tv_service: tv_service_dep,
show_id: ShowId = Path(..., description="The ID of the show"),
tv_service: tv_service_dep,
show_id: ShowId = Path(..., description="The ID of the show"),
) -> Show:
show = tv_service.get_show_by_id(show_id)
return show
@@ -36,8 +36,8 @@ show_dep = Annotated[Show, Depends(get_show_by_id)]
def get_season_by_id(
tv_service: tv_service_dep,
season_id: SeasonId = Path(..., description="The ID of the season"),
tv_service: tv_service_dep,
season_id: SeasonId = Path(..., description="The ID of the season"),
) -> Season:
return tv_service.get_season(season_id=season_id)

View File

@@ -59,7 +59,7 @@ class TvRepository:
raise
def get_show_by_external_id(
self, external_id: int, metadata_provider: str
self, external_id: int, metadata_provider: str
) -> ShowSchema:
"""
Retrieve a show by its external ID, including nested seasons and episodes.
@@ -229,7 +229,7 @@ class TvRepository:
raise
def add_season_request(
self, season_request: SeasonRequestSchema
self, season_request: SeasonRequestSchema
) -> SeasonRequestSchema:
"""
Adds a Season to the SeasonRequest table, which marks it as requested.
@@ -419,7 +419,7 @@ class TvRepository:
raise
def get_season_files_by_season_id(
self, season_id: SeasonId
self, season_id: SeasonId
) -> list[SeasonFileSchema]:
"""
Retrieve all season files for a given season ID.
@@ -521,7 +521,7 @@ class TvRepository:
raise
def get_season_request(
self, season_request_id: SeasonRequestId
self, season_request_id: SeasonRequestId
) -> SeasonRequestSchema:
"""
Retrieve a season request by its ID.

View File

@@ -51,7 +51,7 @@ router = APIRouter()
},
)
def add_a_show(
tv_service: tv_service_dep, show_id: int, metadata_provider: str = "tmdb"
tv_service: tv_service_dep, show_id: int, metadata_provider: str = "tmdb"
):
try:
show = tv_service.add_show(
@@ -83,7 +83,7 @@ def delete_a_show(tv_repository: tv_repository_dep, show_id: ShowId):
"/shows", dependencies=[Depends(current_active_user)], response_model=list[Show]
)
def get_all_shows(
tv_service: tv_service_dep, external_id: int = None, metadata_provider: str = "tmdb"
tv_service: tv_service_dep, external_id: int = None, metadata_provider: str = "tmdb"
):
if external_id is not None:
return tv_service.get_show_by_external_id(
@@ -132,9 +132,9 @@ def get_a_shows_torrents(show: show_dep, tv_service: tv_service_dep):
@router.post("/seasons/requests", status_code=status.HTTP_204_NO_CONTENT)
def request_a_season(
user: Annotated[User, Depends(current_active_user)],
season_request: CreateSeasonRequest,
tv_service: tv_service_dep,
user: Annotated[User, Depends(current_active_user)],
season_request: CreateSeasonRequest,
tv_service: tv_service_dep,
):
"""
adds request flag to a season
@@ -163,9 +163,9 @@ def get_season_requests(tv_service: tv_service_dep) -> list[RichSeasonRequest]:
status_code=status.HTTP_204_NO_CONTENT,
)
def delete_season_request(
tv_service: tv_service_dep,
user: Annotated[User, Depends(current_active_user)],
request_id: SeasonRequestId,
tv_service: tv_service_dep,
user: Annotated[User, Depends(current_active_user)],
request_id: SeasonRequestId,
):
request = tv_service.get_season_request_by_id(season_request_id=request_id)
if user.is_superuser or request.requested_by.id == user.id:
@@ -185,10 +185,10 @@ def delete_season_request(
"/seasons/requests/{season_request_id}", status_code=status.HTTP_204_NO_CONTENT
)
def authorize_request(
tv_service: tv_service_dep,
user: Annotated[User, Depends(current_superuser)],
season_request_id: SeasonRequestId,
authorized_status: bool = False,
tv_service: tv_service_dep,
user: Annotated[User, Depends(current_superuser)],
season_request_id: SeasonRequestId,
authorized_status: bool = False,
):
"""
updates the request flag to true
@@ -206,9 +206,9 @@ def authorize_request(
@router.put("/seasons/requests", status_code=status.HTTP_204_NO_CONTENT)
def update_request(
tv_service: tv_service_dep,
user: Annotated[User, Depends(current_active_user)],
season_request: UpdateSeasonRequest,
tv_service: tv_service_dep,
user: Annotated[User, Depends(current_active_user)],
season_request: UpdateSeasonRequest,
):
updated_season_request: SeasonRequest = SeasonRequest.model_validate(season_request)
request = tv_service.get_season_request_by_id(
@@ -226,7 +226,7 @@ def update_request(
response_model=list[PublicSeasonFile],
)
def get_season_files(
season: season_dep, tv_service: tv_service_dep
season: season_dep, tv_service: tv_service_dep
) -> list[PublicSeasonFile]:
return tv_service.get_public_season_files_by_season_id(season_id=season.id)
@@ -244,10 +244,10 @@ def get_season_files(
response_model=list[PublicIndexerQueryResult],
)
def get_torrents_for_a_season(
tv_service: tv_service_dep,
show_id: ShowId,
season_number: int = 1,
search_query_override: str = None,
tv_service: tv_service_dep,
show_id: ShowId,
season_number: int = 1,
search_query_override: str = None,
):
return tv_service.get_all_available_torrents_for_a_season(
season_number=season_number,
@@ -264,10 +264,10 @@ def get_torrents_for_a_season(
dependencies=[Depends(current_superuser)],
)
def download_a_torrent(
tv_service: tv_service_dep,
public_indexer_result_id: IndexerQueryResultId,
show_id: ShowId,
override_file_path_suffix: str = "",
tv_service: tv_service_dep,
public_indexer_result_id: IndexerQueryResultId,
show_id: ShowId,
override_file_path_suffix: str = "",
):
return tv_service.download_torrent(
public_indexer_result_id=public_indexer_result_id,
@@ -287,7 +287,7 @@ def download_a_torrent(
response_model=list[MetaDataProviderShowSearchResult],
)
def search_metadata_providers_for_a_show(
tv_service: tv_service_dep, query: str, metadata_provider: str = "tmdb"
tv_service: tv_service_dep, query: str, metadata_provider: str = "tmdb"
):
return tv_service.search_for_show(query=query, metadata_provider=metadata_provider)

View File

@@ -59,7 +59,7 @@ class TvService:
return self.tv_repository.add_season_request(season_request=season_request)
def get_season_request_by_id(
self, season_request_id: SeasonRequestId
self, season_request_id: SeasonRequestId
) -> SeasonRequest | None:
"""
Get a season request by its ID.
@@ -90,7 +90,7 @@ class TvService:
self.tv_repository.delete_season_request(season_request_id=season_request_id)
def get_public_season_files_by_season_id(
self, season_id: SeasonId
self, season_id: SeasonId
) -> list[PublicSeasonFile]:
"""
Get all public season files for a given season ID.
@@ -110,10 +110,10 @@ class TvService:
return result
def check_if_show_exists(
self,
external_id: int = None,
metadata_provider: str = None,
show_id: ShowId = None,
self,
external_id: int = None,
metadata_provider: str = None,
show_id: ShowId = None,
) -> bool:
"""
Check if a show exists in the database.
@@ -144,7 +144,7 @@ class TvService:
)
def get_all_available_torrents_for_a_season(
self, season_number: int, show_id: ShowId, search_query_override: str = None
self, season_number: int, show_id: ShowId, search_query_override: str = None
) -> list[IndexerQueryResult]:
"""
Get all available torrents for a given season.
@@ -190,7 +190,7 @@ class TvService:
return self.tv_repository.get_shows()
def search_for_show(
self, query: str, metadata_provider: str
self, query: str, metadata_provider: str
) -> list[MetaDataProviderShowSearchResult]:
"""
Search for shows using a given query.
@@ -202,13 +202,13 @@ class TvService:
results = media_manager.metadataProvider.search_show(query, metadata_provider)
for result in results:
if self.check_if_show_exists(
external_id=result.external_id, metadata_provider=metadata_provider
external_id=result.external_id, metadata_provider=metadata_provider
):
result.added = True
return results
def get_popular_shows(
self, metadata_provider: str
self, metadata_provider: str
) -> list[MetaDataProviderShowSearchResult]:
"""
Get popular shows from a given metadata provider.
@@ -223,7 +223,7 @@ class TvService:
filtered_results = []
for result in results:
if not self.check_if_show_exists(
external_id=result.external_id, metadata_provider=metadata_provider
external_id=result.external_id, metadata_provider=metadata_provider
):
filtered_results.append(result)
@@ -286,7 +286,7 @@ class TvService:
return False
def get_show_by_external_id(
self, external_id: int, metadata_provider: str
self, external_id: int, metadata_provider: str
) -> Show | None:
"""
Get a show by its external ID and metadata provider.
@@ -361,10 +361,10 @@ class TvService:
return [self.get_torrents_for_show(show=show) for show in shows]
def download_torrent(
self,
public_indexer_result_id: IndexerQueryResultId,
show_id: ShowId,
override_show_file_path_suffix: str = "",
self,
public_indexer_result_id: IndexerQueryResultId,
show_id: ShowId,
override_show_file_path_suffix: str = "",
) -> Torrent:
"""
Download a torrent for a given indexer result and show.
@@ -395,7 +395,7 @@ class TvService:
return show_torrent
def download_approved_season_request(
self, season_request: SeasonRequest, show: Show
self, season_request: SeasonRequest, show: Show
) -> bool:
"""
Download an approved season request.
@@ -423,9 +423,9 @@ class TvService:
for torrent in torrents:
if (
torrent.quality > season_request.wanted_quality
or torrent.quality < season_request.min_quality
or torrent.seeders < 3
torrent.quality > season_request.wanted_quality
or torrent.quality < season_request.min_quality
or torrent.seeders < 3
):
log.info(
f"Skipping torrent {torrent.title} with quality {torrent.quality} for season {season.id}, because it does not match the requested quality {season_request.wanted_quality}"
@@ -482,7 +482,7 @@ def auto_download_all_approved_season_requests() -> None:
season_id=season_request.season_id
)
if tv_service.download_approved_season_request(
season_request=season_request, show_id=show.id
season_request=season_request, show_id=show.id
):
count += 1
else: