Add events to make it posible to do some basic admin and rights

This commit is contained in:
Sebastiaan
2025-06-07 20:58:40 +02:00
parent 2b865aa249
commit 8db7a0453d
19 changed files with 718 additions and 69 deletions

2
.gitignore vendored
View File

@@ -4,3 +4,5 @@ node_modules/
/playwright-report/
/blob-report/
/playwright/.cache/
/scratches

View File

@@ -1,6 +1,12 @@
from fastapi import APIRouter
from app.api.routes import login, private, users, utils
from app.api.routes import (
events,
login,
private,
users,
utils,
)
from app.core.config import settings
api_router = APIRouter()
@@ -9,5 +15,8 @@ api_router.include_router(users.router)
api_router.include_router(utils.router)
api_router.include_router(events.router)
if settings.ENVIRONMENT == "local":
api_router.include_router(private.router)

View File

@@ -0,0 +1,229 @@
from typing import Any
from fastapi import APIRouter, HTTPException
from sqlmodel import func, select
from app.api.deps import CurrentUser, SessionDep
from app.models.base import Message, RowId
from app.models.event import (
Event,
EventCreate,
EventPublic,
EventsPublic,
EventUpdate,
EventUserLink,
)
from app.models.user import (
PermissionModule,
PermissionPart,
PermissionRight,
PermissionRightObject,
User,
)
router = APIRouter(prefix="/events", tags=["events"])
# region # Events ##############################################################
@router.get("/", response_model=EventsPublic)
def read_events(
session: SessionDep, current_user: CurrentUser, skip: int = 0, limit: int = 100
) -> Any:
"""
Retrieve events.
"""
if current_user.has_permission(
module=PermissionModule.EVENT,
part=PermissionPart.ADMIN,
rights=PermissionRight.READ,
):
count_statement = select(func.count()).select_from(Event)
count = session.exec(count_statement).one()
statement = select(Event).offset(skip).limit(limit)
events = session.exec(statement).all()
else:
count_statement = (
select(func.count())
.select_from(Event)
.where(
EventUserLink.user_id == current_user.id,
(EventUserLink.rights & PermissionRight.READ) == PermissionRight.READ,
)
)
count = session.exec(count_statement).one()
statement = (
select(Event)
.where(
EventUserLink.user_id == current_user.id,
(EventUserLink.rights & PermissionRight.READ) == PermissionRight.READ,
)
.offset(skip)
.limit(limit)
)
events = session.exec(statement).all()
return EventsPublic(data=events, count=count)
@router.get("/{id}", response_model=EventPublic)
def read_event(session: SessionDep, current_user: CurrentUser, id: RowId) -> Any:
"""
Get event by ID.
"""
event = session.get(Event, id)
if not event:
raise HTTPException(status_code=404, detail="Event not found")
if not current_user.has_permission(
module=PermissionModule.EVENT,
part=PermissionPart.ADMIN,
rights=PermissionRight.READ,
) and (event.user_has_rights(user=current_user, rights=PermissionRight.READ)):
raise HTTPException(status_code=400, detail="Not enough permissions")
return event
@router.post("/", response_model=EventPublic)
def create_event(
*, session: SessionDep, current_user: CurrentUser, event_in: EventCreate
) -> Any:
"""
Create new event.
"""
if not current_user.has_permission(
module=PermissionModule.EVENT,
part=PermissionPart.ADMIN,
rights=PermissionRight.CREATE,
):
raise HTTPException(status_code=403, detail="Not enough permissions")
event = Event.create(create_obj=event_in, session=session)
event.add_user(user=current_user, rights=PermissionRight.ADMIN, session=session)
return event
@router.put("/{id}", response_model=EventPublic)
def update_event(
*,
session: SessionDep,
current_user: CurrentUser,
id: RowId,
event_in: EventUpdate,
) -> Any:
"""
Update an event.
"""
event = session.get(Event, id)
if not event:
raise HTTPException(status_code=404, detail="Event not found")
if not current_user.has_permission(
module=PermissionModule.EVENT,
part=PermissionPart.ADMIN,
rights=PermissionRight.UPDATE,
) and (event.user_has_rights(user=current_user, rights=PermissionRight.UPDATE)):
raise HTTPException(status_code=400, detail="Not enough permissions")
return Event.update(db_obj=event, in_obj=event_in, session=session)
@router.delete("/{id}")
def delete_event(
session: SessionDep,
current_user: CurrentUser,
id: RowId,
) -> Message:
"""
Delete an event.
"""
event = session.get(Event, id)
if not event:
raise HTTPException(status_code=404, detail="Event not found")
if not current_user.has_permission(
module=PermissionModule.EVENT,
part=PermissionPart.ADMIN,
rights=PermissionRight.DELETE,
) and (event.user_has_rights(user=current_user, rights=PermissionRight.DELETE)):
raise HTTPException(status_code=400, detail="Not enough permissions")
session.delete(event)
session.commit()
return Message(message="Event deleted successfully")
# endregion
# region # Events / Users ######################################################
@router.post("/{id}/users/{user_id}", tags=["users"])
def add_user_to_event(
session: SessionDep,
current_user: CurrentUser,
id: RowId,
user_id: RowId,
rights_in: PermissionRightObject,
) -> Message:
"""
Add or update a user to an event.
"""
event = session.get(Event, id)
if not event:
raise HTTPException(status_code=404, detail="Event not found")
if not current_user.has_permission(
module=PermissionModule.EVENT,
part=PermissionPart.ADMIN,
rights=PermissionRight.MANAGE_USERS,
) and (
event.user_has_rights(
user=current_user, rights=(PermissionRight.MANAGE_USERS | rights_in.rights)
)
):
raise HTTPException(status_code=400, detail="Not enough permissions")
user = session.get(User, user_id)
if not event:
raise HTTPException(status_code=404, detail="User not found")
event.add_user(user=user, rights=rights_in.rights, session=session)
return Message(
message="User added successfully"
) # TODO: Return event or event_users
@router.delete("/{id}/users/{user_id}", tags=["users"])
def remove_user_from_event(
session: SessionDep, current_user: CurrentUser, id: RowId, user_id: RowId
) -> Message:
"""
Remove a user from an event.
"""
event = session.get(Event, id)
if not event:
raise HTTPException(status_code=404, detail="Event not found")
if not current_user.has_permission(
module=PermissionModule.EVENT,
part=PermissionPart.ADMIN,
rights=PermissionRight.MANAGE_USERS,
) and not event.user_has_rights(
user=current_user, rights=PermissionRight.MANAGE_USERS
):
raise HTTPException(status_code=403, detail="Not enough permissions")
user = session.get(User, user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
event.remove_user(user=user, session=session)
return Message(
message="User removed successfully"
) # TODO: Return event or event_users
# endregion

View File

@@ -9,9 +9,9 @@ from app.api.deps import CurrentUser, SessionDep, get_current_system_admin
from app.core import security
from app.core.config import settings
from app.core.security import get_password_hash
from app.models.base import Message
from app.models.user import User, NewPassword, Token, UserPublic
from app.models.apikey import ApiKey
from app.models.base import Message
from app.models.user import NewPassword, Token, User, UserPublic
from app.utils import (
generate_password_reset_token,
generate_reset_password_email,

View File

@@ -2,7 +2,7 @@ import uuid
from typing import Any
from fastapi import APIRouter, Depends, HTTPException
from sqlmodel import col, delete, func, select
from sqlmodel import func, select
from app.api.deps import (
CurrentUser,
@@ -11,8 +11,18 @@ from app.api.deps import (
)
from app.core.config import settings
from app.core.security import get_password_hash, verify_password
from app.models.apikey import (
ApiKey,
ApiKeyCreate,
ApiKeyGenerate,
ApiKeyPublic,
ApiKeysPublic,
)
from app.models.base import Message, RowId
from app.models.user import (
PermissionModule,
PermissionPart,
PermissionRight,
UpdatePassword,
User,
UserCreate,
@@ -21,16 +31,6 @@ from app.models.user import (
UsersPublic,
UserUpdate,
UserUpdateMe,
PermissionModule,
PermissionPart,
PermissionRight,
)
from app.models.apikey import (
ApiKey,
ApiKeyCreate,
ApiKeyPublic,
ApiKeysPublic,
ApiKeyGenerate,
)
from app.utils import generate_new_account_email, send_email
@@ -239,7 +239,11 @@ def read_user_by_id(
user = session.get(User, user_id)
if user == current_user:
return user
if not current_user.has_permission(module=PermissionModule.USER, part=PermissionPart.ADMIN, rights=PermissionRight.READ):
if not current_user.has_permission(
module=PermissionModule.USER,
part=PermissionPart.ADMIN,
rights=PermissionRight.READ,
):
raise HTTPException(
status_code=403,
detail="The user doesn't have enough privileges",

View File

@@ -38,9 +38,9 @@ class Settings(BaseSettings):
FRONTEND_HOST: str = "http://localhost:5173"
ENVIRONMENT: Literal["local", "staging", "production"] = "local"
BACKEND_CORS_ORIGINS: Annotated[list[AnyUrl] | str, BeforeValidator(parse_cors)] = (
[]
)
BACKEND_CORS_ORIGINS: Annotated[
list[AnyUrl] | str, BeforeValidator(parse_cors)
] = []
@computed_field # type: ignore[prop-decorator]
@property

View File

@@ -1,19 +1,20 @@
from sqlmodel import Session, create_engine, select
from app.core.config import settings
from app.models.event import (
Event,
EventCreate,
)
from app.models.user import (
User,
UserCreate,
Role,
Permission,
PermissionModule,
PermissionPart,
PermissionRight,
Role,
User,
UserCreate,
)
from app.models.apikey import ApiKey
engine = create_engine(str(settings.SQLALCHEMY_DATABASE_URI))
@@ -101,5 +102,20 @@ def init_db(session: Session) -> None:
)
user = User.create(session=session, create_obj=user_in)
user.add_role(db_obj=system_admin_role, session=session)
session.commit()
event = session.exec(
select(Event).where(Event.contact == settings.FIRST_SUPERUSER)
).first()
if not event:
event_in = EventCreate(
contact=settings.FIRST_SUPERUSER,
is_active=True,
name="Admins first event",
)
event = Event.create(session=session, create_obj=event_in)
event.add_user(user, PermissionRight.ADMIN, session=session)
session.commit()
# endregion

View File

@@ -1,17 +1,14 @@
import random
from typing import TYPE_CHECKING
from sqlmodel import Session, Field, Relationship, select
from sqlmodel import Field, Relationship, Session, select
from .base import (
RowId,
BaseSQLModel,
)
from . import mixin
from .base import (
BaseSQLModel,
RowId,
)
from .user import User
# region # API Keys for access ###################################################
@@ -21,6 +18,7 @@ class ApiKeyBase(mixin.IsActive, mixin.Name, BaseSQLModel):
foreign_key="user.id", nullable=False, ondelete="CASCADE"
)
# Properties to receive via API on creation
class ApiKeyCreate(ApiKeyBase):
pass

View File

@@ -1,10 +1,9 @@
from enum import IntFlag, Enum # Python 3.11 >= StrEnum
from enum import Enum, IntFlag # Python 3.11 >= StrEnum
from enum import auto as auto_enum
from uuid import UUID as RowId
from sqlmodel import SQLModel
from uuid import UUID as RowId
__all__ = [
"RowId",
"DocumentedStrEnum",

161
backend/app/models/event.py Normal file
View File

@@ -0,0 +1,161 @@
from sqlmodel import (
Field,
Relationship,
Session,
select,
)
from . import mixin
from .base import (
BaseSQLModel,
RowId,
)
from .user import (
PermissionRight,
User,
)
# region # Event ###############################################################
# Event auth
class EventUserLink(BaseSQLModel, table=True):
event_id: RowId = Field(
foreign_key="event.id",
primary_key=True,
nullable=False,
ondelete="CASCADE",
)
user_id: RowId = Field(
foreign_key="user.id",
primary_key=True,
nullable=False,
ondelete="CASCADE",
)
rights: PermissionRight = Field(default=PermissionRight.READ, nullable=False)
event: "Event" = Relationship(back_populates="user_links")
user: "User" = Relationship(back_populates="event_links")
# ##############################################################################
# Shared properties
class EventBase(
mixin.Name,
mixin.Contact,
mixin.StartEndDate,
mixin.IsActive,
mixin.Contact,
BaseSQLModel,
):
pass
# Properties to receive via API on creation
class EventCreate(EventBase):
pass
# Properties to receive via API on update, all are optional
class EventUpdate(EventBase):
pass
# Database model, database table inferred from class name
class Event(mixin.RowId, EventBase, table=True):
# --- database only items --------------------------------------------------
# --- back_populates links -------------------------------------------------
# --- many-to-many links ---------------------------------------------------
user_links: list[EventUserLink] = Relationship(back_populates="event")
# --- CRUD actions ---------------------------------------------------------
@classmethod
def create(cls, *, session: Session, create_obj: EventCreate) -> "Event":
data_obj = create_obj.model_dump(exclude_unset=True)
db_obj = cls.model_validate(data_obj)
session.add(db_obj)
session.commit()
session.refresh(db_obj)
return db_obj
@classmethod
def update(
cls, *, session: Session, db_obj: "Event", in_obj: EventUpdate
) -> "Event":
data_obj = in_obj.model_dump(exclude_unset=True)
db_obj.sqlmodel_update(data_obj)
session.add(db_obj)
session.commit()
session.refresh(db_obj)
return db_obj
def add_user(
self,
user: User,
rights: PermissionRight = PermissionRight.READ,
*,
session: Session,
) -> "Event":
to_add = next((add for add in self.user_links if add.user == user), None)
if to_add:
to_add.rights = rights
session.add(to_add)
else:
self.user_links.append(EventUserLink(event=self, user=user, rights=rights))
session.add(self.user_links[-1])
session.commit()
return self
def remove_user(self, user: User, *, session: Session) -> "Event":
to_remove = next(
(remove for remove in self.user_links if remove.user == user), None
)
if to_remove:
statement = select(EventUserLink).where(
EventUserLink.event_id == self.id, EventUserLink.user_id == user.id
)
link_to_remove = session.exec(statement).first()
if link_to_remove:
session.delete(link_to_remove)
session.commit()
return self
def user_has_rights(
self,
user: User,
rights: PermissionRight | None = None,
) -> bool:
return any(
(
link.user == user
and link.rights
and (not rights or (link.rights & rights) == rights)
)
for link in self.user_links
)
# Properties to return via API, id is always required
class EventPublic(mixin.RowIdPublic, EventBase):
pass
class EventsPublic(BaseSQLModel):
data: list[EventPublic]
count: int
# endregion

View File

@@ -1,7 +1,10 @@
import uuid
from datetime import datetime
from pydantic import EmailStr, BaseModel
from sqlmodel import Field
from pydantic import BaseModel, EmailStr
from sqlmodel import (
Field,
)
from .base import RowId as RowIdType
@@ -14,6 +17,10 @@ class FullName(BaseModel):
full_name: str | None = Field(default=None, nullable=True, max_length=255)
class Contact(BaseModel):
contact: str | None = Field(default=None, nullable=True, max_length=255)
class IsActive(BaseModel):
is_active: bool | None = Field(default=True, nullable=False)
@@ -64,3 +71,8 @@ class RowIdPublic(RowId):
class Description(BaseModel):
description: str | None = Field(default=None, nullable=True, max_length=512)
class StartEndDate:
start_at: datetime | None = Field(default=None, nullable=True)
end_at: datetime | None = Field(default=None, nullable=True)

View File

@@ -1,21 +1,22 @@
from typing import TYPE_CHECKING
from pydantic import EmailStr
from sqlmodel import Session, Field, Relationship, select
from sqlmodel import Field, Relationship, Session, select
from app.core.security import get_password_hash, verify_password
from .base import (
RowId,
DocumentedStrEnum,
DocumentedIntFlag,
auto_enum,
BaseSQLModel,
)
from . import mixin
from .base import (
BaseSQLModel,
DocumentedIntFlag,
DocumentedStrEnum,
RowId,
auto_enum,
)
if TYPE_CHECKING:
from .apikey import ApiKey
from .event import EventUserLink
# region # User ################################################################
@@ -24,6 +25,7 @@ if TYPE_CHECKING:
class PermissionModule(DocumentedStrEnum):
SYSTEM = auto_enum()
USER = auto_enum()
EVENT = auto_enum()
class PermissionPart(DocumentedStrEnum):
@@ -37,7 +39,13 @@ class PermissionRight(DocumentedIntFlag):
UPDATE = auto_enum()
DELETE = auto_enum()
ADMIN = CREATE | READ | UPDATE | DELETE
MANAGE_USERS = auto_enum()
ADMIN = CREATE | READ | UPDATE | DELETE | MANAGE_USERS
class PermissionRightObject(BaseSQLModel):
rights: PermissionRight | None = Field(default=PermissionRight.READ, nullable=False)
# ##############################################################################
@@ -108,6 +116,7 @@ class User(mixin.RowId, UserBase, table=True):
# --- many-to-many links ---------------------------------------------------
roles: list["Role"] = Relationship(back_populates="users", link_model=UserRoleLink)
event_links: list["EventUserLink"] = Relationship(back_populates="user")
# --- CRUD actions ---------------------------------------------------------
@classmethod
@@ -155,26 +164,40 @@ class User(mixin.RowId, UserBase, table=True):
return None
return db_obj
def add_role(self, *, name: str = None, id: RowId = None, db_obj: "Role" = None, session: Session) -> "User":
def add_role(
self,
*,
name: str = None,
id: RowId = None,
db_obj: "Role" = None,
session: Session,
) -> "User":
db_obj = Role.get(name=name, id=id, db_obj=db_obj, session=session)
to_add = next((add for add in self.roles if add == db_obj), None)
if not to_add:
self.roles.append(db_obj)
session.add(self)
session.commit()
return self
def remove_role(self, *, name: str = None, id: RowId = None, db_obj: "Role" = None, session: Session) -> "User":
def remove_role(
self,
*,
name: str = None,
id: RowId = None,
db_obj: "Role" = None,
session: Session,
) -> "User":
db_obj = Role.get(name=name, id=id, db_obj=db_obj, session=session)
to_remove = next((remove for remove in self.roles if remove == db_obj), None)
if to_remove:
statement = select(UserRoleLink).where(
UserRoleLink.user_id == self.id,
UserRoleLink.role_id == db_obj.id
)
UserRoleLink.user_id == self.id, UserRoleLink.role_id == db_obj.id
)
link_to_remove = session.exec(statement).first()
if link_to_remove:
@@ -290,7 +313,14 @@ class Role(
return db_obj
@classmethod
def get(cls, *, name: str = None, id: RowId = None, db_obj: "Role" = None, session: Session) -> "Role":
def get(
cls,
*,
name: str = None,
id: RowId = None,
db_obj: "Role" = None,
session: Session,
) -> "Role":
if db_obj:
pass
elif name:

View File

@@ -0,0 +1,178 @@
import uuid
from fastapi.testclient import TestClient
from sqlmodel import Session
from app.core.config import settings
from app.tests.utils.event import create_random_event
def test_event(client: TestClient, superuser_token_headers: dict[str, str]) -> None:
data = {"name": "Foo", "contact": "Someone"}
response = client.post(
f"{settings.API_V1_STR}/events/",
headers=superuser_token_headers,
json=data,
)
assert response.status_code == 200
content = response.json()
assert content["name"] == data["name"]
assert content["contact"] == data["contact"]
assert "id" in content
assert "is_active" in content
assert "start_at" in content
assert "end_at" in content
def test_read_event(
client: TestClient, superuser_token_headers: dict[str, str], db: Session
) -> None:
event = create_random_event(db)
response = client.get(
f"{settings.API_V1_STR}/events/{event.id}",
headers=superuser_token_headers,
)
assert response.status_code == 200
content = response.json()
assert content["name"] == event.name
assert content["contact"] == event.contact
assert content["id"] == str(event.id)
assert content["is_active"] == str(event.is_active)
assert content["start_at"] == str(event.start_at)
assert content["end_at"] == str(event.end_at)
def test_read_event_not_found(
client: TestClient, superuser_token_headers: dict[str, str]
) -> None:
response = client.get(
f"{settings.API_V1_STR}/events/{uuid.uuid4()}",
headers=superuser_token_headers,
)
assert response.status_code == 404
content = response.json()
assert content["detail"] == "Event not found"
def test_read_event_not_enough_permissions(
client: TestClient, normal_user_token_headers: dict[str, str], db: Session
) -> None:
item = create_random_event(db)
response = client.get(
f"{settings.API_V1_STR}/events/{item.id}",
headers=normal_user_token_headers,
)
assert response.status_code == 400
content = response.json()
assert content["detail"] == "Not enough permissions"
def test_read_events(
client: TestClient, superuser_token_headers: dict[str, str], db: Session
) -> None:
create_random_event(db)
create_random_event(db)
response = client.get(
f"{settings.API_V1_STR}/events/",
headers=superuser_token_headers,
)
assert response.status_code == 200
content = response.json()
assert len(content["data"]) >= 2
def test_update_event(
client: TestClient, superuser_token_headers: dict[str, str], db: Session
) -> None:
event = create_random_event(db)
data = {"name": "Updated name", "contact": "Updated contact"}
response = client.put(
f"{settings.API_V1_STR}/events/{event.id}",
headers=superuser_token_headers,
json=data,
)
assert response.status_code == 200
content = response.json()
assert content["name"] == data["name"]
assert content["contact"] == data["contact"]
assert content["id"] == str(event.id)
assert "is_active" == str(event.is_active)
assert "start_at" == str(event.start_at)
assert "end_at" == str(event.end_at)
def test_update_event_not_found(
client: TestClient, superuser_token_headers: dict[str, str]
) -> None:
data = {"name": "Updated name", "contact": "Updated contact"}
response = client.put(
f"{settings.API_V1_STR}/events/{uuid.uuid4()}",
headers=superuser_token_headers,
json=data,
)
assert response.status_code == 404
content = response.json()
assert content["detail"] == "Event not found"
def test_update_event_not_enough_permissions(
client: TestClient, normal_user_token_headers: dict[str, str], db: Session
) -> None:
event = create_random_event(db)
data = {"name": "Updated name", "contact": "Updated contact"}
response = client.put(
f"{settings.API_V1_STR}/items/{event.id}",
headers=normal_user_token_headers,
json=data,
)
assert response.status_code == 400
content = response.json()
assert content["detail"] == "Not enough permissions"
def test_delete_event(
client: TestClient, superuser_token_headers: dict[str, str], db: Session
) -> None:
event = create_random_event(db)
response = client.delete(
f"{settings.API_V1_STR}/events/{event.id}",
headers=superuser_token_headers,
)
assert response.status_code == 200
content = response.json()
assert content["message"] == "Event deleted successfully"
def test_delete_event_not_found(
client: TestClient, superuser_token_headers: dict[str, str]
) -> None:
response = client.delete(
f"{settings.API_V1_STR}/events/{uuid.uuid4()}",
headers=superuser_token_headers,
)
assert response.status_code == 404
content = response.json()
assert content["detail"] == "Event not found"
def test_delete_event_not_enough_permissions(
client: TestClient, normal_user_token_headers: dict[str, str], db: Session
) -> None:
event = create_random_event(db)
response = client.delete(
f"{settings.API_V1_STR}/events/{event.id}",
headers=normal_user_token_headers,
)
assert response.status_code == 400
content = response.json()
assert content["detail"] == "Not enough permissions"
# TODO: Add user (super, less rights, own rights, more rights) (*** user without rights)
# TODO: Edit user rights (super, less rights, own rights, more rights) (*** user without rights)
# TODO: Remove user (*** user without rights)
# TODO: Remove own user (is allowed)
# TODO: Remove not linked user
# TODO: Remove event when no rights
# TODO: Remove event when READ rights

View File

@@ -5,8 +5,8 @@ from sqlmodel import Session
from app.core.config import settings
from app.core.security import verify_password
from app.models.user import User, UserCreate
from app.models.apikey import ApiKey, ApiKeyCreate
from app.models.user import User, UserCreate
from app.tests.utils.user import user_authentication_headers
from app.tests.utils.utils import random_email, random_lower_string
from app.utils import generate_password_reset_token

View File

@@ -3,11 +3,11 @@ from sqlmodel import Session
from app.core.security import verify_password
from app.models.user import (
PermissionModule,
PermissionPart,
User,
UserCreate,
UserUpdate,
PermissionModule,
PermissionPart,
)
from app.tests.utils.utils import random_email, random_lower_string

View File

@@ -24,10 +24,10 @@ def test_init_successful_connection() -> None:
except Exception:
connection_successful = False
assert (
connection_successful
), "The database connection should be successful and not raise an exception."
assert connection_successful, (
"The database connection should be successful and not raise an exception."
)
assert session_mock.exec.called_once_with(
select(1)
), "The session should execute a select statement once."
assert session_mock.exec.called_once_with(select(1)), (
"The session should execute a select statement once."
)

View File

@@ -24,10 +24,10 @@ def test_init_successful_connection() -> None:
except Exception:
connection_successful = False
assert (
connection_successful
), "The database connection should be successful and not raise an exception."
assert connection_successful, (
"The database connection should be successful and not raise an exception."
)
assert session_mock.exec.called_once_with(
select(1)
), "The session should execute a select statement once."
assert session_mock.exec.called_once_with(select(1)), (
"The session should execute a select statement once."
)

View File

@@ -0,0 +1,11 @@
from sqlmodel import Session
from app.models.event import Event, EventCreate
from app.tests.utils.utils import random_email, random_lower_string
def create_random_event(db: Session) -> Event:
name = random_lower_string()
contact = random_email()
event_in = EventCreate(name=name, contact=contact)
return Event.create(session=db, create_obj=event_in)

View File

@@ -1,8 +1,8 @@
from fastapi.testclient import TestClient
from sqlmodel import Session, select
from sqlmodel import Session
from app.core.config import settings
from app.models.user import User, UserCreate, UserUpdate, Role
from app.models.user import User, UserCreate, UserUpdate
from app.tests.utils.utils import random_email, random_lower_string