from typing import AsyncGenerator from fastapi import Depends, HTTPException, status from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select from app.core.database import AsyncSessionLocal from app.core.security import decode_token bearer_scheme = HTTPBearer() async def get_db() -> AsyncGenerator[AsyncSession, None]: async with AsyncSessionLocal() as session: try: yield session finally: await session.close() async def get_current_user( credentials: HTTPAuthorizationCredentials = Depends(bearer_scheme), db: AsyncSession = Depends(get_db), ): from app.models.user import User token = credentials.credentials payload = decode_token(token) if not payload or payload.get("type") != "access": raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Token non valido o scaduto", ) user_id = payload.get("sub") result = await db.execute(select(User).where(User.id == int(user_id))) user = result.scalar_one_or_none() if not user or not user.is_active: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Utente non trovato o disabilitato", ) return user def require_roles(*roles: str): async def checker(current_user=Depends(get_current_user)): if current_user.role not in roles: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Permessi insufficienti", ) return current_user return checker