mirror of
https://github.com/maxdorninger/MediaManager.git
synced 2026-04-17 19:53:55 +02:00
This PR replaces the APScheduler lib with the Taskiq task queuing lib. # why APScheduler doesn't support FastAPI's DI in tasks, this makes them quite cumbersome to read and write since DB, Repositories and Services all need to be instanciated manually. Moreover, Taskiq makes it easier to start background tasks from FastAPI requests. This enables MM to move to a more event-based architecture. <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit * **New Features** * App now uses an orchestrated async startup/shutdown and runs background scheduling via a database-backed task queue; startup enqueues pre-load/import/update tasks. * **Bug Fixes** * Improved torrent client handling with clearer conflict messages and guidance for manual resolution. * Enhanced logging around season, episode and metadata update operations; minor regex/behaviour formatting preserved. * **Chores** * Updated dependencies to support the new task queue and connection pooling. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
92 lines
3.0 KiB
Python
92 lines
3.0 KiB
Python
import asyncio
|
|
import logging
|
|
from urllib.parse import quote
|
|
|
|
import taskiq_fastapi
|
|
from taskiq import TaskiqDepends, TaskiqScheduler
|
|
from taskiq.cli.scheduler.run import SchedulerLoop
|
|
from taskiq_postgresql import PostgresqlBroker
|
|
from taskiq_postgresql.scheduler_source import PostgresqlSchedulerSource
|
|
|
|
from media_manager.movies.dependencies import get_movie_service
|
|
from media_manager.movies.service import MovieService
|
|
from media_manager.tv.dependencies import get_tv_service
|
|
from media_manager.tv.service import TvService
|
|
|
|
|
|
def _build_db_connection_string_for_taskiq() -> str:
|
|
from media_manager.config import MediaManagerConfig
|
|
|
|
db_config = MediaManagerConfig().database
|
|
user = quote(db_config.user, safe="")
|
|
password = quote(db_config.password, safe="")
|
|
dbname = quote(db_config.dbname, safe="")
|
|
host = quote(str(db_config.host), safe="")
|
|
port = quote(str(db_config.port), safe="")
|
|
return f"postgresql://{user}:{password}@{host}:{port}/{dbname}"
|
|
|
|
|
|
broker = PostgresqlBroker(
|
|
dsn=_build_db_connection_string_for_taskiq,
|
|
driver="psycopg",
|
|
run_migrations=True,
|
|
)
|
|
|
|
# Register FastAPI app with the broker so worker processes can resolve FastAPI
|
|
# dependencies. Using a string reference avoids circular imports.
|
|
taskiq_fastapi.init(broker, "media_manager.main:app")
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
@broker.task
|
|
async def import_all_movie_torrents_task(
|
|
movie_service: MovieService = TaskiqDepends(get_movie_service),
|
|
) -> None:
|
|
log.info("Importing all Movie torrents")
|
|
await asyncio.to_thread(movie_service.import_all_torrents)
|
|
|
|
|
|
@broker.task
|
|
async def import_all_show_torrents_task(
|
|
tv_service: TvService = TaskiqDepends(get_tv_service),
|
|
) -> None:
|
|
log.info("Importing all Show torrents")
|
|
await asyncio.to_thread(tv_service.import_all_torrents)
|
|
|
|
|
|
@broker.task
|
|
async def update_all_movies_metadata_task(
|
|
movie_service: MovieService = TaskiqDepends(get_movie_service),
|
|
) -> None:
|
|
await asyncio.to_thread(movie_service.update_all_metadata)
|
|
|
|
|
|
@broker.task
|
|
async def update_all_non_ended_shows_metadata_task(
|
|
tv_service: TvService = TaskiqDepends(get_tv_service),
|
|
) -> None:
|
|
await asyncio.to_thread(tv_service.update_all_non_ended_shows_metadata)
|
|
|
|
|
|
# Maps each task to its cron schedule so PostgresqlSchedulerSource can seed
|
|
# the taskiq_schedulers table on first startup.
|
|
_STARTUP_SCHEDULES: dict[str, list[dict[str, str]]] = {
|
|
import_all_movie_torrents_task.task_name: [{"cron": "*/2 * * * *"}],
|
|
import_all_show_torrents_task.task_name: [{"cron": "*/2 * * * *"}],
|
|
update_all_movies_metadata_task.task_name: [{"cron": "0 0 * * 1"}],
|
|
update_all_non_ended_shows_metadata_task.task_name: [{"cron": "0 0 * * 1"}],
|
|
}
|
|
|
|
|
|
def build_scheduler_loop() -> SchedulerLoop:
|
|
source = PostgresqlSchedulerSource(
|
|
dsn=_build_db_connection_string_for_taskiq,
|
|
driver="psycopg",
|
|
broker=broker,
|
|
run_migrations=True,
|
|
startup_schedule=_STARTUP_SCHEDULES,
|
|
)
|
|
scheduler = TaskiqScheduler(broker=broker, sources=[source])
|
|
return SchedulerLoop(scheduler)
|