2 Commits

Author SHA1 Message Date
Sebastiaan
0d6ef073df [WIP] Link hike to places 2025-11-01 01:03:34 +01:00
Sebastiaan
84d75e21ca Add base for route 2025-10-31 14:22:21 +01:00
19 changed files with 1444 additions and 17 deletions

View File

@@ -11,6 +11,8 @@ from app.api.routes import (
users, users,
utils, utils,
hikes, hikes,
routes,
places,
) )
from app.core.config import settings from app.core.config import settings
@@ -27,6 +29,8 @@ api_router.include_router(divisions.router)
api_router.include_router(members.router) api_router.include_router(members.router)
api_router.include_router(hikes.router) api_router.include_router(hikes.router)
api_router.include_router(routes.router)
api_router.include_router(places.router)
if settings.ENVIRONMENT == "local": if settings.ENVIRONMENT == "local":

View File

@@ -16,6 +16,10 @@ from app.models.hike import (
HikePublic, HikePublic,
HikesPublic, HikesPublic,
) )
from app.models.route import (
Route,
RoutesPublic,
)
from app.models.user import ( from app.models.user import (
PermissionModule, PermissionModule,
PermissionPart, PermissionPart,
@@ -129,4 +133,47 @@ def delete_hike(session: SessionDep,current_user: CurrentUser, id: RowId) -> Mes
session.commit() session.commit()
return Message(message="Hike deleted successfully") return Message(message="Hike deleted successfully")
# endregion
# region # Hike / Routes #######################################################
@router.get("/{hike_id}/routes/", response_model=RoutesPublic)
def read_hike_route(
session: SessionDep,
current_user: CurrentUser,
hike_id: RowId,
skip: int = 0,
limit: int = 100,
) -> Any:
"""
Retrieve all hike routes.
"""
hike = session.get(Hike, hike_id)
if not hike:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Hike not found"
)
if not current_user.has_permission(
module=PermissionModule.HIKE,
part=PermissionPart.ADMIN,
rights=(PermissionRight.MANAGE_HIKES | PermissionRight.READ),
):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions"
)
data_query = select(Route).where(
Route.hike_id == hike_id,
)
count = session.exec(select(func.count()).select_from(data_query.subquery())).one()
data = session.exec(data_query.offset(skip).limit(limit)).all()
return RoutesPublic(data=data, count=count)
# endregion # endregion

View File

@@ -0,0 +1,148 @@
from typing import Any
from fastapi import APIRouter, HTTPException, status
from sqlmodel import func, select
from app.api.deps import CurrentUser, SessionDep
from app.models.base import (
ApiTags,
Message,
RowId,
)
from app.models.place import (
Place,
PlaceCreate,
PlaceUpdate,
PlacePublic,
PlacesPublic,
)
from app.models.user import (
PermissionModule,
PermissionPart,
PermissionRight,
)
router = APIRouter(prefix="/places", tags=[ApiTags.PLACES])
# region # Places ########################################################
@router.get("/", response_model=PlacesPublic)
def read_places(
session: SessionDep, current_user: CurrentUser, skip: int = 0, limit: int = 100
) -> Any:
"""
Retrieve all places.
"""
if current_user.has_permissions(
module=PermissionModule.PLACE,
part=PermissionPart.ADMIN,
rights=PermissionRight.READ,
):
count_statement = select(func.count()).select_from(Place)
count = session.exec(count_statement).one()
statement = select(Place).offset(skip).limit(limit)
places = session.exec(statement).all()
return PlacesPublic(data=places, count=count)
return PlacesPublic(data=[], count=0)
@router.get("/{id}", response_model=PlacePublic)
def read_place(session: SessionDep, current_user: CurrentUser, id: RowId) -> Any:
"""
Get place by ID.
"""
place = session.get(Place, id)
if not place:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Place not found"
)
if not current_user.has_permissions(
module=PermissionModule.PLACE,
part=PermissionPart.ADMIN,
rights=PermissionRight.READ,
):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions"
)
return place
@router.post("/", response_model=PlacePublic)
def create_place(
*, session: SessionDep, current_user: CurrentUser, place_in: PlaceCreate
) -> Any:
"""
Create new place.
"""
if not current_user.has_permissions(
module=PermissionModule.PLACE,
part=PermissionPart.ADMIN,
rights=PermissionRight.CREATE,
):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions"
)
place = Place.create(create_obj=place_in, session=session)
return place
@router.put("/{id}", response_model=PlacePublic)
def update_place(
*, session: SessionDep, current_user: CurrentUser, id: RowId, place_in: PlaceUpdate
) -> Any:
"""
Update a place.
"""
place = session.get(Place, id)
if not place:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Place not found"
)
if not current_user.has_permissions(
module=PermissionModule.PLACE,
part=PermissionPart.ADMIN,
rights=PermissionRight.UPDATE,
):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions"
)
place = Place.update(db_obj=place, in_obj=place_in, session=session)
return place
@router.delete("/{id}")
def delete_place(session: SessionDep, current_user: CurrentUser, id: RowId) -> Message:
"""
Delete a place.
"""
place = session.get(Place, id)
if not place:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Place not found"
)
if not current_user.has_permissions(
module=PermissionModule.PLACE,
part=PermissionPart.ADMIN,
rights=PermissionRight.DELETE,
):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions"
)
session.delete(place)
session.commit()
return Message(message="Place deleted successfully")
# endregion

View File

@@ -0,0 +1,148 @@
from typing import Any
from fastapi import APIRouter, HTTPException, status
from sqlmodel import func, select
from app.api.deps import CurrentUser, SessionDep
from app.models.base import (
ApiTags,
Message,
RowId,
)
from app.models.route import (
Route,
RouteCreate,
RouteUpdate,
RoutePublic,
RoutesPublic,
)
from app.models.user import (
PermissionModule,
PermissionPart,
PermissionRight,
)
router = APIRouter(prefix="/routes", tags=[ApiTags.ROUTES])
# region # Routes ########################################################
@router.get("/", response_model=RoutesPublic)
def read_routes(
session: SessionDep, current_user: CurrentUser, skip: int = 0, limit: int = 100
) -> Any:
"""
Retrieve all routes.
"""
if current_user.has_permissions(
module=PermissionModule.ROUTE,
part=PermissionPart.ADMIN,
rights=PermissionRight.READ,
):
count_statement = select(func.count()).select_from(Route)
count = session.exec(count_statement).one()
statement = select(Route).offset(skip).limit(limit)
routes = session.exec(statement).all()
return RoutesPublic(data=routes, count=count)
return RoutesPublic(data=[], count=0)
@router.get("/{id}", response_model=RoutePublic)
def read_route(session: SessionDep, current_user: CurrentUser, id: RowId) -> Any:
"""
Get route by ID.
"""
route = session.get(Route, id)
if not route:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Route not found"
)
if not current_user.has_permissions(
module=PermissionModule.ROUTE,
part=PermissionPart.ADMIN,
rights=PermissionRight.READ,
):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions"
)
return route
@router.post("/", response_model=RoutePublic)
def create_route(
*, session: SessionDep, current_user: CurrentUser, route_in: RouteCreate
) -> Any:
"""
Create new route.
"""
if not current_user.has_permissions(
module=PermissionModule.ROUTE,
part=PermissionPart.ADMIN,
rights=PermissionRight.CREATE,
):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions"
)
route = Route.create(create_obj=route_in, session=session)
return route
@router.put("/{id}", response_model=RoutePublic)
def update_route(
*, session: SessionDep, current_user: CurrentUser, id: RowId, route_in: RouteUpdate
) -> Any:
"""
Update a route.
"""
route = session.get(Route, id)
if not route:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Route not found"
)
if not current_user.has_permissions(
module=PermissionModule.ROUTE,
part=PermissionPart.ADMIN,
rights=PermissionRight.UPDATE,
):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions"
)
route = Route.update(db_obj=route, in_obj=route_in, session=session)
return route
@router.delete("/{id}")
def delete_route(session: SessionDep, current_user: CurrentUser, id: RowId) -> Message:
"""
Delete a route.
"""
route = session.get(Route, id)
if not route:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Route not found"
)
if not current_user.has_permissions(
module=PermissionModule.ROUTE,
part=PermissionPart.ADMIN,
rights=PermissionRight.DELETE,
):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions"
)
session.delete(route)
session.commit()
return Message(message="Route deleted successfully")
# endregion

View File

@@ -25,13 +25,22 @@ from app.models.user import (
UserCreate, UserCreate,
) )
from app.models.member import ( from app.models.member import (
Member, MemberCreate, Member,
MemberCreate,
) )
from app.models.apikey import ( from app.models.apikey import (
ApiKey, ApiKey,
) )
from app.models.hike import ( from app.models.hike import (
Hike, Hike,
HikeTeamLink,
)
from app.models.route import (
Route,
RoutePart,
)
from app.models.place import (
Place,
) )
engine = create_engine(str(settings.SQLALCHEMY_DATABASE_URI)) engine = create_engine(str(settings.SQLALCHEMY_DATABASE_URI))

View File

@@ -139,6 +139,8 @@ class ApiTags(DocumentedStrEnum):
MEMBERS = "Members" MEMBERS = "Members"
HIKES = "Hikes" HIKES = "Hikes"
ROUTES = "Routes"
PLACES = "Places"
# endregion # endregion

View File

@@ -13,13 +13,16 @@ from .base import (
DocumentedIntFlag, DocumentedIntFlag,
DocumentedStrFlagType, DocumentedStrFlagType,
auto_enum, auto_enum,
RowId,
) )
if TYPE_CHECKING: if TYPE_CHECKING:
from .event import Event from .route import Route
from .team import Team
# region # Hike ################################################################ # region # Hike ################################################################
class HikeTeamPage(DocumentedIntFlag): class HikeTeamPage(DocumentedIntFlag):
ROUTE = auto_enum() # Route steps info ROUTE = auto_enum() # Route steps info
QUESTIONS = auto_enum() # Visible questions QUESTIONS = auto_enum() # Visible questions
@@ -61,6 +64,21 @@ class HikeTimeCalculation(DocumentedIntFlag):
# ############################################################################## # ##############################################################################
class HikeVisitLogType(DocumentedStrEnum):
FIRST_VISIT = auto_enum()
LAST_VISIT = auto_enum()
LONGEST_VISIT = auto_enum()
SHORTEST_VISIT = auto_enum()
EACH_VISIT = auto_enum()
DISABLE_VISIT_LOGING = auto_enum()
# ##############################################################################
class HikeBase( class HikeBase(
mixin.Name, mixin.Name,
mixin.Contact, mixin.Contact,
@@ -92,6 +110,11 @@ class HikeBase(
sa_type=DocumentedStrFlagType(HikeTimeCalculation), sa_type=DocumentedStrFlagType(HikeTimeCalculation),
) )
visit_log_type: HikeVisitLogType = Field(
default=HikeVisitLogType.FIRST_VISIT,
nullable=False,
)
min_time_points: int | None = Field( min_time_points: int | None = Field(
default=0, default=0,
ge=0, ge=0,
@@ -121,6 +144,7 @@ class HikeUpdate(HikeBase):
is_multi_day: bool | None = Field(default=None) # type: ignore is_multi_day: bool | None = Field(default=None) # type: ignore
team_page: HikeTeamPage | None = Field(default=None) # type: ignore team_page: HikeTeamPage | None = Field(default=None) # type: ignore
time_calculation: HikeTimeCalculation | None = Field(default=None) # type: ignore time_calculation: HikeTimeCalculation | None = Field(default=None) # type: ignore
visit_log_type: HikeVisitLogType | None = Field(default=None) # type: ignore
class Hike(mixin.RowId, HikeBase, table=True): class Hike(mixin.RowId, HikeBase, table=True):
@@ -129,6 +153,8 @@ class Hike(mixin.RowId, HikeBase, table=True):
# --- read only items ------------------------------------------------------ # --- read only items ------------------------------------------------------
# --- back_populates links ------------------------------------------------- # --- back_populates links -------------------------------------------------
routes: list["Route"] = Relationship(back_populates="hike", cascade_delete=True)
teams: list["HikeTeamLink"] = Relationship(back_populates="hike", cascade_delete=True)
# --- CRUD actions --------------------------------------------------------- # --- CRUD actions ---------------------------------------------------------
@classmethod @classmethod
@@ -165,17 +191,86 @@ class HikesPublic(BaseSQLModel):
# endregion # endregion
# region # Hike / Team #########################################################
# region # Hike / Route ######################################################## class HikeTeamLinkBase(
BaseSQLModel,
):
hike_id: RowId = Field(
nullable=False,
foreign_key="hike.id",
)
team_id: RowId = Field(
nullable=False,
foreign_key="team.id",
)
route_id: RowId = Field(
nullable=False,
foreign_key="route.id",
)
start_place_id: RowId | None = Field(
default=None,
nullable=True,
foreign_key="place.id",
)
class RouteType(DocumentedStrEnum): # Properties to receive via API on creation
START_FINISH = auto_enum() # Start at the start and end at the finish class HikeTeamLinkCreate(HikeTeamLinkBase):
CIRCULAR = auto_enum() # Start some ware, finish at the last new place pass
CIRCULAR_BACK_TO_START = auto_enum() # Start and finish on the same random place (CIRCULAR + next to start)
# ############################################################################## # Properties to receive via API on update, all are optional
class HikeTeamLinkUpdate(HikeTeamLinkBase):
hike_id: RowId | None = Field(default=None, nullable=True) # type: ignore
team_id: RowId | None = Field(default=None, nullable=True) # type: ignore
route_id: RowId | None = Field(default=None, nullable=True) # type: ignore
class HikeTeamLink(mixin.RowId, HikeTeamLinkBase, table=True):
# --- database only items --------------------------------------------------
# --- read only items ------------------------------------------------------
# --- back_populates links -------------------------------------------------
team: "Team" = Relationship(back_populates="hike_links")
hike: "Hike" = Relationship(back_populates="teams")
route: "Route" = Relationship(back_populates="teams")
# start_place: "Place" = Relationship(back_populates="teams")
# --- CRUD actions ---------------------------------------------------------
@classmethod
def create(cls, *, session: Session, create_obj: HikeTeamLinkCreate) -> "HikeTeamLink":
data_obj = create_obj.model_dump(exclude_unset=True)
db_obj = cls.model_validate(data_obj)
session.add(db_obj)
session.commit()
session.refresh(db_obj)
return db_obj
@classmethod
def update(
cls, *, session: Session, db_obj: "HikeTeamLink", in_obj: HikeTeamLinkUpdate
) -> "HikeTeamLink":
data_obj = in_obj.model_dump(exclude_unset=True)
db_obj.sqlmodel_update(data_obj)
session.add(db_obj)
session.commit()
session.refresh(db_obj)
return db_obj
# Properties to return via API, id is always required
class HikeTeamLinkPublic(mixin.RowIdPublic, HikeTeamLinkBase):
pass
class HikeTeamLinksPublic(BaseSQLModel):
data: list[HikeTeamLinkPublic]
count: int
# endregion # endregion

View File

@@ -1,5 +1,6 @@
import uuid import uuid
from datetime import datetime, date from datetime import datetime, date
from decimal import Decimal
from pydantic import BaseModel, EmailStr from pydantic import BaseModel, EmailStr
from sqlmodel import ( from sqlmodel import (
@@ -14,6 +15,10 @@ class Name(BaseModel):
name: str | None = Field(default=None, nullable=False, unique=False, max_length=255) name: str | None = Field(default=None, nullable=False, unique=False, max_length=255)
class NameOveride(BaseModel):
name_overide: str | None = Field(default=None, nullable=True, unique=False, max_length=255)
class FullName(BaseModel): class FullName(BaseModel):
full_name: str | None = Field(default=None, nullable=True, max_length=255) full_name: str | None = Field(default=None, nullable=True, max_length=255)
@@ -31,7 +36,12 @@ class ShortName(BaseModel):
class ShortNameUpdate(ShortName): class ShortNameUpdate(ShortName):
short_name: str | None = Field(default=None, max_length=8) #TODO: Waarom is deze verplicht ???
short_name: str | None = Field(default=None, nullable=True, max_length=8)
class ShortNameOveride(ShortName):
short_name_overide: str | None = Field(default=None, nullable=True, max_length=8)
class Contact(BaseModel): class Contact(BaseModel):
@@ -120,3 +130,17 @@ class Birthday(BaseModel):
class Created(BaseModel): class Created(BaseModel):
created_at: datetime | None = Field(nullable=False, default_factory=lambda: datetime.now(settings.tz_info)) created_at: datetime | None = Field(nullable=False, default_factory=lambda: datetime.now(settings.tz_info))
created_by: RowIdType | None = Field(default=None, nullable=True, foreign_key="user.id", ondelete="SET NULL") created_by: RowIdType | None = Field(default=None, nullable=True, foreign_key="user.id", ondelete="SET NULL")
class Location(BaseModel):
latitude: Decimal | None = Field(default=None, nullable=True, max_digits=11, decimal_places=8, description="decimal degrees")
longitude: Decimal | None = Field(default=None, nullable=True, max_digits=11, decimal_places=8, description="decimal degrees")
class LocationWithRadius(Location):
radius: int | None = Field(default=None, nullable=True, description="Radius in meters")
class QuestionInfo:
question_file: str | None = Field(default=None, nullable=True, max_length=255)
answer_file: str | None = Field(default=None, nullable=True, max_length=255)

179
backend/app/models/place.py Normal file
View File

@@ -0,0 +1,179 @@
from datetime import timedelta
from typing import TYPE_CHECKING
from sqlalchemy import Interval
from sqlmodel import (
Session,
Relationship,
Field,
)
from . import mixin
from .base import (
BaseSQLModel,
DocumentedStrEnum,
auto_enum,
)
if TYPE_CHECKING:
from .route import Route, RoutePart
# region # Place ###############################################################
class PlaceType(DocumentedStrEnum):
START = auto_enum() # Only check in (make GPS available) (TODO: determine based on route)
PLACE = auto_enum() # Check in / Check out (subtract time between in&out)
TAG = auto_enum() # Instant in&out at the same time
FINISH = auto_enum() # Only check out (and disable GPS) (TODO: determine based on route)
# ##############################################################################
class VisitedCountType(DocumentedStrEnum):
NONE = auto_enum()
ONE_VISIT = auto_enum()
EACH_VISIT = auto_enum()
# ##############################################################################
class PlaceBase(
mixin.Name,
mixin.ShortName,
mixin.Contact,
mixin.Description,
mixin.LocationWithRadius,
mixin.QuestionInfo,
BaseSQLModel,
):
place_type: PlaceType = Field(
default=PlaceType.PLACE,
nullable=False,
)
max_points: int | None = Field(
default=None,
ge=0,
description="Max points for this place, None will disable scoring at this place",
)
visited_points: int | None = Field(
default=None,
ge=0,
description="Visited points for this place, None will disable this function",
)
visited_count_type: VisitedCountType = Field(
default=VisitedCountType.NONE,
)
skipped_penalty_points: int | None = Field(
default=None,
ge=0,
description="Skipped penalty for this place, amount will be subtracted, None will disable this function",
)
place_time: timedelta | None = Field(
default=None,
nullable=True,
description="Time for question at this place during testing.",
sa_type=Interval,
)
# Properties to receive via API on creation
class PlaceCreate(PlaceBase):
pass
# Properties to receive via API on update, all are optional
class PlaceUpdate(
PlaceBase,
mixin.ShortNameUpdate,
):
place_type: PlaceType | None = Field(default=None) # type: ignore
visited_count_type: VisitedCountType | None = Field(default=None) # type: ignore
class Place(mixin.RowId, PlaceBase, table=True):
# --- database only items --------------------------------------------------
# --- read only items ------------------------------------------------------
# --- back_populates links -------------------------------------------------
routes: list["Route"] = Relationship(
sa_relationship_kwargs={
"primaryjoin": "or_("
"Place.id==Route.start_id,"
"Place.id==Route.finish_id,"
"Place.id==RoutePart.place_id,"
"Place.id==RoutePart.to_place_id,"
")",
"foreign_keys": "["
"Route.start_id,"
"Route.finish_id,"
"RoutePart.place_id,"
"RoutePart.to_place_id"
"]",
"viewonly": True,
},
)
next_places: list["Place"] = Relationship(
back_populates="previous_place",
sa_relationship_kwargs={
"secondary": "route_part",
"primaryjoin": "Place.id == RoutePart.place_id",
"secondaryjoin": "Place.id == RoutePart.to_place_id",
"viewonly": True,
},
)
previous_place: list["Place"] = Relationship(
back_populates="next_places",
sa_relationship_kwargs={
"secondary": "route_part",
"primaryjoin": "Place.id == RoutePart.to_place_id",
"secondaryjoin": "Place.id == RoutePart.place_id",
"viewonly": True,
},
)
route_parts: list["RoutePart"] = Relationship(back_populates="place", sa_relationship_kwargs={"foreign_keys": "[RoutePart.place_id]"})
# --- CRUD actions ---------------------------------------------------------
@classmethod
def create(cls, *, session: Session, create_obj: PlaceCreate) -> "Place":
data_obj = create_obj.model_dump(exclude_unset=True)
db_obj = cls.model_validate(data_obj)
session.add(db_obj)
session.commit()
session.refresh(db_obj)
return db_obj
@classmethod
def update(
cls, *, session: Session, db_obj: "Place", in_obj: PlaceUpdate
) -> "Place":
data_obj = in_obj.model_dump(exclude_unset=True)
db_obj.sqlmodel_update(data_obj)
session.add(db_obj)
session.commit()
session.refresh(db_obj)
return db_obj
# Properties to return via API, id is always required
class PlacePublic(mixin.RowIdPublic, PlaceBase):
pass
class PlacesPublic(BaseSQLModel):
data: list[PlacePublic]
count: int
# endregion

283
backend/app/models/route.py Normal file
View File

@@ -0,0 +1,283 @@
from datetime import timedelta
from typing import TYPE_CHECKING
from sqlalchemy import Interval
from sqlmodel import (
Session,
Relationship,
Field,
)
from . import mixin
from .base import (
BaseSQLModel,
DocumentedStrEnum,
DocumentedStrFlagType,
auto_enum,
RowId,
)
from .hike import (
Hike,
HikeTimeCalculation,
HikeTeamLink,
)
if TYPE_CHECKING:
from .place import Place
# region # Route ###############################################################
class RouteType(DocumentedStrEnum):
START_FINISH = auto_enum() # Start at the start and end at the finish
CIRCULAR = auto_enum() # Start some ware, finish at the last new place
CIRCULAR_BACK_TO_START = auto_enum() # Start and finish on the same random place (CIRCULAR + next to start)
# ##############################################################################
class RouteBase(
mixin.Name,
mixin.Contact,
BaseSQLModel,
):
route_type: RouteType = Field(
default=RouteType.START_FINISH,
nullable=False,
)
time_calculation_override: HikeTimeCalculation | None = Field(
default=None,
nullable=True,
description="Should this route be calculated different then the main hike",
sa_type=DocumentedStrFlagType(HikeTimeCalculation),
)
min_time: timedelta | None = Field(
default=None,
description="Min time correction, None = min of all, positive is used as 0:00, negative is subtracted from min of all time",
sa_type=Interval,
)
max_time: timedelta | None = Field(
default=None,
description="Max time correction, None = max of all, positive is used as max, negative is subtracted from max of all time",
sa_type=Interval,
)
hike_id: RowId = Field(
nullable=False,
foreign_key="hike.id",
)
start_id: RowId | None = Field(
default=None,
nullable=True,
foreign_key="place.id",
ondelete="SET NULL",
description="Start place.id of the route, None for random in CIRCULAR or CIRCULAR_BACK_TO_START",
)
finish_id: RowId | None = Field(
default=None,
nullable=True,
foreign_key="place.id",
ondelete="SET NULL",
description="Finish place.id of the route, None for random or decremented",
)
# Properties to receive via API on creation
class RouteCreate(RouteBase):
pass
# Properties to receive via API on update, all are optional
class RouteUpdate(RouteBase):
route_type: RouteType | None = Field(default=None) # type: ignore
hike_id: RowId | None = Field(default=None) # type: ignore
class Route(mixin.RowId, RouteBase, table=True):
# --- database only items --------------------------------------------------
# --- read only items ------------------------------------------------------
# --- back_populates links -------------------------------------------------
hike: "Hike" = Relationship(back_populates="routes")
teams: list["HikeTeamLink"] = Relationship(back_populates="route")
start: "Place" = Relationship(sa_relationship_kwargs={"foreign_keys": "[Route.start_id]"})
finish: "Place" = Relationship(sa_relationship_kwargs={"foreign_keys": "[Route.finish_id]"})
places: list["Place"] = Relationship(
sa_relationship_kwargs={
"primaryjoin": "or_("
"Place.id==Route.start_id,"
"Place.id==Route.finish_id,"
"Place.id==RoutePart.place_id,"
"Place.id==RoutePart.to_place_id,"
")",
"foreign_keys": "["
"Route.start_id,"
"Route.finish_id,"
"RoutePart.place_id,"
"RoutePart.to_place_id"
"]",
"viewonly": True,
},
)
route_parts: list["RoutePart"] = Relationship(back_populates="route")
# --- CRUD actions ---------------------------------------------------------
@classmethod
def create(cls, *, session: Session, create_obj: RouteCreate) -> "Route":
data_obj = create_obj.model_dump(exclude_unset=True)
db_obj = cls.model_validate(data_obj)
session.add(db_obj)
session.commit()
session.refresh(db_obj)
return db_obj
@classmethod
def update(
cls, *, session: Session, db_obj: "Route", in_obj: RouteUpdate
) -> "Route":
data_obj = in_obj.model_dump(exclude_unset=True)
db_obj.sqlmodel_update(data_obj)
session.add(db_obj)
session.commit()
session.refresh(db_obj)
return db_obj
# Properties to return via API, id is always required
class RoutePublic(mixin.RowIdPublic, RouteBase):
pass
class RoutesPublic(BaseSQLModel):
data: list[RoutePublic]
count: int
# endregion
# region # Route / Place link ##################################################
class RoutePartBase(
mixin.NameOveride,
mixin.ShortNameOveride,
BaseSQLModel,
):
route_id: RowId = Field(
nullable=False,
foreign_key="route.id",
)
place_id: RowId = Field(
nullable=False,
foreign_key="place.id",
)
to_place_id: RowId = Field(
nullable=False,
foreign_key="place.id",
)
travel_time: timedelta | None = Field(
default=None,
nullable=True,
description="Time to travel during test walk, only used for information",
sa_type=Interval,
)
travel_distance: int | None = Field(
default=None,
nullable=True,
ge=0,
description="Distance in meters to travel, only used for information",
)
# TODO: Convert to time groups
free_walk_time: bool | None = Field(
default=False,
description="If true, travel time is not add to calculated for travel time",
)
# Properties to receive via API on creation
class RoutePartBaseCreate(RoutePartBase):
pass
# Properties to receive via API on update, all are optional
class RoutePartBaseUpdate(RoutePartBase):
route_id: RowId | None = Field(default=None) # type: ignore
place_id: RowId | None = Field(default=None) # type: ignore
to_place_id: RowId | None = Field(default=None) # type: ignore
free_walk_time: bool | None = Field(default=None) # type: ignore
class RoutePart(mixin.RowId, RoutePartBase, table=True):
# --- database only items --------------------------------------------------
# --- read only items ------------------------------------------------------
# --- back_populates links -------------------------------------------------
route: "Route" = Relationship(back_populates="route_parts")
place: "Place" = Relationship(back_populates="route_parts", sa_relationship_kwargs={"foreign_keys": "[RoutePart.place_id]"})
place_to: "Place" = Relationship(sa_relationship_kwargs={"foreign_keys": "[RoutePart.to_place_id]"})
places: list["Place"] = Relationship(
sa_relationship_kwargs={
"primaryjoin": "or_("
"Place.id==RoutePart.place_id,"
"Place.id==RoutePart.to_place_id,"
")",
"foreign_keys": "["
"RoutePart.place_id,"
"RoutePart.to_place_id"
"]",
"viewonly": True,
},
)
# --- CRUD actions ---------------------------------------------------------
@classmethod
def create(cls, *, session: Session, create_obj: RouteCreate) -> "RoutePart":
data_obj = create_obj.model_dump(exclude_unset=True)
db_obj = cls.model_validate(data_obj)
session.add(db_obj)
session.commit()
session.refresh(db_obj)
return db_obj
@classmethod
def update(
cls, *, session: Session, db_obj: "RoutePart", in_obj: RouteUpdate
) -> "RoutePart":
data_obj = in_obj.model_dump(exclude_unset=True)
db_obj.sqlmodel_update(data_obj)
session.add(db_obj)
session.commit()
session.refresh(db_obj)
return db_obj
# Properties to return via API, id is always required
class RoutePartPublic(mixin.RowIdPublic, RoutePartBase):
pass
class RoutePartsPublic(BaseSQLModel):
data: list[RoutePartPublic]
count: int
# endregion

View File

@@ -16,6 +16,7 @@ if TYPE_CHECKING:
from .event import Event from .event import Event
from .division import DivisionTeamLink from .division import DivisionTeamLink
from .member import MemberTeamLink from .member import MemberTeamLink
from .hike import HikeTeamLink
# region # Team ################################################################ # region # Team ################################################################
@@ -51,6 +52,7 @@ class Team(mixin.RowId, TeamBase, table=True):
event: "Event" = Relationship(back_populates="teams") event: "Event" = Relationship(back_populates="teams")
division_link: "DivisionTeamLink" = Relationship(back_populates="team", cascade_delete=True) division_link: "DivisionTeamLink" = Relationship(back_populates="team", cascade_delete=True)
member_links: list["MemberTeamLink"] = Relationship(back_populates="team", cascade_delete=True) member_links: list["MemberTeamLink"] = Relationship(back_populates="team", cascade_delete=True)
hike_links: list["HikeTeamLink"] = Relationship(back_populates="team", cascade_delete=True)
# --- CRUD actions --------------------------------------------------------- # --- CRUD actions ---------------------------------------------------------
@classmethod @classmethod

View File

@@ -33,6 +33,8 @@ class PermissionModule(DocumentedStrEnum):
MEMBER = auto_enum() MEMBER = auto_enum()
HIKE = auto_enum() HIKE = auto_enum()
ROUTE = auto_enum()
PLACE = auto_enum()
class PermissionPart(DocumentedStrEnum): class PermissionPart(DocumentedStrEnum):
@@ -51,6 +53,8 @@ class PermissionRight(DocumentedIntFlag):
MANAGE_DIVISIONS = auto_enum() MANAGE_DIVISIONS = auto_enum()
MANAGE_MEMBERS = auto_enum() MANAGE_MEMBERS = auto_enum()
MANAGE_HIKES = auto_enum()
ADMIN = ( CREATE ADMIN = ( CREATE
| READ | READ
| UPDATE | UPDATE
@@ -59,6 +63,7 @@ class PermissionRight(DocumentedIntFlag):
| MANAGE_TEAMS | MANAGE_TEAMS
| MANAGE_DIVISIONS | MANAGE_DIVISIONS
| MANAGE_MEMBERS | MANAGE_MEMBERS
| MANAGE_HIKES
) )

View File

@@ -6,6 +6,7 @@ from sqlmodel import Session
from app.core.config import settings from app.core.config import settings
from app.tests.utils.hike import create_random_hike from app.tests.utils.hike import create_random_hike
from app.tests.utils.route import create_random_route
def test_create_hike(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None: def test_create_hike(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
@@ -27,8 +28,8 @@ def test_create_hike(client: TestClient, superuser_token_headers: dict[str, str]
def test_create_hike_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None: def test_create_hike_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
data = { data = {
"name": "RSW Maasdelta 2026", "name": "No permissions",
"contact": "Sebas", "contact": "No permissions",
} }
response = client.post( response = client.post(
f"{settings.API_V1_STR}/hikes/", f"{settings.API_V1_STR}/hikes/",
@@ -177,3 +178,46 @@ def test_delete_hike_no_permissions(client: TestClient, normal_user_token_header
) )
assert response.status_code == status.HTTP_403_FORBIDDEN assert response.status_code == status.HTTP_403_FORBIDDEN
assert response.json()["detail"] == "Not enough permissions" assert response.json()["detail"] == "Not enough permissions"
def test_read_hike_routes(
client: TestClient, superuser_token_headers: dict[str, str], db: Session
) -> None:
hike = create_random_hike(db)
create_random_route(db, hike=hike)
create_random_route(db, hike=hike)
response = client.get(
f"{settings.API_V1_STR}/hikes/{hike.id}/routes",
headers=superuser_token_headers,
)
assert response.status_code == status.HTTP_200_OK
content = response.json()
assert "count" in content
assert content["count"] == 2
assert "data" in content
assert isinstance(content["data"], list)
assert len(content["data"]) <= content["count"]
def test_read_hike_routes_not_found(
client: TestClient, superuser_token_headers: dict[str, str], db: Session
) -> None:
response = client.get(
f"{settings.API_V1_STR}/hikes/{uuid.uuid4()}/routes",
headers=superuser_token_headers,
)
assert response.status_code == status.HTTP_404_NOT_FOUND
assert response.json()["detail"] == "Hike not found"
def test_read_hike_routes_no_permissions(
client: TestClient, normal_user_token_headers: dict[str, str], db: Session
) -> None:
hike = create_random_hike(db)
create_random_route(db, hike=hike)
response = client.get(
f"{settings.API_V1_STR}/hikes/{hike.id}/routes",
headers=normal_user_token_headers,
)
assert response.status_code == status.HTTP_403_FORBIDDEN
assert response.json()["detail"] == "Not enough permissions"

View File

@@ -0,0 +1,217 @@
import uuid
from fastapi import status
from fastapi.testclient import TestClient
from sqlmodel import Session
from app.core.config import settings
from app.tests.utils.place import create_random_place
from app.models.place import PlaceType, VisitedCountType
def test_create_place(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
data = {
"place_type": PlaceType.PLACE,
"name": "Post 1",
"short_name": "1",
"contact": "Sebas",
"description": "Post met renspel",
"question_file": None,
"answer_file": None,
"place_time": "PT10M",
"latitude": None,
"longitude": None,
"radius": None,
"max_points": None,
"visited_points": 10,
"visited_count_type": VisitedCountType.ONE_VISIT,
"skipped_penalty_points": 50,
}
response = client.post(
f"{settings.API_V1_STR}/places/",
headers=superuser_token_headers,
json=data,
)
assert response.status_code == status.HTTP_200_OK
content = response.json()
assert content["name"] == data["name"]
assert content["place_type"] == data["place_type"]
assert content["name"] == data["name"]
assert content["short_name"] == data["short_name"]
assert content["contact"] == data["contact"]
assert content["description"] == data["description"]
assert content["question_file"] == data["question_file"]
assert content["answer_file"] == data["answer_file"]
assert content["latitude"] == data["latitude"]
assert content["longitude"] == data["longitude"]
assert content["radius"] == data["radius"]
assert content["max_points"] == data["max_points"]
assert content["visited_points"] == data["visited_points"]
assert content["visited_count_type"] == data["visited_count_type"]
assert content["skipped_penalty_points"] == data["skipped_penalty_points"]
assert content["place_time"] == data["place_time"]
assert "id" in content
def test_create_place_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
data = {
"place_type": PlaceType.PLACE,
"name": "No permissions",
"short_name": "No perm",
"visited_count_type": VisitedCountType.ONE_VISIT,
}
response = client.post(
f"{settings.API_V1_STR}/places/",
headers=normal_user_token_headers,
json=data,
)
assert response.status_code == status.HTTP_403_FORBIDDEN
assert response.json()["detail"] == "Not enough permissions"
def test_read_place(
client: TestClient, superuser_token_headers: dict[str, str], db: Session
) -> None:
place = create_random_place(db)
response = client.get(
f"{settings.API_V1_STR}/places/{place.id}",
headers=superuser_token_headers,
)
assert response.status_code == status.HTTP_200_OK
content = response.json()
assert content["id"] == str(place.id)
assert content["name"] == place.name
assert content["contact"] == place.contact
def test_read_place_not_found(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
response = client.get(
f"{settings.API_V1_STR}/places/{uuid.uuid4()}",
headers=superuser_token_headers,
)
assert response.status_code == status.HTTP_404_NOT_FOUND
assert response.json()["detail"] == "Place not found"
def test_read_place_no_permission(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
place = create_random_place(db)
response = client.get(
f"{settings.API_V1_STR}/places/{place.id}",
headers=normal_user_token_headers,
)
assert response.status_code == status.HTTP_403_FORBIDDEN
assert response.json()["detail"] == "Not enough permissions"
def test_read_places(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
create_random_place(db)
create_random_place(db)
response = client.get(
f"{settings.API_V1_STR}/places/",
headers=superuser_token_headers,
)
assert response.status_code == status.HTTP_200_OK
content = response.json()
assert "count" in content
assert content["count"] >= 2
assert "data" in content
assert isinstance(content["data"], list)
assert len(content["data"]) <= content["count"]
def test_read_places_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
create_random_place(db)
create_random_place(db)
response = client.get(
f"{settings.API_V1_STR}/places/",
headers=normal_user_token_headers,
)
assert response.status_code == status.HTTP_200_OK
content = response.json()
assert "count" in content
assert content["count"] == 0
assert "data" in content
assert isinstance(content["data"], list)
assert len(content["data"]) == 0
def test_update_place(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
place = create_random_place(db)
data = {
"name": "Updated name",
"short_name": "4", # TODO: Fix Shortname
}
response = client.put(
f"{settings.API_V1_STR}/places/{place.id}",
headers=superuser_token_headers,
json=data,
)
assert response.status_code == status.HTTP_200_OK
content = response.json()
assert content["id"] == str(place.id)
assert content["name"] == data["name"]
assert content["short_name"] == data["short_name"]
def test_update_place_not_found(client: TestClient, superuser_token_headers: dict[str, str]) -> None:
data = {
"name": "Not found",
"short_name": "5", # TODO: Fix Shortname
}
response = client.put(
f"{settings.API_V1_STR}/places/{uuid.uuid4()}",
headers=superuser_token_headers,
json=data,
)
assert response.status_code == status.HTTP_404_NOT_FOUND
assert response.json()["detail"] == "Place not found"
def test_update_place_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
place = create_random_place(db)
data = {
"name": "No permissions",
"short_name": "6", # TODO: Fix Shortname
}
response = client.put(
f"{settings.API_V1_STR}/places/{place.id}",
headers=normal_user_token_headers,
json=data,
)
assert response.status_code == status.HTTP_403_FORBIDDEN
assert response.json()["detail"] == "Not enough permissions"
def test_delete_place(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
place = create_random_place(db)
response = client.delete(
f"{settings.API_V1_STR}/places/{place.id}",
headers=superuser_token_headers,
)
assert response.status_code == status.HTTP_200_OK
assert response.json()["message"] == "Place deleted successfully"
def test_delete_place_not_found(client: TestClient, superuser_token_headers: dict[str, str]) -> None:
response = client.delete(
f"{settings.API_V1_STR}/places/{uuid.uuid4()}",
headers=superuser_token_headers,
)
assert response.status_code == status.HTTP_404_NOT_FOUND
assert response.json()["detail"] == "Place not found"
def test_delete_place_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
place = create_random_place(db)
response = client.delete(
f"{settings.API_V1_STR}/places/{place.id}",
headers=normal_user_token_headers,
)
assert response.status_code == status.HTTP_403_FORBIDDEN
assert response.json()["detail"] == "Not enough permissions"

View File

@@ -0,0 +1,187 @@
import uuid
from fastapi import status
from fastapi.testclient import TestClient
from sqlmodel import Session
from app.core.config import settings
from app.tests.utils.hike import create_random_hike
from app.tests.utils.route import create_random_route
def test_create_route(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
hike = create_random_hike(db)
data = {
"name": "Roete A",
"contact": "Sebas",
"hike_id": str(hike.id),
}
response = client.post(
f"{settings.API_V1_STR}/routes/",
headers=superuser_token_headers,
json=data,
)
assert response.status_code == status.HTTP_200_OK
content = response.json()
assert content["name"] == data["name"]
assert content["contact"] == data["contact"]
assert content["hike_id"] == data["hike_id"]
assert "id" in content
def test_create_route_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
data = {
"name": "No permissions",
"contact": "No permissions",
"hike_id": str(uuid.uuid4()),
}
response = client.post(
f"{settings.API_V1_STR}/routes/",
headers=normal_user_token_headers,
json=data,
)
assert response.status_code == status.HTTP_403_FORBIDDEN
assert response.json()["detail"] == "Not enough permissions"
def test_read_route(
client: TestClient, superuser_token_headers: dict[str, str], db: Session
) -> None:
route = create_random_route(db)
response = client.get(
f"{settings.API_V1_STR}/routes/{route.id}",
headers=superuser_token_headers,
)
assert response.status_code == status.HTTP_200_OK
content = response.json()
assert content["id"] == str(route.id)
assert content["name"] == route.name
assert content["contact"] == route.contact
def test_read_route_not_found(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
response = client.get(
f"{settings.API_V1_STR}/routes/{uuid.uuid4()}",
headers=superuser_token_headers,
)
assert response.status_code == status.HTTP_404_NOT_FOUND
assert response.json()["detail"] == "Route not found"
def test_read_route_no_permission(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
route = create_random_route(db)
response = client.get(
f"{settings.API_V1_STR}/routes/{route.id}",
headers=normal_user_token_headers,
)
assert response.status_code == status.HTTP_403_FORBIDDEN
assert response.json()["detail"] == "Not enough permissions"
def test_read_routes(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
create_random_route(db)
create_random_route(db)
response = client.get(
f"{settings.API_V1_STR}/routes/",
headers=superuser_token_headers,
)
assert response.status_code == status.HTTP_200_OK
content = response.json()
assert "count" in content
assert content["count"] >= 2
assert "data" in content
assert isinstance(content["data"], list)
assert len(content["data"]) <= content["count"]
def test_read_routes_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
create_random_route(db)
create_random_route(db)
response = client.get(
f"{settings.API_V1_STR}/routes/",
headers=normal_user_token_headers,
)
assert response.status_code == status.HTTP_200_OK
content = response.json()
assert "count" in content
assert content["count"] == 0
assert "data" in content
assert isinstance(content["data"], list)
assert len(content["data"]) == 0
def test_update_route(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
route = create_random_route(db)
data = {
"name": "Updated name",
"contact": "Updated contact",
}
response = client.put(
f"{settings.API_V1_STR}/routes/{route.id}",
headers=superuser_token_headers,
json=data,
)
assert response.status_code == status.HTTP_200_OK
content = response.json()
assert content["id"] == str(route.id)
assert content["name"] == data["name"]
assert content["contact"] == data["contact"]
def test_update_route_not_found(client: TestClient, superuser_token_headers: dict[str, str]) -> None:
data = {
"name": "Not found",
"contact": "Not found",
}
response = client.put(
f"{settings.API_V1_STR}/routes/{uuid.uuid4()}",
headers=superuser_token_headers,
json=data,
)
assert response.status_code == status.HTTP_404_NOT_FOUND
assert response.json()["detail"] == "Route not found"
def test_update_route_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
route = create_random_route(db)
data = {
"name": "No permissions",
"contact": "No permissions",
}
response = client.put(
f"{settings.API_V1_STR}/routes/{route.id}",
headers=normal_user_token_headers,
json=data,
)
assert response.status_code == status.HTTP_403_FORBIDDEN
assert response.json()["detail"] == "Not enough permissions"
def test_delete_route(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
route = create_random_route(db)
response = client.delete(
f"{settings.API_V1_STR}/routes/{route.id}",
headers=superuser_token_headers,
)
assert response.status_code == status.HTTP_200_OK
assert response.json()["message"] == "Route deleted successfully"
def test_delete_route_not_found(client: TestClient, superuser_token_headers: dict[str, str]) -> None:
response = client.delete(
f"{settings.API_V1_STR}/routes/{uuid.uuid4()}",
headers=superuser_token_headers,
)
assert response.status_code == status.HTTP_404_NOT_FOUND
assert response.json()["detail"] == "Route not found"
def test_delete_route_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
route = create_random_route(db)
response = client.delete(
f"{settings.API_V1_STR}/routes/{route.id}",
headers=normal_user_token_headers,
)
assert response.status_code == status.HTTP_403_FORBIDDEN
assert response.json()["detail"] == "Not enough permissions"

View File

@@ -0,0 +1,14 @@
from sqlmodel import Session
from app.models.place import Place, PlaceCreate
from app.tests.utils.utils import random_lower_string, random_lower_short_string
def create_random_place(db: Session, name: str = None) -> Place:
if not name:
name = random_lower_string()
short_name = random_lower_short_string()
place_in = PlaceCreate(name=name, short_name=short_name)
return Place.create(session=db, create_obj=place_in)

View File

@@ -0,0 +1,17 @@
from sqlmodel import Session
from app.models.hike import Hike
from app.models.route import Route, RouteCreate
from app.tests.utils.utils import random_lower_string
from app.tests.utils.hike import create_random_hike
def create_random_route(db: Session, name: str = None, hike: Hike = None) -> Route:
if not name:
name = random_lower_string()
if not hike:
hike = create_random_hike(db=db)
route_in = RouteCreate(name=name, hike_id=hike.id)
return Route.create(session=db, create_obj=route_in)

View File

@@ -6,15 +6,12 @@ from app.models.event import Event
from app.models.team import Team, TeamCreate from app.models.team import Team, TeamCreate
from app.tests.utils.event import create_random_event from app.tests.utils.event import create_random_event
from app.tests.utils.utils import random_lower_string from app.tests.utils.utils import random_lower_string, random_lower_short_string
def random_short_name() -> str:
return str(random.Random().randrange(1, 200))
def create_random_team(db: Session, event: Event | None = None) -> Team: def create_random_team(db: Session, event: Event | None = None) -> Team:
name = random_lower_string() name = random_lower_string()
short_name = random_short_name() short_name = random_lower_short_string()
if not event: if not event:
event = create_random_event(db) event = create_random_event(db)

View File

@@ -10,6 +10,10 @@ def random_lower_string() -> str:
return "".join(random.choices(string.ascii_lowercase, k=32)) return "".join(random.choices(string.ascii_lowercase, k=32))
def random_lower_short_string() -> str:
return str(random.Random().randrange(1, 8))
def random_email() -> str: def random_email() -> str:
return f"{random_lower_string()}@{random_lower_string()}.com" return f"{random_lower_string()}@{random_lower_string()}.com"
@@ -24,3 +28,4 @@ def get_superuser_token_headers(client: TestClient) -> dict[str, str]:
a_token = tokens["access_token"] a_token = tokens["access_token"]
headers = {"Authorization": f"Bearer {a_token}"} headers = {"Authorization": f"Bearer {a_token}"}
return headers return headers