mirror of
https://github.com/idrainformatica/PecFlow.git
synced 2026-06-16 12:45:42 +02:00
140 lines
4.7 KiB
Python
140 lines
4.7 KiB
Python
"""
|
|
Modelli User e RefreshToken.
|
|
"""
|
|
|
|
import uuid
|
|
from datetime import datetime
|
|
|
|
from sqlalchemy import (
|
|
Boolean,
|
|
DateTime,
|
|
Enum,
|
|
ForeignKey,
|
|
Index,
|
|
Integer,
|
|
String,
|
|
Text,
|
|
UniqueConstraint,
|
|
func,
|
|
)
|
|
from sqlalchemy.dialects.postgresql import INET, UUID
|
|
from sqlalchemy.orm import Mapped, mapped_column, relationship
|
|
|
|
from app.database import Base
|
|
|
|
UserRole = Enum(
|
|
"super_admin", "admin", "supervisor", "operator", "readonly",
|
|
name="user_role",
|
|
create_type=False, # creato dalla migrazione Alembic
|
|
)
|
|
|
|
|
|
class User(Base):
|
|
__tablename__ = "users"
|
|
|
|
id: Mapped[uuid.UUID] = mapped_column(
|
|
UUID(as_uuid=True), primary_key=True, default=uuid.uuid4
|
|
)
|
|
tenant_id: Mapped[uuid.UUID] = mapped_column(
|
|
UUID(as_uuid=True), ForeignKey("tenants.id", ondelete="CASCADE"), nullable=False
|
|
)
|
|
email: Mapped[str] = mapped_column(String(255), nullable=False)
|
|
password_hash: Mapped[str] = mapped_column(Text, nullable=False)
|
|
full_name: Mapped[str] = mapped_column(String(255), nullable=False)
|
|
role: Mapped[str] = mapped_column(UserRole, nullable=False, default="operator")
|
|
is_active: Mapped[bool] = mapped_column(Boolean, nullable=False, default=True)
|
|
|
|
# 2FA TOTP
|
|
totp_secret: Mapped[str | None] = mapped_column(Text, nullable=True)
|
|
totp_enabled: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False)
|
|
|
|
# Sicurezza accesso
|
|
last_login_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True)
|
|
failed_login_count: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
|
|
locked_until: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True)
|
|
|
|
created_at: Mapped[datetime] = mapped_column(
|
|
DateTime(timezone=True), nullable=False, server_default=func.now()
|
|
)
|
|
updated_at: Mapped[datetime] = mapped_column(
|
|
DateTime(timezone=True), nullable=False, server_default=func.now(), onupdate=func.now()
|
|
)
|
|
|
|
# Relazioni
|
|
tenant: Mapped["Tenant"] = relationship("Tenant", back_populates="users") # noqa: F821
|
|
refresh_tokens: Mapped[list["RefreshToken"]] = relationship(
|
|
"RefreshToken", back_populates="user", cascade="all, delete-orphan"
|
|
)
|
|
mailbox_permissions: Mapped[list["MailboxPermission"]] = relationship( # noqa: F821
|
|
"MailboxPermission",
|
|
back_populates="user",
|
|
foreign_keys="[MailboxPermission.user_id]",
|
|
cascade="all, delete-orphan",
|
|
)
|
|
|
|
__table_args__ = (
|
|
UniqueConstraint("tenant_id", "email", name="uq_user_email_tenant"),
|
|
Index("idx_users_tenant", "tenant_id"),
|
|
Index("idx_users_email", "email"),
|
|
)
|
|
|
|
@property
|
|
def is_admin(self) -> bool:
|
|
return self.role in ("super_admin", "admin")
|
|
|
|
@property
|
|
def is_super_admin(self) -> bool:
|
|
return self.role == "super_admin"
|
|
|
|
@property
|
|
def is_supervisor(self) -> bool:
|
|
"""Ruolo supervisor: lettura implicita su tutte le caselle, senza poteri di gestione."""
|
|
return self.role == "supervisor"
|
|
|
|
@property
|
|
def is_supervisor_or_admin(self) -> bool:
|
|
"""True per super_admin, admin e supervisor (accesso in lettura a tutto il tenant)."""
|
|
return self.role in ("super_admin", "admin", "supervisor")
|
|
|
|
def __repr__(self) -> str:
|
|
return f"<User {self.email!r} role={self.role!r}>"
|
|
|
|
|
|
class RefreshToken(Base):
|
|
__tablename__ = "refresh_tokens"
|
|
|
|
id: Mapped[uuid.UUID] = mapped_column(
|
|
UUID(as_uuid=True), primary_key=True, default=uuid.uuid4
|
|
)
|
|
user_id: Mapped[uuid.UUID] = mapped_column(
|
|
UUID(as_uuid=True), ForeignKey("users.id", ondelete="CASCADE"), nullable=False
|
|
)
|
|
token_hash: Mapped[str] = mapped_column(Text, nullable=False, unique=True)
|
|
issued_at: Mapped[datetime] = mapped_column(
|
|
DateTime(timezone=True), nullable=False, server_default=func.now()
|
|
)
|
|
expires_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
|
|
revoked_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True)
|
|
user_agent: Mapped[str | None] = mapped_column(Text, nullable=True)
|
|
ip_address: Mapped[str | None] = mapped_column(INET, nullable=True)
|
|
|
|
# Relazioni
|
|
user: Mapped["User"] = relationship("User", back_populates="refresh_tokens")
|
|
|
|
__table_args__ = (
|
|
Index("idx_rt_user", "user_id"),
|
|
Index(
|
|
"idx_rt_expires",
|
|
"expires_at",
|
|
postgresql_where="revoked_at IS NULL",
|
|
),
|
|
)
|
|
|
|
@property
|
|
def is_valid(self) -> bool:
|
|
from datetime import UTC
|
|
return self.revoked_at is None and self.expires_at > datetime.now(UTC)
|
|
|
|
def __repr__(self) -> str:
|
|
return f"<RefreshToken user_id={self.user_id!r}>"
|