4 Commits

Author SHA1 Message Date
Sebastiaan
0df8587f78 Add base tests for post 2026-03-21 12:22:43 +01:00
Rick
0a51e6559e Added post definitions 2026-03-21 11:59:22 +01:00
Rick
ab0d0a4323 Added series model 2026-03-21 10:52:40 +01:00
Rick
076765e5c5 Added base file for serie
Some checks failed
Generate Client / generate-client (pull_request) Has been cancelled
Lint Backend / lint-backend (pull_request) Has been cancelled
Playwright Tests / changes (pull_request) Has been cancelled
Test Backend / test-backend (pull_request) Has been cancelled
Test Docker Compose / test-docker-compose (pull_request) Has been cancelled
Add to Project / Add to project (pull_request_target) Has been cancelled
Labels / labeler (pull_request_target) Has been cancelled
Playwright Tests / test-playwright (1, 4) (pull_request) Has been cancelled
Playwright Tests / test-playwright (2, 4) (pull_request) Has been cancelled
Playwright Tests / test-playwright (3, 4) (pull_request) Has been cancelled
Playwright Tests / test-playwright (4, 4) (pull_request) Has been cancelled
Playwright Tests / merge-playwright-reports (pull_request) Has been cancelled
Playwright Tests / alls-green-playwright (pull_request) Has been cancelled
Labels / check-labels (pull_request_target) Has been cancelled
Issue Manager / issue-manager (push) Has been cancelled
2025-11-08 12:19:34 +01:00
17 changed files with 885 additions and 465 deletions

View File

@@ -3,8 +3,9 @@
<component name="NewModuleRootManager"> <component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$"> <content url="file://$MODULE_DIR$">
<sourceFolder url="file://$MODULE_DIR$/backend" isTestSource="false" /> <sourceFolder url="file://$MODULE_DIR$/backend" isTestSource="false" />
<excludeFolder url="file://$MODULE_DIR$/.venv" />
</content> </content>
<orderEntry type="jdk" jdkName="Python 3.13" jdkType="Python SDK" /> <orderEntry type="jdk" jdkName="Python 3.14 (RSW_puntensysteem)" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" /> <orderEntry type="sourceFolder" forTests="false" />
</component> </component>
<component name="PyDocumentationSettings"> <component name="PyDocumentationSettings">

2
.idea/misc.xml generated
View File

@@ -3,5 +3,5 @@
<component name="Black"> <component name="Black">
<option name="sdkName" value="Python 3.13" /> <option name="sdkName" value="Python 3.13" />
</component> </component>
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.13" project-jdk-type="Python SDK" /> <component name="ProjectRootManager" version="2" project-jdk-name="Python 3.14 (RSW_puntensysteem)" project-jdk-type="Python SDK" />
</project> </project>

View File

@@ -10,7 +10,7 @@ from app.api.routes import (
private, private,
users, users,
utils, utils,
hikes, series,
) )
from app.core.config import settings from app.core.config import settings
@@ -25,8 +25,7 @@ api_router.include_router(teams.router)
api_router.include_router(associations.router) api_router.include_router(associations.router)
api_router.include_router(divisions.router) api_router.include_router(divisions.router)
api_router.include_router(members.router) api_router.include_router(members.router)
api_router.include_router(series.router)
api_router.include_router(hikes.router)
if settings.ENVIRONMENT == "local": if settings.ENVIRONMENT == "local":

View File

@@ -1,132 +0,0 @@
from typing import Any
from fastapi import APIRouter, HTTPException, status
from sqlmodel import func, select
from app.api.deps import CurrentUser, SessionDep
from app.models.base import (
ApiTags,
Message,
RowId,
)
from app.models.hike import (
Hike,
HikeCreate,
HikeUpdate,
HikePublic,
HikesPublic,
)
from app.models.user import (
PermissionModule,
PermissionPart,
PermissionRight,
)
router = APIRouter(prefix="/hikes", tags=[ApiTags.HIKES])
# region # Hikes ########################################################
@router.get("/", response_model=HikesPublic)
def read_hikes(
session: SessionDep, current_user: CurrentUser, skip: int = 0, limit: int = 100
) -> Any:
"""
Retrieve all hikes.
"""
if current_user.has_permissions(
module=PermissionModule.HIKE,
part=PermissionPart.ADMIN,
rights=PermissionRight.READ,
):
count_statement = select(func.count()).select_from(Hike)
count = session.exec(count_statement).one()
statement = select(Hike).offset(skip).limit(limit)
hikes = session.exec(statement).all()
return HikesPublic(data=hikes, count=count)
return HikesPublic(data=[], count=0)
@router.get("/{id}", response_model=HikePublic)
def read_hike(session: SessionDep, current_user: CurrentUser, id: RowId) -> Any:
"""
Get hike by ID.
"""
hike = session.get(Hike, id)
if not hike:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Hike not found")
if not current_user.has_permissions(
module=PermissionModule.HIKE,
part=PermissionPart.ADMIN,
rights=PermissionRight.READ,
):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions")
return hike
@router.post("/", response_model=HikePublic)
def create_hike(
*, session: SessionDep, current_user: CurrentUser, hike_in: HikeCreate
) -> Any:
"""
Create new hike.
"""
if not current_user.has_permissions(
module=PermissionModule.HIKE,
part=PermissionPart.ADMIN,
rights=PermissionRight.CREATE,
):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions")
hike = Hike.create(create_obj=hike_in, session=session)
return hike
@router.put("/{id}", response_model=HikePublic)
def update_hike(
*, session: SessionDep, current_user: CurrentUser, id: RowId, hike_in: HikeUpdate
) -> Any:
"""
Update a hike.
"""
hike = session.get(Hike, id)
if not hike:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Hike not found")
if not current_user.has_permissions(
module=PermissionModule.HIKE,
part=PermissionPart.ADMIN,
rights=PermissionRight.UPDATE,
):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions")
hike = Hike.update(db_obj=hike, in_obj=hike_in, session=session)
return hike
@router.delete("/{id}")
def delete_hike(session: SessionDep,current_user: CurrentUser, id: RowId) -> Message:
"""
Delete a hike.
"""
hike = session.get(Hike, id)
if not hike:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Hike not found")
if not current_user.has_permissions(
module=PermissionModule.HIKE,
part=PermissionPart.ADMIN,
rights=PermissionRight.DELETE,
):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions")
session.delete(hike)
session.commit()
return Message(message="Hike deleted successfully")
# endregion

View File

@@ -0,0 +1,176 @@
from typing import Any
from fastapi import APIRouter, HTTPException, status
from sqlmodel import func, select
from app.api.deps import CurrentUser, SessionDep
from app.models.base import (
ApiTags,
Message,
RowId,
)
from app.models.serie import (
Serie,
SerieCreate,
SerieUpdate,
SeriePublic,
SeriesPublic,
)
from app.models.division import (
Division,
DivisionsPublic,
)
from app.models.user import (
PermissionModule,
PermissionPart,
PermissionRight,
)
router = APIRouter(prefix="/series", tags=[ApiTags.SERIES])
# region # Series ########################################################
@router.get("/", response_model=SeriesPublic)
def read_series(
session: SessionDep, current_user: CurrentUser, skip: int = 0, limit: int = 100
) -> Any:
"""
Retrieve all series.
"""
if current_user.has_permissions(
module=PermissionModule.SERIE,
part=PermissionPart.ADMIN,
rights=PermissionRight.READ,
):
count_statement = select(func.count()).select_from(Serie)
count = session.exec(count_statement).one()
statement = select(Serie).offset(skip).limit(limit)
series = session.exec(statement).all()
return SeriesPublic(data=series, count=count)
return SeriesPublic(data=[], count=0)
@router.get("/{id}", response_model=SeriePublic)
def read_serie(session: SessionDep, current_user: CurrentUser, id: RowId) -> Any:
"""
Get serie by ID.
"""
serie = session.get(Serie, id)
if not serie:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Serie not found")
if not current_user.has_permissions(
module=PermissionModule.SERIE,
part=PermissionPart.ADMIN,
rights=PermissionRight.READ,
):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions")
return serie
@router.post("/", response_model=SeriePublic)
def create_serie(
*, session: SessionDep, current_user: CurrentUser, serie_in: SerieCreate
) -> Any:
"""
Create new serie.
"""
if not current_user.has_permissions(
module=PermissionModule.SERIE,
part=PermissionPart.ADMIN,
rights=PermissionRight.CREATE,
):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions")
serie = Serie.create(create_obj=serie_in, session=session)
return serie
@router.put("/{id}", response_model=SeriePublic)
def update_serie(
*, session: SessionDep, current_user: CurrentUser, id: RowId, serie_in: SerieUpdate
) -> Any:
"""
Update a serie.
"""
serie = session.get(Serie, id)
if not serie:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Serie not found")
if not current_user.has_permissions(
module=PermissionModule.SERIE,
part=PermissionPart.ADMIN,
rights=PermissionRight.UPDATE,
):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions")
serie = Serie.update(db_obj=serie, in_obj=serie_in, session=session)
return serie
@router.delete("/{id}")
def delete_serie(session: SessionDep,current_user: CurrentUser, id: RowId) -> Message:
"""
Delete a serie.
"""
serie = session.get(Serie, id)
if not serie:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Serie not found")
if not current_user.has_permissions(
module=PermissionModule.SERIE,
part=PermissionPart.ADMIN,
rights=PermissionRight.DELETE,
):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions")
session.delete(serie)
session.commit()
return Message(message="Serie deleted successfully")
# endregion
# region # Series / Divisions ############################################
@router.get("/{series_id}/divisions/", response_model=DivisionsPublic)
def read_serie_division(
session: SessionDep, current_user: CurrentUser, series_id: RowId, skip: int = 0, limit: int = 100
) -> Any:
"""
Retrieve all serie divisions.
"""
serie = session.get(Serie, series_id)
if not serie:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Serie not found")
if not current_user.has_permission(
module=PermissionModule.SERIE,
part=PermissionPart.ADMIN,
rights=(PermissionRight.MANAGE_DIVISIONS | PermissionRight.READ),
):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions")
count_statement = (select(func.count())
.select_from(Division)
.where(Division.serie_id == serie.id)
)
count = session.exec(count_statement).one()
statement = (select(Division)
.where(Division.serie_id == serie.id)
.offset(skip)
.limit(limit)
)
divisions = session.exec(statement).all()
return DivisionsPublic(data=divisions, count=count)
# endregion

View File

@@ -30,8 +30,8 @@ from app.models.member import (
from app.models.apikey import ( from app.models.apikey import (
ApiKey, ApiKey,
) )
from app.models.hike import ( from app.models.serie import (
Hike, Serie,
) )
engine = create_engine(str(settings.SQLALCHEMY_DATABASE_URI)) engine = create_engine(str(settings.SQLALCHEMY_DATABASE_URI))

View File

@@ -4,7 +4,6 @@ from enum import Enum, IntFlag # Python 3.11 >= StrEnum
from enum import auto as auto_enum from enum import auto as auto_enum
from uuid import UUID as RowId from uuid import UUID as RowId
from sqlalchemy import TypeDecorator, Integer, VARCHAR
from sqlmodel import SQLModel from sqlmodel import SQLModel
from sqlalchemy.orm import declared_attr from sqlalchemy.orm import declared_attr
@@ -40,85 +39,8 @@ class DocumentedStrEnum(str, Enum):
class DocumentedIntFlag(IntFlag): class DocumentedIntFlag(IntFlag):
@property # TODO: Build DB sport to proper store flags and make it possible to store all mutations
def names(self) -> str: pass
"""
Returns a comma-separated string of all active flag names.
"""
# Exclude 0-value flags
return ",".join([flag.name for flag in type(self) if flag in self and flag.value != 0])
def __str__(self) -> str:
# Default string conversion uses the names
return self.names
# Optional: for Pydantic compatibility
def __get_pydantic_json__(self) -> str:
return self.names
class DocumentedIntFlagType(TypeDecorator):
impl = Integer
cache_ok = True
def __init__(self, enum_class: type[IntFlag], *args, **kwargs):
super().__init__(*args, **kwargs)
self.enum_class = enum_class
def process_bind_param(self, value, dialect):
"""
Convert IntFlag to integer before storing
"""
if value is None:
return None
if isinstance(value, self.enum_class):
return int(value)
if isinstance(value, int):
return value
raise ValueError(f"Invalid value for {self.enum_class.__name__}: {value!r}")
def process_result_value(self, value, dialect):
"""
Convert integer from DB back to IntFlag
"""
if value is None:
return None
return self.enum_class(value)
class DocumentedStrFlagType(TypeDecorator):
impl = VARCHAR
cache_ok = True
def __init__(self, enum_class: type[IntFlag], *args, **kwargs):
super().__init__(*args, **kwargs)
self.enum_class = enum_class
def process_bind_param(self, value, dialect):
"""
Convert IntFlag to comma-separated string of names for storing in DB.
"""
if value is None:
return None
if isinstance(value, self.enum_class):
return str(value)
raise ValueError(f"Invalid value for {self.enum_class.__name__}: {value!r}")
def process_result_value(self, value, dialect):
"""
Convert comma-separated string of names from DB back to IntFlag.
"""
if value is None or value == "":
return self.enum_class(0)
names = value.split(",")
result = self.enum_class(0)
for name in names:
try:
result |= self.enum_class[name]
except KeyError:
raise ValueError(f"Invalid flag name '{name}' for {self.enum_class.__name__}")
return result
# ############################################################################# # #############################################################################
@@ -137,9 +59,13 @@ class ApiTags(DocumentedStrEnum):
ASSOCIATIONS = "Associations" ASSOCIATIONS = "Associations"
DIVISIONS = "Divisions" DIVISIONS = "Divisions"
MEMBERS = "Members" MEMBERS = "Members"
SERIES = "Series"
HIKES = "Hikes"
class VisitedCountType(DocumentedStrEnum):
NONE = auto_enum()
ONE_VISIT = auto_enum()
EACH_VISIT = auto_enum()
# endregion # endregion

View File

@@ -1,181 +0,0 @@
from typing import TYPE_CHECKING
from sqlmodel import (
Session,
Relationship,
Field,
)
from . import mixin
from .base import (
BaseSQLModel,
DocumentedStrEnum,
DocumentedIntFlag,
DocumentedStrFlagType,
auto_enum,
)
if TYPE_CHECKING:
from .event import Event
# region # Hike ################################################################
class HikeTeamPage(DocumentedIntFlag):
ROUTE = auto_enum() # Route steps info
QUESTIONS = auto_enum() # Visible questions
VISITED_PLACES = auto_enum() # Places the team has been
CURRENT_PLACE = auto_enum() # Place where the team currently is
UNVISITED_PLACES = auto_enum() # Places the team still neat to visit
PLACES = VISITED_PLACES | CURRENT_PLACE | UNVISITED_PLACES # All the places
TRAVEL_TIME = auto_enum() # Total travel time
TRAVEL_FREE_TIME = auto_enum() # Total travel time
PLACE_TIME = auto_enum() # Total place time
CALCULATE_TIME = auto_enum() # Total time calculated based on hike settings
# TODO: Think about time between oter teams
PLACE_POINTS = auto_enum() # Points scored on a place
TOTAL_PLACE_POINTS = auto_enum() # All points got on all places
ASSIST_POINTS = auto_enum() # Minus points for assistants
# TODO: Think about place in classement
ASSIST_LOG = auto_enum() # Assisted items
ASSIST_LATTER = auto_enum() # ???
# ##############################################################################
class HikeTimeCalculation(DocumentedIntFlag):
TRAVEL_TIME = auto_enum() # Time traveling
# TODO: Think about time groups (in this model we have 2, free and non free)
TRAVEL_FREE_TIME = auto_enum() # Time that is excluded from traveling but not at a place
PLACE_TIME = auto_enum() # Time checked in at a place
TOTAL_TIME = TRAVEL_TIME | TRAVEL_FREE_TIME | PLACE_TIME
# ##############################################################################
class HikeBase(
mixin.Name,
mixin.Contact,
BaseSQLModel,
):
tracker_interval: int | None = Field(
default=None,
nullable=True,
description="Is GPS button available, value will be the interval",
)
is_multi_day: bool = Field(
default=False,
nullable=False,
description="Show datetime in stead of time only",
)
team_page: HikeTeamPage = Field(
default=HikeTeamPage.PLACES | HikeTeamPage.ROUTE | HikeTeamPage.QUESTIONS,
nullable=False,
description="Show all the places of the route inside the teams page",
sa_type=DocumentedStrFlagType(HikeTeamPage),
)
time_calculation: HikeTimeCalculation = Field(
default=HikeTimeCalculation.TRAVEL_TIME,
nullable=False,
description="Wath should we calculate inside the total time",
sa_type=DocumentedStrFlagType(HikeTimeCalculation),
)
min_time_points: int | None = Field(
default=0,
ge=0,
# TODO: le: max_time_points
description="Min points for time",
)
max_time_points: int | None = Field(
default=100,
ge=0, # TODO: ge > min_time_points
description="Max points for time",
)
max_question_points: int | None = Field(
default=None,
description="None: Calculate from answers (no max), Positive: Set as max for dynamic range",
)
# Properties to receive via API on creation
class HikeCreate(HikeBase):
pass
# Properties to receive via API on update, all are optional
class HikeUpdate(HikeBase):
is_multi_day: bool | None = Field(default=None) # type: ignore
team_page: HikeTeamPage | None = Field(default=None) # type: ignore
time_calculation: HikeTimeCalculation | None = Field(default=None) # type: ignore
class Hike(mixin.RowId, HikeBase, table=True):
# --- database only items --------------------------------------------------
# --- read only items ------------------------------------------------------
# --- back_populates links -------------------------------------------------
# --- CRUD actions ---------------------------------------------------------
@classmethod
def create(cls, *, session: Session, create_obj: HikeCreate) -> "Hike":
data_obj = create_obj.model_dump(exclude_unset=True)
db_obj = cls.model_validate(data_obj)
session.add(db_obj)
session.commit()
session.refresh(db_obj)
return db_obj
@classmethod
def update(
cls, *, session: Session, db_obj: "Hike", in_obj: HikeUpdate
) -> "Hike":
data_obj = in_obj.model_dump(exclude_unset=True)
db_obj.sqlmodel_update(data_obj)
session.add(db_obj)
session.commit()
session.refresh(db_obj)
return db_obj
# Properties to return via API, id is always required
class HikePublic(mixin.RowIdPublic, HikeBase):
pass
class HikesPublic(BaseSQLModel):
data: list[HikePublic]
count: int
# endregion
# region # Hike / Route ########################################################
class RouteType(DocumentedStrEnum):
START_FINISH = auto_enum() # Start at the start and end at the finish
CIRCULAR = auto_enum() # Start some ware, finish at the last new place
CIRCULAR_BACK_TO_START = auto_enum() # Start and finish on the same random place (CIRCULAR + next to start)
# ##############################################################################
# endregion

View File

@@ -6,7 +6,7 @@ from sqlmodel import (
Field, Field,
) )
from .base import RowId as RowIdType from .base import RowId as RowIdType, VisitedCountType
from ..core.config import settings from ..core.config import settings
@@ -120,3 +120,28 @@ class Birthday(BaseModel):
class Created(BaseModel): class Created(BaseModel):
created_at: datetime | None = Field(nullable=False, default_factory=lambda: datetime.now(settings.tz_info)) created_at: datetime | None = Field(nullable=False, default_factory=lambda: datetime.now(settings.tz_info))
created_by: RowIdType | None = Field(default=None, nullable=True, foreign_key="user.id", ondelete="SET NULL") created_by: RowIdType | None = Field(default=None, nullable=True, foreign_key="user.id", ondelete="SET NULL")
class QuestionInfo(BaseModel):
question_file: str | None = Field(default=None, nullable=True, max_length=255)
answer_file: str | None = Field(default=None, nullable=True, max_length=255)
class TeamAmmounts(BaseModel):
min_teams: int = Field(default=None, nullable=True, ge=1)
max_teams: int = Field(default=None, nullable=True, ge=1)
class MaxPoints(BaseModel):
max_points: int = Field(default=None, nullable=True, ge=0)
class VisitedPoints(BaseModel):
visited_points: int | None = Field(
default=None,
ge=0,
description="Visited points for this place, None will disable this function",
)
visited_count_type: VisitedCountType = Field(
default=VisitedCountType.NONE,
)

206
backend/app/models/post.py Normal file
View File

@@ -0,0 +1,206 @@
# app/models/post.py
from typing import TYPE_CHECKING
from sqlmodel import Field, Relationship, Session, select
from . import mixin
from .base import BaseSQLModel, RowId, DocumentedStrEnum, DocumentedIntFlag, auto_enum
from .user import PermissionRight, User
if TYPE_CHECKING:
from .team import Team
# region # Post ##############################################################
class PostType(DocumentedIntFlag):
POINTS = auto_enum() #allows for the ability to get points
OPTION = auto_enum() #allows groups to enter waiting list
POINTS_OPTIONS = POINTS | POINTS
VISITED = auto_enum() #only mark if a team has visited this post
CORRECT = auto_enum() #possible to fill in if correct (get max points)
class ReplayType(DocumentedStrEnum):
NOT_POSSIBLE = auto_enum()
ONLY_WHEN_NO_POINTS = auto_enum()
YES_BUT_FIRST_POINTS_COUNT = auto_enum()
YES_BUT_LAST_POINTS_COUNT = auto_enum()
YES_BUT_HIGHEST_POINTS_COUNT = auto_enum()
ALWAYS_AS_NEW = auto_enum() # add points together
# ##############################################################################
# Shared properties
class PostUserLinkBase(BaseSQLModel):
rights: PermissionRight = Field(default=PermissionRight.READ, nullable=False)
# Properties to receive via API on creation
class PostUserLinkCreate(PostUserLinkBase):
user_id: RowId = Field(nullable=False)
# Properties to receive via API on update, all are optional
class PostUserLinkUpdate(PostUserLinkBase):
pass
# Database model (link tussen post en user)
class PostUserLink(PostUserLinkBase, table=True):
post_id: RowId = Field(
foreign_key="post.id",
primary_key=True,
nullable=False,
ondelete="CASCADE",
)
user_id: RowId = Field(
foreign_key="user.id",
primary_key=True,
nullable=False,
ondelete="CASCADE",
)
post: "Post" = Relationship(back_populates="user_links")
user: "User" = Relationship(back_populates="post_links")
# Properties to return via API
class PostUserLinkPublic(PostUserLinkBase):
user_id: RowId
post_id: RowId
class PostUserLinksPublic(BaseSQLModel):
data: list[PostUserLinkPublic]
count: int
# Shared properties
class PostBase(
#common
mixin.Name, #post name
mixin.Contact, #name contact person
mixin.IsActive, #scouting active (unused)
#post specific
mixin.ShortName, #shortname for post
mixin.MaxPoints, #max obtainable points
mixin.VisitedPoints, #points obtained for visiting post
mixin.TeamAmmounts, #teams, min&max teams required
mixin.QuestionInfo, #field for questions
BaseSQLModel,
):
post_type: PostType = Field(default=PostType.POINTS, nullable=False)
replay: ReplayType = Field(default=ReplayType.NOT_POSSIBLE, nullable=False)
# Properties to receive via API on creation
class PostCreate(PostBase):
pass
# Properties to receive via API on update, all are optional
class PostUpdate(mixin.ShortNameUpdate, PostBase):
post_type: PostType | None = Field(default=None, nullable=True) # type: ignore
replay: ReplayType | None = Field(default=None, nullable=True) # type: ignore
# Database model
class Post(mixin.RowId, PostBase, table=True):
user_links: list["PostUserLink"] = Relationship(back_populates="post", cascade_delete=True)
#teams: list["Team"] = Relationship(back_populates="post", cascade_delete=True)
@classmethod
def create(cls, *, session: Session, create_obj: PostCreate) -> "Post":
data_obj = create_obj.model_dump(exclude_unset=True)
db_obj = cls.model_validate(data_obj)
session.add(db_obj)
session.commit()
session.refresh(db_obj)
return db_obj
@classmethod
def update(cls, *, session: Session, db_obj: "Post", in_obj: PostUpdate) -> "Post":
data_obj = in_obj.model_dump(exclude_unset=True)
db_obj.sqlmodel_update(data_obj)
session.add(db_obj)
session.commit()
session.refresh(db_obj)
return db_obj
def get_user_link(self, user: User) -> "PostUserLink | None":
return next((link for link in self.user_links if link.user == user), None)
def add_user(
self,
user: User,
rights: PermissionRight = PermissionRight.READ,
*,
session: Session,
) -> "PostUserLink | None":
existing = self.get_user_link(user=user)
if existing is None:
new_link = PostUserLink(post=self, user=user, rights=rights)
self.user_links.append(new_link)
session.add(new_link)
session.commit()
return new_link
return None
def update_user(
self,
user: User,
rights: PermissionRight = PermissionRight.READ,
*,
session: Session,
) -> "PostUserLink | None":
link = self.get_user_link(user=user)
if link:
link.rights = rights
session.add(link)
session.commit()
return link
return None
def remove_user(self, user: User, *, session: Session) -> None:
link = self.get_user_link(user=user)
if link:
session.delete(link)
session.commit()
def user_has_rights(
self,
user: User,
rights: PermissionRight | None = None,
) -> bool:
return any(
(
link.user == user
and link.rights
and (not rights or (link.rights & rights) == rights)
)
for link in self.user_links
)
def user_has_right(
self,
user: User,
rights: PermissionRight | None = None,
) -> bool:
return any(
(
link.user == user
and link.rights
and (not rights or (link.rights & rights))
)
for link in self.user_links
)
# API output models
class PostPublic(mixin.RowIdPublic, PostBase):
pass
class PostsPublic(BaseSQLModel):
data: list[PostPublic]
count: int
# endregion

173
backend/app/models/serie.py Normal file
View File

@@ -0,0 +1,173 @@
# app/models/serie.py
from typing import TYPE_CHECKING
from sqlmodel import Field, Relationship, Session, select
from . import mixin
from .base import BaseSQLModel, RowId
from .user import PermissionRight, User
if TYPE_CHECKING:
from .team import Team
# region # Serie ##############################################################
# Shared properties
class SerieUserLinkBase(BaseSQLModel):
rights: PermissionRight = Field(default=PermissionRight.READ, nullable=False)
# Properties to receive via API on creation
class SerieUserLinkCreate(SerieUserLinkBase):
user_id: RowId = Field(nullable=False)
# Properties to receive via API on update, all are optional
class SerieUserLinkUpdate(SerieUserLinkBase):
pass
# Database model (link tussen serie en user)
class SerieUserLink(SerieUserLinkBase, table=True):
serie_id: RowId = Field(
foreign_key="serie.id",
primary_key=True,
nullable=False,
ondelete="CASCADE",
)
user_id: RowId = Field(
foreign_key="user.id",
primary_key=True,
nullable=False,
ondelete="CASCADE",
)
serie: "Serie" = Relationship(back_populates="user_links")
user: "User" = Relationship(back_populates="serie_links")
# Properties to return via API
class SerieUserLinkPublic(SerieUserLinkBase):
user_id: RowId
serie_id: RowId
class SerieUserLinksPublic(BaseSQLModel):
data: list[SerieUserLinkPublic]
count: int
# Shared properties
class SerieBase(
mixin.Name,
mixin.Contact,
mixin.IsActive,
BaseSQLModel,
):
pass
# Properties to receive via API on creation
class SerieCreate(SerieBase):
pass
# Properties to receive via API on update, all are optional
class SerieUpdate(SerieBase):
pass
# Database model
class Serie(mixin.RowId, SerieBase, table=True):
user_links: list["SerieUserLink"] = Relationship(back_populates="serie", cascade_delete=True)
#teams: list["Team"] = Relationship(back_populates="serie", cascade_delete=True)
@classmethod
def create(cls, *, session: Session, create_obj: SerieCreate) -> "Serie":
data_obj = create_obj.model_dump(exclude_unset=True)
db_obj = cls.model_validate(data_obj)
session.add(db_obj)
session.commit()
session.refresh(db_obj)
return db_obj
@classmethod
def update(cls, *, session: Session, db_obj: "Serie", in_obj: SerieUpdate) -> "Serie":
data_obj = in_obj.model_dump(exclude_unset=True)
db_obj.sqlmodel_update(data_obj)
session.add(db_obj)
session.commit()
session.refresh(db_obj)
return db_obj
def get_user_link(self, user: User) -> "SerieUserLink | None":
return next((link for link in self.user_links if link.user == user), None)
def add_user(
self,
user: User,
rights: PermissionRight = PermissionRight.READ,
*,
session: Session,
) -> "SerieUserLink | None":
existing = self.get_user_link(user=user)
if existing is None:
new_link = SerieUserLink(serie=self, user=user, rights=rights)
self.user_links.append(new_link)
session.add(new_link)
session.commit()
return new_link
return None #TODO aanpassen??
def update_user(
self,
user: User,
rights: PermissionRight = PermissionRight.READ,
*,
session: Session,
) -> "SerieUserLink | None":
link = self.get_user_link(user=user)
if link:
link.rights = rights
session.add(link)
session.commit()
return link
return None
def remove_user(self, user: User, *, session: Session) -> None:
link = self.get_user_link(user=user)
if link:
session.delete(link)
session.commit()
def user_has_rights(
self,
user: User,
rights: PermissionRight | None = None,
) -> bool:
return any(
(
link.user == user
and link.rights
and (not rights or (link.rights & rights) == rights)
)
for link in self.user_links
)
def user_has_right(
self,
user: User,
rights: PermissionRight | None = None,
) -> bool:
return any(
(
link.user == user
and link.rights
and (not rights or (link.rights & rights))
)
for link in self.user_links
)
# API output models
class SeriePublic(mixin.RowIdPublic, SerieBase):
pass
class SeriesPublic(BaseSQLModel):
data: list[SeriePublic]
count: int
# endregion

View File

@@ -18,6 +18,7 @@ if TYPE_CHECKING:
from .apikey import ApiKey from .apikey import ApiKey
from .event import EventUserLink from .event import EventUserLink
from .member import Member from .member import Member
from .serie import SerieUserLink
# region # User ################################################################ # region # User ################################################################
@@ -31,8 +32,8 @@ class PermissionModule(DocumentedStrEnum):
ASSOCIATION = auto_enum() ASSOCIATION = auto_enum()
DIVISION = auto_enum() DIVISION = auto_enum()
MEMBER = auto_enum() MEMBER = auto_enum()
SERIE = auto_enum()
HIKE = auto_enum() POST = auto_enum()
class PermissionPart(DocumentedStrEnum): class PermissionPart(DocumentedStrEnum):
@@ -138,6 +139,7 @@ class User(mixin.RowId, UserBase, table=True):
# --- many-to-many links --------------------------------------------------- # --- many-to-many links ---------------------------------------------------
roles: list["Role"] = Relationship(back_populates="users", link_model=UserRoleLink) roles: list["Role"] = Relationship(back_populates="users", link_model=UserRoleLink)
event_links: list["EventUserLink"] = Relationship(back_populates="user") event_links: list["EventUserLink"] = Relationship(back_populates="user")
serie_links: list["SerieUserLink"] = Relationship(back_populates="user")
# --- CRUD actions --------------------------------------------------------- # --- CRUD actions ---------------------------------------------------------
@classmethod @classmethod

View File

@@ -0,0 +1,213 @@
import uuid
from fastapi import status
from fastapi.testclient import TestClient
from sqlmodel import Session
from app.core.config import settings
from app.models.base import VisitedCountType
from app.tests.utils.post import create_random_post
from app.models.post import PostType, ReplayType
def test_create_post(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
data = {
"post_type": PostType.POINTS | PostType.VISITED | PostType.OPTION,
"replay": ReplayType.NOT_POSSIBLE,
"name": "Post 1",
"short_name": "1",
"contact": "Rick",
"description": "Post met renspel",
"question_file": None,
"answer_file": None,
"max_points": 5,
"visited_points": 5,
"visited_count_type": VisitedCountType.ONE_VISIT,
"min_teams": 1,
"max_teams": 2,
}
response = client.post(
f"{settings.API_V1_STR}/posts/",
headers=superuser_token_headers,
json=data,
)
assert response.status_code == status.HTTP_200_OK
content = response.json()
assert content["post_type"] == data["post_type"]
assert content["replay"] == data["replay"]
assert content["name"] == data["name"]
assert content["short_name"] == data["short_name"]
assert content["contact"] == data["contact"]
assert content["description"] == data["description"]
assert content["question_file"] == data["question_file"]
assert content["answer_file"] == data["answer_file"]
assert content["max_points"] == data["max_points"]
assert content["visited_points"] == data["visited_points"]
assert content["visited_count_type"] == data["visited_count_type"]
assert content["min_teams"] == data["min_teams"]
assert content["max_teams"] == data["max_teams"]
assert "id" in content
def test_create_post_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
data = {
"post_type": PostType.POINTS | PostType.VISITED | PostType.OPTION,
"replay": ReplayType.NOT_POSSIBLE,
"name": "No permissions",
"short_name": "No perm",
}
response = client.post(
f"{settings.API_V1_STR}/posts/",
headers=normal_user_token_headers,
json=data,
)
assert response.status_code == status.HTTP_403_FORBIDDEN
assert response.json()["detail"] == "Not enough permissions"
def test_read_post(
client: TestClient, superuser_token_headers: dict[str, str], db: Session
) -> None:
post = create_random_post(db)
response = client.get(
f"{settings.API_V1_STR}/posts/{post.id}",
headers=superuser_token_headers,
)
assert response.status_code == status.HTTP_200_OK
content = response.json()
assert content["id"] == str(post.id)
assert content["name"] == post.name
assert content["contact"] == post.contact
def test_read_post_not_found(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
response = client.get(
f"{settings.API_V1_STR}/posts/{uuid.uuid4()}",
headers=superuser_token_headers,
)
assert response.status_code == status.HTTP_404_NOT_FOUND
assert response.json()["detail"] == "Post not found"
def test_read_post_no_permission(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
post = create_random_post(db)
response = client.get(
f"{settings.API_V1_STR}/posts/{post.id}",
headers=normal_user_token_headers,
)
assert response.status_code == status.HTTP_403_FORBIDDEN
assert response.json()["detail"] == "Not enough permissions"
def test_read_posts(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
create_random_post(db)
create_random_post(db)
response = client.get(
f"{settings.API_V1_STR}/posts/",
headers=superuser_token_headers,
)
assert response.status_code == status.HTTP_200_OK
content = response.json()
assert "count" in content
assert content["count"] >= 2
assert "data" in content
assert isinstance(content["data"], list)
assert len(content["data"]) <= content["count"]
def test_read_posts_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
create_random_post(db)
create_random_post(db)
response = client.get(
f"{settings.API_V1_STR}/posts/",
headers=normal_user_token_headers,
)
assert response.status_code == status.HTTP_200_OK
content = response.json()
assert "count" in content
assert content["count"] == 0
assert "data" in content
assert isinstance(content["data"], list)
assert len(content["data"]) == 0
def test_update_post(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
post = create_random_post(db)
data = {
"name": "Updated name",
"short_name": "4",
}
response = client.put(
f"{settings.API_V1_STR}/posts/{post.id}",
headers=superuser_token_headers,
json=data,
)
assert response.status_code == status.HTTP_200_OK
content = response.json()
assert content["id"] == str(post.id)
assert content["name"] == data["name"]
assert content["short_name"] == data["short_name"]
def test_update_post_not_found(client: TestClient, superuser_token_headers: dict[str, str]) -> None:
data = {
"name": "Not found",
"short_name": "5",
}
response = client.put(
f"{settings.API_V1_STR}/posts/{uuid.uuid4()}",
headers=superuser_token_headers,
json=data,
)
assert response.status_code == status.HTTP_404_NOT_FOUND
assert response.json()["detail"] == "Post not found"
def test_update_post_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
post = create_random_post(db)
data = {
"name": "No permissions",
"short_name": "6",
}
response = client.put(
f"{settings.API_V1_STR}/posts/{post.id}",
headers=normal_user_token_headers,
json=data,
)
assert response.status_code == status.HTTP_403_FORBIDDEN
assert response.json()["detail"] == "Not enough permissions"
def test_delete_post(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
post = create_random_post(db)
response = client.delete(
f"{settings.API_V1_STR}/posts/{post.id}",
headers=superuser_token_headers,
)
assert response.status_code == status.HTTP_200_OK
assert response.json()["message"] == "Post deleted successfully"
def test_delete_post_not_found(client: TestClient, superuser_token_headers: dict[str, str]) -> None:
response = client.delete(
f"{settings.API_V1_STR}/posts/{uuid.uuid4()}",
headers=superuser_token_headers,
)
assert response.status_code == status.HTTP_404_NOT_FOUND
assert response.json()["detail"] == "Post not found"
def test_delete_post_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
post = create_random_post(db)
response = client.delete(
f"{settings.API_V1_STR}/posts/{post.id}",
headers=normal_user_token_headers,
)
assert response.status_code == status.HTTP_403_FORBIDDEN
assert response.json()["detail"] == "Not enough permissions"

View File

@@ -5,16 +5,16 @@ from fastapi.testclient import TestClient
from sqlmodel import Session from sqlmodel import Session
from app.core.config import settings from app.core.config import settings
from app.tests.utils.hike import create_random_hike from app.tests.utils.serie import create_random_serie
def test_create_hike(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None: def test_create_serie(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
data = { data = {
"name": "RSW Maasdelta 2026", "name": "Zondag spel",
"contact": "Sebas", "contact": "Rick",
} }
response = client.post( response = client.post(
f"{settings.API_V1_STR}/hikes/", f"{settings.API_V1_STR}/series/",
headers=superuser_token_headers, headers=superuser_token_headers,
json=data, json=data,
) )
@@ -25,13 +25,13 @@ def test_create_hike(client: TestClient, superuser_token_headers: dict[str, str]
assert "id" in content assert "id" in content
def test_create_hike_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None: def test_create_serie_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
data = { data = {
"name": "RSW Maasdelta 2026", "name": "Zondag Spel",
"contact": "Sebas", "contact": "Rick",
} }
response = client.post( response = client.post(
f"{settings.API_V1_STR}/hikes/", f"{settings.API_V1_STR}/series/",
headers=normal_user_token_headers, headers=normal_user_token_headers,
json=data, json=data,
) )
@@ -39,43 +39,43 @@ def test_create_hike_no_permissions(client: TestClient, normal_user_token_header
assert response.json()["detail"] == "Not enough permissions" assert response.json()["detail"] == "Not enough permissions"
def test_read_hike(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None: def test_read_serie(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
hike = create_random_hike(db) serie = create_random_serie(db)
response = client.get( response = client.get(
f"{settings.API_V1_STR}/hikes/{hike.id}", f"{settings.API_V1_STR}/series/{serie.id}",
headers=superuser_token_headers, headers=superuser_token_headers,
) )
assert response.status_code == status.HTTP_200_OK assert response.status_code == status.HTTP_200_OK
content = response.json() content = response.json()
assert content["id"] == str(hike.id) assert content["id"] == str(serie.id)
assert content["name"] == hike.name assert content["name"] == serie.name
assert content["contact"] == hike.contact assert content["contact"] == serie.contact
def test_read_hike_not_found(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None: def test_read_serie_not_found(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
response = client.get( response = client.get(
f"{settings.API_V1_STR}/hikes/{uuid.uuid4()}", f"{settings.API_V1_STR}/series/{uuid.uuid4()}",
headers=superuser_token_headers, headers=superuser_token_headers,
) )
assert response.status_code == status.HTTP_404_NOT_FOUND assert response.status_code == status.HTTP_404_NOT_FOUND
assert response.json()["detail"] == "Hike not found" assert response.json()["detail"] == "Serie not found"
def test_read_hike_no_permission(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None: def test_read_serie_no_permission(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
hike = create_random_hike(db) serie = create_random_serie(db)
response = client.get( response = client.get(
f"{settings.API_V1_STR}/hikes/{hike.id}", f"{settings.API_V1_STR}/series/{serie.id}",
headers=normal_user_token_headers, headers=normal_user_token_headers,
) )
assert response.status_code == status.HTTP_403_FORBIDDEN assert response.status_code == status.HTTP_403_FORBIDDEN
assert response.json()["detail"] == "Not enough permissions" assert response.json()["detail"] == "Not enough permissions"
def test_read_hikes(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None: def test_read_series(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
create_random_hike(db) create_random_serie(db)
create_random_hike(db) create_random_serie(db)
response = client.get( response = client.get(
f"{settings.API_V1_STR}/hikes/", f"{settings.API_V1_STR}/series/",
headers=superuser_token_headers, headers=superuser_token_headers,
) )
assert response.status_code == status.HTTP_200_OK assert response.status_code == status.HTTP_200_OK
@@ -87,11 +87,11 @@ def test_read_hikes(client: TestClient, superuser_token_headers: dict[str, str],
assert len(content["data"]) <= content["count"] assert len(content["data"]) <= content["count"]
def test_read_hikes_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None: def test_read_series_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
create_random_hike(db) create_random_serie(db)
create_random_hike(db) create_random_serie(db)
response = client.get( response = client.get(
f"{settings.API_V1_STR}/hikes/", f"{settings.API_V1_STR}/series/",
headers=normal_user_token_headers, headers=normal_user_token_headers,
) )
assert response.status_code == status.HTTP_200_OK assert response.status_code == status.HTTP_200_OK
@@ -103,46 +103,46 @@ def test_read_hikes_no_permissions(client: TestClient, normal_user_token_headers
assert len(content["data"]) == 0 assert len(content["data"]) == 0
def test_update_hike(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None: def test_update_serie(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
hike = create_random_hike(db) serie = create_random_serie(db)
data = { data = {
"name": "Updated name", "name": "Updated name",
"contact": "Updated contact", "contact": "Updated contact",
} }
response = client.put( response = client.put(
f"{settings.API_V1_STR}/hikes/{hike.id}", f"{settings.API_V1_STR}/series/{serie.id}",
headers=superuser_token_headers, headers=superuser_token_headers,
json=data, json=data,
) )
assert response.status_code == status.HTTP_200_OK assert response.status_code == status.HTTP_200_OK
content = response.json() content = response.json()
assert content["id"] == str(hike.id) assert content["id"] == str(serie.id)
assert content["name"] == data["name"] assert content["name"] == data["name"]
assert content["contact"] == data["contact"] assert content["contact"] == data["contact"]
def test_update_hike_not_found(client: TestClient, superuser_token_headers: dict[str, str]) -> None: def test_update_serie_not_found(client: TestClient, superuser_token_headers: dict[str, str]) -> None:
data = { data = {
"name": "Not found", "name": "Not found",
"contact": "Not found", "contact": "Not found",
} }
response = client.put( response = client.put(
f"{settings.API_V1_STR}/hikes/{uuid.uuid4()}", f"{settings.API_V1_STR}/series/{uuid.uuid4()}",
headers=superuser_token_headers, headers=superuser_token_headers,
json=data, json=data,
) )
assert response.status_code == status.HTTP_404_NOT_FOUND assert response.status_code == status.HTTP_404_NOT_FOUND
assert response.json()["detail"] == "Hike not found" assert response.json()["detail"] == "Serie not found"
def test_update_hike_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None: def test_update_serie_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
hike = create_random_hike(db) serie = create_random_serie(db)
data = { data = {
"name": "No permissions", "name": "No permissions",
"contact": "No permissions", "contact": "No permissions",
} }
response = client.put( response = client.put(
f"{settings.API_V1_STR}/hikes/{hike.id}", f"{settings.API_V1_STR}/series/{serie.id}",
headers=normal_user_token_headers, headers=normal_user_token_headers,
json=data, json=data,
) )
@@ -150,29 +150,29 @@ def test_update_hike_no_permissions(client: TestClient, normal_user_token_header
assert response.json()["detail"] == "Not enough permissions" assert response.json()["detail"] == "Not enough permissions"
def test_delete_hike(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None: def test_delete_serie(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
hike = create_random_hike(db) serie = create_random_serie(db)
response = client.delete( response = client.delete(
f"{settings.API_V1_STR}/hikes/{hike.id}", f"{settings.API_V1_STR}/series/{serie.id}",
headers=superuser_token_headers, headers=superuser_token_headers,
) )
assert response.status_code == status.HTTP_200_OK assert response.status_code == status.HTTP_200_OK
assert response.json()["message"] == "Hike deleted successfully" assert response.json()["message"] == "Serie deleted successfully"
def test_delete_hike_not_found(client: TestClient, superuser_token_headers: dict[str, str]) -> None: def test_delete_serie_not_found(client: TestClient, superuser_token_headers: dict[str, str]) -> None:
response = client.delete( response = client.delete(
f"{settings.API_V1_STR}/hikes/{uuid.uuid4()}", f"{settings.API_V1_STR}/series/{uuid.uuid4()}",
headers=superuser_token_headers, headers=superuser_token_headers,
) )
assert response.status_code == status.HTTP_404_NOT_FOUND assert response.status_code == status.HTTP_404_NOT_FOUND
assert response.json()["detail"] == "Hike not found" assert response.json()["detail"] == "Serie not found"
def test_delete_hike_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None: def test_delete_serie_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
hike = create_random_hike(db) serie = create_random_serie(db)
response = client.delete( response = client.delete(
f"{settings.API_V1_STR}/hikes/{hike.id}", f"{settings.API_V1_STR}/series/{serie.id}",
headers=normal_user_token_headers, headers=normal_user_token_headers,
) )
assert response.status_code == status.HTTP_403_FORBIDDEN assert response.status_code == status.HTTP_403_FORBIDDEN

View File

@@ -1,12 +0,0 @@
from sqlmodel import Session
from app.models.hike import Hike, HikeCreate
from app.tests.utils.utils import random_lower_string
def create_random_hike(db: Session, name: str = None) -> Hike:
if not name:
name = random_lower_string()
hike_in = HikeCreate(name=name)
return Hike.create(session=db, create_obj=hike_in)

View File

@@ -0,0 +1,12 @@
from sqlmodel import Session
from app.models.post import Post, PostCreate
from app.tests.utils.utils import random_lower_string
def create_random_post(db: Session, name: str = None) -> Post:
if not name:
name = random_lower_string()
post_in = PostCreate(name=name)
return Post.create(session=db, create_obj=post_in)

View File

@@ -0,0 +1,12 @@
from sqlmodel import Session
from app.models.serie import Serie, SerieCreate
from app.tests.utils.utils import random_lower_string
def create_random_serie(db: Session, name: str = None) -> Serie:
if not name:
name = random_lower_string()
serie_in = SerieCreate(name=name)
return Serie.create(session=db, create_obj=serie_in)