add config.py to access configuration more easily

This commit is contained in:
maxDorninger
2025-02-23 12:49:24 +01:00
parent 58c23eb69c
commit de5047c269
4 changed files with 46 additions and 24 deletions

View File

@@ -7,6 +7,7 @@ from fastapi.security import OAuth2PasswordBearer
from jwt.exceptions import InvalidTokenError
from pydantic import BaseModel
import config
import database
import database.users
from database.users import UserInternal
@@ -19,14 +20,6 @@ class Token(BaseModel):
class TokenData(BaseModel):
uid: str | None = None
# to get a string like this run:
# openssl rand -hex 32
# TODO: remove secrets from files
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
log = logging.getLogger(__name__)
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="api/v1/token")
@@ -42,7 +35,7 @@ async def get_current_user(token: str = Depends(oauth2_scheme)) -> UserInternal:
)
log.debug("token: " + token)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
payload = jwt.decode(token, config.auth.jwt_signing_key, algorithms=[config.auth.jwt_algorithm])
log.debug("jwt payload: " + payload.__str__())
user_uid: str = payload.get("sub")
log.debug("jwt payload sub (user uid): " + user_uid)
@@ -65,7 +58,7 @@ def create_access_token(data: dict, expires_delta: timedelta | None = None):
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
expire = datetime.now(timezone.utc) + timedelta(minutes=config.auth.jwt_access_token_lifetime)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
encoded_jwt = jwt.encode(to_encode, config.auth.jwt_signing_key, algorithm=config.auth.jwt_algorithm)
return encoded_jwt

View File

@@ -1,4 +1,3 @@
from datetime import timedelta
from typing import Annotated
import bcrypt
@@ -6,8 +5,8 @@ from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
import database
from auth import ACCESS_TOKEN_EXPIRE_MINUTES, create_access_token, Token, router
from database import users
from auth import create_access_token, Token, router
from database import users
from database.users import UserInternal
@@ -51,8 +50,5 @@ async def login_for_access_token(
detail="Incorrect email or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.id}, expires_delta=access_token_expires
)
access_token = create_access_token(data={"sub": user.id})
return Token(access_token=access_token, token_type="bearer")

View File

@@ -0,0 +1,32 @@
import os
from typing import Literal
from pydantic import BaseModel
class DbConfig(BaseModel):
host: str = os.getenv("DB_HOST") or "localhost"
port: int = int(os.getenv("DB_PORT")) or 5432
user: str = os.getenv("DB_USERNAME") or "MediaManager"
password: str = os.getenv("DB_PASSWORD") or "MediaManager"
dbname: str = os.getenv("DB_NAME") or "MediaManager"
class IndexerConfig(BaseModel):
default_indexer: Literal["tmdb"] = os.getenv("INDEXER") or "tmdb"
default_indexer_api_key: str = os.getenv("INDEXER_API_KEY")
class AuthConfig(BaseModel):
# to get a signing key run:
# openssl rand -hex 32
jwt_signing_key: str = os.getenv("JWT_SIGNING_KEY")
jwt_signing_algorithm: str = "HS256"
jwt_access_token_lifetime: int = int(os.getenv("JWT_ACCESS_TOKEN_LIFETIME")) or 60*24*30
db: DbConfig = DbConfig()
indexer: IndexerConfig = IndexerConfig()
auth: AuthConfig = AuthConfig()
if __name__ == "__main__":
print(db.__str__())
print(indexer.__str__())
print(auth.__str__())

View File

@@ -1,9 +1,10 @@
import logging
import os
import psycopg
from psycopg.rows import dict_row
import config
log = logging.getLogger(__name__)
@@ -16,11 +17,11 @@ class PgDatabase:
def connect_to_database(self):
return self.driver.connect(
autocommit=True,
host=os.getenv("DB_HOST"),
port=os.getenv("DB_PORT"),
user=os.getenv("DB_USERNAME"),
password=os.getenv("DB_PASSWORD"),
dbname=os.getenv("DB_NAME"),
host=config.db.host,
port=config.db.port,
user=config.db.user,
password=config.db.password,
dbname=config.db.dbname,
row_factory=dict_row
)