Files
2026-03-27 14:43:42 +01:00

140 lines
4.7 KiB
Python

"""
Modelli User e RefreshToken.
"""
import uuid
from datetime import datetime
from sqlalchemy import (
Boolean,
DateTime,
Enum,
ForeignKey,
Index,
Integer,
String,
Text,
UniqueConstraint,
func,
)
from sqlalchemy.dialects.postgresql import INET, UUID
from sqlalchemy.orm import Mapped, mapped_column, relationship
from app.database import Base
UserRole = Enum(
"super_admin", "admin", "supervisor", "operator", "readonly",
name="user_role",
create_type=False, # creato dalla migrazione Alembic
)
class User(Base):
__tablename__ = "users"
id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True), primary_key=True, default=uuid.uuid4
)
tenant_id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True), ForeignKey("tenants.id", ondelete="CASCADE"), nullable=False
)
email: Mapped[str] = mapped_column(String(255), nullable=False)
password_hash: Mapped[str] = mapped_column(Text, nullable=False)
full_name: Mapped[str] = mapped_column(String(255), nullable=False)
role: Mapped[str] = mapped_column(UserRole, nullable=False, default="operator")
is_active: Mapped[bool] = mapped_column(Boolean, nullable=False, default=True)
# 2FA TOTP
totp_secret: Mapped[str | None] = mapped_column(Text, nullable=True)
totp_enabled: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False)
# Sicurezza accesso
last_login_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True)
failed_login_count: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
locked_until: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False, server_default=func.now()
)
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False, server_default=func.now(), onupdate=func.now()
)
# Relazioni
tenant: Mapped["Tenant"] = relationship("Tenant", back_populates="users") # noqa: F821
refresh_tokens: Mapped[list["RefreshToken"]] = relationship(
"RefreshToken", back_populates="user", cascade="all, delete-orphan"
)
mailbox_permissions: Mapped[list["MailboxPermission"]] = relationship( # noqa: F821
"MailboxPermission",
back_populates="user",
foreign_keys="[MailboxPermission.user_id]",
cascade="all, delete-orphan",
)
__table_args__ = (
UniqueConstraint("tenant_id", "email", name="uq_user_email_tenant"),
Index("idx_users_tenant", "tenant_id"),
Index("idx_users_email", "email"),
)
@property
def is_admin(self) -> bool:
return self.role in ("super_admin", "admin")
@property
def is_super_admin(self) -> bool:
return self.role == "super_admin"
@property
def is_supervisor(self) -> bool:
"""Ruolo supervisor: lettura implicita su tutte le caselle, senza poteri di gestione."""
return self.role == "supervisor"
@property
def is_supervisor_or_admin(self) -> bool:
"""True per super_admin, admin e supervisor (accesso in lettura a tutto il tenant)."""
return self.role in ("super_admin", "admin", "supervisor")
def __repr__(self) -> str:
return f"<User {self.email!r} role={self.role!r}>"
class RefreshToken(Base):
__tablename__ = "refresh_tokens"
id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True), primary_key=True, default=uuid.uuid4
)
user_id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True), ForeignKey("users.id", ondelete="CASCADE"), nullable=False
)
token_hash: Mapped[str] = mapped_column(Text, nullable=False, unique=True)
issued_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False, server_default=func.now()
)
expires_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
revoked_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True)
user_agent: Mapped[str | None] = mapped_column(Text, nullable=True)
ip_address: Mapped[str | None] = mapped_column(INET, nullable=True)
# Relazioni
user: Mapped["User"] = relationship("User", back_populates="refresh_tokens")
__table_args__ = (
Index("idx_rt_user", "user_id"),
Index(
"idx_rt_expires",
"expires_at",
postgresql_where="revoked_at IS NULL",
),
)
@property
def is_valid(self) -> bool:
from datetime import UTC
return self.revoked_at is None and self.expires_at > datetime.now(UTC)
def __repr__(self) -> str:
return f"<RefreshToken user_id={self.user_id!r}>"