mirror of
https://github.com/maxdorninger/MediaManager.git
synced 2026-04-24 17:55:34 +02:00
format files
This commit is contained in:
@@ -3,12 +3,15 @@ from sqlalchemy.exc import (
|
||||
IntegrityError,
|
||||
SQLAlchemyError,
|
||||
)
|
||||
from sqlalchemy.orm import Session, joinedload
|
||||
from sqlalchemy.orm import Session
|
||||
import logging
|
||||
|
||||
from media_manager.exceptions import NotFoundError, MediaAlreadyExists
|
||||
from media_manager.notification.models import Notification
|
||||
from media_manager.notification.schemas import NotificationId, Notification as NotificationSchema
|
||||
from media_manager.notification.schemas import (
|
||||
NotificationId,
|
||||
Notification as NotificationSchema,
|
||||
)
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
@@ -18,7 +21,7 @@ class NotificationRepository:
|
||||
self.db = db
|
||||
|
||||
def get_notification(self, id: NotificationId) -> NotificationSchema:
|
||||
result= self.db.get(Notification, id)
|
||||
result = self.db.get(Notification, id)
|
||||
|
||||
if not result:
|
||||
raise NotFoundError
|
||||
@@ -27,10 +30,17 @@ class NotificationRepository:
|
||||
|
||||
def get_unread_notifications(self) -> list[NotificationSchema]:
|
||||
try:
|
||||
stmt = select(Notification).where(Notification.read == False).order_by(Notification.timestamp.desc()) # noqa: E712
|
||||
stmt = (
|
||||
select(Notification)
|
||||
.where(Notification.read == False)
|
||||
.order_by(Notification.timestamp.desc())
|
||||
) # noqa: E712
|
||||
results = self.db.execute(stmt).scalars().all()
|
||||
log.info(f"Successfully retrieved {len(results)} unread notifications.")
|
||||
return [NotificationSchema.model_validate(notification) for notification in results]
|
||||
return [
|
||||
NotificationSchema.model_validate(notification)
|
||||
for notification in results
|
||||
]
|
||||
except SQLAlchemyError as e:
|
||||
log.error(f"Database error while retrieving unread notifications: {e}")
|
||||
raise
|
||||
@@ -40,7 +50,10 @@ class NotificationRepository:
|
||||
stmt = select(Notification).order_by(Notification.timestamp.desc())
|
||||
results = self.db.execute(stmt).scalars().all()
|
||||
log.info(f"Successfully retrieved {len(results)} notifications.")
|
||||
return [NotificationSchema.model_validate(notification) for notification in results]
|
||||
return [
|
||||
NotificationSchema.model_validate(notification)
|
||||
for notification in results
|
||||
]
|
||||
except SQLAlchemyError as e:
|
||||
log.error(f"Database error while retrieving notifications: {e}")
|
||||
raise
|
||||
@@ -51,7 +64,9 @@ class NotificationRepository:
|
||||
self.db.commit()
|
||||
except IntegrityError as e:
|
||||
log.error(f"Could not save notification, Error: {e}")
|
||||
raise MediaAlreadyExists(f"Notification with id {notification.id} already exists.")
|
||||
raise MediaAlreadyExists(
|
||||
f"Notification with id {notification.id} already exists."
|
||||
)
|
||||
return
|
||||
|
||||
def mark_notification_as_read(self, id: NotificationId) -> None:
|
||||
|
||||
Reference in New Issue
Block a user