renaming directory

This commit is contained in:
maxDorninger
2025-03-27 18:25:31 +01:00
parent 994ef66835
commit fb4a26ba24
27 changed files with 22 additions and 74 deletions

View File

@@ -0,0 +1,71 @@
import logging
from datetime import datetime, timedelta, timezone
import jwt
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jwt.exceptions import InvalidTokenError
from pydantic import BaseModel
from config import AuthConfig
from database import SessionDependency
from database.users import User
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
uid: str | None = None
log = logging.getLogger(__name__)
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="api/v1/token")
router = APIRouter()
async def get_current_user(db: SessionDependency, token: str = Depends(oauth2_scheme)) -> User:
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
config = AuthConfig()
log.debug("token: " + token)
try:
payload = jwt.decode(token, config.jwt_signing_key, algorithms=[config.jwt_signing_algorithm])
log.debug("jwt payload: " + payload.__str__())
user_uid: str = payload.get("sub")
log.debug("jwt payload sub (USER uid): " + user_uid)
if user_uid is None:
raise credentials_exception
token_data = TokenData(uid=user_uid)
except InvalidTokenError:
log.warning("received invalid token: " + token)
raise credentials_exception
user: User | None = db.get(User, token_data.uid)
if user is None:
log.debug("USER not found")
raise credentials_exception
log.debug("received USER: " + user.__str__())
return user
def create_access_token(data: dict, expires_delta: timedelta | None = None):
to_encode = data.copy()
config = AuthConfig()
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(minutes=config.jwt_access_token_lifetime)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, config.jwt_signing_key, algorithm=config.jwt_signing_algorithm)
return encoded_jwt

4
backend/src/auth/oidc.py Normal file
View File

@@ -0,0 +1,4 @@
#TODO: Implement OAuth2/Open ID Connect

View File

@@ -0,0 +1,53 @@
from typing import Annotated
import bcrypt
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from sqlmodel import select
from auth import Token, create_access_token, router
from database import SessionDependency
from database.users import User
def verify_password(plain_password, hashed_password):
return bcrypt.checkpw(
bytes(plain_password, encoding="utf-8"),
bytes(hashed_password, encoding="utf-8"),
)
def get_password_hash(password: str) -> str:
return bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt()).decode("utf-8")
def authenticate_user(db: SessionDependency, email: str, password: str) -> bool | User:
"""
:param email: email of the USER
:param password: PASSWORD of the USER
:return: if authentication succeeds, returns the USER object with added name and lastname, otherwise or if the USER doesn't exist returns False
"""
user: User | None = db.exec(select(User).where(User.email == email)).first()
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
@router.post("/token")
async def login_for_access_token(
form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
db: SessionDependency,
) -> Token:
user = authenticate_user(db,form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect email or PASSWORD",
headers={"WWW-Authenticate": "Bearer"},
)
# id needs to be converted because a UUID object isn't json serializable
access_token = create_access_token(data={"sub": user.id.__str__()})
return Token(access_token=access_token, token_type="bearer")