mirror of
https://github.com/maxdorninger/MediaManager.git
synced 2026-04-17 15:13:24 +02:00
reformat code
This commit is contained in:
@@ -1,12 +1,12 @@
|
||||
import logging
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from typing import Annotated
|
||||
|
||||
import jwt
|
||||
from fastapi import Depends, HTTPException, status, APIRouter
|
||||
from fastapi.security import OAuth2, OAuth2AuthorizationCodeBearer
|
||||
from fastapi.security import OAuth2PasswordBearer
|
||||
from jwt.exceptions import InvalidTokenError
|
||||
from pydantic import BaseModel
|
||||
|
||||
import database
|
||||
from database import UserInternal
|
||||
|
||||
@@ -33,31 +33,33 @@ oauth2_scheme = OAuth2PasswordBearer(tokenUrl="api/v1/token")
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
|
||||
async def get_current_user(token: str = Depends(oauth2_scheme)) -> UserInternal:
|
||||
credentials_exception = HTTPException(
|
||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
detail="Could not validate credentials",
|
||||
headers={"WWW-Authenticate": "Bearer"},
|
||||
)
|
||||
log.debug("token: "+ token)
|
||||
log.debug("token: " + token)
|
||||
try:
|
||||
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
|
||||
log.debug("jwt payload: "+payload.__str__())
|
||||
log.debug("jwt payload: " + payload.__str__())
|
||||
user_uid: str = payload.get("sub")
|
||||
log.debug("jwt payload sub (user uid): "+user_uid)
|
||||
log.debug("jwt payload sub (user uid): " + user_uid)
|
||||
if user_uid is None:
|
||||
raise credentials_exception
|
||||
token_data = TokenData(uid=user_uid)
|
||||
except InvalidTokenError:
|
||||
log.warning("received invalid token: "+token)
|
||||
log.warning("received invalid token: " + token)
|
||||
raise credentials_exception
|
||||
user = database.get_user(uid=token_data.uid)
|
||||
if user is None:
|
||||
log.debug("user not found")
|
||||
raise credentials_exception
|
||||
log.debug("received user: "+user.__str__())
|
||||
log.debug("received user: " + user.__str__())
|
||||
return user
|
||||
|
||||
|
||||
def create_access_token(data: dict, expires_delta: timedelta | None = None):
|
||||
to_encode = data.copy()
|
||||
if expires_delta:
|
||||
|
||||
@@ -1,37 +1,4 @@
|
||||
from os import environ
|
||||
|
||||
from fastapi import Depends, APIRouter
|
||||
from fastapi.openapi.models import OAuthFlows, OAuthFlowAuthorizationCode
|
||||
from fastapi.security import OpenIdConnect, OAuth2AuthorizationCodeBearer
|
||||
from pydantic import BaseModel
|
||||
|
||||
from auth import router
|
||||
|
||||
|
||||
#TODO: Implement OAuth2/Open ID Connect
|
||||
class Settings(BaseModel):
|
||||
OAUTH2_AUTHORIZATION_URL: str
|
||||
OAUTH2_TOKEN_URL: str
|
||||
OAUTH2_SCOPE: str
|
||||
|
||||
@property
|
||||
def oauth2_flows(self) -> OAuthFlows:
|
||||
return OAuthFlows(
|
||||
authorizationCode=OAuthFlowAuthorizationCode(
|
||||
authorizationUrl=self.OAUTH2_AUTHORIZATION_URL,
|
||||
tokenUrl=self.OAUTH2_TOKEN_URL,
|
||||
scopes={self.OAUTH2_SCOPE: "Access to this API"},
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
oauth2 = OAuth2AuthorizationCodeBearer(
|
||||
authorizationUrl="/authorize",
|
||||
tokenUrl="/token",
|
||||
)
|
||||
|
||||
@router.get("/foo")
|
||||
async def bar(token = Depends()):
|
||||
return token
|
||||
|
||||
|
||||
|
||||
@@ -2,16 +2,14 @@ from datetime import timedelta
|
||||
from typing import Annotated
|
||||
|
||||
import bcrypt
|
||||
from fastapi import Depends, HTTPException, status, APIRouter
|
||||
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
|
||||
from fastapi import Depends, HTTPException, status
|
||||
from fastapi.security import OAuth2PasswordRequestForm
|
||||
|
||||
import database
|
||||
from auth import ACCESS_TOKEN_EXPIRE_MINUTES, create_access_token, Token, router
|
||||
from database import UserInternal
|
||||
|
||||
|
||||
|
||||
|
||||
def verify_password(plain_password, hashed_password):
|
||||
return bcrypt.checkpw(
|
||||
bytes(plain_password, encoding="utf-8"),
|
||||
|
||||
@@ -1,11 +1,9 @@
|
||||
import logging
|
||||
import os
|
||||
from abc import ABC, abstractmethod
|
||||
from logging import getLogger
|
||||
from uuid import uuid4
|
||||
|
||||
import psycopg
|
||||
from annotated_types.test_cases import cases
|
||||
from psycopg.rows import dict_row
|
||||
from pydantic import BaseModel
|
||||
|
||||
@@ -108,7 +106,7 @@ def create_user(user: UserInternal) -> bool:
|
||||
return False
|
||||
|
||||
log.info("User inserted successfully")
|
||||
log.debug(f"Inserted following User: "+ user.model_dump())
|
||||
log.debug(f"Inserted following User: " + user.model_dump())
|
||||
return True
|
||||
|
||||
|
||||
@@ -124,9 +122,9 @@ def get_user(email: str = None, uid: str = None) -> UserInternal | None:
|
||||
with PgDatabase() as db:
|
||||
if email is not None and uid is None:
|
||||
result = db.connection.execute(
|
||||
"SELECT id, name, lastname, email, hashed_password FROM users WHERE email=%s",
|
||||
(email,)
|
||||
).fetchone()
|
||||
"SELECT id, name, lastname, email, hashed_password FROM users WHERE email=%s",
|
||||
(email,)
|
||||
).fetchone()
|
||||
if uid is not None:
|
||||
result = db.connection.execute(
|
||||
"SELECT id, name, lastname, email, hashed_password FROM users WHERE id=%s",
|
||||
@@ -135,6 +133,7 @@ def get_user(email: str = None, uid: str = None) -> UserInternal | None:
|
||||
|
||||
if result is None:
|
||||
return None
|
||||
user = UserInternal(id = result["id"], name = result["name"], lastname = result["lastname"], email = result["email"], hashed_password = result["hashed_password"])
|
||||
user = UserInternal(id=result["id"], name=result["name"], lastname=result["lastname"], email=result["email"],
|
||||
hashed_password=result["hashed_password"])
|
||||
log.debug(f"Retrieved User succesfully: {user.model_dump()} ")
|
||||
return user
|
||||
|
||||
@@ -1,21 +1,19 @@
|
||||
import logging
|
||||
import pprint
|
||||
import sys
|
||||
|
||||
import uvicorn
|
||||
from fastapi import FastAPI
|
||||
|
||||
from routers import users
|
||||
from auth import password
|
||||
from routers import users
|
||||
|
||||
|
||||
logging.basicConfig(level=logging.DEBUG,format="%(asctime)s - %(levelname)s - module: %(name)s - %(funcName)s(): %(message)s", stream=sys.stdout)
|
||||
logging.basicConfig(level=logging.DEBUG,
|
||||
format="%(asctime)s - %(levelname)s - module: %(name)s - %(funcName)s(): %(message)s",
|
||||
stream=sys.stdout)
|
||||
|
||||
app = FastAPI(root_path="/api/v1")
|
||||
app.include_router(users.router, tags=["users"])
|
||||
app.include_router(password.router, tags=["authentication"])
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
uvicorn.run(app, host="127.0.0.1", port=5049)
|
||||
|
||||
|
||||
@@ -1,3 +1,3 @@
|
||||
import logging
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
@@ -1,5 +1,3 @@
|
||||
import logging
|
||||
|
||||
from fastapi import APIRouter
|
||||
from fastapi import Depends
|
||||
from pydantic import BaseModel
|
||||
@@ -19,15 +17,17 @@ router = APIRouter(
|
||||
class Message(BaseModel):
|
||||
message: str
|
||||
|
||||
|
||||
class CreateUser(User):
|
||||
""""
|
||||
The Usermodel, but with an additional non-hashed password. attribute
|
||||
"""
|
||||
password: str
|
||||
|
||||
@router.post("/",status_code=201, responses = {
|
||||
409: {"model": Message, "description": "User with provided email already exists"},
|
||||
201:{"model": UserInternal, "description": "User created successfully"}
|
||||
|
||||
@router.post("/", status_code=201, responses={
|
||||
409: {"model": Message, "description": "User with provided email already exists"},
|
||||
201: {"model": UserInternal, "description": "User created successfully"}
|
||||
})
|
||||
async def create_user(
|
||||
user: CreateUser = Depends(CreateUser),
|
||||
@@ -35,14 +35,13 @@ async def create_user(
|
||||
internal_user = UserInternal(name=user.name, lastname=user.lastname, email=user.email,
|
||||
hashed_password=get_password_hash(user.password))
|
||||
if database.create_user(internal_user):
|
||||
log.info("Created new user",internal_user.model_dump())
|
||||
log.info("Created new user", internal_user.model_dump())
|
||||
return internal_user
|
||||
else:
|
||||
log.warning("Failed to create new user, User with this email already exists,",internal_user.model_dump())
|
||||
log.warning("Failed to create new user, User with this email already exists,", internal_user.model_dump())
|
||||
return JSONResponse(status_code=409, content={"message": "User with this email already exists"})
|
||||
|
||||
|
||||
|
||||
@router.get("/me")
|
||||
async def read_users_me(
|
||||
current_user: UserInternal = Depends(get_current_user),
|
||||
|
||||
Reference in New Issue
Block a user