mirror of
https://github.com/maxdorninger/MediaManager.git
synced 2026-04-20 15:55:42 +02:00
fix typo
This commit is contained in:
@@ -1,73 +0,0 @@
|
||||
import logging
|
||||
from datetime import datetime, timedelta, timezone
|
||||
|
||||
import jwt
|
||||
from fastapi import APIRouter, Depends, HTTPException, status
|
||||
from fastapi.security import OAuth2PasswordBearer
|
||||
from jwt.exceptions import InvalidTokenError
|
||||
from pydantic import BaseModel
|
||||
|
||||
from auth.config import AuthConfig
|
||||
from database import DbSessionDependency
|
||||
from database.users import User
|
||||
|
||||
|
||||
# TODO: evaluate FASTAPI-Users package
|
||||
|
||||
class Token(BaseModel):
|
||||
access_token: str
|
||||
token_type: str
|
||||
|
||||
|
||||
class TokenData(BaseModel):
|
||||
uid: str | None = None
|
||||
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="api/v1/token")
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
|
||||
async def get_current_user(db: DbSessionDependency, token: str = Depends(oauth2_scheme)) -> User:
|
||||
credentials_exception = HTTPException(
|
||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
detail="Could not validate credentials",
|
||||
headers={"WWW-Authenticate": "Bearer"},
|
||||
)
|
||||
auth_config = AuthConfig
|
||||
log.debug("token: " + token)
|
||||
|
||||
try:
|
||||
payload = jwt.decode(token, auth_config.jwt_signing_key, algorithms=[auth_config.jwt_signing_algorithm])
|
||||
log.debug("jwt payload: " + payload.__str__())
|
||||
user_uid: str = payload.get("sub")
|
||||
log.debug("jwt payload sub (USER uid): " + user_uid)
|
||||
if user_uid is None:
|
||||
raise credentials_exception
|
||||
token_data = TokenData(uid=user_uid)
|
||||
except InvalidTokenError:
|
||||
log.warning("received invalid token: " + token)
|
||||
raise credentials_exception
|
||||
|
||||
user: User | None = db.get(User, token_data.uid)
|
||||
|
||||
if user is None:
|
||||
log.debug("USER not found")
|
||||
raise credentials_exception
|
||||
|
||||
log.debug("received USER: " + user.__str__())
|
||||
return user
|
||||
|
||||
|
||||
def create_access_token(data: dict, expires_delta: timedelta | None = None):
|
||||
to_encode = data.copy()
|
||||
auth_config = AuthConfig
|
||||
if expires_delta:
|
||||
expire = datetime.now(timezone.utc) + expires_delta
|
||||
else:
|
||||
expire = datetime.now(timezone.utc) + timedelta(minutes=auth_config.jwt_access_token_lifetime)
|
||||
to_encode.update({"exp": expire})
|
||||
encoded_jwt = jwt.encode(to_encode, auth_config.jwt_signing_key, algorithm=auth_config.jwt_signing_algorithm)
|
||||
return encoded_jwt
|
||||
@@ -1,13 +1,12 @@
|
||||
from pydantic_settings import BaseSettings
|
||||
from pydantic_settings import BaseSettings, SettingsConfigDict
|
||||
|
||||
|
||||
class AuthConfig(BaseSettings):
|
||||
# to get a signing key run:
|
||||
# openssl rand -hex 32
|
||||
jwt_signing_key: str
|
||||
jwt_signing_algorithm: str = "HS256"
|
||||
jwt_access_token_lifetime: int = 60 * 24 * 30
|
||||
|
||||
model_config = SettingsConfigDict(env_prefix='AUTH_')
|
||||
token_secret: str
|
||||
session_lifetime: int = 60 * 60 * 24
|
||||
@property
|
||||
def jwt_signing_key(self):
|
||||
return self._jwt_signing_key
|
||||
|
||||
34
backend/src/auth/db.py
Normal file
34
backend/src/auth/db.py
Normal file
@@ -0,0 +1,34 @@
|
||||
from collections.abc import AsyncGenerator
|
||||
|
||||
from fastapi import Depends
|
||||
from fastapi_users.db import SQLAlchemyBaseUserTableUUID, SQLAlchemyUserDatabase
|
||||
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
|
||||
from sqlalchemy.orm import DeclarativeBase
|
||||
|
||||
import database
|
||||
|
||||
|
||||
class Base(DeclarativeBase):
|
||||
pass
|
||||
|
||||
|
||||
class User(SQLAlchemyBaseUserTableUUID, Base):
|
||||
pass
|
||||
|
||||
|
||||
engine = create_async_engine(database.db_url, echo=False)
|
||||
async_session_maker = async_sessionmaker(engine, expire_on_commit=False)
|
||||
|
||||
|
||||
async def create_db_and_tables():
|
||||
async with engine.begin() as conn:
|
||||
await conn.run_sync(Base.metadata.create_all)
|
||||
|
||||
|
||||
async def get_async_session() -> AsyncGenerator[AsyncSession, None]:
|
||||
async with async_session_maker() as session:
|
||||
yield session
|
||||
|
||||
|
||||
async def get_user_db(session: AsyncSession = Depends(get_async_session)):
|
||||
yield SQLAlchemyUserDatabase(session, User)
|
||||
@@ -1,4 +0,0 @@
|
||||
#TODO: Implement OAuth2/Open ID Connect
|
||||
|
||||
|
||||
|
||||
@@ -1,53 +0,0 @@
|
||||
from typing import Annotated
|
||||
|
||||
import bcrypt
|
||||
from fastapi import Depends, HTTPException, status
|
||||
from fastapi.security import OAuth2PasswordRequestForm
|
||||
from sqlmodel import select
|
||||
|
||||
from auth import Token, create_access_token, router
|
||||
from database import DbSessionDependency
|
||||
from database.users import User
|
||||
|
||||
|
||||
def verify_password(plain_password, hashed_password):
|
||||
return bcrypt.checkpw(
|
||||
bytes(plain_password, encoding="utf-8"),
|
||||
bytes(hashed_password, encoding="utf-8"),
|
||||
)
|
||||
|
||||
|
||||
def get_password_hash(password: str) -> str:
|
||||
return bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt()).decode("utf-8")
|
||||
|
||||
|
||||
def authenticate_user(db: DbSessionDependency, email: str, password: str) -> bool | User:
|
||||
"""
|
||||
|
||||
:param email: email of the USER
|
||||
:param password: PASSWORD of the USER
|
||||
:return: if authentication succeeds, returns the USER object with added name and lastname, otherwise or if the USER doesn't exist returns False
|
||||
"""
|
||||
user: User | None = db.exec(select(User).where(User.email == email)).first()
|
||||
if not user:
|
||||
return False
|
||||
if not verify_password(password, user.hashed_password):
|
||||
return False
|
||||
return user
|
||||
|
||||
|
||||
@router.post("/token")
|
||||
async def login_for_access_token(
|
||||
form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
|
||||
db: DbSessionDependency,
|
||||
) -> Token:
|
||||
user = authenticate_user(db,form_data.username, form_data.password)
|
||||
if not user:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
detail="Incorrect email or PASSWORD",
|
||||
headers={"WWW-Authenticate": "Bearer"},
|
||||
)
|
||||
# id needs to be converted because a UUID object isn't json serializable
|
||||
access_token = create_access_token(data={"sub": user.id.__str__()})
|
||||
return Token(access_token=access_token, token_type="bearer")
|
||||
15
backend/src/auth/schemas.py
Normal file
15
backend/src/auth/schemas.py
Normal file
@@ -0,0 +1,15 @@
|
||||
import uuid
|
||||
|
||||
from fastapi_users import schemas
|
||||
|
||||
|
||||
class UserRead(schemas.BaseUser[uuid.UUID]):
|
||||
pass
|
||||
|
||||
|
||||
class UserCreate(schemas.BaseUserCreate):
|
||||
pass
|
||||
|
||||
|
||||
class UserUpdate(schemas.BaseUserUpdate):
|
||||
pass
|
||||
64
backend/src/auth/users.py
Normal file
64
backend/src/auth/users.py
Normal file
@@ -0,0 +1,64 @@
|
||||
import uuid
|
||||
from typing import Optional
|
||||
|
||||
from fastapi import Depends, Request
|
||||
from fastapi_users import BaseUserManager, FastAPIUsers, UUIDIDMixin, models
|
||||
from fastapi_users.authentication import (
|
||||
AuthenticationBackend,
|
||||
BearerTransport,
|
||||
CookieTransport, JWTStrategy,
|
||||
)
|
||||
from fastapi_users.db import SQLAlchemyUserDatabase
|
||||
|
||||
import auth.config
|
||||
from auth.db import User, get_user_db
|
||||
|
||||
config = auth.config.AuthConfig()
|
||||
SECRET = config.token_secret
|
||||
LIFETIME = config.session_lifetime
|
||||
|
||||
|
||||
# TODO: implement on_xxx methods
|
||||
class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]):
|
||||
reset_password_token_secret = SECRET
|
||||
verification_token_secret = SECRET
|
||||
|
||||
async def on_after_register(self, user: User, request: Optional[Request] = None):
|
||||
print(f"User {user.id} has registered.")
|
||||
|
||||
async def on_after_forgot_password(
|
||||
self, user: User, token: str, request: Optional[Request] = None
|
||||
):
|
||||
print(f"User {user.id} has forgot their password. Reset token: {token}")
|
||||
|
||||
async def on_after_request_verify(
|
||||
self, user: User, token: str, request: Optional[Request] = None
|
||||
):
|
||||
print(f"Verification requested for user {user.id}. Verification token: {token}")
|
||||
|
||||
|
||||
async def get_user_manager(user_db: SQLAlchemyUserDatabase = Depends(get_user_db)):
|
||||
yield UserManager(user_db)
|
||||
|
||||
|
||||
def get_jwt_strategy() -> JWTStrategy[models.UP, models.ID]:
|
||||
return JWTStrategy(secret=SECRET, lifetime_seconds=LIFETIME)
|
||||
|
||||
|
||||
bearer_transport = BearerTransport(tokenUrl="auth/jwt/login")
|
||||
cookie_transport = CookieTransport(cookie_max_age=LIFETIME)
|
||||
|
||||
bearer_auth_backend = AuthenticationBackend(
|
||||
name="jwt",
|
||||
transport=bearer_transport,
|
||||
get_strategy=get_jwt_strategy,
|
||||
)
|
||||
cookie_auth_backend = AuthenticationBackend(
|
||||
name="cookie",
|
||||
transport=cookie_transport,
|
||||
get_strategy=get_jwt_strategy,
|
||||
)
|
||||
|
||||
fastapi_users = FastAPIUsers[User, uuid.UUID](get_user_manager, [bearer_auth_backend, cookie_auth_backend])
|
||||
|
||||
current_active_user = fastapi_users.current_user(active=True)
|
||||
Reference in New Issue
Block a user