Implement basic hike functions
This commit is contained in:
@@ -10,6 +10,7 @@ from app.api.routes import (
|
||||
private,
|
||||
users,
|
||||
utils,
|
||||
hikes,
|
||||
)
|
||||
from app.core.config import settings
|
||||
|
||||
@@ -25,6 +26,8 @@ api_router.include_router(associations.router)
|
||||
api_router.include_router(divisions.router)
|
||||
api_router.include_router(members.router)
|
||||
|
||||
api_router.include_router(hikes.router)
|
||||
|
||||
|
||||
if settings.ENVIRONMENT == "local":
|
||||
api_router.include_router(private.router)
|
||||
|
||||
132
backend/app/api/routes/hikes.py
Normal file
132
backend/app/api/routes/hikes.py
Normal file
@@ -0,0 +1,132 @@
|
||||
from typing import Any
|
||||
|
||||
from fastapi import APIRouter, HTTPException, status
|
||||
from sqlmodel import func, select
|
||||
|
||||
from app.api.deps import CurrentUser, SessionDep
|
||||
from app.models.base import (
|
||||
ApiTags,
|
||||
Message,
|
||||
RowId,
|
||||
)
|
||||
from app.models.hike import (
|
||||
Hike,
|
||||
HikeCreate,
|
||||
HikeUpdate,
|
||||
HikePublic,
|
||||
HikesPublic,
|
||||
)
|
||||
from app.models.user import (
|
||||
PermissionModule,
|
||||
PermissionPart,
|
||||
PermissionRight,
|
||||
)
|
||||
|
||||
router = APIRouter(prefix="/hikes", tags=[ApiTags.HIKES])
|
||||
|
||||
|
||||
# region # Hikes ########################################################
|
||||
|
||||
@router.get("/", response_model=HikesPublic)
|
||||
def read_hikes(
|
||||
session: SessionDep, current_user: CurrentUser, skip: int = 0, limit: int = 100
|
||||
) -> Any:
|
||||
"""
|
||||
Retrieve all hikes.
|
||||
"""
|
||||
|
||||
if current_user.has_permissions(
|
||||
module=PermissionModule.HIKE,
|
||||
part=PermissionPart.ADMIN,
|
||||
rights=PermissionRight.READ,
|
||||
):
|
||||
count_statement = select(func.count()).select_from(Hike)
|
||||
count = session.exec(count_statement).one()
|
||||
statement = select(Hike).offset(skip).limit(limit)
|
||||
hikes = session.exec(statement).all()
|
||||
return HikesPublic(data=hikes, count=count)
|
||||
|
||||
return HikesPublic(data=[], count=0)
|
||||
|
||||
|
||||
@router.get("/{id}", response_model=HikePublic)
|
||||
def read_hike(session: SessionDep, current_user: CurrentUser, id: RowId) -> Any:
|
||||
"""
|
||||
Get hike by ID.
|
||||
"""
|
||||
hike = session.get(Hike, id)
|
||||
if not hike:
|
||||
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Hike not found")
|
||||
|
||||
if not current_user.has_permissions(
|
||||
module=PermissionModule.HIKE,
|
||||
part=PermissionPart.ADMIN,
|
||||
rights=PermissionRight.READ,
|
||||
):
|
||||
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions")
|
||||
|
||||
return hike
|
||||
|
||||
|
||||
@router.post("/", response_model=HikePublic)
|
||||
def create_hike(
|
||||
*, session: SessionDep, current_user: CurrentUser, hike_in: HikeCreate
|
||||
) -> Any:
|
||||
"""
|
||||
Create new hike.
|
||||
"""
|
||||
|
||||
if not current_user.has_permissions(
|
||||
module=PermissionModule.HIKE,
|
||||
part=PermissionPart.ADMIN,
|
||||
rights=PermissionRight.CREATE,
|
||||
):
|
||||
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions")
|
||||
|
||||
hike = Hike.create(create_obj=hike_in, session=session)
|
||||
return hike
|
||||
|
||||
|
||||
@router.put("/{id}", response_model=HikePublic)
|
||||
def update_hike(
|
||||
*, session: SessionDep, current_user: CurrentUser, id: RowId, hike_in: HikeUpdate
|
||||
) -> Any:
|
||||
"""
|
||||
Update a hike.
|
||||
"""
|
||||
hike = session.get(Hike, id)
|
||||
if not hike:
|
||||
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Hike not found")
|
||||
|
||||
if not current_user.has_permissions(
|
||||
module=PermissionModule.HIKE,
|
||||
part=PermissionPart.ADMIN,
|
||||
rights=PermissionRight.UPDATE,
|
||||
):
|
||||
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions")
|
||||
|
||||
hike = Hike.update(db_obj=hike, in_obj=hike_in, session=session)
|
||||
return hike
|
||||
|
||||
|
||||
@router.delete("/{id}")
|
||||
def delete_hike(session: SessionDep,current_user: CurrentUser, id: RowId) -> Message:
|
||||
"""
|
||||
Delete a hike.
|
||||
"""
|
||||
hike = session.get(Hike, id)
|
||||
if not hike:
|
||||
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Hike not found")
|
||||
|
||||
if not current_user.has_permissions(
|
||||
module=PermissionModule.HIKE,
|
||||
part=PermissionPart.ADMIN,
|
||||
rights=PermissionRight.DELETE,
|
||||
):
|
||||
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions")
|
||||
|
||||
session.delete(hike)
|
||||
session.commit()
|
||||
return Message(message="Hike deleted successfully")
|
||||
|
||||
# endregion
|
||||
@@ -30,6 +30,9 @@ from app.models.member import (
|
||||
from app.models.apikey import (
|
||||
ApiKey,
|
||||
)
|
||||
from app.models.hike import (
|
||||
Hike,
|
||||
)
|
||||
|
||||
engine = create_engine(str(settings.SQLALCHEMY_DATABASE_URI))
|
||||
|
||||
|
||||
@@ -138,6 +138,8 @@ class ApiTags(DocumentedStrEnum):
|
||||
DIVISIONS = "Divisions"
|
||||
MEMBERS = "Members"
|
||||
|
||||
HIKES = "Hikes"
|
||||
|
||||
|
||||
# endregion
|
||||
|
||||
|
||||
181
backend/app/models/hike.py
Normal file
181
backend/app/models/hike.py
Normal file
@@ -0,0 +1,181 @@
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from sqlmodel import (
|
||||
Session,
|
||||
Relationship,
|
||||
Field,
|
||||
)
|
||||
|
||||
from . import mixin
|
||||
from .base import (
|
||||
BaseSQLModel,
|
||||
DocumentedStrEnum,
|
||||
DocumentedIntFlag,
|
||||
DocumentedStrFlagType,
|
||||
auto_enum,
|
||||
)
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from .event import Event
|
||||
|
||||
# region # Hike ################################################################
|
||||
|
||||
class HikeTeamPage(DocumentedIntFlag):
|
||||
ROUTE = auto_enum() # Route steps info
|
||||
QUESTIONS = auto_enum() # Visible questions
|
||||
|
||||
VISITED_PLACES = auto_enum() # Places the team has been
|
||||
CURRENT_PLACE = auto_enum() # Place where the team currently is
|
||||
UNVISITED_PLACES = auto_enum() # Places the team still neat to visit
|
||||
|
||||
PLACES = VISITED_PLACES | CURRENT_PLACE | UNVISITED_PLACES # All the places
|
||||
|
||||
TRAVEL_TIME = auto_enum() # Total travel time
|
||||
TRAVEL_FREE_TIME = auto_enum() # Total travel time
|
||||
PLACE_TIME = auto_enum() # Total place time
|
||||
CALCULATE_TIME = auto_enum() # Total time calculated based on hike settings
|
||||
# TODO: Think about time between oter teams
|
||||
|
||||
PLACE_POINTS = auto_enum() # Points scored on a place
|
||||
TOTAL_PLACE_POINTS = auto_enum() # All points got on all places
|
||||
ASSIST_POINTS = auto_enum() # Minus points for assistants
|
||||
# TODO: Think about place in classement
|
||||
|
||||
ASSIST_LOG = auto_enum() # Assisted items
|
||||
ASSIST_LATTER = auto_enum() # ???
|
||||
|
||||
|
||||
# ##############################################################################
|
||||
|
||||
|
||||
class HikeTimeCalculation(DocumentedIntFlag):
|
||||
TRAVEL_TIME = auto_enum() # Time traveling
|
||||
|
||||
# TODO: Think about time groups (in this model we have 2, free and non free)
|
||||
TRAVEL_FREE_TIME = auto_enum() # Time that is excluded from traveling but not at a place
|
||||
PLACE_TIME = auto_enum() # Time checked in at a place
|
||||
|
||||
TOTAL_TIME = TRAVEL_TIME | TRAVEL_FREE_TIME | PLACE_TIME
|
||||
|
||||
|
||||
# ##############################################################################
|
||||
|
||||
|
||||
class HikeBase(
|
||||
mixin.Name,
|
||||
mixin.Contact,
|
||||
BaseSQLModel,
|
||||
):
|
||||
tracker_interval: int | None = Field(
|
||||
default=None,
|
||||
nullable=True,
|
||||
description="Is GPS button available, value will be the interval",
|
||||
)
|
||||
|
||||
is_multi_day: bool = Field(
|
||||
default=False,
|
||||
nullable=False,
|
||||
description="Show datetime in stead of time only",
|
||||
)
|
||||
|
||||
team_page: HikeTeamPage = Field(
|
||||
default=HikeTeamPage.PLACES | HikeTeamPage.ROUTE | HikeTeamPage.QUESTIONS,
|
||||
nullable=False,
|
||||
description="Show all the places of the route inside the teams page",
|
||||
sa_type=DocumentedStrFlagType(HikeTeamPage),
|
||||
)
|
||||
|
||||
time_calculation: HikeTimeCalculation = Field(
|
||||
default=HikeTimeCalculation.TRAVEL_TIME,
|
||||
nullable=False,
|
||||
description="Wath should we calculate inside the total time",
|
||||
sa_type=DocumentedStrFlagType(HikeTimeCalculation),
|
||||
)
|
||||
|
||||
min_time_points: int | None = Field(
|
||||
default=0,
|
||||
ge=0,
|
||||
# TODO: le: max_time_points
|
||||
description="Min points for time",
|
||||
)
|
||||
|
||||
max_time_points: int | None = Field(
|
||||
default=100,
|
||||
ge=0, # TODO: ge > min_time_points
|
||||
description="Max points for time",
|
||||
)
|
||||
|
||||
max_question_points: int | None = Field(
|
||||
default=None,
|
||||
description="None: Calculate from answers (no max), Positive: Set as max for dynamic range",
|
||||
)
|
||||
|
||||
|
||||
# Properties to receive via API on creation
|
||||
class HikeCreate(HikeBase):
|
||||
pass
|
||||
|
||||
|
||||
# Properties to receive via API on update, all are optional
|
||||
class HikeUpdate(HikeBase):
|
||||
is_multi_day: bool | None = Field(default=None) # type: ignore
|
||||
team_page: HikeTeamPage | None = Field(default=None) # type: ignore
|
||||
time_calculation: HikeTimeCalculation | None = Field(default=None) # type: ignore
|
||||
|
||||
|
||||
class Hike(mixin.RowId, HikeBase, table=True):
|
||||
# --- database only items --------------------------------------------------
|
||||
|
||||
# --- read only items ------------------------------------------------------
|
||||
|
||||
# --- back_populates links -------------------------------------------------
|
||||
|
||||
# --- CRUD actions ---------------------------------------------------------
|
||||
@classmethod
|
||||
def create(cls, *, session: Session, create_obj: HikeCreate) -> "Hike":
|
||||
data_obj = create_obj.model_dump(exclude_unset=True)
|
||||
|
||||
db_obj = cls.model_validate(data_obj)
|
||||
session.add(db_obj)
|
||||
session.commit()
|
||||
session.refresh(db_obj)
|
||||
return db_obj
|
||||
|
||||
@classmethod
|
||||
def update(
|
||||
cls, *, session: Session, db_obj: "Hike", in_obj: HikeUpdate
|
||||
) -> "Hike":
|
||||
data_obj = in_obj.model_dump(exclude_unset=True)
|
||||
db_obj.sqlmodel_update(data_obj)
|
||||
session.add(db_obj)
|
||||
session.commit()
|
||||
session.refresh(db_obj)
|
||||
return db_obj
|
||||
|
||||
|
||||
# Properties to return via API, id is always required
|
||||
class HikePublic(mixin.RowIdPublic, HikeBase):
|
||||
pass
|
||||
|
||||
|
||||
class HikesPublic(BaseSQLModel):
|
||||
data: list[HikePublic]
|
||||
count: int
|
||||
|
||||
|
||||
# endregion
|
||||
|
||||
|
||||
# region # Hike / Route ########################################################
|
||||
|
||||
|
||||
class RouteType(DocumentedStrEnum):
|
||||
START_FINISH = auto_enum() # Start at the start and end at the finish
|
||||
CIRCULAR = auto_enum() # Start some ware, finish at the last new place
|
||||
CIRCULAR_BACK_TO_START = auto_enum() # Start and finish on the same random place (CIRCULAR + next to start)
|
||||
|
||||
|
||||
# ##############################################################################
|
||||
|
||||
|
||||
# endregion
|
||||
@@ -32,6 +32,8 @@ class PermissionModule(DocumentedStrEnum):
|
||||
DIVISION = auto_enum()
|
||||
MEMBER = auto_enum()
|
||||
|
||||
HIKE = auto_enum()
|
||||
|
||||
|
||||
class PermissionPart(DocumentedStrEnum):
|
||||
ADMIN = auto_enum()
|
||||
|
||||
179
backend/app/tests/api/routes/test_hike.py
Normal file
179
backend/app/tests/api/routes/test_hike.py
Normal file
@@ -0,0 +1,179 @@
|
||||
import uuid
|
||||
|
||||
from fastapi import status
|
||||
from fastapi.testclient import TestClient
|
||||
from sqlmodel import Session
|
||||
|
||||
from app.core.config import settings
|
||||
from app.tests.utils.hike import create_random_hike
|
||||
|
||||
|
||||
def test_create_hike(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
|
||||
data = {
|
||||
"name": "RSW Maasdelta 2026",
|
||||
"contact": "Sebas",
|
||||
}
|
||||
response = client.post(
|
||||
f"{settings.API_V1_STR}/hikes/",
|
||||
headers=superuser_token_headers,
|
||||
json=data,
|
||||
)
|
||||
assert response.status_code == status.HTTP_200_OK
|
||||
content = response.json()
|
||||
assert content["name"] == data["name"]
|
||||
assert content["contact"] == data["contact"]
|
||||
assert "id" in content
|
||||
|
||||
|
||||
def test_create_hike_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
|
||||
data = {
|
||||
"name": "RSW Maasdelta 2026",
|
||||
"contact": "Sebas",
|
||||
}
|
||||
response = client.post(
|
||||
f"{settings.API_V1_STR}/hikes/",
|
||||
headers=normal_user_token_headers,
|
||||
json=data,
|
||||
)
|
||||
assert response.status_code == status.HTTP_403_FORBIDDEN
|
||||
assert response.json()["detail"] == "Not enough permissions"
|
||||
|
||||
|
||||
def test_read_hike(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
|
||||
hike = create_random_hike(db)
|
||||
response = client.get(
|
||||
f"{settings.API_V1_STR}/hikes/{hike.id}",
|
||||
headers=superuser_token_headers,
|
||||
)
|
||||
assert response.status_code == status.HTTP_200_OK
|
||||
content = response.json()
|
||||
assert content["id"] == str(hike.id)
|
||||
assert content["name"] == hike.name
|
||||
assert content["contact"] == hike.contact
|
||||
|
||||
|
||||
def test_read_hike_not_found(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
|
||||
response = client.get(
|
||||
f"{settings.API_V1_STR}/hikes/{uuid.uuid4()}",
|
||||
headers=superuser_token_headers,
|
||||
)
|
||||
assert response.status_code == status.HTTP_404_NOT_FOUND
|
||||
assert response.json()["detail"] == "Hike not found"
|
||||
|
||||
|
||||
def test_read_hike_no_permission(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
|
||||
hike = create_random_hike(db)
|
||||
response = client.get(
|
||||
f"{settings.API_V1_STR}/hikes/{hike.id}",
|
||||
headers=normal_user_token_headers,
|
||||
)
|
||||
assert response.status_code == status.HTTP_403_FORBIDDEN
|
||||
assert response.json()["detail"] == "Not enough permissions"
|
||||
|
||||
|
||||
def test_read_hikes(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
|
||||
create_random_hike(db)
|
||||
create_random_hike(db)
|
||||
response = client.get(
|
||||
f"{settings.API_V1_STR}/hikes/",
|
||||
headers=superuser_token_headers,
|
||||
)
|
||||
assert response.status_code == status.HTTP_200_OK
|
||||
content = response.json()
|
||||
assert "count" in content
|
||||
assert content["count"] >= 2
|
||||
assert "data" in content
|
||||
assert isinstance(content["data"], list)
|
||||
assert len(content["data"]) <= content["count"]
|
||||
|
||||
|
||||
def test_read_hikes_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
|
||||
create_random_hike(db)
|
||||
create_random_hike(db)
|
||||
response = client.get(
|
||||
f"{settings.API_V1_STR}/hikes/",
|
||||
headers=normal_user_token_headers,
|
||||
)
|
||||
assert response.status_code == status.HTTP_200_OK
|
||||
content = response.json()
|
||||
assert "count" in content
|
||||
assert content["count"] == 0
|
||||
assert "data" in content
|
||||
assert isinstance(content["data"], list)
|
||||
assert len(content["data"]) == 0
|
||||
|
||||
|
||||
def test_update_hike(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
|
||||
hike = create_random_hike(db)
|
||||
data = {
|
||||
"name": "Updated name",
|
||||
"contact": "Updated contact",
|
||||
}
|
||||
response = client.put(
|
||||
f"{settings.API_V1_STR}/hikes/{hike.id}",
|
||||
headers=superuser_token_headers,
|
||||
json=data,
|
||||
)
|
||||
assert response.status_code == status.HTTP_200_OK
|
||||
content = response.json()
|
||||
assert content["id"] == str(hike.id)
|
||||
assert content["name"] == data["name"]
|
||||
assert content["contact"] == data["contact"]
|
||||
|
||||
|
||||
def test_update_hike_not_found(client: TestClient, superuser_token_headers: dict[str, str]) -> None:
|
||||
data = {
|
||||
"name": "Not found",
|
||||
"contact": "Not found",
|
||||
}
|
||||
response = client.put(
|
||||
f"{settings.API_V1_STR}/hikes/{uuid.uuid4()}",
|
||||
headers=superuser_token_headers,
|
||||
json=data,
|
||||
)
|
||||
assert response.status_code == status.HTTP_404_NOT_FOUND
|
||||
assert response.json()["detail"] == "Hike not found"
|
||||
|
||||
|
||||
def test_update_hike_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
|
||||
hike = create_random_hike(db)
|
||||
data = {
|
||||
"name": "No permissions",
|
||||
"contact": "No permissions",
|
||||
}
|
||||
response = client.put(
|
||||
f"{settings.API_V1_STR}/hikes/{hike.id}",
|
||||
headers=normal_user_token_headers,
|
||||
json=data,
|
||||
)
|
||||
assert response.status_code == status.HTTP_403_FORBIDDEN
|
||||
assert response.json()["detail"] == "Not enough permissions"
|
||||
|
||||
|
||||
def test_delete_hike(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
|
||||
hike = create_random_hike(db)
|
||||
response = client.delete(
|
||||
f"{settings.API_V1_STR}/hikes/{hike.id}",
|
||||
headers=superuser_token_headers,
|
||||
)
|
||||
assert response.status_code == status.HTTP_200_OK
|
||||
assert response.json()["message"] == "Hike deleted successfully"
|
||||
|
||||
|
||||
def test_delete_hike_not_found(client: TestClient, superuser_token_headers: dict[str, str]) -> None:
|
||||
response = client.delete(
|
||||
f"{settings.API_V1_STR}/hikes/{uuid.uuid4()}",
|
||||
headers=superuser_token_headers,
|
||||
)
|
||||
assert response.status_code == status.HTTP_404_NOT_FOUND
|
||||
assert response.json()["detail"] == "Hike not found"
|
||||
|
||||
|
||||
def test_delete_hike_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
|
||||
hike = create_random_hike(db)
|
||||
response = client.delete(
|
||||
f"{settings.API_V1_STR}/hikes/{hike.id}",
|
||||
headers=normal_user_token_headers,
|
||||
)
|
||||
assert response.status_code == status.HTTP_403_FORBIDDEN
|
||||
assert response.json()["detail"] == "Not enough permissions"
|
||||
12
backend/app/tests/utils/hike.py
Normal file
12
backend/app/tests/utils/hike.py
Normal file
@@ -0,0 +1,12 @@
|
||||
from sqlmodel import Session
|
||||
|
||||
from app.models.hike import Hike, HikeCreate
|
||||
from app.tests.utils.utils import random_lower_string
|
||||
|
||||
|
||||
def create_random_hike(db: Session, name: str = None) -> Hike:
|
||||
if not name:
|
||||
name = random_lower_string()
|
||||
|
||||
hike_in = HikeCreate(name=name)
|
||||
return Hike.create(session=db, create_obj=hike_in)
|
||||
Reference in New Issue
Block a user