4 Commits

Author SHA1 Message Date
Sebastiaan
0d6ef073df [WIP] Link hike to places 2025-11-01 01:03:34 +01:00
Sebastiaan
84d75e21ca Add base for route 2025-10-31 14:22:21 +01:00
Sebastiaan
23d7d63103 Implement basic hike functions 2025-10-24 17:48:02 +02:00
Sebastiaan
f958856e95 Add base to store flags as strings and numbes 2025-10-24 17:46:02 +02:00
20 changed files with 2028 additions and 9 deletions

View File

@@ -10,6 +10,9 @@ from app.api.routes import (
private,
users,
utils,
hikes,
routes,
places,
)
from app.core.config import settings
@@ -25,6 +28,10 @@ api_router.include_router(associations.router)
api_router.include_router(divisions.router)
api_router.include_router(members.router)
api_router.include_router(hikes.router)
api_router.include_router(routes.router)
api_router.include_router(places.router)
if settings.ENVIRONMENT == "local":
api_router.include_router(private.router)

View File

@@ -0,0 +1,179 @@
from typing import Any
from fastapi import APIRouter, HTTPException, status
from sqlmodel import func, select
from app.api.deps import CurrentUser, SessionDep
from app.models.base import (
ApiTags,
Message,
RowId,
)
from app.models.hike import (
Hike,
HikeCreate,
HikeUpdate,
HikePublic,
HikesPublic,
)
from app.models.route import (
Route,
RoutesPublic,
)
from app.models.user import (
PermissionModule,
PermissionPart,
PermissionRight,
)
router = APIRouter(prefix="/hikes", tags=[ApiTags.HIKES])
# region # Hikes ########################################################
@router.get("/", response_model=HikesPublic)
def read_hikes(
session: SessionDep, current_user: CurrentUser, skip: int = 0, limit: int = 100
) -> Any:
"""
Retrieve all hikes.
"""
if current_user.has_permissions(
module=PermissionModule.HIKE,
part=PermissionPart.ADMIN,
rights=PermissionRight.READ,
):
count_statement = select(func.count()).select_from(Hike)
count = session.exec(count_statement).one()
statement = select(Hike).offset(skip).limit(limit)
hikes = session.exec(statement).all()
return HikesPublic(data=hikes, count=count)
return HikesPublic(data=[], count=0)
@router.get("/{id}", response_model=HikePublic)
def read_hike(session: SessionDep, current_user: CurrentUser, id: RowId) -> Any:
"""
Get hike by ID.
"""
hike = session.get(Hike, id)
if not hike:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Hike not found")
if not current_user.has_permissions(
module=PermissionModule.HIKE,
part=PermissionPart.ADMIN,
rights=PermissionRight.READ,
):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions")
return hike
@router.post("/", response_model=HikePublic)
def create_hike(
*, session: SessionDep, current_user: CurrentUser, hike_in: HikeCreate
) -> Any:
"""
Create new hike.
"""
if not current_user.has_permissions(
module=PermissionModule.HIKE,
part=PermissionPart.ADMIN,
rights=PermissionRight.CREATE,
):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions")
hike = Hike.create(create_obj=hike_in, session=session)
return hike
@router.put("/{id}", response_model=HikePublic)
def update_hike(
*, session: SessionDep, current_user: CurrentUser, id: RowId, hike_in: HikeUpdate
) -> Any:
"""
Update a hike.
"""
hike = session.get(Hike, id)
if not hike:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Hike not found")
if not current_user.has_permissions(
module=PermissionModule.HIKE,
part=PermissionPart.ADMIN,
rights=PermissionRight.UPDATE,
):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions")
hike = Hike.update(db_obj=hike, in_obj=hike_in, session=session)
return hike
@router.delete("/{id}")
def delete_hike(session: SessionDep,current_user: CurrentUser, id: RowId) -> Message:
"""
Delete a hike.
"""
hike = session.get(Hike, id)
if not hike:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Hike not found")
if not current_user.has_permissions(
module=PermissionModule.HIKE,
part=PermissionPart.ADMIN,
rights=PermissionRight.DELETE,
):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions")
session.delete(hike)
session.commit()
return Message(message="Hike deleted successfully")
# endregion
# region # Hike / Routes #######################################################
@router.get("/{hike_id}/routes/", response_model=RoutesPublic)
def read_hike_route(
session: SessionDep,
current_user: CurrentUser,
hike_id: RowId,
skip: int = 0,
limit: int = 100,
) -> Any:
"""
Retrieve all hike routes.
"""
hike = session.get(Hike, hike_id)
if not hike:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Hike not found"
)
if not current_user.has_permission(
module=PermissionModule.HIKE,
part=PermissionPart.ADMIN,
rights=(PermissionRight.MANAGE_HIKES | PermissionRight.READ),
):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions"
)
data_query = select(Route).where(
Route.hike_id == hike_id,
)
count = session.exec(select(func.count()).select_from(data_query.subquery())).one()
data = session.exec(data_query.offset(skip).limit(limit)).all()
return RoutesPublic(data=data, count=count)
# endregion

View File

@@ -0,0 +1,148 @@
from typing import Any
from fastapi import APIRouter, HTTPException, status
from sqlmodel import func, select
from app.api.deps import CurrentUser, SessionDep
from app.models.base import (
ApiTags,
Message,
RowId,
)
from app.models.place import (
Place,
PlaceCreate,
PlaceUpdate,
PlacePublic,
PlacesPublic,
)
from app.models.user import (
PermissionModule,
PermissionPart,
PermissionRight,
)
router = APIRouter(prefix="/places", tags=[ApiTags.PLACES])
# region # Places ########################################################
@router.get("/", response_model=PlacesPublic)
def read_places(
session: SessionDep, current_user: CurrentUser, skip: int = 0, limit: int = 100
) -> Any:
"""
Retrieve all places.
"""
if current_user.has_permissions(
module=PermissionModule.PLACE,
part=PermissionPart.ADMIN,
rights=PermissionRight.READ,
):
count_statement = select(func.count()).select_from(Place)
count = session.exec(count_statement).one()
statement = select(Place).offset(skip).limit(limit)
places = session.exec(statement).all()
return PlacesPublic(data=places, count=count)
return PlacesPublic(data=[], count=0)
@router.get("/{id}", response_model=PlacePublic)
def read_place(session: SessionDep, current_user: CurrentUser, id: RowId) -> Any:
"""
Get place by ID.
"""
place = session.get(Place, id)
if not place:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Place not found"
)
if not current_user.has_permissions(
module=PermissionModule.PLACE,
part=PermissionPart.ADMIN,
rights=PermissionRight.READ,
):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions"
)
return place
@router.post("/", response_model=PlacePublic)
def create_place(
*, session: SessionDep, current_user: CurrentUser, place_in: PlaceCreate
) -> Any:
"""
Create new place.
"""
if not current_user.has_permissions(
module=PermissionModule.PLACE,
part=PermissionPart.ADMIN,
rights=PermissionRight.CREATE,
):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions"
)
place = Place.create(create_obj=place_in, session=session)
return place
@router.put("/{id}", response_model=PlacePublic)
def update_place(
*, session: SessionDep, current_user: CurrentUser, id: RowId, place_in: PlaceUpdate
) -> Any:
"""
Update a place.
"""
place = session.get(Place, id)
if not place:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Place not found"
)
if not current_user.has_permissions(
module=PermissionModule.PLACE,
part=PermissionPart.ADMIN,
rights=PermissionRight.UPDATE,
):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions"
)
place = Place.update(db_obj=place, in_obj=place_in, session=session)
return place
@router.delete("/{id}")
def delete_place(session: SessionDep, current_user: CurrentUser, id: RowId) -> Message:
"""
Delete a place.
"""
place = session.get(Place, id)
if not place:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Place not found"
)
if not current_user.has_permissions(
module=PermissionModule.PLACE,
part=PermissionPart.ADMIN,
rights=PermissionRight.DELETE,
):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions"
)
session.delete(place)
session.commit()
return Message(message="Place deleted successfully")
# endregion

View File

@@ -0,0 +1,148 @@
from typing import Any
from fastapi import APIRouter, HTTPException, status
from sqlmodel import func, select
from app.api.deps import CurrentUser, SessionDep
from app.models.base import (
ApiTags,
Message,
RowId,
)
from app.models.route import (
Route,
RouteCreate,
RouteUpdate,
RoutePublic,
RoutesPublic,
)
from app.models.user import (
PermissionModule,
PermissionPart,
PermissionRight,
)
router = APIRouter(prefix="/routes", tags=[ApiTags.ROUTES])
# region # Routes ########################################################
@router.get("/", response_model=RoutesPublic)
def read_routes(
session: SessionDep, current_user: CurrentUser, skip: int = 0, limit: int = 100
) -> Any:
"""
Retrieve all routes.
"""
if current_user.has_permissions(
module=PermissionModule.ROUTE,
part=PermissionPart.ADMIN,
rights=PermissionRight.READ,
):
count_statement = select(func.count()).select_from(Route)
count = session.exec(count_statement).one()
statement = select(Route).offset(skip).limit(limit)
routes = session.exec(statement).all()
return RoutesPublic(data=routes, count=count)
return RoutesPublic(data=[], count=0)
@router.get("/{id}", response_model=RoutePublic)
def read_route(session: SessionDep, current_user: CurrentUser, id: RowId) -> Any:
"""
Get route by ID.
"""
route = session.get(Route, id)
if not route:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Route not found"
)
if not current_user.has_permissions(
module=PermissionModule.ROUTE,
part=PermissionPart.ADMIN,
rights=PermissionRight.READ,
):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions"
)
return route
@router.post("/", response_model=RoutePublic)
def create_route(
*, session: SessionDep, current_user: CurrentUser, route_in: RouteCreate
) -> Any:
"""
Create new route.
"""
if not current_user.has_permissions(
module=PermissionModule.ROUTE,
part=PermissionPart.ADMIN,
rights=PermissionRight.CREATE,
):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions"
)
route = Route.create(create_obj=route_in, session=session)
return route
@router.put("/{id}", response_model=RoutePublic)
def update_route(
*, session: SessionDep, current_user: CurrentUser, id: RowId, route_in: RouteUpdate
) -> Any:
"""
Update a route.
"""
route = session.get(Route, id)
if not route:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Route not found"
)
if not current_user.has_permissions(
module=PermissionModule.ROUTE,
part=PermissionPart.ADMIN,
rights=PermissionRight.UPDATE,
):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions"
)
route = Route.update(db_obj=route, in_obj=route_in, session=session)
return route
@router.delete("/{id}")
def delete_route(session: SessionDep, current_user: CurrentUser, id: RowId) -> Message:
"""
Delete a route.
"""
route = session.get(Route, id)
if not route:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Route not found"
)
if not current_user.has_permissions(
module=PermissionModule.ROUTE,
part=PermissionPart.ADMIN,
rights=PermissionRight.DELETE,
):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions"
)
session.delete(route)
session.commit()
return Message(message="Route deleted successfully")
# endregion

View File

@@ -25,11 +25,23 @@ from app.models.user import (
UserCreate,
)
from app.models.member import (
Member, MemberCreate,
Member,
MemberCreate,
)
from app.models.apikey import (
ApiKey,
)
from app.models.hike import (
Hike,
HikeTeamLink,
)
from app.models.route import (
Route,
RoutePart,
)
from app.models.place import (
Place,
)
engine = create_engine(str(settings.SQLALCHEMY_DATABASE_URI))

View File

@@ -4,6 +4,7 @@ from enum import Enum, IntFlag # Python 3.11 >= StrEnum
from enum import auto as auto_enum
from uuid import UUID as RowId
from sqlalchemy import TypeDecorator, Integer, VARCHAR
from sqlmodel import SQLModel
from sqlalchemy.orm import declared_attr
@@ -39,8 +40,85 @@ class DocumentedStrEnum(str, Enum):
class DocumentedIntFlag(IntFlag):
# TODO: Build DB sport to proper store flags and make it possible to store all mutations
pass
@property
def names(self) -> str:
"""
Returns a comma-separated string of all active flag names.
"""
# Exclude 0-value flags
return ",".join([flag.name for flag in type(self) if flag in self and flag.value != 0])
def __str__(self) -> str:
# Default string conversion uses the names
return self.names
# Optional: for Pydantic compatibility
def __get_pydantic_json__(self) -> str:
return self.names
class DocumentedIntFlagType(TypeDecorator):
impl = Integer
cache_ok = True
def __init__(self, enum_class: type[IntFlag], *args, **kwargs):
super().__init__(*args, **kwargs)
self.enum_class = enum_class
def process_bind_param(self, value, dialect):
"""
Convert IntFlag to integer before storing
"""
if value is None:
return None
if isinstance(value, self.enum_class):
return int(value)
if isinstance(value, int):
return value
raise ValueError(f"Invalid value for {self.enum_class.__name__}: {value!r}")
def process_result_value(self, value, dialect):
"""
Convert integer from DB back to IntFlag
"""
if value is None:
return None
return self.enum_class(value)
class DocumentedStrFlagType(TypeDecorator):
impl = VARCHAR
cache_ok = True
def __init__(self, enum_class: type[IntFlag], *args, **kwargs):
super().__init__(*args, **kwargs)
self.enum_class = enum_class
def process_bind_param(self, value, dialect):
"""
Convert IntFlag to comma-separated string of names for storing in DB.
"""
if value is None:
return None
if isinstance(value, self.enum_class):
return str(value)
raise ValueError(f"Invalid value for {self.enum_class.__name__}: {value!r}")
def process_result_value(self, value, dialect):
"""
Convert comma-separated string of names from DB back to IntFlag.
"""
if value is None or value == "":
return self.enum_class(0)
names = value.split(",")
result = self.enum_class(0)
for name in names:
try:
result |= self.enum_class[name]
except KeyError:
raise ValueError(f"Invalid flag name '{name}' for {self.enum_class.__name__}")
return result
# #############################################################################
@@ -60,6 +138,10 @@ class ApiTags(DocumentedStrEnum):
DIVISIONS = "Divisions"
MEMBERS = "Members"
HIKES = "Hikes"
ROUTES = "Routes"
PLACES = "Places"
# endregion

276
backend/app/models/hike.py Normal file
View File

@@ -0,0 +1,276 @@
from typing import TYPE_CHECKING
from sqlmodel import (
Session,
Relationship,
Field,
)
from . import mixin
from .base import (
BaseSQLModel,
DocumentedStrEnum,
DocumentedIntFlag,
DocumentedStrFlagType,
auto_enum,
RowId,
)
if TYPE_CHECKING:
from .route import Route
from .team import Team
# region # Hike ################################################################
class HikeTeamPage(DocumentedIntFlag):
ROUTE = auto_enum() # Route steps info
QUESTIONS = auto_enum() # Visible questions
VISITED_PLACES = auto_enum() # Places the team has been
CURRENT_PLACE = auto_enum() # Place where the team currently is
UNVISITED_PLACES = auto_enum() # Places the team still neat to visit
PLACES = VISITED_PLACES | CURRENT_PLACE | UNVISITED_PLACES # All the places
TRAVEL_TIME = auto_enum() # Total travel time
TRAVEL_FREE_TIME = auto_enum() # Total travel time
PLACE_TIME = auto_enum() # Total place time
CALCULATE_TIME = auto_enum() # Total time calculated based on hike settings
# TODO: Think about time between oter teams
PLACE_POINTS = auto_enum() # Points scored on a place
TOTAL_PLACE_POINTS = auto_enum() # All points got on all places
ASSIST_POINTS = auto_enum() # Minus points for assistants
# TODO: Think about place in classement
ASSIST_LOG = auto_enum() # Assisted items
ASSIST_LATTER = auto_enum() # ???
# ##############################################################################
class HikeTimeCalculation(DocumentedIntFlag):
TRAVEL_TIME = auto_enum() # Time traveling
# TODO: Think about time groups (in this model we have 2, free and non free)
TRAVEL_FREE_TIME = auto_enum() # Time that is excluded from traveling but not at a place
PLACE_TIME = auto_enum() # Time checked in at a place
TOTAL_TIME = TRAVEL_TIME | TRAVEL_FREE_TIME | PLACE_TIME
# ##############################################################################
class HikeVisitLogType(DocumentedStrEnum):
FIRST_VISIT = auto_enum()
LAST_VISIT = auto_enum()
LONGEST_VISIT = auto_enum()
SHORTEST_VISIT = auto_enum()
EACH_VISIT = auto_enum()
DISABLE_VISIT_LOGING = auto_enum()
# ##############################################################################
class HikeBase(
mixin.Name,
mixin.Contact,
BaseSQLModel,
):
tracker_interval: int | None = Field(
default=None,
nullable=True,
description="Is GPS button available, value will be the interval",
)
is_multi_day: bool = Field(
default=False,
nullable=False,
description="Show datetime in stead of time only",
)
team_page: HikeTeamPage = Field(
default=HikeTeamPage.PLACES | HikeTeamPage.ROUTE | HikeTeamPage.QUESTIONS,
nullable=False,
description="Show all the places of the route inside the teams page",
sa_type=DocumentedStrFlagType(HikeTeamPage),
)
time_calculation: HikeTimeCalculation = Field(
default=HikeTimeCalculation.TRAVEL_TIME,
nullable=False,
description="Wath should we calculate inside the total time",
sa_type=DocumentedStrFlagType(HikeTimeCalculation),
)
visit_log_type: HikeVisitLogType = Field(
default=HikeVisitLogType.FIRST_VISIT,
nullable=False,
)
min_time_points: int | None = Field(
default=0,
ge=0,
# TODO: le: max_time_points
description="Min points for time",
)
max_time_points: int | None = Field(
default=100,
ge=0, # TODO: ge > min_time_points
description="Max points for time",
)
max_question_points: int | None = Field(
default=None,
description="None: Calculate from answers (no max), Positive: Set as max for dynamic range",
)
# Properties to receive via API on creation
class HikeCreate(HikeBase):
pass
# Properties to receive via API on update, all are optional
class HikeUpdate(HikeBase):
is_multi_day: bool | None = Field(default=None) # type: ignore
team_page: HikeTeamPage | None = Field(default=None) # type: ignore
time_calculation: HikeTimeCalculation | None = Field(default=None) # type: ignore
visit_log_type: HikeVisitLogType | None = Field(default=None) # type: ignore
class Hike(mixin.RowId, HikeBase, table=True):
# --- database only items --------------------------------------------------
# --- read only items ------------------------------------------------------
# --- back_populates links -------------------------------------------------
routes: list["Route"] = Relationship(back_populates="hike", cascade_delete=True)
teams: list["HikeTeamLink"] = Relationship(back_populates="hike", cascade_delete=True)
# --- CRUD actions ---------------------------------------------------------
@classmethod
def create(cls, *, session: Session, create_obj: HikeCreate) -> "Hike":
data_obj = create_obj.model_dump(exclude_unset=True)
db_obj = cls.model_validate(data_obj)
session.add(db_obj)
session.commit()
session.refresh(db_obj)
return db_obj
@classmethod
def update(
cls, *, session: Session, db_obj: "Hike", in_obj: HikeUpdate
) -> "Hike":
data_obj = in_obj.model_dump(exclude_unset=True)
db_obj.sqlmodel_update(data_obj)
session.add(db_obj)
session.commit()
session.refresh(db_obj)
return db_obj
# Properties to return via API, id is always required
class HikePublic(mixin.RowIdPublic, HikeBase):
pass
class HikesPublic(BaseSQLModel):
data: list[HikePublic]
count: int
# endregion
# region # Hike / Team #########################################################
class HikeTeamLinkBase(
BaseSQLModel,
):
hike_id: RowId = Field(
nullable=False,
foreign_key="hike.id",
)
team_id: RowId = Field(
nullable=False,
foreign_key="team.id",
)
route_id: RowId = Field(
nullable=False,
foreign_key="route.id",
)
start_place_id: RowId | None = Field(
default=None,
nullable=True,
foreign_key="place.id",
)
# Properties to receive via API on creation
class HikeTeamLinkCreate(HikeTeamLinkBase):
pass
# Properties to receive via API on update, all are optional
class HikeTeamLinkUpdate(HikeTeamLinkBase):
hike_id: RowId | None = Field(default=None, nullable=True) # type: ignore
team_id: RowId | None = Field(default=None, nullable=True) # type: ignore
route_id: RowId | None = Field(default=None, nullable=True) # type: ignore
class HikeTeamLink(mixin.RowId, HikeTeamLinkBase, table=True):
# --- database only items --------------------------------------------------
# --- read only items ------------------------------------------------------
# --- back_populates links -------------------------------------------------
team: "Team" = Relationship(back_populates="hike_links")
hike: "Hike" = Relationship(back_populates="teams")
route: "Route" = Relationship(back_populates="teams")
# start_place: "Place" = Relationship(back_populates="teams")
# --- CRUD actions ---------------------------------------------------------
@classmethod
def create(cls, *, session: Session, create_obj: HikeTeamLinkCreate) -> "HikeTeamLink":
data_obj = create_obj.model_dump(exclude_unset=True)
db_obj = cls.model_validate(data_obj)
session.add(db_obj)
session.commit()
session.refresh(db_obj)
return db_obj
@classmethod
def update(
cls, *, session: Session, db_obj: "HikeTeamLink", in_obj: HikeTeamLinkUpdate
) -> "HikeTeamLink":
data_obj = in_obj.model_dump(exclude_unset=True)
db_obj.sqlmodel_update(data_obj)
session.add(db_obj)
session.commit()
session.refresh(db_obj)
return db_obj
# Properties to return via API, id is always required
class HikeTeamLinkPublic(mixin.RowIdPublic, HikeTeamLinkBase):
pass
class HikeTeamLinksPublic(BaseSQLModel):
data: list[HikeTeamLinkPublic]
count: int
# endregion

View File

@@ -1,5 +1,6 @@
import uuid
from datetime import datetime, date
from decimal import Decimal
from pydantic import BaseModel, EmailStr
from sqlmodel import (
@@ -14,6 +15,10 @@ class Name(BaseModel):
name: str | None = Field(default=None, nullable=False, unique=False, max_length=255)
class NameOveride(BaseModel):
name_overide: str | None = Field(default=None, nullable=True, unique=False, max_length=255)
class FullName(BaseModel):
full_name: str | None = Field(default=None, nullable=True, max_length=255)
@@ -31,7 +36,12 @@ class ShortName(BaseModel):
class ShortNameUpdate(ShortName):
short_name: str | None = Field(default=None, max_length=8)
#TODO: Waarom is deze verplicht ???
short_name: str | None = Field(default=None, nullable=True, max_length=8)
class ShortNameOveride(ShortName):
short_name_overide: str | None = Field(default=None, nullable=True, max_length=8)
class Contact(BaseModel):
@@ -120,3 +130,17 @@ class Birthday(BaseModel):
class Created(BaseModel):
created_at: datetime | None = Field(nullable=False, default_factory=lambda: datetime.now(settings.tz_info))
created_by: RowIdType | None = Field(default=None, nullable=True, foreign_key="user.id", ondelete="SET NULL")
class Location(BaseModel):
latitude: Decimal | None = Field(default=None, nullable=True, max_digits=11, decimal_places=8, description="decimal degrees")
longitude: Decimal | None = Field(default=None, nullable=True, max_digits=11, decimal_places=8, description="decimal degrees")
class LocationWithRadius(Location):
radius: int | None = Field(default=None, nullable=True, description="Radius in meters")
class QuestionInfo:
question_file: str | None = Field(default=None, nullable=True, max_length=255)
answer_file: str | None = Field(default=None, nullable=True, max_length=255)

179
backend/app/models/place.py Normal file
View File

@@ -0,0 +1,179 @@
from datetime import timedelta
from typing import TYPE_CHECKING
from sqlalchemy import Interval
from sqlmodel import (
Session,
Relationship,
Field,
)
from . import mixin
from .base import (
BaseSQLModel,
DocumentedStrEnum,
auto_enum,
)
if TYPE_CHECKING:
from .route import Route, RoutePart
# region # Place ###############################################################
class PlaceType(DocumentedStrEnum):
START = auto_enum() # Only check in (make GPS available) (TODO: determine based on route)
PLACE = auto_enum() # Check in / Check out (subtract time between in&out)
TAG = auto_enum() # Instant in&out at the same time
FINISH = auto_enum() # Only check out (and disable GPS) (TODO: determine based on route)
# ##############################################################################
class VisitedCountType(DocumentedStrEnum):
NONE = auto_enum()
ONE_VISIT = auto_enum()
EACH_VISIT = auto_enum()
# ##############################################################################
class PlaceBase(
mixin.Name,
mixin.ShortName,
mixin.Contact,
mixin.Description,
mixin.LocationWithRadius,
mixin.QuestionInfo,
BaseSQLModel,
):
place_type: PlaceType = Field(
default=PlaceType.PLACE,
nullable=False,
)
max_points: int | None = Field(
default=None,
ge=0,
description="Max points for this place, None will disable scoring at this place",
)
visited_points: int | None = Field(
default=None,
ge=0,
description="Visited points for this place, None will disable this function",
)
visited_count_type: VisitedCountType = Field(
default=VisitedCountType.NONE,
)
skipped_penalty_points: int | None = Field(
default=None,
ge=0,
description="Skipped penalty for this place, amount will be subtracted, None will disable this function",
)
place_time: timedelta | None = Field(
default=None,
nullable=True,
description="Time for question at this place during testing.",
sa_type=Interval,
)
# Properties to receive via API on creation
class PlaceCreate(PlaceBase):
pass
# Properties to receive via API on update, all are optional
class PlaceUpdate(
PlaceBase,
mixin.ShortNameUpdate,
):
place_type: PlaceType | None = Field(default=None) # type: ignore
visited_count_type: VisitedCountType | None = Field(default=None) # type: ignore
class Place(mixin.RowId, PlaceBase, table=True):
# --- database only items --------------------------------------------------
# --- read only items ------------------------------------------------------
# --- back_populates links -------------------------------------------------
routes: list["Route"] = Relationship(
sa_relationship_kwargs={
"primaryjoin": "or_("
"Place.id==Route.start_id,"
"Place.id==Route.finish_id,"
"Place.id==RoutePart.place_id,"
"Place.id==RoutePart.to_place_id,"
")",
"foreign_keys": "["
"Route.start_id,"
"Route.finish_id,"
"RoutePart.place_id,"
"RoutePart.to_place_id"
"]",
"viewonly": True,
},
)
next_places: list["Place"] = Relationship(
back_populates="previous_place",
sa_relationship_kwargs={
"secondary": "route_part",
"primaryjoin": "Place.id == RoutePart.place_id",
"secondaryjoin": "Place.id == RoutePart.to_place_id",
"viewonly": True,
},
)
previous_place: list["Place"] = Relationship(
back_populates="next_places",
sa_relationship_kwargs={
"secondary": "route_part",
"primaryjoin": "Place.id == RoutePart.to_place_id",
"secondaryjoin": "Place.id == RoutePart.place_id",
"viewonly": True,
},
)
route_parts: list["RoutePart"] = Relationship(back_populates="place", sa_relationship_kwargs={"foreign_keys": "[RoutePart.place_id]"})
# --- CRUD actions ---------------------------------------------------------
@classmethod
def create(cls, *, session: Session, create_obj: PlaceCreate) -> "Place":
data_obj = create_obj.model_dump(exclude_unset=True)
db_obj = cls.model_validate(data_obj)
session.add(db_obj)
session.commit()
session.refresh(db_obj)
return db_obj
@classmethod
def update(
cls, *, session: Session, db_obj: "Place", in_obj: PlaceUpdate
) -> "Place":
data_obj = in_obj.model_dump(exclude_unset=True)
db_obj.sqlmodel_update(data_obj)
session.add(db_obj)
session.commit()
session.refresh(db_obj)
return db_obj
# Properties to return via API, id is always required
class PlacePublic(mixin.RowIdPublic, PlaceBase):
pass
class PlacesPublic(BaseSQLModel):
data: list[PlacePublic]
count: int
# endregion

283
backend/app/models/route.py Normal file
View File

@@ -0,0 +1,283 @@
from datetime import timedelta
from typing import TYPE_CHECKING
from sqlalchemy import Interval
from sqlmodel import (
Session,
Relationship,
Field,
)
from . import mixin
from .base import (
BaseSQLModel,
DocumentedStrEnum,
DocumentedStrFlagType,
auto_enum,
RowId,
)
from .hike import (
Hike,
HikeTimeCalculation,
HikeTeamLink,
)
if TYPE_CHECKING:
from .place import Place
# region # Route ###############################################################
class RouteType(DocumentedStrEnum):
START_FINISH = auto_enum() # Start at the start and end at the finish
CIRCULAR = auto_enum() # Start some ware, finish at the last new place
CIRCULAR_BACK_TO_START = auto_enum() # Start and finish on the same random place (CIRCULAR + next to start)
# ##############################################################################
class RouteBase(
mixin.Name,
mixin.Contact,
BaseSQLModel,
):
route_type: RouteType = Field(
default=RouteType.START_FINISH,
nullable=False,
)
time_calculation_override: HikeTimeCalculation | None = Field(
default=None,
nullable=True,
description="Should this route be calculated different then the main hike",
sa_type=DocumentedStrFlagType(HikeTimeCalculation),
)
min_time: timedelta | None = Field(
default=None,
description="Min time correction, None = min of all, positive is used as 0:00, negative is subtracted from min of all time",
sa_type=Interval,
)
max_time: timedelta | None = Field(
default=None,
description="Max time correction, None = max of all, positive is used as max, negative is subtracted from max of all time",
sa_type=Interval,
)
hike_id: RowId = Field(
nullable=False,
foreign_key="hike.id",
)
start_id: RowId | None = Field(
default=None,
nullable=True,
foreign_key="place.id",
ondelete="SET NULL",
description="Start place.id of the route, None for random in CIRCULAR or CIRCULAR_BACK_TO_START",
)
finish_id: RowId | None = Field(
default=None,
nullable=True,
foreign_key="place.id",
ondelete="SET NULL",
description="Finish place.id of the route, None for random or decremented",
)
# Properties to receive via API on creation
class RouteCreate(RouteBase):
pass
# Properties to receive via API on update, all are optional
class RouteUpdate(RouteBase):
route_type: RouteType | None = Field(default=None) # type: ignore
hike_id: RowId | None = Field(default=None) # type: ignore
class Route(mixin.RowId, RouteBase, table=True):
# --- database only items --------------------------------------------------
# --- read only items ------------------------------------------------------
# --- back_populates links -------------------------------------------------
hike: "Hike" = Relationship(back_populates="routes")
teams: list["HikeTeamLink"] = Relationship(back_populates="route")
start: "Place" = Relationship(sa_relationship_kwargs={"foreign_keys": "[Route.start_id]"})
finish: "Place" = Relationship(sa_relationship_kwargs={"foreign_keys": "[Route.finish_id]"})
places: list["Place"] = Relationship(
sa_relationship_kwargs={
"primaryjoin": "or_("
"Place.id==Route.start_id,"
"Place.id==Route.finish_id,"
"Place.id==RoutePart.place_id,"
"Place.id==RoutePart.to_place_id,"
")",
"foreign_keys": "["
"Route.start_id,"
"Route.finish_id,"
"RoutePart.place_id,"
"RoutePart.to_place_id"
"]",
"viewonly": True,
},
)
route_parts: list["RoutePart"] = Relationship(back_populates="route")
# --- CRUD actions ---------------------------------------------------------
@classmethod
def create(cls, *, session: Session, create_obj: RouteCreate) -> "Route":
data_obj = create_obj.model_dump(exclude_unset=True)
db_obj = cls.model_validate(data_obj)
session.add(db_obj)
session.commit()
session.refresh(db_obj)
return db_obj
@classmethod
def update(
cls, *, session: Session, db_obj: "Route", in_obj: RouteUpdate
) -> "Route":
data_obj = in_obj.model_dump(exclude_unset=True)
db_obj.sqlmodel_update(data_obj)
session.add(db_obj)
session.commit()
session.refresh(db_obj)
return db_obj
# Properties to return via API, id is always required
class RoutePublic(mixin.RowIdPublic, RouteBase):
pass
class RoutesPublic(BaseSQLModel):
data: list[RoutePublic]
count: int
# endregion
# region # Route / Place link ##################################################
class RoutePartBase(
mixin.NameOveride,
mixin.ShortNameOveride,
BaseSQLModel,
):
route_id: RowId = Field(
nullable=False,
foreign_key="route.id",
)
place_id: RowId = Field(
nullable=False,
foreign_key="place.id",
)
to_place_id: RowId = Field(
nullable=False,
foreign_key="place.id",
)
travel_time: timedelta | None = Field(
default=None,
nullable=True,
description="Time to travel during test walk, only used for information",
sa_type=Interval,
)
travel_distance: int | None = Field(
default=None,
nullable=True,
ge=0,
description="Distance in meters to travel, only used for information",
)
# TODO: Convert to time groups
free_walk_time: bool | None = Field(
default=False,
description="If true, travel time is not add to calculated for travel time",
)
# Properties to receive via API on creation
class RoutePartBaseCreate(RoutePartBase):
pass
# Properties to receive via API on update, all are optional
class RoutePartBaseUpdate(RoutePartBase):
route_id: RowId | None = Field(default=None) # type: ignore
place_id: RowId | None = Field(default=None) # type: ignore
to_place_id: RowId | None = Field(default=None) # type: ignore
free_walk_time: bool | None = Field(default=None) # type: ignore
class RoutePart(mixin.RowId, RoutePartBase, table=True):
# --- database only items --------------------------------------------------
# --- read only items ------------------------------------------------------
# --- back_populates links -------------------------------------------------
route: "Route" = Relationship(back_populates="route_parts")
place: "Place" = Relationship(back_populates="route_parts", sa_relationship_kwargs={"foreign_keys": "[RoutePart.place_id]"})
place_to: "Place" = Relationship(sa_relationship_kwargs={"foreign_keys": "[RoutePart.to_place_id]"})
places: list["Place"] = Relationship(
sa_relationship_kwargs={
"primaryjoin": "or_("
"Place.id==RoutePart.place_id,"
"Place.id==RoutePart.to_place_id,"
")",
"foreign_keys": "["
"RoutePart.place_id,"
"RoutePart.to_place_id"
"]",
"viewonly": True,
},
)
# --- CRUD actions ---------------------------------------------------------
@classmethod
def create(cls, *, session: Session, create_obj: RouteCreate) -> "RoutePart":
data_obj = create_obj.model_dump(exclude_unset=True)
db_obj = cls.model_validate(data_obj)
session.add(db_obj)
session.commit()
session.refresh(db_obj)
return db_obj
@classmethod
def update(
cls, *, session: Session, db_obj: "RoutePart", in_obj: RouteUpdate
) -> "RoutePart":
data_obj = in_obj.model_dump(exclude_unset=True)
db_obj.sqlmodel_update(data_obj)
session.add(db_obj)
session.commit()
session.refresh(db_obj)
return db_obj
# Properties to return via API, id is always required
class RoutePartPublic(mixin.RowIdPublic, RoutePartBase):
pass
class RoutePartsPublic(BaseSQLModel):
data: list[RoutePartPublic]
count: int
# endregion

View File

@@ -16,6 +16,7 @@ if TYPE_CHECKING:
from .event import Event
from .division import DivisionTeamLink
from .member import MemberTeamLink
from .hike import HikeTeamLink
# region # Team ################################################################
@@ -51,6 +52,7 @@ class Team(mixin.RowId, TeamBase, table=True):
event: "Event" = Relationship(back_populates="teams")
division_link: "DivisionTeamLink" = Relationship(back_populates="team", cascade_delete=True)
member_links: list["MemberTeamLink"] = Relationship(back_populates="team", cascade_delete=True)
hike_links: list["HikeTeamLink"] = Relationship(back_populates="team", cascade_delete=True)
# --- CRUD actions ---------------------------------------------------------
@classmethod

View File

@@ -32,6 +32,10 @@ class PermissionModule(DocumentedStrEnum):
DIVISION = auto_enum()
MEMBER = auto_enum()
HIKE = auto_enum()
ROUTE = auto_enum()
PLACE = auto_enum()
class PermissionPart(DocumentedStrEnum):
ADMIN = auto_enum()
@@ -49,6 +53,8 @@ class PermissionRight(DocumentedIntFlag):
MANAGE_DIVISIONS = auto_enum()
MANAGE_MEMBERS = auto_enum()
MANAGE_HIKES = auto_enum()
ADMIN = ( CREATE
| READ
| UPDATE
@@ -57,6 +63,7 @@ class PermissionRight(DocumentedIntFlag):
| MANAGE_TEAMS
| MANAGE_DIVISIONS
| MANAGE_MEMBERS
| MANAGE_HIKES
)

View File

@@ -0,0 +1,223 @@
import uuid
from fastapi import status
from fastapi.testclient import TestClient
from sqlmodel import Session
from app.core.config import settings
from app.tests.utils.hike import create_random_hike
from app.tests.utils.route import create_random_route
def test_create_hike(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
data = {
"name": "RSW Maasdelta 2026",
"contact": "Sebas",
}
response = client.post(
f"{settings.API_V1_STR}/hikes/",
headers=superuser_token_headers,
json=data,
)
assert response.status_code == status.HTTP_200_OK
content = response.json()
assert content["name"] == data["name"]
assert content["contact"] == data["contact"]
assert "id" in content
def test_create_hike_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
data = {
"name": "No permissions",
"contact": "No permissions",
}
response = client.post(
f"{settings.API_V1_STR}/hikes/",
headers=normal_user_token_headers,
json=data,
)
assert response.status_code == status.HTTP_403_FORBIDDEN
assert response.json()["detail"] == "Not enough permissions"
def test_read_hike(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
hike = create_random_hike(db)
response = client.get(
f"{settings.API_V1_STR}/hikes/{hike.id}",
headers=superuser_token_headers,
)
assert response.status_code == status.HTTP_200_OK
content = response.json()
assert content["id"] == str(hike.id)
assert content["name"] == hike.name
assert content["contact"] == hike.contact
def test_read_hike_not_found(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
response = client.get(
f"{settings.API_V1_STR}/hikes/{uuid.uuid4()}",
headers=superuser_token_headers,
)
assert response.status_code == status.HTTP_404_NOT_FOUND
assert response.json()["detail"] == "Hike not found"
def test_read_hike_no_permission(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
hike = create_random_hike(db)
response = client.get(
f"{settings.API_V1_STR}/hikes/{hike.id}",
headers=normal_user_token_headers,
)
assert response.status_code == status.HTTP_403_FORBIDDEN
assert response.json()["detail"] == "Not enough permissions"
def test_read_hikes(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
create_random_hike(db)
create_random_hike(db)
response = client.get(
f"{settings.API_V1_STR}/hikes/",
headers=superuser_token_headers,
)
assert response.status_code == status.HTTP_200_OK
content = response.json()
assert "count" in content
assert content["count"] >= 2
assert "data" in content
assert isinstance(content["data"], list)
assert len(content["data"]) <= content["count"]
def test_read_hikes_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
create_random_hike(db)
create_random_hike(db)
response = client.get(
f"{settings.API_V1_STR}/hikes/",
headers=normal_user_token_headers,
)
assert response.status_code == status.HTTP_200_OK
content = response.json()
assert "count" in content
assert content["count"] == 0
assert "data" in content
assert isinstance(content["data"], list)
assert len(content["data"]) == 0
def test_update_hike(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
hike = create_random_hike(db)
data = {
"name": "Updated name",
"contact": "Updated contact",
}
response = client.put(
f"{settings.API_V1_STR}/hikes/{hike.id}",
headers=superuser_token_headers,
json=data,
)
assert response.status_code == status.HTTP_200_OK
content = response.json()
assert content["id"] == str(hike.id)
assert content["name"] == data["name"]
assert content["contact"] == data["contact"]
def test_update_hike_not_found(client: TestClient, superuser_token_headers: dict[str, str]) -> None:
data = {
"name": "Not found",
"contact": "Not found",
}
response = client.put(
f"{settings.API_V1_STR}/hikes/{uuid.uuid4()}",
headers=superuser_token_headers,
json=data,
)
assert response.status_code == status.HTTP_404_NOT_FOUND
assert response.json()["detail"] == "Hike not found"
def test_update_hike_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
hike = create_random_hike(db)
data = {
"name": "No permissions",
"contact": "No permissions",
}
response = client.put(
f"{settings.API_V1_STR}/hikes/{hike.id}",
headers=normal_user_token_headers,
json=data,
)
assert response.status_code == status.HTTP_403_FORBIDDEN
assert response.json()["detail"] == "Not enough permissions"
def test_delete_hike(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
hike = create_random_hike(db)
response = client.delete(
f"{settings.API_V1_STR}/hikes/{hike.id}",
headers=superuser_token_headers,
)
assert response.status_code == status.HTTP_200_OK
assert response.json()["message"] == "Hike deleted successfully"
def test_delete_hike_not_found(client: TestClient, superuser_token_headers: dict[str, str]) -> None:
response = client.delete(
f"{settings.API_V1_STR}/hikes/{uuid.uuid4()}",
headers=superuser_token_headers,
)
assert response.status_code == status.HTTP_404_NOT_FOUND
assert response.json()["detail"] == "Hike not found"
def test_delete_hike_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
hike = create_random_hike(db)
response = client.delete(
f"{settings.API_V1_STR}/hikes/{hike.id}",
headers=normal_user_token_headers,
)
assert response.status_code == status.HTTP_403_FORBIDDEN
assert response.json()["detail"] == "Not enough permissions"
def test_read_hike_routes(
client: TestClient, superuser_token_headers: dict[str, str], db: Session
) -> None:
hike = create_random_hike(db)
create_random_route(db, hike=hike)
create_random_route(db, hike=hike)
response = client.get(
f"{settings.API_V1_STR}/hikes/{hike.id}/routes",
headers=superuser_token_headers,
)
assert response.status_code == status.HTTP_200_OK
content = response.json()
assert "count" in content
assert content["count"] == 2
assert "data" in content
assert isinstance(content["data"], list)
assert len(content["data"]) <= content["count"]
def test_read_hike_routes_not_found(
client: TestClient, superuser_token_headers: dict[str, str], db: Session
) -> None:
response = client.get(
f"{settings.API_V1_STR}/hikes/{uuid.uuid4()}/routes",
headers=superuser_token_headers,
)
assert response.status_code == status.HTTP_404_NOT_FOUND
assert response.json()["detail"] == "Hike not found"
def test_read_hike_routes_no_permissions(
client: TestClient, normal_user_token_headers: dict[str, str], db: Session
) -> None:
hike = create_random_hike(db)
create_random_route(db, hike=hike)
response = client.get(
f"{settings.API_V1_STR}/hikes/{hike.id}/routes",
headers=normal_user_token_headers,
)
assert response.status_code == status.HTTP_403_FORBIDDEN
assert response.json()["detail"] == "Not enough permissions"

View File

@@ -0,0 +1,217 @@
import uuid
from fastapi import status
from fastapi.testclient import TestClient
from sqlmodel import Session
from app.core.config import settings
from app.tests.utils.place import create_random_place
from app.models.place import PlaceType, VisitedCountType
def test_create_place(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
data = {
"place_type": PlaceType.PLACE,
"name": "Post 1",
"short_name": "1",
"contact": "Sebas",
"description": "Post met renspel",
"question_file": None,
"answer_file": None,
"place_time": "PT10M",
"latitude": None,
"longitude": None,
"radius": None,
"max_points": None,
"visited_points": 10,
"visited_count_type": VisitedCountType.ONE_VISIT,
"skipped_penalty_points": 50,
}
response = client.post(
f"{settings.API_V1_STR}/places/",
headers=superuser_token_headers,
json=data,
)
assert response.status_code == status.HTTP_200_OK
content = response.json()
assert content["name"] == data["name"]
assert content["place_type"] == data["place_type"]
assert content["name"] == data["name"]
assert content["short_name"] == data["short_name"]
assert content["contact"] == data["contact"]
assert content["description"] == data["description"]
assert content["question_file"] == data["question_file"]
assert content["answer_file"] == data["answer_file"]
assert content["latitude"] == data["latitude"]
assert content["longitude"] == data["longitude"]
assert content["radius"] == data["radius"]
assert content["max_points"] == data["max_points"]
assert content["visited_points"] == data["visited_points"]
assert content["visited_count_type"] == data["visited_count_type"]
assert content["skipped_penalty_points"] == data["skipped_penalty_points"]
assert content["place_time"] == data["place_time"]
assert "id" in content
def test_create_place_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
data = {
"place_type": PlaceType.PLACE,
"name": "No permissions",
"short_name": "No perm",
"visited_count_type": VisitedCountType.ONE_VISIT,
}
response = client.post(
f"{settings.API_V1_STR}/places/",
headers=normal_user_token_headers,
json=data,
)
assert response.status_code == status.HTTP_403_FORBIDDEN
assert response.json()["detail"] == "Not enough permissions"
def test_read_place(
client: TestClient, superuser_token_headers: dict[str, str], db: Session
) -> None:
place = create_random_place(db)
response = client.get(
f"{settings.API_V1_STR}/places/{place.id}",
headers=superuser_token_headers,
)
assert response.status_code == status.HTTP_200_OK
content = response.json()
assert content["id"] == str(place.id)
assert content["name"] == place.name
assert content["contact"] == place.contact
def test_read_place_not_found(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
response = client.get(
f"{settings.API_V1_STR}/places/{uuid.uuid4()}",
headers=superuser_token_headers,
)
assert response.status_code == status.HTTP_404_NOT_FOUND
assert response.json()["detail"] == "Place not found"
def test_read_place_no_permission(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
place = create_random_place(db)
response = client.get(
f"{settings.API_V1_STR}/places/{place.id}",
headers=normal_user_token_headers,
)
assert response.status_code == status.HTTP_403_FORBIDDEN
assert response.json()["detail"] == "Not enough permissions"
def test_read_places(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
create_random_place(db)
create_random_place(db)
response = client.get(
f"{settings.API_V1_STR}/places/",
headers=superuser_token_headers,
)
assert response.status_code == status.HTTP_200_OK
content = response.json()
assert "count" in content
assert content["count"] >= 2
assert "data" in content
assert isinstance(content["data"], list)
assert len(content["data"]) <= content["count"]
def test_read_places_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
create_random_place(db)
create_random_place(db)
response = client.get(
f"{settings.API_V1_STR}/places/",
headers=normal_user_token_headers,
)
assert response.status_code == status.HTTP_200_OK
content = response.json()
assert "count" in content
assert content["count"] == 0
assert "data" in content
assert isinstance(content["data"], list)
assert len(content["data"]) == 0
def test_update_place(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
place = create_random_place(db)
data = {
"name": "Updated name",
"short_name": "4", # TODO: Fix Shortname
}
response = client.put(
f"{settings.API_V1_STR}/places/{place.id}",
headers=superuser_token_headers,
json=data,
)
assert response.status_code == status.HTTP_200_OK
content = response.json()
assert content["id"] == str(place.id)
assert content["name"] == data["name"]
assert content["short_name"] == data["short_name"]
def test_update_place_not_found(client: TestClient, superuser_token_headers: dict[str, str]) -> None:
data = {
"name": "Not found",
"short_name": "5", # TODO: Fix Shortname
}
response = client.put(
f"{settings.API_V1_STR}/places/{uuid.uuid4()}",
headers=superuser_token_headers,
json=data,
)
assert response.status_code == status.HTTP_404_NOT_FOUND
assert response.json()["detail"] == "Place not found"
def test_update_place_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
place = create_random_place(db)
data = {
"name": "No permissions",
"short_name": "6", # TODO: Fix Shortname
}
response = client.put(
f"{settings.API_V1_STR}/places/{place.id}",
headers=normal_user_token_headers,
json=data,
)
assert response.status_code == status.HTTP_403_FORBIDDEN
assert response.json()["detail"] == "Not enough permissions"
def test_delete_place(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
place = create_random_place(db)
response = client.delete(
f"{settings.API_V1_STR}/places/{place.id}",
headers=superuser_token_headers,
)
assert response.status_code == status.HTTP_200_OK
assert response.json()["message"] == "Place deleted successfully"
def test_delete_place_not_found(client: TestClient, superuser_token_headers: dict[str, str]) -> None:
response = client.delete(
f"{settings.API_V1_STR}/places/{uuid.uuid4()}",
headers=superuser_token_headers,
)
assert response.status_code == status.HTTP_404_NOT_FOUND
assert response.json()["detail"] == "Place not found"
def test_delete_place_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
place = create_random_place(db)
response = client.delete(
f"{settings.API_V1_STR}/places/{place.id}",
headers=normal_user_token_headers,
)
assert response.status_code == status.HTTP_403_FORBIDDEN
assert response.json()["detail"] == "Not enough permissions"

View File

@@ -0,0 +1,187 @@
import uuid
from fastapi import status
from fastapi.testclient import TestClient
from sqlmodel import Session
from app.core.config import settings
from app.tests.utils.hike import create_random_hike
from app.tests.utils.route import create_random_route
def test_create_route(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
hike = create_random_hike(db)
data = {
"name": "Roete A",
"contact": "Sebas",
"hike_id": str(hike.id),
}
response = client.post(
f"{settings.API_V1_STR}/routes/",
headers=superuser_token_headers,
json=data,
)
assert response.status_code == status.HTTP_200_OK
content = response.json()
assert content["name"] == data["name"]
assert content["contact"] == data["contact"]
assert content["hike_id"] == data["hike_id"]
assert "id" in content
def test_create_route_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
data = {
"name": "No permissions",
"contact": "No permissions",
"hike_id": str(uuid.uuid4()),
}
response = client.post(
f"{settings.API_V1_STR}/routes/",
headers=normal_user_token_headers,
json=data,
)
assert response.status_code == status.HTTP_403_FORBIDDEN
assert response.json()["detail"] == "Not enough permissions"
def test_read_route(
client: TestClient, superuser_token_headers: dict[str, str], db: Session
) -> None:
route = create_random_route(db)
response = client.get(
f"{settings.API_V1_STR}/routes/{route.id}",
headers=superuser_token_headers,
)
assert response.status_code == status.HTTP_200_OK
content = response.json()
assert content["id"] == str(route.id)
assert content["name"] == route.name
assert content["contact"] == route.contact
def test_read_route_not_found(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
response = client.get(
f"{settings.API_V1_STR}/routes/{uuid.uuid4()}",
headers=superuser_token_headers,
)
assert response.status_code == status.HTTP_404_NOT_FOUND
assert response.json()["detail"] == "Route not found"
def test_read_route_no_permission(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
route = create_random_route(db)
response = client.get(
f"{settings.API_V1_STR}/routes/{route.id}",
headers=normal_user_token_headers,
)
assert response.status_code == status.HTTP_403_FORBIDDEN
assert response.json()["detail"] == "Not enough permissions"
def test_read_routes(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
create_random_route(db)
create_random_route(db)
response = client.get(
f"{settings.API_V1_STR}/routes/",
headers=superuser_token_headers,
)
assert response.status_code == status.HTTP_200_OK
content = response.json()
assert "count" in content
assert content["count"] >= 2
assert "data" in content
assert isinstance(content["data"], list)
assert len(content["data"]) <= content["count"]
def test_read_routes_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
create_random_route(db)
create_random_route(db)
response = client.get(
f"{settings.API_V1_STR}/routes/",
headers=normal_user_token_headers,
)
assert response.status_code == status.HTTP_200_OK
content = response.json()
assert "count" in content
assert content["count"] == 0
assert "data" in content
assert isinstance(content["data"], list)
assert len(content["data"]) == 0
def test_update_route(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
route = create_random_route(db)
data = {
"name": "Updated name",
"contact": "Updated contact",
}
response = client.put(
f"{settings.API_V1_STR}/routes/{route.id}",
headers=superuser_token_headers,
json=data,
)
assert response.status_code == status.HTTP_200_OK
content = response.json()
assert content["id"] == str(route.id)
assert content["name"] == data["name"]
assert content["contact"] == data["contact"]
def test_update_route_not_found(client: TestClient, superuser_token_headers: dict[str, str]) -> None:
data = {
"name": "Not found",
"contact": "Not found",
}
response = client.put(
f"{settings.API_V1_STR}/routes/{uuid.uuid4()}",
headers=superuser_token_headers,
json=data,
)
assert response.status_code == status.HTTP_404_NOT_FOUND
assert response.json()["detail"] == "Route not found"
def test_update_route_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
route = create_random_route(db)
data = {
"name": "No permissions",
"contact": "No permissions",
}
response = client.put(
f"{settings.API_V1_STR}/routes/{route.id}",
headers=normal_user_token_headers,
json=data,
)
assert response.status_code == status.HTTP_403_FORBIDDEN
assert response.json()["detail"] == "Not enough permissions"
def test_delete_route(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
route = create_random_route(db)
response = client.delete(
f"{settings.API_V1_STR}/routes/{route.id}",
headers=superuser_token_headers,
)
assert response.status_code == status.HTTP_200_OK
assert response.json()["message"] == "Route deleted successfully"
def test_delete_route_not_found(client: TestClient, superuser_token_headers: dict[str, str]) -> None:
response = client.delete(
f"{settings.API_V1_STR}/routes/{uuid.uuid4()}",
headers=superuser_token_headers,
)
assert response.status_code == status.HTTP_404_NOT_FOUND
assert response.json()["detail"] == "Route not found"
def test_delete_route_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
route = create_random_route(db)
response = client.delete(
f"{settings.API_V1_STR}/routes/{route.id}",
headers=normal_user_token_headers,
)
assert response.status_code == status.HTTP_403_FORBIDDEN
assert response.json()["detail"] == "Not enough permissions"

View File

@@ -0,0 +1,12 @@
from sqlmodel import Session
from app.models.hike import Hike, HikeCreate
from app.tests.utils.utils import random_lower_string
def create_random_hike(db: Session, name: str = None) -> Hike:
if not name:
name = random_lower_string()
hike_in = HikeCreate(name=name)
return Hike.create(session=db, create_obj=hike_in)

View File

@@ -0,0 +1,14 @@
from sqlmodel import Session
from app.models.place import Place, PlaceCreate
from app.tests.utils.utils import random_lower_string, random_lower_short_string
def create_random_place(db: Session, name: str = None) -> Place:
if not name:
name = random_lower_string()
short_name = random_lower_short_string()
place_in = PlaceCreate(name=name, short_name=short_name)
return Place.create(session=db, create_obj=place_in)

View File

@@ -0,0 +1,17 @@
from sqlmodel import Session
from app.models.hike import Hike
from app.models.route import Route, RouteCreate
from app.tests.utils.utils import random_lower_string
from app.tests.utils.hike import create_random_hike
def create_random_route(db: Session, name: str = None, hike: Hike = None) -> Route:
if not name:
name = random_lower_string()
if not hike:
hike = create_random_hike(db=db)
route_in = RouteCreate(name=name, hike_id=hike.id)
return Route.create(session=db, create_obj=route_in)

View File

@@ -6,15 +6,12 @@ from app.models.event import Event
from app.models.team import Team, TeamCreate
from app.tests.utils.event import create_random_event
from app.tests.utils.utils import random_lower_string
from app.tests.utils.utils import random_lower_string, random_lower_short_string
def random_short_name() -> str:
return str(random.Random().randrange(1, 200))
def create_random_team(db: Session, event: Event | None = None) -> Team:
name = random_lower_string()
short_name = random_short_name()
short_name = random_lower_short_string()
if not event:
event = create_random_event(db)

View File

@@ -10,6 +10,10 @@ def random_lower_string() -> str:
return "".join(random.choices(string.ascii_lowercase, k=32))
def random_lower_short_string() -> str:
return str(random.Random().randrange(1, 8))
def random_email() -> str:
return f"{random_lower_string()}@{random_lower_string()}.com"
@@ -24,3 +28,4 @@ def get_superuser_token_headers(client: TestClient) -> dict[str, str]:
a_token = tokens["access_token"]
headers = {"Authorization": f"Bearer {a_token}"}
return headers