add working auth and sessions with passwords, refactor variable names and make psycopg cursor return a dictionary instead of an unnamed tuple

This commit is contained in:
maxid
2025-02-22 18:15:08 +01:00
parent e38d2e872c
commit 8b6d31c6a0
7 changed files with 155 additions and 104 deletions

View File

@@ -1,45 +1,16 @@
from datetime import datetime, timedelta, timezone
from datetime import timedelta
from typing import Annotated
import bcrypt
import jwt
from fastapi import Depends, HTTPException, status, APIRouter
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jwt.exceptions import InvalidTokenError
from pydantic import BaseModel
import database
from auth import ACCESS_TOKEN_EXPIRE_MINUTES, create_access_token, Token, router
from database import UserInternal
# to get a string like this run:
# openssl rand -hex 32
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
fake_users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "johndoe@example.com",
"hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
"disabled": False,
}
}
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: str | None = None
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
app = APIRouter()
def verify_password(plain_password, hashed_password):
@@ -63,46 +34,15 @@ def authenticate_user(email: str, password: str) -> bool | UserInternal:
:param password: password of the user
:return: if authentication succeeds, returns the user object with added name and lastname, otherwise or if the user doesn't exist returns False
"""
user = database.get_user(email)
user = database.get_user(email=email)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return True
def create_access_token(data: dict, expires_delta: timedelta | None = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except InvalidTokenError:
raise credentials_exception
user = database.get_user(token_data.username)
if user is None:
raise credentials_exception
return user
@app.post("/token")
@router.post("/token")
async def login_for_access_token(
form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
) -> Token:
@@ -111,11 +51,11 @@ async def login_for_access_token(
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
detail="Incorrect email or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.email}, expires_delta=access_token_expires
data={"sub": user.id}, expires_delta=access_token_expires
)
return Token(access_token=access_token, token_type="bearer")