Compare commits
4 Commits
develop
...
feature/hi
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0d6ef073df | ||
|
|
84d75e21ca | ||
|
|
23d7d63103 | ||
|
|
f958856e95 |
@@ -10,6 +10,9 @@ from app.api.routes import (
|
||||
private,
|
||||
users,
|
||||
utils,
|
||||
hikes,
|
||||
routes,
|
||||
places,
|
||||
)
|
||||
from app.core.config import settings
|
||||
|
||||
@@ -25,6 +28,10 @@ api_router.include_router(associations.router)
|
||||
api_router.include_router(divisions.router)
|
||||
api_router.include_router(members.router)
|
||||
|
||||
api_router.include_router(hikes.router)
|
||||
api_router.include_router(routes.router)
|
||||
api_router.include_router(places.router)
|
||||
|
||||
|
||||
if settings.ENVIRONMENT == "local":
|
||||
api_router.include_router(private.router)
|
||||
|
||||
179
backend/app/api/routes/hikes.py
Normal file
179
backend/app/api/routes/hikes.py
Normal file
@@ -0,0 +1,179 @@
|
||||
from typing import Any
|
||||
|
||||
from fastapi import APIRouter, HTTPException, status
|
||||
from sqlmodel import func, select
|
||||
|
||||
from app.api.deps import CurrentUser, SessionDep
|
||||
from app.models.base import (
|
||||
ApiTags,
|
||||
Message,
|
||||
RowId,
|
||||
)
|
||||
from app.models.hike import (
|
||||
Hike,
|
||||
HikeCreate,
|
||||
HikeUpdate,
|
||||
HikePublic,
|
||||
HikesPublic,
|
||||
)
|
||||
from app.models.route import (
|
||||
Route,
|
||||
RoutesPublic,
|
||||
)
|
||||
from app.models.user import (
|
||||
PermissionModule,
|
||||
PermissionPart,
|
||||
PermissionRight,
|
||||
)
|
||||
|
||||
router = APIRouter(prefix="/hikes", tags=[ApiTags.HIKES])
|
||||
|
||||
|
||||
# region # Hikes ########################################################
|
||||
|
||||
@router.get("/", response_model=HikesPublic)
|
||||
def read_hikes(
|
||||
session: SessionDep, current_user: CurrentUser, skip: int = 0, limit: int = 100
|
||||
) -> Any:
|
||||
"""
|
||||
Retrieve all hikes.
|
||||
"""
|
||||
|
||||
if current_user.has_permissions(
|
||||
module=PermissionModule.HIKE,
|
||||
part=PermissionPart.ADMIN,
|
||||
rights=PermissionRight.READ,
|
||||
):
|
||||
count_statement = select(func.count()).select_from(Hike)
|
||||
count = session.exec(count_statement).one()
|
||||
statement = select(Hike).offset(skip).limit(limit)
|
||||
hikes = session.exec(statement).all()
|
||||
return HikesPublic(data=hikes, count=count)
|
||||
|
||||
return HikesPublic(data=[], count=0)
|
||||
|
||||
|
||||
@router.get("/{id}", response_model=HikePublic)
|
||||
def read_hike(session: SessionDep, current_user: CurrentUser, id: RowId) -> Any:
|
||||
"""
|
||||
Get hike by ID.
|
||||
"""
|
||||
hike = session.get(Hike, id)
|
||||
if not hike:
|
||||
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Hike not found")
|
||||
|
||||
if not current_user.has_permissions(
|
||||
module=PermissionModule.HIKE,
|
||||
part=PermissionPart.ADMIN,
|
||||
rights=PermissionRight.READ,
|
||||
):
|
||||
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions")
|
||||
|
||||
return hike
|
||||
|
||||
|
||||
@router.post("/", response_model=HikePublic)
|
||||
def create_hike(
|
||||
*, session: SessionDep, current_user: CurrentUser, hike_in: HikeCreate
|
||||
) -> Any:
|
||||
"""
|
||||
Create new hike.
|
||||
"""
|
||||
|
||||
if not current_user.has_permissions(
|
||||
module=PermissionModule.HIKE,
|
||||
part=PermissionPart.ADMIN,
|
||||
rights=PermissionRight.CREATE,
|
||||
):
|
||||
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions")
|
||||
|
||||
hike = Hike.create(create_obj=hike_in, session=session)
|
||||
return hike
|
||||
|
||||
|
||||
@router.put("/{id}", response_model=HikePublic)
|
||||
def update_hike(
|
||||
*, session: SessionDep, current_user: CurrentUser, id: RowId, hike_in: HikeUpdate
|
||||
) -> Any:
|
||||
"""
|
||||
Update a hike.
|
||||
"""
|
||||
hike = session.get(Hike, id)
|
||||
if not hike:
|
||||
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Hike not found")
|
||||
|
||||
if not current_user.has_permissions(
|
||||
module=PermissionModule.HIKE,
|
||||
part=PermissionPart.ADMIN,
|
||||
rights=PermissionRight.UPDATE,
|
||||
):
|
||||
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions")
|
||||
|
||||
hike = Hike.update(db_obj=hike, in_obj=hike_in, session=session)
|
||||
return hike
|
||||
|
||||
|
||||
@router.delete("/{id}")
|
||||
def delete_hike(session: SessionDep,current_user: CurrentUser, id: RowId) -> Message:
|
||||
"""
|
||||
Delete a hike.
|
||||
"""
|
||||
hike = session.get(Hike, id)
|
||||
if not hike:
|
||||
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Hike not found")
|
||||
|
||||
if not current_user.has_permissions(
|
||||
module=PermissionModule.HIKE,
|
||||
part=PermissionPart.ADMIN,
|
||||
rights=PermissionRight.DELETE,
|
||||
):
|
||||
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions")
|
||||
|
||||
session.delete(hike)
|
||||
session.commit()
|
||||
return Message(message="Hike deleted successfully")
|
||||
|
||||
|
||||
# endregion
|
||||
|
||||
# region # Hike / Routes #######################################################
|
||||
|
||||
|
||||
@router.get("/{hike_id}/routes/", response_model=RoutesPublic)
|
||||
def read_hike_route(
|
||||
session: SessionDep,
|
||||
current_user: CurrentUser,
|
||||
hike_id: RowId,
|
||||
skip: int = 0,
|
||||
limit: int = 100,
|
||||
) -> Any:
|
||||
"""
|
||||
Retrieve all hike routes.
|
||||
"""
|
||||
|
||||
hike = session.get(Hike, hike_id)
|
||||
if not hike:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_404_NOT_FOUND, detail="Hike not found"
|
||||
)
|
||||
|
||||
if not current_user.has_permission(
|
||||
module=PermissionModule.HIKE,
|
||||
part=PermissionPart.ADMIN,
|
||||
rights=(PermissionRight.MANAGE_HIKES | PermissionRight.READ),
|
||||
):
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions"
|
||||
)
|
||||
|
||||
data_query = select(Route).where(
|
||||
Route.hike_id == hike_id,
|
||||
)
|
||||
|
||||
count = session.exec(select(func.count()).select_from(data_query.subquery())).one()
|
||||
data = session.exec(data_query.offset(skip).limit(limit)).all()
|
||||
|
||||
return RoutesPublic(data=data, count=count)
|
||||
|
||||
|
||||
# endregion
|
||||
148
backend/app/api/routes/places.py
Normal file
148
backend/app/api/routes/places.py
Normal file
@@ -0,0 +1,148 @@
|
||||
from typing import Any
|
||||
|
||||
from fastapi import APIRouter, HTTPException, status
|
||||
from sqlmodel import func, select
|
||||
|
||||
from app.api.deps import CurrentUser, SessionDep
|
||||
from app.models.base import (
|
||||
ApiTags,
|
||||
Message,
|
||||
RowId,
|
||||
)
|
||||
from app.models.place import (
|
||||
Place,
|
||||
PlaceCreate,
|
||||
PlaceUpdate,
|
||||
PlacePublic,
|
||||
PlacesPublic,
|
||||
)
|
||||
from app.models.user import (
|
||||
PermissionModule,
|
||||
PermissionPart,
|
||||
PermissionRight,
|
||||
)
|
||||
|
||||
router = APIRouter(prefix="/places", tags=[ApiTags.PLACES])
|
||||
|
||||
|
||||
# region # Places ########################################################
|
||||
|
||||
|
||||
@router.get("/", response_model=PlacesPublic)
|
||||
def read_places(
|
||||
session: SessionDep, current_user: CurrentUser, skip: int = 0, limit: int = 100
|
||||
) -> Any:
|
||||
"""
|
||||
Retrieve all places.
|
||||
"""
|
||||
|
||||
if current_user.has_permissions(
|
||||
module=PermissionModule.PLACE,
|
||||
part=PermissionPart.ADMIN,
|
||||
rights=PermissionRight.READ,
|
||||
):
|
||||
count_statement = select(func.count()).select_from(Place)
|
||||
count = session.exec(count_statement).one()
|
||||
statement = select(Place).offset(skip).limit(limit)
|
||||
places = session.exec(statement).all()
|
||||
return PlacesPublic(data=places, count=count)
|
||||
|
||||
return PlacesPublic(data=[], count=0)
|
||||
|
||||
|
||||
@router.get("/{id}", response_model=PlacePublic)
|
||||
def read_place(session: SessionDep, current_user: CurrentUser, id: RowId) -> Any:
|
||||
"""
|
||||
Get place by ID.
|
||||
"""
|
||||
place = session.get(Place, id)
|
||||
if not place:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_404_NOT_FOUND, detail="Place not found"
|
||||
)
|
||||
|
||||
if not current_user.has_permissions(
|
||||
module=PermissionModule.PLACE,
|
||||
part=PermissionPart.ADMIN,
|
||||
rights=PermissionRight.READ,
|
||||
):
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions"
|
||||
)
|
||||
|
||||
return place
|
||||
|
||||
|
||||
@router.post("/", response_model=PlacePublic)
|
||||
def create_place(
|
||||
*, session: SessionDep, current_user: CurrentUser, place_in: PlaceCreate
|
||||
) -> Any:
|
||||
"""
|
||||
Create new place.
|
||||
"""
|
||||
|
||||
if not current_user.has_permissions(
|
||||
module=PermissionModule.PLACE,
|
||||
part=PermissionPart.ADMIN,
|
||||
rights=PermissionRight.CREATE,
|
||||
):
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions"
|
||||
)
|
||||
|
||||
place = Place.create(create_obj=place_in, session=session)
|
||||
return place
|
||||
|
||||
|
||||
@router.put("/{id}", response_model=PlacePublic)
|
||||
def update_place(
|
||||
*, session: SessionDep, current_user: CurrentUser, id: RowId, place_in: PlaceUpdate
|
||||
) -> Any:
|
||||
"""
|
||||
Update a place.
|
||||
"""
|
||||
place = session.get(Place, id)
|
||||
if not place:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_404_NOT_FOUND, detail="Place not found"
|
||||
)
|
||||
|
||||
if not current_user.has_permissions(
|
||||
module=PermissionModule.PLACE,
|
||||
part=PermissionPart.ADMIN,
|
||||
rights=PermissionRight.UPDATE,
|
||||
):
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions"
|
||||
)
|
||||
|
||||
place = Place.update(db_obj=place, in_obj=place_in, session=session)
|
||||
return place
|
||||
|
||||
|
||||
@router.delete("/{id}")
|
||||
def delete_place(session: SessionDep, current_user: CurrentUser, id: RowId) -> Message:
|
||||
"""
|
||||
Delete a place.
|
||||
"""
|
||||
place = session.get(Place, id)
|
||||
if not place:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_404_NOT_FOUND, detail="Place not found"
|
||||
)
|
||||
|
||||
if not current_user.has_permissions(
|
||||
module=PermissionModule.PLACE,
|
||||
part=PermissionPart.ADMIN,
|
||||
rights=PermissionRight.DELETE,
|
||||
):
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions"
|
||||
)
|
||||
|
||||
session.delete(place)
|
||||
session.commit()
|
||||
return Message(message="Place deleted successfully")
|
||||
|
||||
|
||||
# endregion
|
||||
148
backend/app/api/routes/routes.py
Normal file
148
backend/app/api/routes/routes.py
Normal file
@@ -0,0 +1,148 @@
|
||||
from typing import Any
|
||||
|
||||
from fastapi import APIRouter, HTTPException, status
|
||||
from sqlmodel import func, select
|
||||
|
||||
from app.api.deps import CurrentUser, SessionDep
|
||||
from app.models.base import (
|
||||
ApiTags,
|
||||
Message,
|
||||
RowId,
|
||||
)
|
||||
from app.models.route import (
|
||||
Route,
|
||||
RouteCreate,
|
||||
RouteUpdate,
|
||||
RoutePublic,
|
||||
RoutesPublic,
|
||||
)
|
||||
from app.models.user import (
|
||||
PermissionModule,
|
||||
PermissionPart,
|
||||
PermissionRight,
|
||||
)
|
||||
|
||||
router = APIRouter(prefix="/routes", tags=[ApiTags.ROUTES])
|
||||
|
||||
|
||||
# region # Routes ########################################################
|
||||
|
||||
|
||||
@router.get("/", response_model=RoutesPublic)
|
||||
def read_routes(
|
||||
session: SessionDep, current_user: CurrentUser, skip: int = 0, limit: int = 100
|
||||
) -> Any:
|
||||
"""
|
||||
Retrieve all routes.
|
||||
"""
|
||||
|
||||
if current_user.has_permissions(
|
||||
module=PermissionModule.ROUTE,
|
||||
part=PermissionPart.ADMIN,
|
||||
rights=PermissionRight.READ,
|
||||
):
|
||||
count_statement = select(func.count()).select_from(Route)
|
||||
count = session.exec(count_statement).one()
|
||||
statement = select(Route).offset(skip).limit(limit)
|
||||
routes = session.exec(statement).all()
|
||||
return RoutesPublic(data=routes, count=count)
|
||||
|
||||
return RoutesPublic(data=[], count=0)
|
||||
|
||||
|
||||
@router.get("/{id}", response_model=RoutePublic)
|
||||
def read_route(session: SessionDep, current_user: CurrentUser, id: RowId) -> Any:
|
||||
"""
|
||||
Get route by ID.
|
||||
"""
|
||||
route = session.get(Route, id)
|
||||
if not route:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_404_NOT_FOUND, detail="Route not found"
|
||||
)
|
||||
|
||||
if not current_user.has_permissions(
|
||||
module=PermissionModule.ROUTE,
|
||||
part=PermissionPart.ADMIN,
|
||||
rights=PermissionRight.READ,
|
||||
):
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions"
|
||||
)
|
||||
|
||||
return route
|
||||
|
||||
|
||||
@router.post("/", response_model=RoutePublic)
|
||||
def create_route(
|
||||
*, session: SessionDep, current_user: CurrentUser, route_in: RouteCreate
|
||||
) -> Any:
|
||||
"""
|
||||
Create new route.
|
||||
"""
|
||||
|
||||
if not current_user.has_permissions(
|
||||
module=PermissionModule.ROUTE,
|
||||
part=PermissionPart.ADMIN,
|
||||
rights=PermissionRight.CREATE,
|
||||
):
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions"
|
||||
)
|
||||
|
||||
route = Route.create(create_obj=route_in, session=session)
|
||||
return route
|
||||
|
||||
|
||||
@router.put("/{id}", response_model=RoutePublic)
|
||||
def update_route(
|
||||
*, session: SessionDep, current_user: CurrentUser, id: RowId, route_in: RouteUpdate
|
||||
) -> Any:
|
||||
"""
|
||||
Update a route.
|
||||
"""
|
||||
route = session.get(Route, id)
|
||||
if not route:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_404_NOT_FOUND, detail="Route not found"
|
||||
)
|
||||
|
||||
if not current_user.has_permissions(
|
||||
module=PermissionModule.ROUTE,
|
||||
part=PermissionPart.ADMIN,
|
||||
rights=PermissionRight.UPDATE,
|
||||
):
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions"
|
||||
)
|
||||
|
||||
route = Route.update(db_obj=route, in_obj=route_in, session=session)
|
||||
return route
|
||||
|
||||
|
||||
@router.delete("/{id}")
|
||||
def delete_route(session: SessionDep, current_user: CurrentUser, id: RowId) -> Message:
|
||||
"""
|
||||
Delete a route.
|
||||
"""
|
||||
route = session.get(Route, id)
|
||||
if not route:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_404_NOT_FOUND, detail="Route not found"
|
||||
)
|
||||
|
||||
if not current_user.has_permissions(
|
||||
module=PermissionModule.ROUTE,
|
||||
part=PermissionPart.ADMIN,
|
||||
rights=PermissionRight.DELETE,
|
||||
):
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions"
|
||||
)
|
||||
|
||||
session.delete(route)
|
||||
session.commit()
|
||||
return Message(message="Route deleted successfully")
|
||||
|
||||
|
||||
# endregion
|
||||
@@ -25,11 +25,23 @@ from app.models.user import (
|
||||
UserCreate,
|
||||
)
|
||||
from app.models.member import (
|
||||
Member, MemberCreate,
|
||||
Member,
|
||||
MemberCreate,
|
||||
)
|
||||
from app.models.apikey import (
|
||||
ApiKey,
|
||||
)
|
||||
from app.models.hike import (
|
||||
Hike,
|
||||
HikeTeamLink,
|
||||
)
|
||||
from app.models.route import (
|
||||
Route,
|
||||
RoutePart,
|
||||
)
|
||||
from app.models.place import (
|
||||
Place,
|
||||
)
|
||||
|
||||
engine = create_engine(str(settings.SQLALCHEMY_DATABASE_URI))
|
||||
|
||||
|
||||
@@ -4,6 +4,7 @@ from enum import Enum, IntFlag # Python 3.11 >= StrEnum
|
||||
from enum import auto as auto_enum
|
||||
from uuid import UUID as RowId
|
||||
|
||||
from sqlalchemy import TypeDecorator, Integer, VARCHAR
|
||||
from sqlmodel import SQLModel
|
||||
from sqlalchemy.orm import declared_attr
|
||||
|
||||
@@ -39,8 +40,85 @@ class DocumentedStrEnum(str, Enum):
|
||||
|
||||
|
||||
class DocumentedIntFlag(IntFlag):
|
||||
# TODO: Build DB sport to proper store flags and make it possible to store all mutations
|
||||
pass
|
||||
@property
|
||||
def names(self) -> str:
|
||||
"""
|
||||
Returns a comma-separated string of all active flag names.
|
||||
"""
|
||||
# Exclude 0-value flags
|
||||
return ",".join([flag.name for flag in type(self) if flag in self and flag.value != 0])
|
||||
|
||||
def __str__(self) -> str:
|
||||
# Default string conversion uses the names
|
||||
return self.names
|
||||
|
||||
# Optional: for Pydantic compatibility
|
||||
def __get_pydantic_json__(self) -> str:
|
||||
return self.names
|
||||
|
||||
|
||||
|
||||
class DocumentedIntFlagType(TypeDecorator):
|
||||
impl = Integer
|
||||
cache_ok = True
|
||||
|
||||
def __init__(self, enum_class: type[IntFlag], *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self.enum_class = enum_class
|
||||
|
||||
def process_bind_param(self, value, dialect):
|
||||
"""
|
||||
Convert IntFlag to integer before storing
|
||||
"""
|
||||
if value is None:
|
||||
return None
|
||||
if isinstance(value, self.enum_class):
|
||||
return int(value)
|
||||
if isinstance(value, int):
|
||||
return value
|
||||
raise ValueError(f"Invalid value for {self.enum_class.__name__}: {value!r}")
|
||||
|
||||
def process_result_value(self, value, dialect):
|
||||
"""
|
||||
Convert integer from DB back to IntFlag
|
||||
"""
|
||||
if value is None:
|
||||
return None
|
||||
return self.enum_class(value)
|
||||
|
||||
|
||||
class DocumentedStrFlagType(TypeDecorator):
|
||||
impl = VARCHAR
|
||||
cache_ok = True
|
||||
|
||||
def __init__(self, enum_class: type[IntFlag], *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self.enum_class = enum_class
|
||||
|
||||
def process_bind_param(self, value, dialect):
|
||||
"""
|
||||
Convert IntFlag to comma-separated string of names for storing in DB.
|
||||
"""
|
||||
if value is None:
|
||||
return None
|
||||
if isinstance(value, self.enum_class):
|
||||
return str(value)
|
||||
raise ValueError(f"Invalid value for {self.enum_class.__name__}: {value!r}")
|
||||
|
||||
def process_result_value(self, value, dialect):
|
||||
"""
|
||||
Convert comma-separated string of names from DB back to IntFlag.
|
||||
"""
|
||||
if value is None or value == "":
|
||||
return self.enum_class(0)
|
||||
names = value.split(",")
|
||||
result = self.enum_class(0)
|
||||
for name in names:
|
||||
try:
|
||||
result |= self.enum_class[name]
|
||||
except KeyError:
|
||||
raise ValueError(f"Invalid flag name '{name}' for {self.enum_class.__name__}")
|
||||
return result
|
||||
|
||||
|
||||
# #############################################################################
|
||||
@@ -60,6 +138,10 @@ class ApiTags(DocumentedStrEnum):
|
||||
DIVISIONS = "Divisions"
|
||||
MEMBERS = "Members"
|
||||
|
||||
HIKES = "Hikes"
|
||||
ROUTES = "Routes"
|
||||
PLACES = "Places"
|
||||
|
||||
|
||||
# endregion
|
||||
|
||||
|
||||
276
backend/app/models/hike.py
Normal file
276
backend/app/models/hike.py
Normal file
@@ -0,0 +1,276 @@
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from sqlmodel import (
|
||||
Session,
|
||||
Relationship,
|
||||
Field,
|
||||
)
|
||||
|
||||
from . import mixin
|
||||
from .base import (
|
||||
BaseSQLModel,
|
||||
DocumentedStrEnum,
|
||||
DocumentedIntFlag,
|
||||
DocumentedStrFlagType,
|
||||
auto_enum,
|
||||
RowId,
|
||||
)
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from .route import Route
|
||||
from .team import Team
|
||||
|
||||
# region # Hike ################################################################
|
||||
|
||||
|
||||
class HikeTeamPage(DocumentedIntFlag):
|
||||
ROUTE = auto_enum() # Route steps info
|
||||
QUESTIONS = auto_enum() # Visible questions
|
||||
|
||||
VISITED_PLACES = auto_enum() # Places the team has been
|
||||
CURRENT_PLACE = auto_enum() # Place where the team currently is
|
||||
UNVISITED_PLACES = auto_enum() # Places the team still neat to visit
|
||||
|
||||
PLACES = VISITED_PLACES | CURRENT_PLACE | UNVISITED_PLACES # All the places
|
||||
|
||||
TRAVEL_TIME = auto_enum() # Total travel time
|
||||
TRAVEL_FREE_TIME = auto_enum() # Total travel time
|
||||
PLACE_TIME = auto_enum() # Total place time
|
||||
CALCULATE_TIME = auto_enum() # Total time calculated based on hike settings
|
||||
# TODO: Think about time between oter teams
|
||||
|
||||
PLACE_POINTS = auto_enum() # Points scored on a place
|
||||
TOTAL_PLACE_POINTS = auto_enum() # All points got on all places
|
||||
ASSIST_POINTS = auto_enum() # Minus points for assistants
|
||||
# TODO: Think about place in classement
|
||||
|
||||
ASSIST_LOG = auto_enum() # Assisted items
|
||||
ASSIST_LATTER = auto_enum() # ???
|
||||
|
||||
|
||||
# ##############################################################################
|
||||
|
||||
|
||||
class HikeTimeCalculation(DocumentedIntFlag):
|
||||
TRAVEL_TIME = auto_enum() # Time traveling
|
||||
|
||||
# TODO: Think about time groups (in this model we have 2, free and non free)
|
||||
TRAVEL_FREE_TIME = auto_enum() # Time that is excluded from traveling but not at a place
|
||||
PLACE_TIME = auto_enum() # Time checked in at a place
|
||||
|
||||
TOTAL_TIME = TRAVEL_TIME | TRAVEL_FREE_TIME | PLACE_TIME
|
||||
|
||||
|
||||
# ##############################################################################
|
||||
|
||||
|
||||
class HikeVisitLogType(DocumentedStrEnum):
|
||||
FIRST_VISIT = auto_enum()
|
||||
LAST_VISIT = auto_enum()
|
||||
|
||||
LONGEST_VISIT = auto_enum()
|
||||
SHORTEST_VISIT = auto_enum()
|
||||
|
||||
EACH_VISIT = auto_enum()
|
||||
|
||||
DISABLE_VISIT_LOGING = auto_enum()
|
||||
|
||||
|
||||
# ##############################################################################
|
||||
|
||||
|
||||
class HikeBase(
|
||||
mixin.Name,
|
||||
mixin.Contact,
|
||||
BaseSQLModel,
|
||||
):
|
||||
tracker_interval: int | None = Field(
|
||||
default=None,
|
||||
nullable=True,
|
||||
description="Is GPS button available, value will be the interval",
|
||||
)
|
||||
|
||||
is_multi_day: bool = Field(
|
||||
default=False,
|
||||
nullable=False,
|
||||
description="Show datetime in stead of time only",
|
||||
)
|
||||
|
||||
team_page: HikeTeamPage = Field(
|
||||
default=HikeTeamPage.PLACES | HikeTeamPage.ROUTE | HikeTeamPage.QUESTIONS,
|
||||
nullable=False,
|
||||
description="Show all the places of the route inside the teams page",
|
||||
sa_type=DocumentedStrFlagType(HikeTeamPage),
|
||||
)
|
||||
|
||||
time_calculation: HikeTimeCalculation = Field(
|
||||
default=HikeTimeCalculation.TRAVEL_TIME,
|
||||
nullable=False,
|
||||
description="Wath should we calculate inside the total time",
|
||||
sa_type=DocumentedStrFlagType(HikeTimeCalculation),
|
||||
)
|
||||
|
||||
visit_log_type: HikeVisitLogType = Field(
|
||||
default=HikeVisitLogType.FIRST_VISIT,
|
||||
nullable=False,
|
||||
)
|
||||
|
||||
min_time_points: int | None = Field(
|
||||
default=0,
|
||||
ge=0,
|
||||
# TODO: le: max_time_points
|
||||
description="Min points for time",
|
||||
)
|
||||
|
||||
max_time_points: int | None = Field(
|
||||
default=100,
|
||||
ge=0, # TODO: ge > min_time_points
|
||||
description="Max points for time",
|
||||
)
|
||||
|
||||
max_question_points: int | None = Field(
|
||||
default=None,
|
||||
description="None: Calculate from answers (no max), Positive: Set as max for dynamic range",
|
||||
)
|
||||
|
||||
|
||||
# Properties to receive via API on creation
|
||||
class HikeCreate(HikeBase):
|
||||
pass
|
||||
|
||||
|
||||
# Properties to receive via API on update, all are optional
|
||||
class HikeUpdate(HikeBase):
|
||||
is_multi_day: bool | None = Field(default=None) # type: ignore
|
||||
team_page: HikeTeamPage | None = Field(default=None) # type: ignore
|
||||
time_calculation: HikeTimeCalculation | None = Field(default=None) # type: ignore
|
||||
visit_log_type: HikeVisitLogType | None = Field(default=None) # type: ignore
|
||||
|
||||
|
||||
class Hike(mixin.RowId, HikeBase, table=True):
|
||||
# --- database only items --------------------------------------------------
|
||||
|
||||
# --- read only items ------------------------------------------------------
|
||||
|
||||
# --- back_populates links -------------------------------------------------
|
||||
routes: list["Route"] = Relationship(back_populates="hike", cascade_delete=True)
|
||||
teams: list["HikeTeamLink"] = Relationship(back_populates="hike", cascade_delete=True)
|
||||
|
||||
# --- CRUD actions ---------------------------------------------------------
|
||||
@classmethod
|
||||
def create(cls, *, session: Session, create_obj: HikeCreate) -> "Hike":
|
||||
data_obj = create_obj.model_dump(exclude_unset=True)
|
||||
|
||||
db_obj = cls.model_validate(data_obj)
|
||||
session.add(db_obj)
|
||||
session.commit()
|
||||
session.refresh(db_obj)
|
||||
return db_obj
|
||||
|
||||
@classmethod
|
||||
def update(
|
||||
cls, *, session: Session, db_obj: "Hike", in_obj: HikeUpdate
|
||||
) -> "Hike":
|
||||
data_obj = in_obj.model_dump(exclude_unset=True)
|
||||
db_obj.sqlmodel_update(data_obj)
|
||||
session.add(db_obj)
|
||||
session.commit()
|
||||
session.refresh(db_obj)
|
||||
return db_obj
|
||||
|
||||
|
||||
# Properties to return via API, id is always required
|
||||
class HikePublic(mixin.RowIdPublic, HikeBase):
|
||||
pass
|
||||
|
||||
|
||||
class HikesPublic(BaseSQLModel):
|
||||
data: list[HikePublic]
|
||||
count: int
|
||||
|
||||
|
||||
# endregion
|
||||
|
||||
# region # Hike / Team #########################################################
|
||||
|
||||
class HikeTeamLinkBase(
|
||||
BaseSQLModel,
|
||||
):
|
||||
hike_id: RowId = Field(
|
||||
nullable=False,
|
||||
foreign_key="hike.id",
|
||||
)
|
||||
|
||||
team_id: RowId = Field(
|
||||
nullable=False,
|
||||
foreign_key="team.id",
|
||||
)
|
||||
|
||||
route_id: RowId = Field(
|
||||
nullable=False,
|
||||
foreign_key="route.id",
|
||||
)
|
||||
|
||||
start_place_id: RowId | None = Field(
|
||||
default=None,
|
||||
nullable=True,
|
||||
foreign_key="place.id",
|
||||
)
|
||||
|
||||
|
||||
# Properties to receive via API on creation
|
||||
class HikeTeamLinkCreate(HikeTeamLinkBase):
|
||||
pass
|
||||
|
||||
|
||||
# Properties to receive via API on update, all are optional
|
||||
class HikeTeamLinkUpdate(HikeTeamLinkBase):
|
||||
hike_id: RowId | None = Field(default=None, nullable=True) # type: ignore
|
||||
team_id: RowId | None = Field(default=None, nullable=True) # type: ignore
|
||||
route_id: RowId | None = Field(default=None, nullable=True) # type: ignore
|
||||
|
||||
|
||||
class HikeTeamLink(mixin.RowId, HikeTeamLinkBase, table=True):
|
||||
# --- database only items --------------------------------------------------
|
||||
|
||||
# --- read only items ------------------------------------------------------
|
||||
|
||||
# --- back_populates links -------------------------------------------------
|
||||
team: "Team" = Relationship(back_populates="hike_links")
|
||||
hike: "Hike" = Relationship(back_populates="teams")
|
||||
route: "Route" = Relationship(back_populates="teams")
|
||||
# start_place: "Place" = Relationship(back_populates="teams")
|
||||
|
||||
# --- CRUD actions ---------------------------------------------------------
|
||||
@classmethod
|
||||
def create(cls, *, session: Session, create_obj: HikeTeamLinkCreate) -> "HikeTeamLink":
|
||||
data_obj = create_obj.model_dump(exclude_unset=True)
|
||||
|
||||
db_obj = cls.model_validate(data_obj)
|
||||
session.add(db_obj)
|
||||
session.commit()
|
||||
session.refresh(db_obj)
|
||||
return db_obj
|
||||
|
||||
@classmethod
|
||||
def update(
|
||||
cls, *, session: Session, db_obj: "HikeTeamLink", in_obj: HikeTeamLinkUpdate
|
||||
) -> "HikeTeamLink":
|
||||
data_obj = in_obj.model_dump(exclude_unset=True)
|
||||
db_obj.sqlmodel_update(data_obj)
|
||||
session.add(db_obj)
|
||||
session.commit()
|
||||
session.refresh(db_obj)
|
||||
return db_obj
|
||||
|
||||
|
||||
# Properties to return via API, id is always required
|
||||
class HikeTeamLinkPublic(mixin.RowIdPublic, HikeTeamLinkBase):
|
||||
pass
|
||||
|
||||
|
||||
class HikeTeamLinksPublic(BaseSQLModel):
|
||||
data: list[HikeTeamLinkPublic]
|
||||
count: int
|
||||
|
||||
# endregion
|
||||
@@ -1,5 +1,6 @@
|
||||
import uuid
|
||||
from datetime import datetime, date
|
||||
from decimal import Decimal
|
||||
|
||||
from pydantic import BaseModel, EmailStr
|
||||
from sqlmodel import (
|
||||
@@ -14,6 +15,10 @@ class Name(BaseModel):
|
||||
name: str | None = Field(default=None, nullable=False, unique=False, max_length=255)
|
||||
|
||||
|
||||
class NameOveride(BaseModel):
|
||||
name_overide: str | None = Field(default=None, nullable=True, unique=False, max_length=255)
|
||||
|
||||
|
||||
class FullName(BaseModel):
|
||||
full_name: str | None = Field(default=None, nullable=True, max_length=255)
|
||||
|
||||
@@ -31,7 +36,12 @@ class ShortName(BaseModel):
|
||||
|
||||
|
||||
class ShortNameUpdate(ShortName):
|
||||
short_name: str | None = Field(default=None, max_length=8)
|
||||
#TODO: Waarom is deze verplicht ???
|
||||
short_name: str | None = Field(default=None, nullable=True, max_length=8)
|
||||
|
||||
|
||||
class ShortNameOveride(ShortName):
|
||||
short_name_overide: str | None = Field(default=None, nullable=True, max_length=8)
|
||||
|
||||
|
||||
class Contact(BaseModel):
|
||||
@@ -120,3 +130,17 @@ class Birthday(BaseModel):
|
||||
class Created(BaseModel):
|
||||
created_at: datetime | None = Field(nullable=False, default_factory=lambda: datetime.now(settings.tz_info))
|
||||
created_by: RowIdType | None = Field(default=None, nullable=True, foreign_key="user.id", ondelete="SET NULL")
|
||||
|
||||
|
||||
class Location(BaseModel):
|
||||
latitude: Decimal | None = Field(default=None, nullable=True, max_digits=11, decimal_places=8, description="decimal degrees")
|
||||
longitude: Decimal | None = Field(default=None, nullable=True, max_digits=11, decimal_places=8, description="decimal degrees")
|
||||
|
||||
|
||||
class LocationWithRadius(Location):
|
||||
radius: int | None = Field(default=None, nullable=True, description="Radius in meters")
|
||||
|
||||
|
||||
class QuestionInfo:
|
||||
question_file: str | None = Field(default=None, nullable=True, max_length=255)
|
||||
answer_file: str | None = Field(default=None, nullable=True, max_length=255)
|
||||
|
||||
179
backend/app/models/place.py
Normal file
179
backend/app/models/place.py
Normal file
@@ -0,0 +1,179 @@
|
||||
from datetime import timedelta
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from sqlalchemy import Interval
|
||||
from sqlmodel import (
|
||||
Session,
|
||||
Relationship,
|
||||
Field,
|
||||
)
|
||||
|
||||
from . import mixin
|
||||
from .base import (
|
||||
BaseSQLModel,
|
||||
DocumentedStrEnum,
|
||||
auto_enum,
|
||||
)
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from .route import Route, RoutePart
|
||||
|
||||
# region # Place ###############################################################
|
||||
|
||||
|
||||
class PlaceType(DocumentedStrEnum):
|
||||
START = auto_enum() # Only check in (make GPS available) (TODO: determine based on route)
|
||||
PLACE = auto_enum() # Check in / Check out (subtract time between in&out)
|
||||
TAG = auto_enum() # Instant in&out at the same time
|
||||
FINISH = auto_enum() # Only check out (and disable GPS) (TODO: determine based on route)
|
||||
|
||||
|
||||
# ##############################################################################
|
||||
|
||||
|
||||
class VisitedCountType(DocumentedStrEnum):
|
||||
NONE = auto_enum()
|
||||
ONE_VISIT = auto_enum()
|
||||
EACH_VISIT = auto_enum()
|
||||
|
||||
|
||||
# ##############################################################################
|
||||
|
||||
|
||||
class PlaceBase(
|
||||
mixin.Name,
|
||||
mixin.ShortName,
|
||||
mixin.Contact,
|
||||
mixin.Description,
|
||||
mixin.LocationWithRadius,
|
||||
mixin.QuestionInfo,
|
||||
BaseSQLModel,
|
||||
):
|
||||
place_type: PlaceType = Field(
|
||||
default=PlaceType.PLACE,
|
||||
nullable=False,
|
||||
)
|
||||
|
||||
max_points: int | None = Field(
|
||||
default=None,
|
||||
ge=0,
|
||||
description="Max points for this place, None will disable scoring at this place",
|
||||
)
|
||||
|
||||
visited_points: int | None = Field(
|
||||
default=None,
|
||||
ge=0,
|
||||
description="Visited points for this place, None will disable this function",
|
||||
)
|
||||
visited_count_type: VisitedCountType = Field(
|
||||
default=VisitedCountType.NONE,
|
||||
)
|
||||
|
||||
skipped_penalty_points: int | None = Field(
|
||||
default=None,
|
||||
ge=0,
|
||||
description="Skipped penalty for this place, amount will be subtracted, None will disable this function",
|
||||
)
|
||||
|
||||
place_time: timedelta | None = Field(
|
||||
default=None,
|
||||
nullable=True,
|
||||
description="Time for question at this place during testing.",
|
||||
sa_type=Interval,
|
||||
)
|
||||
|
||||
|
||||
|
||||
# Properties to receive via API on creation
|
||||
class PlaceCreate(PlaceBase):
|
||||
pass
|
||||
|
||||
|
||||
# Properties to receive via API on update, all are optional
|
||||
class PlaceUpdate(
|
||||
PlaceBase,
|
||||
mixin.ShortNameUpdate,
|
||||
):
|
||||
place_type: PlaceType | None = Field(default=None) # type: ignore
|
||||
visited_count_type: VisitedCountType | None = Field(default=None) # type: ignore
|
||||
|
||||
|
||||
class Place(mixin.RowId, PlaceBase, table=True):
|
||||
# --- database only items --------------------------------------------------
|
||||
|
||||
# --- read only items ------------------------------------------------------
|
||||
|
||||
# --- back_populates links -------------------------------------------------
|
||||
routes: list["Route"] = Relationship(
|
||||
sa_relationship_kwargs={
|
||||
"primaryjoin": "or_("
|
||||
"Place.id==Route.start_id,"
|
||||
"Place.id==Route.finish_id,"
|
||||
"Place.id==RoutePart.place_id,"
|
||||
"Place.id==RoutePart.to_place_id,"
|
||||
")",
|
||||
"foreign_keys": "["
|
||||
"Route.start_id,"
|
||||
"Route.finish_id,"
|
||||
"RoutePart.place_id,"
|
||||
"RoutePart.to_place_id"
|
||||
"]",
|
||||
"viewonly": True,
|
||||
},
|
||||
)
|
||||
|
||||
next_places: list["Place"] = Relationship(
|
||||
back_populates="previous_place",
|
||||
sa_relationship_kwargs={
|
||||
"secondary": "route_part",
|
||||
"primaryjoin": "Place.id == RoutePart.place_id",
|
||||
"secondaryjoin": "Place.id == RoutePart.to_place_id",
|
||||
"viewonly": True,
|
||||
},
|
||||
)
|
||||
previous_place: list["Place"] = Relationship(
|
||||
back_populates="next_places",
|
||||
sa_relationship_kwargs={
|
||||
"secondary": "route_part",
|
||||
"primaryjoin": "Place.id == RoutePart.to_place_id",
|
||||
"secondaryjoin": "Place.id == RoutePart.place_id",
|
||||
"viewonly": True,
|
||||
},
|
||||
)
|
||||
|
||||
route_parts: list["RoutePart"] = Relationship(back_populates="place", sa_relationship_kwargs={"foreign_keys": "[RoutePart.place_id]"})
|
||||
|
||||
# --- CRUD actions ---------------------------------------------------------
|
||||
@classmethod
|
||||
def create(cls, *, session: Session, create_obj: PlaceCreate) -> "Place":
|
||||
data_obj = create_obj.model_dump(exclude_unset=True)
|
||||
|
||||
db_obj = cls.model_validate(data_obj)
|
||||
session.add(db_obj)
|
||||
session.commit()
|
||||
session.refresh(db_obj)
|
||||
return db_obj
|
||||
|
||||
@classmethod
|
||||
def update(
|
||||
cls, *, session: Session, db_obj: "Place", in_obj: PlaceUpdate
|
||||
) -> "Place":
|
||||
data_obj = in_obj.model_dump(exclude_unset=True)
|
||||
db_obj.sqlmodel_update(data_obj)
|
||||
session.add(db_obj)
|
||||
session.commit()
|
||||
session.refresh(db_obj)
|
||||
return db_obj
|
||||
|
||||
|
||||
# Properties to return via API, id is always required
|
||||
class PlacePublic(mixin.RowIdPublic, PlaceBase):
|
||||
pass
|
||||
|
||||
|
||||
class PlacesPublic(BaseSQLModel):
|
||||
data: list[PlacePublic]
|
||||
count: int
|
||||
|
||||
|
||||
# endregion
|
||||
283
backend/app/models/route.py
Normal file
283
backend/app/models/route.py
Normal file
@@ -0,0 +1,283 @@
|
||||
from datetime import timedelta
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from sqlalchemy import Interval
|
||||
from sqlmodel import (
|
||||
Session,
|
||||
Relationship,
|
||||
Field,
|
||||
)
|
||||
|
||||
from . import mixin
|
||||
from .base import (
|
||||
BaseSQLModel,
|
||||
DocumentedStrEnum,
|
||||
DocumentedStrFlagType,
|
||||
auto_enum,
|
||||
RowId,
|
||||
)
|
||||
|
||||
from .hike import (
|
||||
Hike,
|
||||
HikeTimeCalculation,
|
||||
HikeTeamLink,
|
||||
)
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from .place import Place
|
||||
|
||||
# region # Route ###############################################################
|
||||
|
||||
|
||||
class RouteType(DocumentedStrEnum):
|
||||
START_FINISH = auto_enum() # Start at the start and end at the finish
|
||||
CIRCULAR = auto_enum() # Start some ware, finish at the last new place
|
||||
CIRCULAR_BACK_TO_START = auto_enum() # Start and finish on the same random place (CIRCULAR + next to start)
|
||||
|
||||
|
||||
# ##############################################################################
|
||||
|
||||
|
||||
class RouteBase(
|
||||
mixin.Name,
|
||||
mixin.Contact,
|
||||
BaseSQLModel,
|
||||
):
|
||||
route_type: RouteType = Field(
|
||||
default=RouteType.START_FINISH,
|
||||
nullable=False,
|
||||
)
|
||||
|
||||
time_calculation_override: HikeTimeCalculation | None = Field(
|
||||
default=None,
|
||||
nullable=True,
|
||||
description="Should this route be calculated different then the main hike",
|
||||
sa_type=DocumentedStrFlagType(HikeTimeCalculation),
|
||||
)
|
||||
|
||||
min_time: timedelta | None = Field(
|
||||
default=None,
|
||||
description="Min time correction, None = min of all, positive is used as 0:00, negative is subtracted from min of all time",
|
||||
sa_type=Interval,
|
||||
)
|
||||
|
||||
max_time: timedelta | None = Field(
|
||||
default=None,
|
||||
description="Max time correction, None = max of all, positive is used as max, negative is subtracted from max of all time",
|
||||
sa_type=Interval,
|
||||
)
|
||||
|
||||
hike_id: RowId = Field(
|
||||
nullable=False,
|
||||
foreign_key="hike.id",
|
||||
)
|
||||
|
||||
start_id: RowId | None = Field(
|
||||
default=None,
|
||||
nullable=True,
|
||||
foreign_key="place.id",
|
||||
ondelete="SET NULL",
|
||||
description="Start place.id of the route, None for random in CIRCULAR or CIRCULAR_BACK_TO_START",
|
||||
)
|
||||
|
||||
finish_id: RowId | None = Field(
|
||||
default=None,
|
||||
nullable=True,
|
||||
foreign_key="place.id",
|
||||
ondelete="SET NULL",
|
||||
description="Finish place.id of the route, None for random or decremented",
|
||||
)
|
||||
|
||||
|
||||
# Properties to receive via API on creation
|
||||
class RouteCreate(RouteBase):
|
||||
pass
|
||||
|
||||
|
||||
# Properties to receive via API on update, all are optional
|
||||
class RouteUpdate(RouteBase):
|
||||
route_type: RouteType | None = Field(default=None) # type: ignore
|
||||
hike_id: RowId | None = Field(default=None) # type: ignore
|
||||
|
||||
|
||||
class Route(mixin.RowId, RouteBase, table=True):
|
||||
# --- database only items --------------------------------------------------
|
||||
|
||||
# --- read only items ------------------------------------------------------
|
||||
|
||||
# --- back_populates links -------------------------------------------------
|
||||
hike: "Hike" = Relationship(back_populates="routes")
|
||||
teams: list["HikeTeamLink"] = Relationship(back_populates="route")
|
||||
|
||||
start: "Place" = Relationship(sa_relationship_kwargs={"foreign_keys": "[Route.start_id]"})
|
||||
finish: "Place" = Relationship(sa_relationship_kwargs={"foreign_keys": "[Route.finish_id]"})
|
||||
|
||||
places: list["Place"] = Relationship(
|
||||
sa_relationship_kwargs={
|
||||
"primaryjoin": "or_("
|
||||
"Place.id==Route.start_id,"
|
||||
"Place.id==Route.finish_id,"
|
||||
"Place.id==RoutePart.place_id,"
|
||||
"Place.id==RoutePart.to_place_id,"
|
||||
")",
|
||||
"foreign_keys": "["
|
||||
"Route.start_id,"
|
||||
"Route.finish_id,"
|
||||
"RoutePart.place_id,"
|
||||
"RoutePart.to_place_id"
|
||||
"]",
|
||||
"viewonly": True,
|
||||
},
|
||||
)
|
||||
|
||||
route_parts: list["RoutePart"] = Relationship(back_populates="route")
|
||||
|
||||
# --- CRUD actions ---------------------------------------------------------
|
||||
@classmethod
|
||||
def create(cls, *, session: Session, create_obj: RouteCreate) -> "Route":
|
||||
data_obj = create_obj.model_dump(exclude_unset=True)
|
||||
|
||||
db_obj = cls.model_validate(data_obj)
|
||||
session.add(db_obj)
|
||||
session.commit()
|
||||
session.refresh(db_obj)
|
||||
return db_obj
|
||||
|
||||
@classmethod
|
||||
def update(
|
||||
cls, *, session: Session, db_obj: "Route", in_obj: RouteUpdate
|
||||
) -> "Route":
|
||||
data_obj = in_obj.model_dump(exclude_unset=True)
|
||||
db_obj.sqlmodel_update(data_obj)
|
||||
session.add(db_obj)
|
||||
session.commit()
|
||||
session.refresh(db_obj)
|
||||
return db_obj
|
||||
|
||||
|
||||
# Properties to return via API, id is always required
|
||||
class RoutePublic(mixin.RowIdPublic, RouteBase):
|
||||
pass
|
||||
|
||||
|
||||
class RoutesPublic(BaseSQLModel):
|
||||
data: list[RoutePublic]
|
||||
count: int
|
||||
|
||||
|
||||
# endregion
|
||||
|
||||
# region # Route / Place link ##################################################
|
||||
|
||||
|
||||
class RoutePartBase(
|
||||
mixin.NameOveride,
|
||||
mixin.ShortNameOveride,
|
||||
BaseSQLModel,
|
||||
):
|
||||
route_id: RowId = Field(
|
||||
nullable=False,
|
||||
foreign_key="route.id",
|
||||
)
|
||||
|
||||
place_id: RowId = Field(
|
||||
nullable=False,
|
||||
foreign_key="place.id",
|
||||
)
|
||||
|
||||
to_place_id: RowId = Field(
|
||||
nullable=False,
|
||||
foreign_key="place.id",
|
||||
)
|
||||
|
||||
travel_time: timedelta | None = Field(
|
||||
default=None,
|
||||
nullable=True,
|
||||
description="Time to travel during test walk, only used for information",
|
||||
sa_type=Interval,
|
||||
)
|
||||
travel_distance: int | None = Field(
|
||||
default=None,
|
||||
nullable=True,
|
||||
ge=0,
|
||||
description="Distance in meters to travel, only used for information",
|
||||
)
|
||||
|
||||
# TODO: Convert to time groups
|
||||
free_walk_time: bool | None = Field(
|
||||
default=False,
|
||||
description="If true, travel time is not add to calculated for travel time",
|
||||
)
|
||||
|
||||
|
||||
# Properties to receive via API on creation
|
||||
class RoutePartBaseCreate(RoutePartBase):
|
||||
pass
|
||||
|
||||
|
||||
# Properties to receive via API on update, all are optional
|
||||
class RoutePartBaseUpdate(RoutePartBase):
|
||||
route_id: RowId | None = Field(default=None) # type: ignore
|
||||
place_id: RowId | None = Field(default=None) # type: ignore
|
||||
to_place_id: RowId | None = Field(default=None) # type: ignore
|
||||
free_walk_time: bool | None = Field(default=None) # type: ignore
|
||||
|
||||
|
||||
class RoutePart(mixin.RowId, RoutePartBase, table=True):
|
||||
# --- database only items --------------------------------------------------
|
||||
|
||||
# --- read only items ------------------------------------------------------
|
||||
|
||||
# --- back_populates links -------------------------------------------------
|
||||
route: "Route" = Relationship(back_populates="route_parts")
|
||||
place: "Place" = Relationship(back_populates="route_parts", sa_relationship_kwargs={"foreign_keys": "[RoutePart.place_id]"})
|
||||
place_to: "Place" = Relationship(sa_relationship_kwargs={"foreign_keys": "[RoutePart.to_place_id]"})
|
||||
|
||||
places: list["Place"] = Relationship(
|
||||
sa_relationship_kwargs={
|
||||
"primaryjoin": "or_("
|
||||
"Place.id==RoutePart.place_id,"
|
||||
"Place.id==RoutePart.to_place_id,"
|
||||
")",
|
||||
"foreign_keys": "["
|
||||
"RoutePart.place_id,"
|
||||
"RoutePart.to_place_id"
|
||||
"]",
|
||||
"viewonly": True,
|
||||
},
|
||||
)
|
||||
|
||||
# --- CRUD actions ---------------------------------------------------------
|
||||
@classmethod
|
||||
def create(cls, *, session: Session, create_obj: RouteCreate) -> "RoutePart":
|
||||
data_obj = create_obj.model_dump(exclude_unset=True)
|
||||
|
||||
db_obj = cls.model_validate(data_obj)
|
||||
session.add(db_obj)
|
||||
session.commit()
|
||||
session.refresh(db_obj)
|
||||
return db_obj
|
||||
|
||||
@classmethod
|
||||
def update(
|
||||
cls, *, session: Session, db_obj: "RoutePart", in_obj: RouteUpdate
|
||||
) -> "RoutePart":
|
||||
data_obj = in_obj.model_dump(exclude_unset=True)
|
||||
db_obj.sqlmodel_update(data_obj)
|
||||
session.add(db_obj)
|
||||
session.commit()
|
||||
session.refresh(db_obj)
|
||||
return db_obj
|
||||
|
||||
|
||||
# Properties to return via API, id is always required
|
||||
class RoutePartPublic(mixin.RowIdPublic, RoutePartBase):
|
||||
pass
|
||||
|
||||
|
||||
class RoutePartsPublic(BaseSQLModel):
|
||||
data: list[RoutePartPublic]
|
||||
count: int
|
||||
|
||||
|
||||
# endregion
|
||||
@@ -16,6 +16,7 @@ if TYPE_CHECKING:
|
||||
from .event import Event
|
||||
from .division import DivisionTeamLink
|
||||
from .member import MemberTeamLink
|
||||
from .hike import HikeTeamLink
|
||||
|
||||
# region # Team ################################################################
|
||||
|
||||
@@ -51,6 +52,7 @@ class Team(mixin.RowId, TeamBase, table=True):
|
||||
event: "Event" = Relationship(back_populates="teams")
|
||||
division_link: "DivisionTeamLink" = Relationship(back_populates="team", cascade_delete=True)
|
||||
member_links: list["MemberTeamLink"] = Relationship(back_populates="team", cascade_delete=True)
|
||||
hike_links: list["HikeTeamLink"] = Relationship(back_populates="team", cascade_delete=True)
|
||||
|
||||
# --- CRUD actions ---------------------------------------------------------
|
||||
@classmethod
|
||||
|
||||
@@ -32,6 +32,10 @@ class PermissionModule(DocumentedStrEnum):
|
||||
DIVISION = auto_enum()
|
||||
MEMBER = auto_enum()
|
||||
|
||||
HIKE = auto_enum()
|
||||
ROUTE = auto_enum()
|
||||
PLACE = auto_enum()
|
||||
|
||||
|
||||
class PermissionPart(DocumentedStrEnum):
|
||||
ADMIN = auto_enum()
|
||||
@@ -49,6 +53,8 @@ class PermissionRight(DocumentedIntFlag):
|
||||
MANAGE_DIVISIONS = auto_enum()
|
||||
MANAGE_MEMBERS = auto_enum()
|
||||
|
||||
MANAGE_HIKES = auto_enum()
|
||||
|
||||
ADMIN = ( CREATE
|
||||
| READ
|
||||
| UPDATE
|
||||
@@ -57,6 +63,7 @@ class PermissionRight(DocumentedIntFlag):
|
||||
| MANAGE_TEAMS
|
||||
| MANAGE_DIVISIONS
|
||||
| MANAGE_MEMBERS
|
||||
| MANAGE_HIKES
|
||||
)
|
||||
|
||||
|
||||
|
||||
223
backend/app/tests/api/routes/test_hike.py
Normal file
223
backend/app/tests/api/routes/test_hike.py
Normal file
@@ -0,0 +1,223 @@
|
||||
import uuid
|
||||
|
||||
from fastapi import status
|
||||
from fastapi.testclient import TestClient
|
||||
from sqlmodel import Session
|
||||
|
||||
from app.core.config import settings
|
||||
from app.tests.utils.hike import create_random_hike
|
||||
from app.tests.utils.route import create_random_route
|
||||
|
||||
|
||||
def test_create_hike(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
|
||||
data = {
|
||||
"name": "RSW Maasdelta 2026",
|
||||
"contact": "Sebas",
|
||||
}
|
||||
response = client.post(
|
||||
f"{settings.API_V1_STR}/hikes/",
|
||||
headers=superuser_token_headers,
|
||||
json=data,
|
||||
)
|
||||
assert response.status_code == status.HTTP_200_OK
|
||||
content = response.json()
|
||||
assert content["name"] == data["name"]
|
||||
assert content["contact"] == data["contact"]
|
||||
assert "id" in content
|
||||
|
||||
|
||||
def test_create_hike_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
|
||||
data = {
|
||||
"name": "No permissions",
|
||||
"contact": "No permissions",
|
||||
}
|
||||
response = client.post(
|
||||
f"{settings.API_V1_STR}/hikes/",
|
||||
headers=normal_user_token_headers,
|
||||
json=data,
|
||||
)
|
||||
assert response.status_code == status.HTTP_403_FORBIDDEN
|
||||
assert response.json()["detail"] == "Not enough permissions"
|
||||
|
||||
|
||||
def test_read_hike(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
|
||||
hike = create_random_hike(db)
|
||||
response = client.get(
|
||||
f"{settings.API_V1_STR}/hikes/{hike.id}",
|
||||
headers=superuser_token_headers,
|
||||
)
|
||||
assert response.status_code == status.HTTP_200_OK
|
||||
content = response.json()
|
||||
assert content["id"] == str(hike.id)
|
||||
assert content["name"] == hike.name
|
||||
assert content["contact"] == hike.contact
|
||||
|
||||
|
||||
def test_read_hike_not_found(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
|
||||
response = client.get(
|
||||
f"{settings.API_V1_STR}/hikes/{uuid.uuid4()}",
|
||||
headers=superuser_token_headers,
|
||||
)
|
||||
assert response.status_code == status.HTTP_404_NOT_FOUND
|
||||
assert response.json()["detail"] == "Hike not found"
|
||||
|
||||
|
||||
def test_read_hike_no_permission(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
|
||||
hike = create_random_hike(db)
|
||||
response = client.get(
|
||||
f"{settings.API_V1_STR}/hikes/{hike.id}",
|
||||
headers=normal_user_token_headers,
|
||||
)
|
||||
assert response.status_code == status.HTTP_403_FORBIDDEN
|
||||
assert response.json()["detail"] == "Not enough permissions"
|
||||
|
||||
|
||||
def test_read_hikes(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
|
||||
create_random_hike(db)
|
||||
create_random_hike(db)
|
||||
response = client.get(
|
||||
f"{settings.API_V1_STR}/hikes/",
|
||||
headers=superuser_token_headers,
|
||||
)
|
||||
assert response.status_code == status.HTTP_200_OK
|
||||
content = response.json()
|
||||
assert "count" in content
|
||||
assert content["count"] >= 2
|
||||
assert "data" in content
|
||||
assert isinstance(content["data"], list)
|
||||
assert len(content["data"]) <= content["count"]
|
||||
|
||||
|
||||
def test_read_hikes_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
|
||||
create_random_hike(db)
|
||||
create_random_hike(db)
|
||||
response = client.get(
|
||||
f"{settings.API_V1_STR}/hikes/",
|
||||
headers=normal_user_token_headers,
|
||||
)
|
||||
assert response.status_code == status.HTTP_200_OK
|
||||
content = response.json()
|
||||
assert "count" in content
|
||||
assert content["count"] == 0
|
||||
assert "data" in content
|
||||
assert isinstance(content["data"], list)
|
||||
assert len(content["data"]) == 0
|
||||
|
||||
|
||||
def test_update_hike(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
|
||||
hike = create_random_hike(db)
|
||||
data = {
|
||||
"name": "Updated name",
|
||||
"contact": "Updated contact",
|
||||
}
|
||||
response = client.put(
|
||||
f"{settings.API_V1_STR}/hikes/{hike.id}",
|
||||
headers=superuser_token_headers,
|
||||
json=data,
|
||||
)
|
||||
assert response.status_code == status.HTTP_200_OK
|
||||
content = response.json()
|
||||
assert content["id"] == str(hike.id)
|
||||
assert content["name"] == data["name"]
|
||||
assert content["contact"] == data["contact"]
|
||||
|
||||
|
||||
def test_update_hike_not_found(client: TestClient, superuser_token_headers: dict[str, str]) -> None:
|
||||
data = {
|
||||
"name": "Not found",
|
||||
"contact": "Not found",
|
||||
}
|
||||
response = client.put(
|
||||
f"{settings.API_V1_STR}/hikes/{uuid.uuid4()}",
|
||||
headers=superuser_token_headers,
|
||||
json=data,
|
||||
)
|
||||
assert response.status_code == status.HTTP_404_NOT_FOUND
|
||||
assert response.json()["detail"] == "Hike not found"
|
||||
|
||||
|
||||
def test_update_hike_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
|
||||
hike = create_random_hike(db)
|
||||
data = {
|
||||
"name": "No permissions",
|
||||
"contact": "No permissions",
|
||||
}
|
||||
response = client.put(
|
||||
f"{settings.API_V1_STR}/hikes/{hike.id}",
|
||||
headers=normal_user_token_headers,
|
||||
json=data,
|
||||
)
|
||||
assert response.status_code == status.HTTP_403_FORBIDDEN
|
||||
assert response.json()["detail"] == "Not enough permissions"
|
||||
|
||||
|
||||
def test_delete_hike(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
|
||||
hike = create_random_hike(db)
|
||||
response = client.delete(
|
||||
f"{settings.API_V1_STR}/hikes/{hike.id}",
|
||||
headers=superuser_token_headers,
|
||||
)
|
||||
assert response.status_code == status.HTTP_200_OK
|
||||
assert response.json()["message"] == "Hike deleted successfully"
|
||||
|
||||
|
||||
def test_delete_hike_not_found(client: TestClient, superuser_token_headers: dict[str, str]) -> None:
|
||||
response = client.delete(
|
||||
f"{settings.API_V1_STR}/hikes/{uuid.uuid4()}",
|
||||
headers=superuser_token_headers,
|
||||
)
|
||||
assert response.status_code == status.HTTP_404_NOT_FOUND
|
||||
assert response.json()["detail"] == "Hike not found"
|
||||
|
||||
|
||||
def test_delete_hike_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
|
||||
hike = create_random_hike(db)
|
||||
response = client.delete(
|
||||
f"{settings.API_V1_STR}/hikes/{hike.id}",
|
||||
headers=normal_user_token_headers,
|
||||
)
|
||||
assert response.status_code == status.HTTP_403_FORBIDDEN
|
||||
assert response.json()["detail"] == "Not enough permissions"
|
||||
|
||||
|
||||
def test_read_hike_routes(
|
||||
client: TestClient, superuser_token_headers: dict[str, str], db: Session
|
||||
) -> None:
|
||||
hike = create_random_hike(db)
|
||||
create_random_route(db, hike=hike)
|
||||
create_random_route(db, hike=hike)
|
||||
response = client.get(
|
||||
f"{settings.API_V1_STR}/hikes/{hike.id}/routes",
|
||||
headers=superuser_token_headers,
|
||||
)
|
||||
assert response.status_code == status.HTTP_200_OK
|
||||
content = response.json()
|
||||
assert "count" in content
|
||||
assert content["count"] == 2
|
||||
assert "data" in content
|
||||
assert isinstance(content["data"], list)
|
||||
assert len(content["data"]) <= content["count"]
|
||||
|
||||
|
||||
def test_read_hike_routes_not_found(
|
||||
client: TestClient, superuser_token_headers: dict[str, str], db: Session
|
||||
) -> None:
|
||||
response = client.get(
|
||||
f"{settings.API_V1_STR}/hikes/{uuid.uuid4()}/routes",
|
||||
headers=superuser_token_headers,
|
||||
)
|
||||
assert response.status_code == status.HTTP_404_NOT_FOUND
|
||||
assert response.json()["detail"] == "Hike not found"
|
||||
|
||||
|
||||
def test_read_hike_routes_no_permissions(
|
||||
client: TestClient, normal_user_token_headers: dict[str, str], db: Session
|
||||
) -> None:
|
||||
hike = create_random_hike(db)
|
||||
create_random_route(db, hike=hike)
|
||||
response = client.get(
|
||||
f"{settings.API_V1_STR}/hikes/{hike.id}/routes",
|
||||
headers=normal_user_token_headers,
|
||||
)
|
||||
assert response.status_code == status.HTTP_403_FORBIDDEN
|
||||
assert response.json()["detail"] == "Not enough permissions"
|
||||
217
backend/app/tests/api/routes/test_place.py
Normal file
217
backend/app/tests/api/routes/test_place.py
Normal file
@@ -0,0 +1,217 @@
|
||||
import uuid
|
||||
|
||||
from fastapi import status
|
||||
from fastapi.testclient import TestClient
|
||||
from sqlmodel import Session
|
||||
|
||||
from app.core.config import settings
|
||||
from app.tests.utils.place import create_random_place
|
||||
|
||||
from app.models.place import PlaceType, VisitedCountType
|
||||
|
||||
|
||||
def test_create_place(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
|
||||
data = {
|
||||
"place_type": PlaceType.PLACE,
|
||||
|
||||
"name": "Post 1",
|
||||
"short_name": "1",
|
||||
|
||||
"contact": "Sebas",
|
||||
"description": "Post met renspel",
|
||||
|
||||
"question_file": None,
|
||||
"answer_file": None,
|
||||
"place_time": "PT10M",
|
||||
|
||||
"latitude": None,
|
||||
"longitude": None,
|
||||
"radius": None,
|
||||
|
||||
"max_points": None,
|
||||
"visited_points": 10,
|
||||
"visited_count_type": VisitedCountType.ONE_VISIT,
|
||||
"skipped_penalty_points": 50,
|
||||
}
|
||||
response = client.post(
|
||||
f"{settings.API_V1_STR}/places/",
|
||||
headers=superuser_token_headers,
|
||||
json=data,
|
||||
)
|
||||
assert response.status_code == status.HTTP_200_OK
|
||||
content = response.json()
|
||||
assert content["name"] == data["name"]
|
||||
assert content["place_type"] == data["place_type"]
|
||||
assert content["name"] == data["name"]
|
||||
assert content["short_name"] == data["short_name"]
|
||||
assert content["contact"] == data["contact"]
|
||||
assert content["description"] == data["description"]
|
||||
assert content["question_file"] == data["question_file"]
|
||||
assert content["answer_file"] == data["answer_file"]
|
||||
assert content["latitude"] == data["latitude"]
|
||||
assert content["longitude"] == data["longitude"]
|
||||
assert content["radius"] == data["radius"]
|
||||
assert content["max_points"] == data["max_points"]
|
||||
assert content["visited_points"] == data["visited_points"]
|
||||
assert content["visited_count_type"] == data["visited_count_type"]
|
||||
assert content["skipped_penalty_points"] == data["skipped_penalty_points"]
|
||||
assert content["place_time"] == data["place_time"]
|
||||
assert "id" in content
|
||||
|
||||
|
||||
def test_create_place_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
|
||||
data = {
|
||||
"place_type": PlaceType.PLACE,
|
||||
"name": "No permissions",
|
||||
"short_name": "No perm",
|
||||
"visited_count_type": VisitedCountType.ONE_VISIT,
|
||||
}
|
||||
response = client.post(
|
||||
f"{settings.API_V1_STR}/places/",
|
||||
headers=normal_user_token_headers,
|
||||
json=data,
|
||||
)
|
||||
assert response.status_code == status.HTTP_403_FORBIDDEN
|
||||
assert response.json()["detail"] == "Not enough permissions"
|
||||
|
||||
|
||||
def test_read_place(
|
||||
client: TestClient, superuser_token_headers: dict[str, str], db: Session
|
||||
) -> None:
|
||||
place = create_random_place(db)
|
||||
response = client.get(
|
||||
f"{settings.API_V1_STR}/places/{place.id}",
|
||||
headers=superuser_token_headers,
|
||||
)
|
||||
assert response.status_code == status.HTTP_200_OK
|
||||
content = response.json()
|
||||
assert content["id"] == str(place.id)
|
||||
assert content["name"] == place.name
|
||||
assert content["contact"] == place.contact
|
||||
|
||||
|
||||
def test_read_place_not_found(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
|
||||
response = client.get(
|
||||
f"{settings.API_V1_STR}/places/{uuid.uuid4()}",
|
||||
headers=superuser_token_headers,
|
||||
)
|
||||
assert response.status_code == status.HTTP_404_NOT_FOUND
|
||||
assert response.json()["detail"] == "Place not found"
|
||||
|
||||
|
||||
def test_read_place_no_permission(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
|
||||
place = create_random_place(db)
|
||||
response = client.get(
|
||||
f"{settings.API_V1_STR}/places/{place.id}",
|
||||
headers=normal_user_token_headers,
|
||||
)
|
||||
assert response.status_code == status.HTTP_403_FORBIDDEN
|
||||
assert response.json()["detail"] == "Not enough permissions"
|
||||
|
||||
|
||||
def test_read_places(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
|
||||
create_random_place(db)
|
||||
create_random_place(db)
|
||||
response = client.get(
|
||||
f"{settings.API_V1_STR}/places/",
|
||||
headers=superuser_token_headers,
|
||||
)
|
||||
assert response.status_code == status.HTTP_200_OK
|
||||
content = response.json()
|
||||
assert "count" in content
|
||||
assert content["count"] >= 2
|
||||
assert "data" in content
|
||||
assert isinstance(content["data"], list)
|
||||
assert len(content["data"]) <= content["count"]
|
||||
|
||||
|
||||
def test_read_places_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
|
||||
create_random_place(db)
|
||||
create_random_place(db)
|
||||
response = client.get(
|
||||
f"{settings.API_V1_STR}/places/",
|
||||
headers=normal_user_token_headers,
|
||||
)
|
||||
assert response.status_code == status.HTTP_200_OK
|
||||
content = response.json()
|
||||
assert "count" in content
|
||||
assert content["count"] == 0
|
||||
assert "data" in content
|
||||
assert isinstance(content["data"], list)
|
||||
assert len(content["data"]) == 0
|
||||
|
||||
|
||||
def test_update_place(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
|
||||
place = create_random_place(db)
|
||||
data = {
|
||||
"name": "Updated name",
|
||||
"short_name": "4", # TODO: Fix Shortname
|
||||
}
|
||||
response = client.put(
|
||||
f"{settings.API_V1_STR}/places/{place.id}",
|
||||
headers=superuser_token_headers,
|
||||
json=data,
|
||||
)
|
||||
assert response.status_code == status.HTTP_200_OK
|
||||
content = response.json()
|
||||
assert content["id"] == str(place.id)
|
||||
assert content["name"] == data["name"]
|
||||
assert content["short_name"] == data["short_name"]
|
||||
|
||||
|
||||
def test_update_place_not_found(client: TestClient, superuser_token_headers: dict[str, str]) -> None:
|
||||
data = {
|
||||
"name": "Not found",
|
||||
"short_name": "5", # TODO: Fix Shortname
|
||||
}
|
||||
response = client.put(
|
||||
f"{settings.API_V1_STR}/places/{uuid.uuid4()}",
|
||||
headers=superuser_token_headers,
|
||||
json=data,
|
||||
)
|
||||
assert response.status_code == status.HTTP_404_NOT_FOUND
|
||||
assert response.json()["detail"] == "Place not found"
|
||||
|
||||
|
||||
def test_update_place_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
|
||||
place = create_random_place(db)
|
||||
data = {
|
||||
"name": "No permissions",
|
||||
"short_name": "6", # TODO: Fix Shortname
|
||||
}
|
||||
response = client.put(
|
||||
f"{settings.API_V1_STR}/places/{place.id}",
|
||||
headers=normal_user_token_headers,
|
||||
json=data,
|
||||
)
|
||||
assert response.status_code == status.HTTP_403_FORBIDDEN
|
||||
assert response.json()["detail"] == "Not enough permissions"
|
||||
|
||||
|
||||
def test_delete_place(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
|
||||
place = create_random_place(db)
|
||||
response = client.delete(
|
||||
f"{settings.API_V1_STR}/places/{place.id}",
|
||||
headers=superuser_token_headers,
|
||||
)
|
||||
assert response.status_code == status.HTTP_200_OK
|
||||
assert response.json()["message"] == "Place deleted successfully"
|
||||
|
||||
|
||||
def test_delete_place_not_found(client: TestClient, superuser_token_headers: dict[str, str]) -> None:
|
||||
response = client.delete(
|
||||
f"{settings.API_V1_STR}/places/{uuid.uuid4()}",
|
||||
headers=superuser_token_headers,
|
||||
)
|
||||
assert response.status_code == status.HTTP_404_NOT_FOUND
|
||||
assert response.json()["detail"] == "Place not found"
|
||||
|
||||
|
||||
def test_delete_place_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
|
||||
place = create_random_place(db)
|
||||
response = client.delete(
|
||||
f"{settings.API_V1_STR}/places/{place.id}",
|
||||
headers=normal_user_token_headers,
|
||||
)
|
||||
assert response.status_code == status.HTTP_403_FORBIDDEN
|
||||
assert response.json()["detail"] == "Not enough permissions"
|
||||
187
backend/app/tests/api/routes/test_route.py
Normal file
187
backend/app/tests/api/routes/test_route.py
Normal file
@@ -0,0 +1,187 @@
|
||||
import uuid
|
||||
|
||||
from fastapi import status
|
||||
from fastapi.testclient import TestClient
|
||||
from sqlmodel import Session
|
||||
|
||||
from app.core.config import settings
|
||||
from app.tests.utils.hike import create_random_hike
|
||||
from app.tests.utils.route import create_random_route
|
||||
|
||||
|
||||
def test_create_route(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
|
||||
hike = create_random_hike(db)
|
||||
|
||||
data = {
|
||||
"name": "Roete A",
|
||||
"contact": "Sebas",
|
||||
"hike_id": str(hike.id),
|
||||
}
|
||||
response = client.post(
|
||||
f"{settings.API_V1_STR}/routes/",
|
||||
headers=superuser_token_headers,
|
||||
json=data,
|
||||
)
|
||||
assert response.status_code == status.HTTP_200_OK
|
||||
content = response.json()
|
||||
assert content["name"] == data["name"]
|
||||
assert content["contact"] == data["contact"]
|
||||
assert content["hike_id"] == data["hike_id"]
|
||||
assert "id" in content
|
||||
|
||||
|
||||
def test_create_route_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
|
||||
data = {
|
||||
"name": "No permissions",
|
||||
"contact": "No permissions",
|
||||
"hike_id": str(uuid.uuid4()),
|
||||
}
|
||||
response = client.post(
|
||||
f"{settings.API_V1_STR}/routes/",
|
||||
headers=normal_user_token_headers,
|
||||
json=data,
|
||||
)
|
||||
assert response.status_code == status.HTTP_403_FORBIDDEN
|
||||
assert response.json()["detail"] == "Not enough permissions"
|
||||
|
||||
|
||||
def test_read_route(
|
||||
client: TestClient, superuser_token_headers: dict[str, str], db: Session
|
||||
) -> None:
|
||||
route = create_random_route(db)
|
||||
response = client.get(
|
||||
f"{settings.API_V1_STR}/routes/{route.id}",
|
||||
headers=superuser_token_headers,
|
||||
)
|
||||
assert response.status_code == status.HTTP_200_OK
|
||||
content = response.json()
|
||||
assert content["id"] == str(route.id)
|
||||
assert content["name"] == route.name
|
||||
assert content["contact"] == route.contact
|
||||
|
||||
|
||||
def test_read_route_not_found(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
|
||||
response = client.get(
|
||||
f"{settings.API_V1_STR}/routes/{uuid.uuid4()}",
|
||||
headers=superuser_token_headers,
|
||||
)
|
||||
assert response.status_code == status.HTTP_404_NOT_FOUND
|
||||
assert response.json()["detail"] == "Route not found"
|
||||
|
||||
|
||||
def test_read_route_no_permission(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
|
||||
route = create_random_route(db)
|
||||
response = client.get(
|
||||
f"{settings.API_V1_STR}/routes/{route.id}",
|
||||
headers=normal_user_token_headers,
|
||||
)
|
||||
assert response.status_code == status.HTTP_403_FORBIDDEN
|
||||
assert response.json()["detail"] == "Not enough permissions"
|
||||
|
||||
|
||||
def test_read_routes(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
|
||||
create_random_route(db)
|
||||
create_random_route(db)
|
||||
response = client.get(
|
||||
f"{settings.API_V1_STR}/routes/",
|
||||
headers=superuser_token_headers,
|
||||
)
|
||||
assert response.status_code == status.HTTP_200_OK
|
||||
content = response.json()
|
||||
assert "count" in content
|
||||
assert content["count"] >= 2
|
||||
assert "data" in content
|
||||
assert isinstance(content["data"], list)
|
||||
assert len(content["data"]) <= content["count"]
|
||||
|
||||
|
||||
def test_read_routes_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
|
||||
create_random_route(db)
|
||||
create_random_route(db)
|
||||
response = client.get(
|
||||
f"{settings.API_V1_STR}/routes/",
|
||||
headers=normal_user_token_headers,
|
||||
)
|
||||
assert response.status_code == status.HTTP_200_OK
|
||||
content = response.json()
|
||||
assert "count" in content
|
||||
assert content["count"] == 0
|
||||
assert "data" in content
|
||||
assert isinstance(content["data"], list)
|
||||
assert len(content["data"]) == 0
|
||||
|
||||
|
||||
def test_update_route(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
|
||||
route = create_random_route(db)
|
||||
data = {
|
||||
"name": "Updated name",
|
||||
"contact": "Updated contact",
|
||||
}
|
||||
response = client.put(
|
||||
f"{settings.API_V1_STR}/routes/{route.id}",
|
||||
headers=superuser_token_headers,
|
||||
json=data,
|
||||
)
|
||||
assert response.status_code == status.HTTP_200_OK
|
||||
content = response.json()
|
||||
assert content["id"] == str(route.id)
|
||||
assert content["name"] == data["name"]
|
||||
assert content["contact"] == data["contact"]
|
||||
|
||||
|
||||
def test_update_route_not_found(client: TestClient, superuser_token_headers: dict[str, str]) -> None:
|
||||
data = {
|
||||
"name": "Not found",
|
||||
"contact": "Not found",
|
||||
}
|
||||
response = client.put(
|
||||
f"{settings.API_V1_STR}/routes/{uuid.uuid4()}",
|
||||
headers=superuser_token_headers,
|
||||
json=data,
|
||||
)
|
||||
assert response.status_code == status.HTTP_404_NOT_FOUND
|
||||
assert response.json()["detail"] == "Route not found"
|
||||
|
||||
|
||||
def test_update_route_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
|
||||
route = create_random_route(db)
|
||||
data = {
|
||||
"name": "No permissions",
|
||||
"contact": "No permissions",
|
||||
}
|
||||
response = client.put(
|
||||
f"{settings.API_V1_STR}/routes/{route.id}",
|
||||
headers=normal_user_token_headers,
|
||||
json=data,
|
||||
)
|
||||
assert response.status_code == status.HTTP_403_FORBIDDEN
|
||||
assert response.json()["detail"] == "Not enough permissions"
|
||||
|
||||
|
||||
def test_delete_route(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
|
||||
route = create_random_route(db)
|
||||
response = client.delete(
|
||||
f"{settings.API_V1_STR}/routes/{route.id}",
|
||||
headers=superuser_token_headers,
|
||||
)
|
||||
assert response.status_code == status.HTTP_200_OK
|
||||
assert response.json()["message"] == "Route deleted successfully"
|
||||
|
||||
|
||||
def test_delete_route_not_found(client: TestClient, superuser_token_headers: dict[str, str]) -> None:
|
||||
response = client.delete(
|
||||
f"{settings.API_V1_STR}/routes/{uuid.uuid4()}",
|
||||
headers=superuser_token_headers,
|
||||
)
|
||||
assert response.status_code == status.HTTP_404_NOT_FOUND
|
||||
assert response.json()["detail"] == "Route not found"
|
||||
|
||||
|
||||
def test_delete_route_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
|
||||
route = create_random_route(db)
|
||||
response = client.delete(
|
||||
f"{settings.API_V1_STR}/routes/{route.id}",
|
||||
headers=normal_user_token_headers,
|
||||
)
|
||||
assert response.status_code == status.HTTP_403_FORBIDDEN
|
||||
assert response.json()["detail"] == "Not enough permissions"
|
||||
12
backend/app/tests/utils/hike.py
Normal file
12
backend/app/tests/utils/hike.py
Normal file
@@ -0,0 +1,12 @@
|
||||
from sqlmodel import Session
|
||||
|
||||
from app.models.hike import Hike, HikeCreate
|
||||
from app.tests.utils.utils import random_lower_string
|
||||
|
||||
|
||||
def create_random_hike(db: Session, name: str = None) -> Hike:
|
||||
if not name:
|
||||
name = random_lower_string()
|
||||
|
||||
hike_in = HikeCreate(name=name)
|
||||
return Hike.create(session=db, create_obj=hike_in)
|
||||
14
backend/app/tests/utils/place.py
Normal file
14
backend/app/tests/utils/place.py
Normal file
@@ -0,0 +1,14 @@
|
||||
from sqlmodel import Session
|
||||
|
||||
from app.models.place import Place, PlaceCreate
|
||||
from app.tests.utils.utils import random_lower_string, random_lower_short_string
|
||||
|
||||
|
||||
def create_random_place(db: Session, name: str = None) -> Place:
|
||||
if not name:
|
||||
name = random_lower_string()
|
||||
|
||||
short_name = random_lower_short_string()
|
||||
|
||||
place_in = PlaceCreate(name=name, short_name=short_name)
|
||||
return Place.create(session=db, create_obj=place_in)
|
||||
17
backend/app/tests/utils/route.py
Normal file
17
backend/app/tests/utils/route.py
Normal file
@@ -0,0 +1,17 @@
|
||||
from sqlmodel import Session
|
||||
|
||||
from app.models.hike import Hike
|
||||
from app.models.route import Route, RouteCreate
|
||||
from app.tests.utils.utils import random_lower_string
|
||||
from app.tests.utils.hike import create_random_hike
|
||||
|
||||
|
||||
def create_random_route(db: Session, name: str = None, hike: Hike = None) -> Route:
|
||||
if not name:
|
||||
name = random_lower_string()
|
||||
|
||||
if not hike:
|
||||
hike = create_random_hike(db=db)
|
||||
|
||||
route_in = RouteCreate(name=name, hike_id=hike.id)
|
||||
return Route.create(session=db, create_obj=route_in)
|
||||
@@ -6,15 +6,12 @@ from app.models.event import Event
|
||||
from app.models.team import Team, TeamCreate
|
||||
|
||||
from app.tests.utils.event import create_random_event
|
||||
from app.tests.utils.utils import random_lower_string
|
||||
from app.tests.utils.utils import random_lower_string, random_lower_short_string
|
||||
|
||||
|
||||
def random_short_name() -> str:
|
||||
return str(random.Random().randrange(1, 200))
|
||||
|
||||
def create_random_team(db: Session, event: Event | None = None) -> Team:
|
||||
name = random_lower_string()
|
||||
short_name = random_short_name()
|
||||
short_name = random_lower_short_string()
|
||||
|
||||
if not event:
|
||||
event = create_random_event(db)
|
||||
|
||||
@@ -10,6 +10,10 @@ def random_lower_string() -> str:
|
||||
return "".join(random.choices(string.ascii_lowercase, k=32))
|
||||
|
||||
|
||||
def random_lower_short_string() -> str:
|
||||
return str(random.Random().randrange(1, 8))
|
||||
|
||||
|
||||
def random_email() -> str:
|
||||
return f"{random_lower_string()}@{random_lower_string()}.com"
|
||||
|
||||
@@ -24,3 +28,4 @@ def get_superuser_token_headers(client: TestClient) -> dict[str, str]:
|
||||
a_token = tokens["access_token"]
|
||||
headers = {"Authorization": f"Bearer {a_token}"}
|
||||
return headers
|
||||
|
||||
|
||||
Reference in New Issue
Block a user