Add base for route

This commit is contained in:
Sebastiaan
2025-10-31 13:42:16 +01:00
parent 23d7d63103
commit 84d75e21ca
11 changed files with 582 additions and 20 deletions

View File

@@ -11,6 +11,7 @@ from app.api.routes import (
users,
utils,
hikes,
routes,
)
from app.core.config import settings
@@ -27,6 +28,7 @@ api_router.include_router(divisions.router)
api_router.include_router(members.router)
api_router.include_router(hikes.router)
api_router.include_router(routes.router)
if settings.ENVIRONMENT == "local":

View File

@@ -16,6 +16,10 @@ from app.models.hike import (
HikePublic,
HikesPublic,
)
from app.models.route import (
Route,
RoutesPublic,
)
from app.models.user import (
PermissionModule,
PermissionPart,
@@ -129,4 +133,47 @@ def delete_hike(session: SessionDep,current_user: CurrentUser, id: RowId) -> Mes
session.commit()
return Message(message="Hike deleted successfully")
# endregion
# region # Hike / Routes #######################################################
@router.get("/{hike_id}/routes/", response_model=RoutesPublic)
def read_hike_route(
session: SessionDep,
current_user: CurrentUser,
hike_id: RowId,
skip: int = 0,
limit: int = 100,
) -> Any:
"""
Retrieve all hike routes.
"""
hike = session.get(Hike, hike_id)
if not hike:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Hike not found"
)
if not current_user.has_permission(
module=PermissionModule.HIKE,
part=PermissionPart.ADMIN,
rights=(PermissionRight.MANAGE_HIKES | PermissionRight.READ),
):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions"
)
data_query = select(Route).where(
Route.hike_id == hike_id,
)
count = session.exec(select(func.count()).select_from(data_query.subquery())).one()
data = session.exec(data_query.offset(skip).limit(limit)).all()
return RoutesPublic(data=data, count=count)
# endregion

View File

@@ -0,0 +1,148 @@
from typing import Any
from fastapi import APIRouter, HTTPException, status
from sqlmodel import func, select
from app.api.deps import CurrentUser, SessionDep
from app.models.base import (
ApiTags,
Message,
RowId,
)
from app.models.route import (
Route,
RouteCreate,
RouteUpdate,
RoutePublic,
RoutesPublic,
)
from app.models.user import (
PermissionModule,
PermissionPart,
PermissionRight,
)
router = APIRouter(prefix="/routes", tags=[ApiTags.ROUTES])
# region # Routes ########################################################
@router.get("/", response_model=RoutesPublic)
def read_routes(
session: SessionDep, current_user: CurrentUser, skip: int = 0, limit: int = 100
) -> Any:
"""
Retrieve all routes.
"""
if current_user.has_permissions(
module=PermissionModule.ROUTE,
part=PermissionPart.ADMIN,
rights=PermissionRight.READ,
):
count_statement = select(func.count()).select_from(Route)
count = session.exec(count_statement).one()
statement = select(Route).offset(skip).limit(limit)
routes = session.exec(statement).all()
return RoutesPublic(data=routes, count=count)
return RoutesPublic(data=[], count=0)
@router.get("/{id}", response_model=RoutePublic)
def read_route(session: SessionDep, current_user: CurrentUser, id: RowId) -> Any:
"""
Get route by ID.
"""
route = session.get(Route, id)
if not route:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Route not found"
)
if not current_user.has_permissions(
module=PermissionModule.ROUTE,
part=PermissionPart.ADMIN,
rights=PermissionRight.READ,
):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions"
)
return route
@router.post("/", response_model=RoutePublic)
def create_route(
*, session: SessionDep, current_user: CurrentUser, route_in: RouteCreate
) -> Any:
"""
Create new route.
"""
if not current_user.has_permissions(
module=PermissionModule.ROUTE,
part=PermissionPart.ADMIN,
rights=PermissionRight.CREATE,
):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions"
)
route = Route.create(create_obj=route_in, session=session)
return route
@router.put("/{id}", response_model=RoutePublic)
def update_route(
*, session: SessionDep, current_user: CurrentUser, id: RowId, route_in: RouteUpdate
) -> Any:
"""
Update a route.
"""
route = session.get(Route, id)
if not route:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Route not found"
)
if not current_user.has_permissions(
module=PermissionModule.ROUTE,
part=PermissionPart.ADMIN,
rights=PermissionRight.UPDATE,
):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions"
)
route = Route.update(db_obj=route, in_obj=route_in, session=session)
return route
@router.delete("/{id}")
def delete_route(session: SessionDep, current_user: CurrentUser, id: RowId) -> Message:
"""
Delete a route.
"""
route = session.get(Route, id)
if not route:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Route not found"
)
if not current_user.has_permissions(
module=PermissionModule.ROUTE,
part=PermissionPart.ADMIN,
rights=PermissionRight.DELETE,
):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions"
)
session.delete(route)
session.commit()
return Message(message="Route deleted successfully")
# endregion

View File

@@ -25,7 +25,8 @@ from app.models.user import (
UserCreate,
)
from app.models.member import (
Member, MemberCreate,
Member,
MemberCreate,
)
from app.models.apikey import (
ApiKey,
@@ -33,6 +34,9 @@ from app.models.apikey import (
from app.models.hike import (
Hike,
)
from app.models.route import (
Route,
)
engine = create_engine(str(settings.SQLALCHEMY_DATABASE_URI))

View File

@@ -139,6 +139,7 @@ class ApiTags(DocumentedStrEnum):
MEMBERS = "Members"
HIKES = "Hikes"
ROUTES = "Routes"
# endregion

View File

@@ -1,3 +1,4 @@
from datetime import timedelta
from typing import TYPE_CHECKING
from sqlmodel import (
@@ -16,10 +17,11 @@ from .base import (
)
if TYPE_CHECKING:
from .event import Event
from .route import Route
# region # Hike ################################################################
class HikeTeamPage(DocumentedIntFlag):
ROUTE = auto_enum() # Route steps info
QUESTIONS = auto_enum() # Visible questions
@@ -101,7 +103,7 @@ class HikeBase(
max_time_points: int | None = Field(
default=100,
ge=0, # TODO: ge > min_time_points
ge=0, # TODO: ge > min_time_points
description="Max points for time",
)
@@ -129,6 +131,7 @@ class Hike(mixin.RowId, HikeBase, table=True):
# --- read only items ------------------------------------------------------
# --- back_populates links -------------------------------------------------
routes: list["Route"] = Relationship(back_populates="hike", cascade_delete=True)
# --- CRUD actions ---------------------------------------------------------
@classmethod
@@ -164,18 +167,3 @@ class HikesPublic(BaseSQLModel):
# endregion
# region # Hike / Route ########################################################
class RouteType(DocumentedStrEnum):
START_FINISH = auto_enum() # Start at the start and end at the finish
CIRCULAR = auto_enum() # Start some ware, finish at the last new place
CIRCULAR_BACK_TO_START = auto_enum() # Start and finish on the same random place (CIRCULAR + next to start)
# ##############################################################################
# endregion

120
backend/app/models/route.py Normal file
View File

@@ -0,0 +1,120 @@
from datetime import timedelta
from sqlmodel import (
Session,
Relationship,
Field,
)
from . import mixin
from .base import (
BaseSQLModel,
DocumentedStrEnum,
DocumentedStrFlagType,
auto_enum,
RowId,
)
from .hike import (
Hike,
HikeTimeCalculation,
)
# region # Route ###############################################################
class RouteType(DocumentedStrEnum):
START_FINISH = auto_enum() # Start at the start and end at the finish
CIRCULAR = auto_enum() # Start some ware, finish at the last new place
CIRCULAR_BACK_TO_START = auto_enum() # Start and finish on the same random place (CIRCULAR + next to start)
# ##############################################################################
class RouteBase(
mixin.Name,
mixin.Contact,
BaseSQLModel,
):
route_type: RouteType = Field(
default=RouteType.START_FINISH,
nullable=False,
)
time_calculation_override: HikeTimeCalculation | None = Field(
default=None,
nullable=True,
description="Should this route be calculated different then the main hike",
sa_type=DocumentedStrFlagType(HikeTimeCalculation),
)
min_time: Interval | None = Field(
default=None,
description="Min time correction, None = min of all, positive is used as 0:00, negative is subtracted from min of all time",
)
max_time: Interval | None = Field(
default=None,
description="Max time correction, None = max of all, positive is used as max, negative is subtracted from max of all time",
)
hike_id: RowId = Field(
nullable=False,
foreign_key="hike.id",
)
# Properties to receive via API on creation
class RouteCreate(RouteBase):
pass
# Properties to receive via API on update, all are optional
class RouteUpdate(RouteBase):
route_type: RouteType | None = Field(default=None) # type: ignore
hike_id: RowId | None = Field(default=None) # type: ignore
class Route(mixin.RowId, RouteBase, table=True):
# --- database only items --------------------------------------------------
# --- read only items ------------------------------------------------------
# --- back_populates links -------------------------------------------------
hike: "Hike" = Relationship(back_populates="routes")
# --- CRUD actions ---------------------------------------------------------
@classmethod
def create(cls, *, session: Session, create_obj: RouteCreate) -> "Route":
data_obj = create_obj.model_dump(exclude_unset=True)
db_obj = cls.model_validate(data_obj)
session.add(db_obj)
session.commit()
session.refresh(db_obj)
return db_obj
@classmethod
def update(
cls, *, session: Session, db_obj: "Route", in_obj: RouteUpdate
) -> "Route":
data_obj = in_obj.model_dump(exclude_unset=True)
db_obj.sqlmodel_update(data_obj)
session.add(db_obj)
session.commit()
session.refresh(db_obj)
return db_obj
# Properties to return via API, id is always required
class RoutePublic(mixin.RowIdPublic, RouteBase):
pass
class RoutesPublic(BaseSQLModel):
data: list[RoutePublic]
count: int
# endregion

View File

@@ -33,6 +33,7 @@ class PermissionModule(DocumentedStrEnum):
MEMBER = auto_enum()
HIKE = auto_enum()
ROUTE = auto_enum()
class PermissionPart(DocumentedStrEnum):
@@ -51,6 +52,8 @@ class PermissionRight(DocumentedIntFlag):
MANAGE_DIVISIONS = auto_enum()
MANAGE_MEMBERS = auto_enum()
MANAGE_HIKES = auto_enum()
ADMIN = ( CREATE
| READ
| UPDATE
@@ -59,6 +62,7 @@ class PermissionRight(DocumentedIntFlag):
| MANAGE_TEAMS
| MANAGE_DIVISIONS
| MANAGE_MEMBERS
| MANAGE_HIKES
)

View File

@@ -6,6 +6,7 @@ from sqlmodel import Session
from app.core.config import settings
from app.tests.utils.hike import create_random_hike
from app.tests.utils.route import create_random_route
def test_create_hike(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
@@ -27,8 +28,8 @@ def test_create_hike(client: TestClient, superuser_token_headers: dict[str, str]
def test_create_hike_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
data = {
"name": "RSW Maasdelta 2026",
"contact": "Sebas",
"name": "No permissions",
"contact": "No permissions",
}
response = client.post(
f"{settings.API_V1_STR}/hikes/",
@@ -177,3 +178,46 @@ def test_delete_hike_no_permissions(client: TestClient, normal_user_token_header
)
assert response.status_code == status.HTTP_403_FORBIDDEN
assert response.json()["detail"] == "Not enough permissions"
def test_read_hike_routes(
client: TestClient, superuser_token_headers: dict[str, str], db: Session
) -> None:
hike = create_random_hike(db)
create_random_route(db, hike=hike)
create_random_route(db, hike=hike)
response = client.get(
f"{settings.API_V1_STR}/hikes/{hike.id}/routes",
headers=superuser_token_headers,
)
assert response.status_code == status.HTTP_200_OK
content = response.json()
assert "count" in content
assert content["count"] == 2
assert "data" in content
assert isinstance(content["data"], list)
assert len(content["data"]) <= content["count"]
def test_read_hike_routes_not_found(
client: TestClient, superuser_token_headers: dict[str, str], db: Session
) -> None:
response = client.get(
f"{settings.API_V1_STR}/hikes/{uuid.uuid4()}/routes",
headers=superuser_token_headers,
)
assert response.status_code == status.HTTP_404_NOT_FOUND
assert response.json()["detail"] == "Hike not found"
def test_read_hike_routes_no_permissions(
client: TestClient, normal_user_token_headers: dict[str, str], db: Session
) -> None:
hike = create_random_hike(db)
create_random_route(db, hike=hike)
response = client.get(
f"{settings.API_V1_STR}/hikes/{hike.id}/routes",
headers=normal_user_token_headers,
)
assert response.status_code == status.HTTP_403_FORBIDDEN
assert response.json()["detail"] == "Not enough permissions"

View File

@@ -0,0 +1,187 @@
import uuid
from fastapi import status
from fastapi.testclient import TestClient
from sqlmodel import Session
from app.core.config import settings
from app.tests.utils.hike import create_random_hike
from app.tests.utils.route import create_random_route
def test_create_route(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
hike = create_random_hike(db)
data = {
"name": "Roete A",
"contact": "Sebas",
"hike_id": str(hike.id),
}
response = client.post(
f"{settings.API_V1_STR}/routes/",
headers=superuser_token_headers,
json=data,
)
assert response.status_code == status.HTTP_200_OK
content = response.json()
assert content["name"] == data["name"]
assert content["contact"] == data["contact"]
assert content["hike_id"] == data["hike_id"]
assert "id" in content
def test_create_route_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
data = {
"name": "No permissions",
"contact": "No permissions",
"hike_id": str(uuid.uuid4()),
}
response = client.post(
f"{settings.API_V1_STR}/routes/",
headers=normal_user_token_headers,
json=data,
)
assert response.status_code == status.HTTP_403_FORBIDDEN
assert response.json()["detail"] == "Not enough permissions"
def test_read_route(
client: TestClient, superuser_token_headers: dict[str, str], db: Session
) -> None:
route = create_random_route(db)
response = client.get(
f"{settings.API_V1_STR}/routes/{route.id}",
headers=superuser_token_headers,
)
assert response.status_code == status.HTTP_200_OK
content = response.json()
assert content["id"] == str(route.id)
assert content["name"] == route.name
assert content["contact"] == route.contact
def test_read_route_not_found(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
response = client.get(
f"{settings.API_V1_STR}/routes/{uuid.uuid4()}",
headers=superuser_token_headers,
)
assert response.status_code == status.HTTP_404_NOT_FOUND
assert response.json()["detail"] == "Route not found"
def test_read_route_no_permission(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
route = create_random_route(db)
response = client.get(
f"{settings.API_V1_STR}/routes/{route.id}",
headers=normal_user_token_headers,
)
assert response.status_code == status.HTTP_403_FORBIDDEN
assert response.json()["detail"] == "Not enough permissions"
def test_read_routes(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
create_random_route(db)
create_random_route(db)
response = client.get(
f"{settings.API_V1_STR}/routes/",
headers=superuser_token_headers,
)
assert response.status_code == status.HTTP_200_OK
content = response.json()
assert "count" in content
assert content["count"] >= 2
assert "data" in content
assert isinstance(content["data"], list)
assert len(content["data"]) <= content["count"]
def test_read_routes_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
create_random_route(db)
create_random_route(db)
response = client.get(
f"{settings.API_V1_STR}/routes/",
headers=normal_user_token_headers,
)
assert response.status_code == status.HTTP_200_OK
content = response.json()
assert "count" in content
assert content["count"] == 0
assert "data" in content
assert isinstance(content["data"], list)
assert len(content["data"]) == 0
def test_update_route(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
route = create_random_route(db)
data = {
"name": "Updated name",
"contact": "Updated contact",
}
response = client.put(
f"{settings.API_V1_STR}/routes/{route.id}",
headers=superuser_token_headers,
json=data,
)
assert response.status_code == status.HTTP_200_OK
content = response.json()
assert content["id"] == str(route.id)
assert content["name"] == data["name"]
assert content["contact"] == data["contact"]
def test_update_route_not_found(client: TestClient, superuser_token_headers: dict[str, str]) -> None:
data = {
"name": "Not found",
"contact": "Not found",
}
response = client.put(
f"{settings.API_V1_STR}/routes/{uuid.uuid4()}",
headers=superuser_token_headers,
json=data,
)
assert response.status_code == status.HTTP_404_NOT_FOUND
assert response.json()["detail"] == "Route not found"
def test_update_route_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
route = create_random_route(db)
data = {
"name": "No permissions",
"contact": "No permissions",
}
response = client.put(
f"{settings.API_V1_STR}/routes/{route.id}",
headers=normal_user_token_headers,
json=data,
)
assert response.status_code == status.HTTP_403_FORBIDDEN
assert response.json()["detail"] == "Not enough permissions"
def test_delete_route(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
route = create_random_route(db)
response = client.delete(
f"{settings.API_V1_STR}/routes/{route.id}",
headers=superuser_token_headers,
)
assert response.status_code == status.HTTP_200_OK
assert response.json()["message"] == "Route deleted successfully"
def test_delete_route_not_found(client: TestClient, superuser_token_headers: dict[str, str]) -> None:
response = client.delete(
f"{settings.API_V1_STR}/routes/{uuid.uuid4()}",
headers=superuser_token_headers,
)
assert response.status_code == status.HTTP_404_NOT_FOUND
assert response.json()["detail"] == "Route not found"
def test_delete_route_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
route = create_random_route(db)
response = client.delete(
f"{settings.API_V1_STR}/routes/{route.id}",
headers=normal_user_token_headers,
)
assert response.status_code == status.HTTP_403_FORBIDDEN
assert response.json()["detail"] == "Not enough permissions"

View File

@@ -0,0 +1,17 @@
from sqlmodel import Session
from app.models.hike import Hike
from app.models.route import Route, RouteCreate
from app.tests.utils.utils import random_lower_string
from app.tests.utils.hike import create_random_hike
def create_random_route(db: Session, name: str = None, hike: Hike = None) -> Hike:
if not name:
name = random_lower_string()
if not hike:
hike = create_random_hike(db=db)
route_in = RouteCreate(name=name, hike_id=hike.id)
return Route.create(session=db, create_obj=route_in)