Compare commits
1 Commits
0d6ef073df
...
feature/se
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
076765e5c5 |
@@ -10,9 +10,7 @@ from app.api.routes import (
|
|||||||
private,
|
private,
|
||||||
users,
|
users,
|
||||||
utils,
|
utils,
|
||||||
hikes,
|
series,
|
||||||
routes,
|
|
||||||
places,
|
|
||||||
)
|
)
|
||||||
from app.core.config import settings
|
from app.core.config import settings
|
||||||
|
|
||||||
@@ -27,10 +25,7 @@ api_router.include_router(teams.router)
|
|||||||
api_router.include_router(associations.router)
|
api_router.include_router(associations.router)
|
||||||
api_router.include_router(divisions.router)
|
api_router.include_router(divisions.router)
|
||||||
api_router.include_router(members.router)
|
api_router.include_router(members.router)
|
||||||
|
api_router.include_router(series.router)
|
||||||
api_router.include_router(hikes.router)
|
|
||||||
api_router.include_router(routes.router)
|
|
||||||
api_router.include_router(places.router)
|
|
||||||
|
|
||||||
|
|
||||||
if settings.ENVIRONMENT == "local":
|
if settings.ENVIRONMENT == "local":
|
||||||
|
|||||||
@@ -1,179 +0,0 @@
|
|||||||
from typing import Any
|
|
||||||
|
|
||||||
from fastapi import APIRouter, HTTPException, status
|
|
||||||
from sqlmodel import func, select
|
|
||||||
|
|
||||||
from app.api.deps import CurrentUser, SessionDep
|
|
||||||
from app.models.base import (
|
|
||||||
ApiTags,
|
|
||||||
Message,
|
|
||||||
RowId,
|
|
||||||
)
|
|
||||||
from app.models.hike import (
|
|
||||||
Hike,
|
|
||||||
HikeCreate,
|
|
||||||
HikeUpdate,
|
|
||||||
HikePublic,
|
|
||||||
HikesPublic,
|
|
||||||
)
|
|
||||||
from app.models.route import (
|
|
||||||
Route,
|
|
||||||
RoutesPublic,
|
|
||||||
)
|
|
||||||
from app.models.user import (
|
|
||||||
PermissionModule,
|
|
||||||
PermissionPart,
|
|
||||||
PermissionRight,
|
|
||||||
)
|
|
||||||
|
|
||||||
router = APIRouter(prefix="/hikes", tags=[ApiTags.HIKES])
|
|
||||||
|
|
||||||
|
|
||||||
# region # Hikes ########################################################
|
|
||||||
|
|
||||||
@router.get("/", response_model=HikesPublic)
|
|
||||||
def read_hikes(
|
|
||||||
session: SessionDep, current_user: CurrentUser, skip: int = 0, limit: int = 100
|
|
||||||
) -> Any:
|
|
||||||
"""
|
|
||||||
Retrieve all hikes.
|
|
||||||
"""
|
|
||||||
|
|
||||||
if current_user.has_permissions(
|
|
||||||
module=PermissionModule.HIKE,
|
|
||||||
part=PermissionPart.ADMIN,
|
|
||||||
rights=PermissionRight.READ,
|
|
||||||
):
|
|
||||||
count_statement = select(func.count()).select_from(Hike)
|
|
||||||
count = session.exec(count_statement).one()
|
|
||||||
statement = select(Hike).offset(skip).limit(limit)
|
|
||||||
hikes = session.exec(statement).all()
|
|
||||||
return HikesPublic(data=hikes, count=count)
|
|
||||||
|
|
||||||
return HikesPublic(data=[], count=0)
|
|
||||||
|
|
||||||
|
|
||||||
@router.get("/{id}", response_model=HikePublic)
|
|
||||||
def read_hike(session: SessionDep, current_user: CurrentUser, id: RowId) -> Any:
|
|
||||||
"""
|
|
||||||
Get hike by ID.
|
|
||||||
"""
|
|
||||||
hike = session.get(Hike, id)
|
|
||||||
if not hike:
|
|
||||||
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Hike not found")
|
|
||||||
|
|
||||||
if not current_user.has_permissions(
|
|
||||||
module=PermissionModule.HIKE,
|
|
||||||
part=PermissionPart.ADMIN,
|
|
||||||
rights=PermissionRight.READ,
|
|
||||||
):
|
|
||||||
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions")
|
|
||||||
|
|
||||||
return hike
|
|
||||||
|
|
||||||
|
|
||||||
@router.post("/", response_model=HikePublic)
|
|
||||||
def create_hike(
|
|
||||||
*, session: SessionDep, current_user: CurrentUser, hike_in: HikeCreate
|
|
||||||
) -> Any:
|
|
||||||
"""
|
|
||||||
Create new hike.
|
|
||||||
"""
|
|
||||||
|
|
||||||
if not current_user.has_permissions(
|
|
||||||
module=PermissionModule.HIKE,
|
|
||||||
part=PermissionPart.ADMIN,
|
|
||||||
rights=PermissionRight.CREATE,
|
|
||||||
):
|
|
||||||
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions")
|
|
||||||
|
|
||||||
hike = Hike.create(create_obj=hike_in, session=session)
|
|
||||||
return hike
|
|
||||||
|
|
||||||
|
|
||||||
@router.put("/{id}", response_model=HikePublic)
|
|
||||||
def update_hike(
|
|
||||||
*, session: SessionDep, current_user: CurrentUser, id: RowId, hike_in: HikeUpdate
|
|
||||||
) -> Any:
|
|
||||||
"""
|
|
||||||
Update a hike.
|
|
||||||
"""
|
|
||||||
hike = session.get(Hike, id)
|
|
||||||
if not hike:
|
|
||||||
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Hike not found")
|
|
||||||
|
|
||||||
if not current_user.has_permissions(
|
|
||||||
module=PermissionModule.HIKE,
|
|
||||||
part=PermissionPart.ADMIN,
|
|
||||||
rights=PermissionRight.UPDATE,
|
|
||||||
):
|
|
||||||
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions")
|
|
||||||
|
|
||||||
hike = Hike.update(db_obj=hike, in_obj=hike_in, session=session)
|
|
||||||
return hike
|
|
||||||
|
|
||||||
|
|
||||||
@router.delete("/{id}")
|
|
||||||
def delete_hike(session: SessionDep,current_user: CurrentUser, id: RowId) -> Message:
|
|
||||||
"""
|
|
||||||
Delete a hike.
|
|
||||||
"""
|
|
||||||
hike = session.get(Hike, id)
|
|
||||||
if not hike:
|
|
||||||
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Hike not found")
|
|
||||||
|
|
||||||
if not current_user.has_permissions(
|
|
||||||
module=PermissionModule.HIKE,
|
|
||||||
part=PermissionPart.ADMIN,
|
|
||||||
rights=PermissionRight.DELETE,
|
|
||||||
):
|
|
||||||
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions")
|
|
||||||
|
|
||||||
session.delete(hike)
|
|
||||||
session.commit()
|
|
||||||
return Message(message="Hike deleted successfully")
|
|
||||||
|
|
||||||
|
|
||||||
# endregion
|
|
||||||
|
|
||||||
# region # Hike / Routes #######################################################
|
|
||||||
|
|
||||||
|
|
||||||
@router.get("/{hike_id}/routes/", response_model=RoutesPublic)
|
|
||||||
def read_hike_route(
|
|
||||||
session: SessionDep,
|
|
||||||
current_user: CurrentUser,
|
|
||||||
hike_id: RowId,
|
|
||||||
skip: int = 0,
|
|
||||||
limit: int = 100,
|
|
||||||
) -> Any:
|
|
||||||
"""
|
|
||||||
Retrieve all hike routes.
|
|
||||||
"""
|
|
||||||
|
|
||||||
hike = session.get(Hike, hike_id)
|
|
||||||
if not hike:
|
|
||||||
raise HTTPException(
|
|
||||||
status_code=status.HTTP_404_NOT_FOUND, detail="Hike not found"
|
|
||||||
)
|
|
||||||
|
|
||||||
if not current_user.has_permission(
|
|
||||||
module=PermissionModule.HIKE,
|
|
||||||
part=PermissionPart.ADMIN,
|
|
||||||
rights=(PermissionRight.MANAGE_HIKES | PermissionRight.READ),
|
|
||||||
):
|
|
||||||
raise HTTPException(
|
|
||||||
status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions"
|
|
||||||
)
|
|
||||||
|
|
||||||
data_query = select(Route).where(
|
|
||||||
Route.hike_id == hike_id,
|
|
||||||
)
|
|
||||||
|
|
||||||
count = session.exec(select(func.count()).select_from(data_query.subquery())).one()
|
|
||||||
data = session.exec(data_query.offset(skip).limit(limit)).all()
|
|
||||||
|
|
||||||
return RoutesPublic(data=data, count=count)
|
|
||||||
|
|
||||||
|
|
||||||
# endregion
|
|
||||||
@@ -1,148 +0,0 @@
|
|||||||
from typing import Any
|
|
||||||
|
|
||||||
from fastapi import APIRouter, HTTPException, status
|
|
||||||
from sqlmodel import func, select
|
|
||||||
|
|
||||||
from app.api.deps import CurrentUser, SessionDep
|
|
||||||
from app.models.base import (
|
|
||||||
ApiTags,
|
|
||||||
Message,
|
|
||||||
RowId,
|
|
||||||
)
|
|
||||||
from app.models.place import (
|
|
||||||
Place,
|
|
||||||
PlaceCreate,
|
|
||||||
PlaceUpdate,
|
|
||||||
PlacePublic,
|
|
||||||
PlacesPublic,
|
|
||||||
)
|
|
||||||
from app.models.user import (
|
|
||||||
PermissionModule,
|
|
||||||
PermissionPart,
|
|
||||||
PermissionRight,
|
|
||||||
)
|
|
||||||
|
|
||||||
router = APIRouter(prefix="/places", tags=[ApiTags.PLACES])
|
|
||||||
|
|
||||||
|
|
||||||
# region # Places ########################################################
|
|
||||||
|
|
||||||
|
|
||||||
@router.get("/", response_model=PlacesPublic)
|
|
||||||
def read_places(
|
|
||||||
session: SessionDep, current_user: CurrentUser, skip: int = 0, limit: int = 100
|
|
||||||
) -> Any:
|
|
||||||
"""
|
|
||||||
Retrieve all places.
|
|
||||||
"""
|
|
||||||
|
|
||||||
if current_user.has_permissions(
|
|
||||||
module=PermissionModule.PLACE,
|
|
||||||
part=PermissionPart.ADMIN,
|
|
||||||
rights=PermissionRight.READ,
|
|
||||||
):
|
|
||||||
count_statement = select(func.count()).select_from(Place)
|
|
||||||
count = session.exec(count_statement).one()
|
|
||||||
statement = select(Place).offset(skip).limit(limit)
|
|
||||||
places = session.exec(statement).all()
|
|
||||||
return PlacesPublic(data=places, count=count)
|
|
||||||
|
|
||||||
return PlacesPublic(data=[], count=0)
|
|
||||||
|
|
||||||
|
|
||||||
@router.get("/{id}", response_model=PlacePublic)
|
|
||||||
def read_place(session: SessionDep, current_user: CurrentUser, id: RowId) -> Any:
|
|
||||||
"""
|
|
||||||
Get place by ID.
|
|
||||||
"""
|
|
||||||
place = session.get(Place, id)
|
|
||||||
if not place:
|
|
||||||
raise HTTPException(
|
|
||||||
status_code=status.HTTP_404_NOT_FOUND, detail="Place not found"
|
|
||||||
)
|
|
||||||
|
|
||||||
if not current_user.has_permissions(
|
|
||||||
module=PermissionModule.PLACE,
|
|
||||||
part=PermissionPart.ADMIN,
|
|
||||||
rights=PermissionRight.READ,
|
|
||||||
):
|
|
||||||
raise HTTPException(
|
|
||||||
status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions"
|
|
||||||
)
|
|
||||||
|
|
||||||
return place
|
|
||||||
|
|
||||||
|
|
||||||
@router.post("/", response_model=PlacePublic)
|
|
||||||
def create_place(
|
|
||||||
*, session: SessionDep, current_user: CurrentUser, place_in: PlaceCreate
|
|
||||||
) -> Any:
|
|
||||||
"""
|
|
||||||
Create new place.
|
|
||||||
"""
|
|
||||||
|
|
||||||
if not current_user.has_permissions(
|
|
||||||
module=PermissionModule.PLACE,
|
|
||||||
part=PermissionPart.ADMIN,
|
|
||||||
rights=PermissionRight.CREATE,
|
|
||||||
):
|
|
||||||
raise HTTPException(
|
|
||||||
status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions"
|
|
||||||
)
|
|
||||||
|
|
||||||
place = Place.create(create_obj=place_in, session=session)
|
|
||||||
return place
|
|
||||||
|
|
||||||
|
|
||||||
@router.put("/{id}", response_model=PlacePublic)
|
|
||||||
def update_place(
|
|
||||||
*, session: SessionDep, current_user: CurrentUser, id: RowId, place_in: PlaceUpdate
|
|
||||||
) -> Any:
|
|
||||||
"""
|
|
||||||
Update a place.
|
|
||||||
"""
|
|
||||||
place = session.get(Place, id)
|
|
||||||
if not place:
|
|
||||||
raise HTTPException(
|
|
||||||
status_code=status.HTTP_404_NOT_FOUND, detail="Place not found"
|
|
||||||
)
|
|
||||||
|
|
||||||
if not current_user.has_permissions(
|
|
||||||
module=PermissionModule.PLACE,
|
|
||||||
part=PermissionPart.ADMIN,
|
|
||||||
rights=PermissionRight.UPDATE,
|
|
||||||
):
|
|
||||||
raise HTTPException(
|
|
||||||
status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions"
|
|
||||||
)
|
|
||||||
|
|
||||||
place = Place.update(db_obj=place, in_obj=place_in, session=session)
|
|
||||||
return place
|
|
||||||
|
|
||||||
|
|
||||||
@router.delete("/{id}")
|
|
||||||
def delete_place(session: SessionDep, current_user: CurrentUser, id: RowId) -> Message:
|
|
||||||
"""
|
|
||||||
Delete a place.
|
|
||||||
"""
|
|
||||||
place = session.get(Place, id)
|
|
||||||
if not place:
|
|
||||||
raise HTTPException(
|
|
||||||
status_code=status.HTTP_404_NOT_FOUND, detail="Place not found"
|
|
||||||
)
|
|
||||||
|
|
||||||
if not current_user.has_permissions(
|
|
||||||
module=PermissionModule.PLACE,
|
|
||||||
part=PermissionPart.ADMIN,
|
|
||||||
rights=PermissionRight.DELETE,
|
|
||||||
):
|
|
||||||
raise HTTPException(
|
|
||||||
status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions"
|
|
||||||
)
|
|
||||||
|
|
||||||
session.delete(place)
|
|
||||||
session.commit()
|
|
||||||
return Message(message="Place deleted successfully")
|
|
||||||
|
|
||||||
|
|
||||||
# endregion
|
|
||||||
@@ -1,148 +0,0 @@
|
|||||||
from typing import Any
|
|
||||||
|
|
||||||
from fastapi import APIRouter, HTTPException, status
|
|
||||||
from sqlmodel import func, select
|
|
||||||
|
|
||||||
from app.api.deps import CurrentUser, SessionDep
|
|
||||||
from app.models.base import (
|
|
||||||
ApiTags,
|
|
||||||
Message,
|
|
||||||
RowId,
|
|
||||||
)
|
|
||||||
from app.models.route import (
|
|
||||||
Route,
|
|
||||||
RouteCreate,
|
|
||||||
RouteUpdate,
|
|
||||||
RoutePublic,
|
|
||||||
RoutesPublic,
|
|
||||||
)
|
|
||||||
from app.models.user import (
|
|
||||||
PermissionModule,
|
|
||||||
PermissionPart,
|
|
||||||
PermissionRight,
|
|
||||||
)
|
|
||||||
|
|
||||||
router = APIRouter(prefix="/routes", tags=[ApiTags.ROUTES])
|
|
||||||
|
|
||||||
|
|
||||||
# region # Routes ########################################################
|
|
||||||
|
|
||||||
|
|
||||||
@router.get("/", response_model=RoutesPublic)
|
|
||||||
def read_routes(
|
|
||||||
session: SessionDep, current_user: CurrentUser, skip: int = 0, limit: int = 100
|
|
||||||
) -> Any:
|
|
||||||
"""
|
|
||||||
Retrieve all routes.
|
|
||||||
"""
|
|
||||||
|
|
||||||
if current_user.has_permissions(
|
|
||||||
module=PermissionModule.ROUTE,
|
|
||||||
part=PermissionPart.ADMIN,
|
|
||||||
rights=PermissionRight.READ,
|
|
||||||
):
|
|
||||||
count_statement = select(func.count()).select_from(Route)
|
|
||||||
count = session.exec(count_statement).one()
|
|
||||||
statement = select(Route).offset(skip).limit(limit)
|
|
||||||
routes = session.exec(statement).all()
|
|
||||||
return RoutesPublic(data=routes, count=count)
|
|
||||||
|
|
||||||
return RoutesPublic(data=[], count=0)
|
|
||||||
|
|
||||||
|
|
||||||
@router.get("/{id}", response_model=RoutePublic)
|
|
||||||
def read_route(session: SessionDep, current_user: CurrentUser, id: RowId) -> Any:
|
|
||||||
"""
|
|
||||||
Get route by ID.
|
|
||||||
"""
|
|
||||||
route = session.get(Route, id)
|
|
||||||
if not route:
|
|
||||||
raise HTTPException(
|
|
||||||
status_code=status.HTTP_404_NOT_FOUND, detail="Route not found"
|
|
||||||
)
|
|
||||||
|
|
||||||
if not current_user.has_permissions(
|
|
||||||
module=PermissionModule.ROUTE,
|
|
||||||
part=PermissionPart.ADMIN,
|
|
||||||
rights=PermissionRight.READ,
|
|
||||||
):
|
|
||||||
raise HTTPException(
|
|
||||||
status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions"
|
|
||||||
)
|
|
||||||
|
|
||||||
return route
|
|
||||||
|
|
||||||
|
|
||||||
@router.post("/", response_model=RoutePublic)
|
|
||||||
def create_route(
|
|
||||||
*, session: SessionDep, current_user: CurrentUser, route_in: RouteCreate
|
|
||||||
) -> Any:
|
|
||||||
"""
|
|
||||||
Create new route.
|
|
||||||
"""
|
|
||||||
|
|
||||||
if not current_user.has_permissions(
|
|
||||||
module=PermissionModule.ROUTE,
|
|
||||||
part=PermissionPart.ADMIN,
|
|
||||||
rights=PermissionRight.CREATE,
|
|
||||||
):
|
|
||||||
raise HTTPException(
|
|
||||||
status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions"
|
|
||||||
)
|
|
||||||
|
|
||||||
route = Route.create(create_obj=route_in, session=session)
|
|
||||||
return route
|
|
||||||
|
|
||||||
|
|
||||||
@router.put("/{id}", response_model=RoutePublic)
|
|
||||||
def update_route(
|
|
||||||
*, session: SessionDep, current_user: CurrentUser, id: RowId, route_in: RouteUpdate
|
|
||||||
) -> Any:
|
|
||||||
"""
|
|
||||||
Update a route.
|
|
||||||
"""
|
|
||||||
route = session.get(Route, id)
|
|
||||||
if not route:
|
|
||||||
raise HTTPException(
|
|
||||||
status_code=status.HTTP_404_NOT_FOUND, detail="Route not found"
|
|
||||||
)
|
|
||||||
|
|
||||||
if not current_user.has_permissions(
|
|
||||||
module=PermissionModule.ROUTE,
|
|
||||||
part=PermissionPart.ADMIN,
|
|
||||||
rights=PermissionRight.UPDATE,
|
|
||||||
):
|
|
||||||
raise HTTPException(
|
|
||||||
status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions"
|
|
||||||
)
|
|
||||||
|
|
||||||
route = Route.update(db_obj=route, in_obj=route_in, session=session)
|
|
||||||
return route
|
|
||||||
|
|
||||||
|
|
||||||
@router.delete("/{id}")
|
|
||||||
def delete_route(session: SessionDep, current_user: CurrentUser, id: RowId) -> Message:
|
|
||||||
"""
|
|
||||||
Delete a route.
|
|
||||||
"""
|
|
||||||
route = session.get(Route, id)
|
|
||||||
if not route:
|
|
||||||
raise HTTPException(
|
|
||||||
status_code=status.HTTP_404_NOT_FOUND, detail="Route not found"
|
|
||||||
)
|
|
||||||
|
|
||||||
if not current_user.has_permissions(
|
|
||||||
module=PermissionModule.ROUTE,
|
|
||||||
part=PermissionPart.ADMIN,
|
|
||||||
rights=PermissionRight.DELETE,
|
|
||||||
):
|
|
||||||
raise HTTPException(
|
|
||||||
status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions"
|
|
||||||
)
|
|
||||||
|
|
||||||
session.delete(route)
|
|
||||||
session.commit()
|
|
||||||
return Message(message="Route deleted successfully")
|
|
||||||
|
|
||||||
|
|
||||||
# endregion
|
|
||||||
176
backend/app/api/routes/series.py
Normal file
176
backend/app/api/routes/series.py
Normal file
@@ -0,0 +1,176 @@
|
|||||||
|
from typing import Any
|
||||||
|
|
||||||
|
from fastapi import APIRouter, HTTPException, status
|
||||||
|
from sqlmodel import func, select
|
||||||
|
|
||||||
|
from app.api.deps import CurrentUser, SessionDep
|
||||||
|
from app.models.base import (
|
||||||
|
ApiTags,
|
||||||
|
Message,
|
||||||
|
RowId,
|
||||||
|
)
|
||||||
|
from app.models.serie import (
|
||||||
|
Serie,
|
||||||
|
SerieCreate,
|
||||||
|
SerieUpdate,
|
||||||
|
SeriePublic,
|
||||||
|
SeriesPublic,
|
||||||
|
)
|
||||||
|
from app.models.division import (
|
||||||
|
Division,
|
||||||
|
DivisionsPublic,
|
||||||
|
)
|
||||||
|
from app.models.user import (
|
||||||
|
PermissionModule,
|
||||||
|
PermissionPart,
|
||||||
|
PermissionRight,
|
||||||
|
)
|
||||||
|
|
||||||
|
router = APIRouter(prefix="/series", tags=[ApiTags.SERIES])
|
||||||
|
|
||||||
|
|
||||||
|
# region # Series ########################################################
|
||||||
|
|
||||||
|
@router.get("/", response_model=SeriesPublic)
|
||||||
|
def read_series(
|
||||||
|
session: SessionDep, current_user: CurrentUser, skip: int = 0, limit: int = 100
|
||||||
|
) -> Any:
|
||||||
|
"""
|
||||||
|
Retrieve all series.
|
||||||
|
"""
|
||||||
|
|
||||||
|
if current_user.has_permissions(
|
||||||
|
module=PermissionModule.SERIE,
|
||||||
|
part=PermissionPart.ADMIN,
|
||||||
|
rights=PermissionRight.READ,
|
||||||
|
):
|
||||||
|
count_statement = select(func.count()).select_from(Serie)
|
||||||
|
count = session.exec(count_statement).one()
|
||||||
|
statement = select(Serie).offset(skip).limit(limit)
|
||||||
|
series = session.exec(statement).all()
|
||||||
|
return SeriesPublic(data=series, count=count)
|
||||||
|
|
||||||
|
return SeriesPublic(data=[], count=0)
|
||||||
|
|
||||||
|
|
||||||
|
@router.get("/{id}", response_model=SeriePublic)
|
||||||
|
def read_serie(session: SessionDep, current_user: CurrentUser, id: RowId) -> Any:
|
||||||
|
"""
|
||||||
|
Get serie by ID.
|
||||||
|
"""
|
||||||
|
serie = session.get(Serie, id)
|
||||||
|
if not serie:
|
||||||
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Serie not found")
|
||||||
|
|
||||||
|
if not current_user.has_permissions(
|
||||||
|
module=PermissionModule.SERIE,
|
||||||
|
part=PermissionPart.ADMIN,
|
||||||
|
rights=PermissionRight.READ,
|
||||||
|
):
|
||||||
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions")
|
||||||
|
|
||||||
|
return serie
|
||||||
|
|
||||||
|
|
||||||
|
@router.post("/", response_model=SeriePublic)
|
||||||
|
def create_serie(
|
||||||
|
*, session: SessionDep, current_user: CurrentUser, serie_in: SerieCreate
|
||||||
|
) -> Any:
|
||||||
|
"""
|
||||||
|
Create new serie.
|
||||||
|
"""
|
||||||
|
|
||||||
|
if not current_user.has_permissions(
|
||||||
|
module=PermissionModule.SERIE,
|
||||||
|
part=PermissionPart.ADMIN,
|
||||||
|
rights=PermissionRight.CREATE,
|
||||||
|
):
|
||||||
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions")
|
||||||
|
|
||||||
|
serie = Serie.create(create_obj=serie_in, session=session)
|
||||||
|
return serie
|
||||||
|
|
||||||
|
|
||||||
|
@router.put("/{id}", response_model=SeriePublic)
|
||||||
|
def update_serie(
|
||||||
|
*, session: SessionDep, current_user: CurrentUser, id: RowId, serie_in: SerieUpdate
|
||||||
|
) -> Any:
|
||||||
|
"""
|
||||||
|
Update a serie.
|
||||||
|
"""
|
||||||
|
serie = session.get(Serie, id)
|
||||||
|
if not serie:
|
||||||
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Serie not found")
|
||||||
|
|
||||||
|
if not current_user.has_permissions(
|
||||||
|
module=PermissionModule.SERIE,
|
||||||
|
part=PermissionPart.ADMIN,
|
||||||
|
rights=PermissionRight.UPDATE,
|
||||||
|
):
|
||||||
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions")
|
||||||
|
|
||||||
|
serie = Serie.update(db_obj=serie, in_obj=serie_in, session=session)
|
||||||
|
return serie
|
||||||
|
|
||||||
|
|
||||||
|
@router.delete("/{id}")
|
||||||
|
def delete_serie(session: SessionDep,current_user: CurrentUser, id: RowId) -> Message:
|
||||||
|
"""
|
||||||
|
Delete a serie.
|
||||||
|
"""
|
||||||
|
serie = session.get(Serie, id)
|
||||||
|
if not serie:
|
||||||
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Serie not found")
|
||||||
|
|
||||||
|
if not current_user.has_permissions(
|
||||||
|
module=PermissionModule.SERIE,
|
||||||
|
part=PermissionPart.ADMIN,
|
||||||
|
rights=PermissionRight.DELETE,
|
||||||
|
):
|
||||||
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions")
|
||||||
|
|
||||||
|
session.delete(serie)
|
||||||
|
session.commit()
|
||||||
|
return Message(message="Serie deleted successfully")
|
||||||
|
|
||||||
|
# endregion
|
||||||
|
|
||||||
|
|
||||||
|
# region # Series / Divisions ############################################
|
||||||
|
|
||||||
|
|
||||||
|
@router.get("/{series_id}/divisions/", response_model=DivisionsPublic)
|
||||||
|
def read_serie_division(
|
||||||
|
session: SessionDep, current_user: CurrentUser, series_id: RowId, skip: int = 0, limit: int = 100
|
||||||
|
) -> Any:
|
||||||
|
"""
|
||||||
|
Retrieve all serie divisions.
|
||||||
|
"""
|
||||||
|
|
||||||
|
serie = session.get(Serie, series_id)
|
||||||
|
if not serie:
|
||||||
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Serie not found")
|
||||||
|
|
||||||
|
if not current_user.has_permission(
|
||||||
|
module=PermissionModule.SERIE,
|
||||||
|
part=PermissionPart.ADMIN,
|
||||||
|
rights=(PermissionRight.MANAGE_DIVISIONS | PermissionRight.READ),
|
||||||
|
):
|
||||||
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions")
|
||||||
|
|
||||||
|
count_statement = (select(func.count())
|
||||||
|
.select_from(Division)
|
||||||
|
.where(Division.serie_id == serie.id)
|
||||||
|
)
|
||||||
|
count = session.exec(count_statement).one()
|
||||||
|
statement = (select(Division)
|
||||||
|
.where(Division.serie_id == serie.id)
|
||||||
|
.offset(skip)
|
||||||
|
.limit(limit)
|
||||||
|
)
|
||||||
|
divisions = session.exec(statement).all()
|
||||||
|
|
||||||
|
return DivisionsPublic(data=divisions, count=count)
|
||||||
|
|
||||||
|
|
||||||
|
# endregion
|
||||||
@@ -25,22 +25,13 @@ from app.models.user import (
|
|||||||
UserCreate,
|
UserCreate,
|
||||||
)
|
)
|
||||||
from app.models.member import (
|
from app.models.member import (
|
||||||
Member,
|
Member, MemberCreate,
|
||||||
MemberCreate,
|
|
||||||
)
|
)
|
||||||
from app.models.apikey import (
|
from app.models.apikey import (
|
||||||
ApiKey,
|
ApiKey,
|
||||||
)
|
)
|
||||||
from app.models.hike import (
|
from app.models.serie import (
|
||||||
Hike,
|
Serie,
|
||||||
HikeTeamLink,
|
|
||||||
)
|
|
||||||
from app.models.route import (
|
|
||||||
Route,
|
|
||||||
RoutePart,
|
|
||||||
)
|
|
||||||
from app.models.place import (
|
|
||||||
Place,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
engine = create_engine(str(settings.SQLALCHEMY_DATABASE_URI))
|
engine = create_engine(str(settings.SQLALCHEMY_DATABASE_URI))
|
||||||
|
|||||||
@@ -4,7 +4,6 @@ from enum import Enum, IntFlag # Python 3.11 >= StrEnum
|
|||||||
from enum import auto as auto_enum
|
from enum import auto as auto_enum
|
||||||
from uuid import UUID as RowId
|
from uuid import UUID as RowId
|
||||||
|
|
||||||
from sqlalchemy import TypeDecorator, Integer, VARCHAR
|
|
||||||
from sqlmodel import SQLModel
|
from sqlmodel import SQLModel
|
||||||
from sqlalchemy.orm import declared_attr
|
from sqlalchemy.orm import declared_attr
|
||||||
|
|
||||||
@@ -40,85 +39,8 @@ class DocumentedStrEnum(str, Enum):
|
|||||||
|
|
||||||
|
|
||||||
class DocumentedIntFlag(IntFlag):
|
class DocumentedIntFlag(IntFlag):
|
||||||
@property
|
# TODO: Build DB sport to proper store flags and make it possible to store all mutations
|
||||||
def names(self) -> str:
|
pass
|
||||||
"""
|
|
||||||
Returns a comma-separated string of all active flag names.
|
|
||||||
"""
|
|
||||||
# Exclude 0-value flags
|
|
||||||
return ",".join([flag.name for flag in type(self) if flag in self and flag.value != 0])
|
|
||||||
|
|
||||||
def __str__(self) -> str:
|
|
||||||
# Default string conversion uses the names
|
|
||||||
return self.names
|
|
||||||
|
|
||||||
# Optional: for Pydantic compatibility
|
|
||||||
def __get_pydantic_json__(self) -> str:
|
|
||||||
return self.names
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class DocumentedIntFlagType(TypeDecorator):
|
|
||||||
impl = Integer
|
|
||||||
cache_ok = True
|
|
||||||
|
|
||||||
def __init__(self, enum_class: type[IntFlag], *args, **kwargs):
|
|
||||||
super().__init__(*args, **kwargs)
|
|
||||||
self.enum_class = enum_class
|
|
||||||
|
|
||||||
def process_bind_param(self, value, dialect):
|
|
||||||
"""
|
|
||||||
Convert IntFlag to integer before storing
|
|
||||||
"""
|
|
||||||
if value is None:
|
|
||||||
return None
|
|
||||||
if isinstance(value, self.enum_class):
|
|
||||||
return int(value)
|
|
||||||
if isinstance(value, int):
|
|
||||||
return value
|
|
||||||
raise ValueError(f"Invalid value for {self.enum_class.__name__}: {value!r}")
|
|
||||||
|
|
||||||
def process_result_value(self, value, dialect):
|
|
||||||
"""
|
|
||||||
Convert integer from DB back to IntFlag
|
|
||||||
"""
|
|
||||||
if value is None:
|
|
||||||
return None
|
|
||||||
return self.enum_class(value)
|
|
||||||
|
|
||||||
|
|
||||||
class DocumentedStrFlagType(TypeDecorator):
|
|
||||||
impl = VARCHAR
|
|
||||||
cache_ok = True
|
|
||||||
|
|
||||||
def __init__(self, enum_class: type[IntFlag], *args, **kwargs):
|
|
||||||
super().__init__(*args, **kwargs)
|
|
||||||
self.enum_class = enum_class
|
|
||||||
|
|
||||||
def process_bind_param(self, value, dialect):
|
|
||||||
"""
|
|
||||||
Convert IntFlag to comma-separated string of names for storing in DB.
|
|
||||||
"""
|
|
||||||
if value is None:
|
|
||||||
return None
|
|
||||||
if isinstance(value, self.enum_class):
|
|
||||||
return str(value)
|
|
||||||
raise ValueError(f"Invalid value for {self.enum_class.__name__}: {value!r}")
|
|
||||||
|
|
||||||
def process_result_value(self, value, dialect):
|
|
||||||
"""
|
|
||||||
Convert comma-separated string of names from DB back to IntFlag.
|
|
||||||
"""
|
|
||||||
if value is None or value == "":
|
|
||||||
return self.enum_class(0)
|
|
||||||
names = value.split(",")
|
|
||||||
result = self.enum_class(0)
|
|
||||||
for name in names:
|
|
||||||
try:
|
|
||||||
result |= self.enum_class[name]
|
|
||||||
except KeyError:
|
|
||||||
raise ValueError(f"Invalid flag name '{name}' for {self.enum_class.__name__}")
|
|
||||||
return result
|
|
||||||
|
|
||||||
|
|
||||||
# #############################################################################
|
# #############################################################################
|
||||||
@@ -137,10 +59,7 @@ class ApiTags(DocumentedStrEnum):
|
|||||||
ASSOCIATIONS = "Associations"
|
ASSOCIATIONS = "Associations"
|
||||||
DIVISIONS = "Divisions"
|
DIVISIONS = "Divisions"
|
||||||
MEMBERS = "Members"
|
MEMBERS = "Members"
|
||||||
|
SERIES = "Series"
|
||||||
HIKES = "Hikes"
|
|
||||||
ROUTES = "Routes"
|
|
||||||
PLACES = "Places"
|
|
||||||
|
|
||||||
|
|
||||||
# endregion
|
# endregion
|
||||||
|
|||||||
@@ -1,276 +0,0 @@
|
|||||||
from typing import TYPE_CHECKING
|
|
||||||
|
|
||||||
from sqlmodel import (
|
|
||||||
Session,
|
|
||||||
Relationship,
|
|
||||||
Field,
|
|
||||||
)
|
|
||||||
|
|
||||||
from . import mixin
|
|
||||||
from .base import (
|
|
||||||
BaseSQLModel,
|
|
||||||
DocumentedStrEnum,
|
|
||||||
DocumentedIntFlag,
|
|
||||||
DocumentedStrFlagType,
|
|
||||||
auto_enum,
|
|
||||||
RowId,
|
|
||||||
)
|
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
|
||||||
from .route import Route
|
|
||||||
from .team import Team
|
|
||||||
|
|
||||||
# region # Hike ################################################################
|
|
||||||
|
|
||||||
|
|
||||||
class HikeTeamPage(DocumentedIntFlag):
|
|
||||||
ROUTE = auto_enum() # Route steps info
|
|
||||||
QUESTIONS = auto_enum() # Visible questions
|
|
||||||
|
|
||||||
VISITED_PLACES = auto_enum() # Places the team has been
|
|
||||||
CURRENT_PLACE = auto_enum() # Place where the team currently is
|
|
||||||
UNVISITED_PLACES = auto_enum() # Places the team still neat to visit
|
|
||||||
|
|
||||||
PLACES = VISITED_PLACES | CURRENT_PLACE | UNVISITED_PLACES # All the places
|
|
||||||
|
|
||||||
TRAVEL_TIME = auto_enum() # Total travel time
|
|
||||||
TRAVEL_FREE_TIME = auto_enum() # Total travel time
|
|
||||||
PLACE_TIME = auto_enum() # Total place time
|
|
||||||
CALCULATE_TIME = auto_enum() # Total time calculated based on hike settings
|
|
||||||
# TODO: Think about time between oter teams
|
|
||||||
|
|
||||||
PLACE_POINTS = auto_enum() # Points scored on a place
|
|
||||||
TOTAL_PLACE_POINTS = auto_enum() # All points got on all places
|
|
||||||
ASSIST_POINTS = auto_enum() # Minus points for assistants
|
|
||||||
# TODO: Think about place in classement
|
|
||||||
|
|
||||||
ASSIST_LOG = auto_enum() # Assisted items
|
|
||||||
ASSIST_LATTER = auto_enum() # ???
|
|
||||||
|
|
||||||
|
|
||||||
# ##############################################################################
|
|
||||||
|
|
||||||
|
|
||||||
class HikeTimeCalculation(DocumentedIntFlag):
|
|
||||||
TRAVEL_TIME = auto_enum() # Time traveling
|
|
||||||
|
|
||||||
# TODO: Think about time groups (in this model we have 2, free and non free)
|
|
||||||
TRAVEL_FREE_TIME = auto_enum() # Time that is excluded from traveling but not at a place
|
|
||||||
PLACE_TIME = auto_enum() # Time checked in at a place
|
|
||||||
|
|
||||||
TOTAL_TIME = TRAVEL_TIME | TRAVEL_FREE_TIME | PLACE_TIME
|
|
||||||
|
|
||||||
|
|
||||||
# ##############################################################################
|
|
||||||
|
|
||||||
|
|
||||||
class HikeVisitLogType(DocumentedStrEnum):
|
|
||||||
FIRST_VISIT = auto_enum()
|
|
||||||
LAST_VISIT = auto_enum()
|
|
||||||
|
|
||||||
LONGEST_VISIT = auto_enum()
|
|
||||||
SHORTEST_VISIT = auto_enum()
|
|
||||||
|
|
||||||
EACH_VISIT = auto_enum()
|
|
||||||
|
|
||||||
DISABLE_VISIT_LOGING = auto_enum()
|
|
||||||
|
|
||||||
|
|
||||||
# ##############################################################################
|
|
||||||
|
|
||||||
|
|
||||||
class HikeBase(
|
|
||||||
mixin.Name,
|
|
||||||
mixin.Contact,
|
|
||||||
BaseSQLModel,
|
|
||||||
):
|
|
||||||
tracker_interval: int | None = Field(
|
|
||||||
default=None,
|
|
||||||
nullable=True,
|
|
||||||
description="Is GPS button available, value will be the interval",
|
|
||||||
)
|
|
||||||
|
|
||||||
is_multi_day: bool = Field(
|
|
||||||
default=False,
|
|
||||||
nullable=False,
|
|
||||||
description="Show datetime in stead of time only",
|
|
||||||
)
|
|
||||||
|
|
||||||
team_page: HikeTeamPage = Field(
|
|
||||||
default=HikeTeamPage.PLACES | HikeTeamPage.ROUTE | HikeTeamPage.QUESTIONS,
|
|
||||||
nullable=False,
|
|
||||||
description="Show all the places of the route inside the teams page",
|
|
||||||
sa_type=DocumentedStrFlagType(HikeTeamPage),
|
|
||||||
)
|
|
||||||
|
|
||||||
time_calculation: HikeTimeCalculation = Field(
|
|
||||||
default=HikeTimeCalculation.TRAVEL_TIME,
|
|
||||||
nullable=False,
|
|
||||||
description="Wath should we calculate inside the total time",
|
|
||||||
sa_type=DocumentedStrFlagType(HikeTimeCalculation),
|
|
||||||
)
|
|
||||||
|
|
||||||
visit_log_type: HikeVisitLogType = Field(
|
|
||||||
default=HikeVisitLogType.FIRST_VISIT,
|
|
||||||
nullable=False,
|
|
||||||
)
|
|
||||||
|
|
||||||
min_time_points: int | None = Field(
|
|
||||||
default=0,
|
|
||||||
ge=0,
|
|
||||||
# TODO: le: max_time_points
|
|
||||||
description="Min points for time",
|
|
||||||
)
|
|
||||||
|
|
||||||
max_time_points: int | None = Field(
|
|
||||||
default=100,
|
|
||||||
ge=0, # TODO: ge > min_time_points
|
|
||||||
description="Max points for time",
|
|
||||||
)
|
|
||||||
|
|
||||||
max_question_points: int | None = Field(
|
|
||||||
default=None,
|
|
||||||
description="None: Calculate from answers (no max), Positive: Set as max for dynamic range",
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
# Properties to receive via API on creation
|
|
||||||
class HikeCreate(HikeBase):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
# Properties to receive via API on update, all are optional
|
|
||||||
class HikeUpdate(HikeBase):
|
|
||||||
is_multi_day: bool | None = Field(default=None) # type: ignore
|
|
||||||
team_page: HikeTeamPage | None = Field(default=None) # type: ignore
|
|
||||||
time_calculation: HikeTimeCalculation | None = Field(default=None) # type: ignore
|
|
||||||
visit_log_type: HikeVisitLogType | None = Field(default=None) # type: ignore
|
|
||||||
|
|
||||||
|
|
||||||
class Hike(mixin.RowId, HikeBase, table=True):
|
|
||||||
# --- database only items --------------------------------------------------
|
|
||||||
|
|
||||||
# --- read only items ------------------------------------------------------
|
|
||||||
|
|
||||||
# --- back_populates links -------------------------------------------------
|
|
||||||
routes: list["Route"] = Relationship(back_populates="hike", cascade_delete=True)
|
|
||||||
teams: list["HikeTeamLink"] = Relationship(back_populates="hike", cascade_delete=True)
|
|
||||||
|
|
||||||
# --- CRUD actions ---------------------------------------------------------
|
|
||||||
@classmethod
|
|
||||||
def create(cls, *, session: Session, create_obj: HikeCreate) -> "Hike":
|
|
||||||
data_obj = create_obj.model_dump(exclude_unset=True)
|
|
||||||
|
|
||||||
db_obj = cls.model_validate(data_obj)
|
|
||||||
session.add(db_obj)
|
|
||||||
session.commit()
|
|
||||||
session.refresh(db_obj)
|
|
||||||
return db_obj
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def update(
|
|
||||||
cls, *, session: Session, db_obj: "Hike", in_obj: HikeUpdate
|
|
||||||
) -> "Hike":
|
|
||||||
data_obj = in_obj.model_dump(exclude_unset=True)
|
|
||||||
db_obj.sqlmodel_update(data_obj)
|
|
||||||
session.add(db_obj)
|
|
||||||
session.commit()
|
|
||||||
session.refresh(db_obj)
|
|
||||||
return db_obj
|
|
||||||
|
|
||||||
|
|
||||||
# Properties to return via API, id is always required
|
|
||||||
class HikePublic(mixin.RowIdPublic, HikeBase):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class HikesPublic(BaseSQLModel):
|
|
||||||
data: list[HikePublic]
|
|
||||||
count: int
|
|
||||||
|
|
||||||
|
|
||||||
# endregion
|
|
||||||
|
|
||||||
# region # Hike / Team #########################################################
|
|
||||||
|
|
||||||
class HikeTeamLinkBase(
|
|
||||||
BaseSQLModel,
|
|
||||||
):
|
|
||||||
hike_id: RowId = Field(
|
|
||||||
nullable=False,
|
|
||||||
foreign_key="hike.id",
|
|
||||||
)
|
|
||||||
|
|
||||||
team_id: RowId = Field(
|
|
||||||
nullable=False,
|
|
||||||
foreign_key="team.id",
|
|
||||||
)
|
|
||||||
|
|
||||||
route_id: RowId = Field(
|
|
||||||
nullable=False,
|
|
||||||
foreign_key="route.id",
|
|
||||||
)
|
|
||||||
|
|
||||||
start_place_id: RowId | None = Field(
|
|
||||||
default=None,
|
|
||||||
nullable=True,
|
|
||||||
foreign_key="place.id",
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
# Properties to receive via API on creation
|
|
||||||
class HikeTeamLinkCreate(HikeTeamLinkBase):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
# Properties to receive via API on update, all are optional
|
|
||||||
class HikeTeamLinkUpdate(HikeTeamLinkBase):
|
|
||||||
hike_id: RowId | None = Field(default=None, nullable=True) # type: ignore
|
|
||||||
team_id: RowId | None = Field(default=None, nullable=True) # type: ignore
|
|
||||||
route_id: RowId | None = Field(default=None, nullable=True) # type: ignore
|
|
||||||
|
|
||||||
|
|
||||||
class HikeTeamLink(mixin.RowId, HikeTeamLinkBase, table=True):
|
|
||||||
# --- database only items --------------------------------------------------
|
|
||||||
|
|
||||||
# --- read only items ------------------------------------------------------
|
|
||||||
|
|
||||||
# --- back_populates links -------------------------------------------------
|
|
||||||
team: "Team" = Relationship(back_populates="hike_links")
|
|
||||||
hike: "Hike" = Relationship(back_populates="teams")
|
|
||||||
route: "Route" = Relationship(back_populates="teams")
|
|
||||||
# start_place: "Place" = Relationship(back_populates="teams")
|
|
||||||
|
|
||||||
# --- CRUD actions ---------------------------------------------------------
|
|
||||||
@classmethod
|
|
||||||
def create(cls, *, session: Session, create_obj: HikeTeamLinkCreate) -> "HikeTeamLink":
|
|
||||||
data_obj = create_obj.model_dump(exclude_unset=True)
|
|
||||||
|
|
||||||
db_obj = cls.model_validate(data_obj)
|
|
||||||
session.add(db_obj)
|
|
||||||
session.commit()
|
|
||||||
session.refresh(db_obj)
|
|
||||||
return db_obj
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def update(
|
|
||||||
cls, *, session: Session, db_obj: "HikeTeamLink", in_obj: HikeTeamLinkUpdate
|
|
||||||
) -> "HikeTeamLink":
|
|
||||||
data_obj = in_obj.model_dump(exclude_unset=True)
|
|
||||||
db_obj.sqlmodel_update(data_obj)
|
|
||||||
session.add(db_obj)
|
|
||||||
session.commit()
|
|
||||||
session.refresh(db_obj)
|
|
||||||
return db_obj
|
|
||||||
|
|
||||||
|
|
||||||
# Properties to return via API, id is always required
|
|
||||||
class HikeTeamLinkPublic(mixin.RowIdPublic, HikeTeamLinkBase):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class HikeTeamLinksPublic(BaseSQLModel):
|
|
||||||
data: list[HikeTeamLinkPublic]
|
|
||||||
count: int
|
|
||||||
|
|
||||||
# endregion
|
|
||||||
@@ -1,6 +1,5 @@
|
|||||||
import uuid
|
import uuid
|
||||||
from datetime import datetime, date
|
from datetime import datetime, date
|
||||||
from decimal import Decimal
|
|
||||||
|
|
||||||
from pydantic import BaseModel, EmailStr
|
from pydantic import BaseModel, EmailStr
|
||||||
from sqlmodel import (
|
from sqlmodel import (
|
||||||
@@ -15,10 +14,6 @@ class Name(BaseModel):
|
|||||||
name: str | None = Field(default=None, nullable=False, unique=False, max_length=255)
|
name: str | None = Field(default=None, nullable=False, unique=False, max_length=255)
|
||||||
|
|
||||||
|
|
||||||
class NameOveride(BaseModel):
|
|
||||||
name_overide: str | None = Field(default=None, nullable=True, unique=False, max_length=255)
|
|
||||||
|
|
||||||
|
|
||||||
class FullName(BaseModel):
|
class FullName(BaseModel):
|
||||||
full_name: str | None = Field(default=None, nullable=True, max_length=255)
|
full_name: str | None = Field(default=None, nullable=True, max_length=255)
|
||||||
|
|
||||||
@@ -36,12 +31,7 @@ class ShortName(BaseModel):
|
|||||||
|
|
||||||
|
|
||||||
class ShortNameUpdate(ShortName):
|
class ShortNameUpdate(ShortName):
|
||||||
#TODO: Waarom is deze verplicht ???
|
short_name: str | None = Field(default=None, max_length=8)
|
||||||
short_name: str | None = Field(default=None, nullable=True, max_length=8)
|
|
||||||
|
|
||||||
|
|
||||||
class ShortNameOveride(ShortName):
|
|
||||||
short_name_overide: str | None = Field(default=None, nullable=True, max_length=8)
|
|
||||||
|
|
||||||
|
|
||||||
class Contact(BaseModel):
|
class Contact(BaseModel):
|
||||||
@@ -130,17 +120,3 @@ class Birthday(BaseModel):
|
|||||||
class Created(BaseModel):
|
class Created(BaseModel):
|
||||||
created_at: datetime | None = Field(nullable=False, default_factory=lambda: datetime.now(settings.tz_info))
|
created_at: datetime | None = Field(nullable=False, default_factory=lambda: datetime.now(settings.tz_info))
|
||||||
created_by: RowIdType | None = Field(default=None, nullable=True, foreign_key="user.id", ondelete="SET NULL")
|
created_by: RowIdType | None = Field(default=None, nullable=True, foreign_key="user.id", ondelete="SET NULL")
|
||||||
|
|
||||||
|
|
||||||
class Location(BaseModel):
|
|
||||||
latitude: Decimal | None = Field(default=None, nullable=True, max_digits=11, decimal_places=8, description="decimal degrees")
|
|
||||||
longitude: Decimal | None = Field(default=None, nullable=True, max_digits=11, decimal_places=8, description="decimal degrees")
|
|
||||||
|
|
||||||
|
|
||||||
class LocationWithRadius(Location):
|
|
||||||
radius: int | None = Field(default=None, nullable=True, description="Radius in meters")
|
|
||||||
|
|
||||||
|
|
||||||
class QuestionInfo:
|
|
||||||
question_file: str | None = Field(default=None, nullable=True, max_length=255)
|
|
||||||
answer_file: str | None = Field(default=None, nullable=True, max_length=255)
|
|
||||||
|
|||||||
@@ -1,179 +0,0 @@
|
|||||||
from datetime import timedelta
|
|
||||||
from typing import TYPE_CHECKING
|
|
||||||
|
|
||||||
from sqlalchemy import Interval
|
|
||||||
from sqlmodel import (
|
|
||||||
Session,
|
|
||||||
Relationship,
|
|
||||||
Field,
|
|
||||||
)
|
|
||||||
|
|
||||||
from . import mixin
|
|
||||||
from .base import (
|
|
||||||
BaseSQLModel,
|
|
||||||
DocumentedStrEnum,
|
|
||||||
auto_enum,
|
|
||||||
)
|
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
|
||||||
from .route import Route, RoutePart
|
|
||||||
|
|
||||||
# region # Place ###############################################################
|
|
||||||
|
|
||||||
|
|
||||||
class PlaceType(DocumentedStrEnum):
|
|
||||||
START = auto_enum() # Only check in (make GPS available) (TODO: determine based on route)
|
|
||||||
PLACE = auto_enum() # Check in / Check out (subtract time between in&out)
|
|
||||||
TAG = auto_enum() # Instant in&out at the same time
|
|
||||||
FINISH = auto_enum() # Only check out (and disable GPS) (TODO: determine based on route)
|
|
||||||
|
|
||||||
|
|
||||||
# ##############################################################################
|
|
||||||
|
|
||||||
|
|
||||||
class VisitedCountType(DocumentedStrEnum):
|
|
||||||
NONE = auto_enum()
|
|
||||||
ONE_VISIT = auto_enum()
|
|
||||||
EACH_VISIT = auto_enum()
|
|
||||||
|
|
||||||
|
|
||||||
# ##############################################################################
|
|
||||||
|
|
||||||
|
|
||||||
class PlaceBase(
|
|
||||||
mixin.Name,
|
|
||||||
mixin.ShortName,
|
|
||||||
mixin.Contact,
|
|
||||||
mixin.Description,
|
|
||||||
mixin.LocationWithRadius,
|
|
||||||
mixin.QuestionInfo,
|
|
||||||
BaseSQLModel,
|
|
||||||
):
|
|
||||||
place_type: PlaceType = Field(
|
|
||||||
default=PlaceType.PLACE,
|
|
||||||
nullable=False,
|
|
||||||
)
|
|
||||||
|
|
||||||
max_points: int | None = Field(
|
|
||||||
default=None,
|
|
||||||
ge=0,
|
|
||||||
description="Max points for this place, None will disable scoring at this place",
|
|
||||||
)
|
|
||||||
|
|
||||||
visited_points: int | None = Field(
|
|
||||||
default=None,
|
|
||||||
ge=0,
|
|
||||||
description="Visited points for this place, None will disable this function",
|
|
||||||
)
|
|
||||||
visited_count_type: VisitedCountType = Field(
|
|
||||||
default=VisitedCountType.NONE,
|
|
||||||
)
|
|
||||||
|
|
||||||
skipped_penalty_points: int | None = Field(
|
|
||||||
default=None,
|
|
||||||
ge=0,
|
|
||||||
description="Skipped penalty for this place, amount will be subtracted, None will disable this function",
|
|
||||||
)
|
|
||||||
|
|
||||||
place_time: timedelta | None = Field(
|
|
||||||
default=None,
|
|
||||||
nullable=True,
|
|
||||||
description="Time for question at this place during testing.",
|
|
||||||
sa_type=Interval,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
# Properties to receive via API on creation
|
|
||||||
class PlaceCreate(PlaceBase):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
# Properties to receive via API on update, all are optional
|
|
||||||
class PlaceUpdate(
|
|
||||||
PlaceBase,
|
|
||||||
mixin.ShortNameUpdate,
|
|
||||||
):
|
|
||||||
place_type: PlaceType | None = Field(default=None) # type: ignore
|
|
||||||
visited_count_type: VisitedCountType | None = Field(default=None) # type: ignore
|
|
||||||
|
|
||||||
|
|
||||||
class Place(mixin.RowId, PlaceBase, table=True):
|
|
||||||
# --- database only items --------------------------------------------------
|
|
||||||
|
|
||||||
# --- read only items ------------------------------------------------------
|
|
||||||
|
|
||||||
# --- back_populates links -------------------------------------------------
|
|
||||||
routes: list["Route"] = Relationship(
|
|
||||||
sa_relationship_kwargs={
|
|
||||||
"primaryjoin": "or_("
|
|
||||||
"Place.id==Route.start_id,"
|
|
||||||
"Place.id==Route.finish_id,"
|
|
||||||
"Place.id==RoutePart.place_id,"
|
|
||||||
"Place.id==RoutePart.to_place_id,"
|
|
||||||
")",
|
|
||||||
"foreign_keys": "["
|
|
||||||
"Route.start_id,"
|
|
||||||
"Route.finish_id,"
|
|
||||||
"RoutePart.place_id,"
|
|
||||||
"RoutePart.to_place_id"
|
|
||||||
"]",
|
|
||||||
"viewonly": True,
|
|
||||||
},
|
|
||||||
)
|
|
||||||
|
|
||||||
next_places: list["Place"] = Relationship(
|
|
||||||
back_populates="previous_place",
|
|
||||||
sa_relationship_kwargs={
|
|
||||||
"secondary": "route_part",
|
|
||||||
"primaryjoin": "Place.id == RoutePart.place_id",
|
|
||||||
"secondaryjoin": "Place.id == RoutePart.to_place_id",
|
|
||||||
"viewonly": True,
|
|
||||||
},
|
|
||||||
)
|
|
||||||
previous_place: list["Place"] = Relationship(
|
|
||||||
back_populates="next_places",
|
|
||||||
sa_relationship_kwargs={
|
|
||||||
"secondary": "route_part",
|
|
||||||
"primaryjoin": "Place.id == RoutePart.to_place_id",
|
|
||||||
"secondaryjoin": "Place.id == RoutePart.place_id",
|
|
||||||
"viewonly": True,
|
|
||||||
},
|
|
||||||
)
|
|
||||||
|
|
||||||
route_parts: list["RoutePart"] = Relationship(back_populates="place", sa_relationship_kwargs={"foreign_keys": "[RoutePart.place_id]"})
|
|
||||||
|
|
||||||
# --- CRUD actions ---------------------------------------------------------
|
|
||||||
@classmethod
|
|
||||||
def create(cls, *, session: Session, create_obj: PlaceCreate) -> "Place":
|
|
||||||
data_obj = create_obj.model_dump(exclude_unset=True)
|
|
||||||
|
|
||||||
db_obj = cls.model_validate(data_obj)
|
|
||||||
session.add(db_obj)
|
|
||||||
session.commit()
|
|
||||||
session.refresh(db_obj)
|
|
||||||
return db_obj
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def update(
|
|
||||||
cls, *, session: Session, db_obj: "Place", in_obj: PlaceUpdate
|
|
||||||
) -> "Place":
|
|
||||||
data_obj = in_obj.model_dump(exclude_unset=True)
|
|
||||||
db_obj.sqlmodel_update(data_obj)
|
|
||||||
session.add(db_obj)
|
|
||||||
session.commit()
|
|
||||||
session.refresh(db_obj)
|
|
||||||
return db_obj
|
|
||||||
|
|
||||||
|
|
||||||
# Properties to return via API, id is always required
|
|
||||||
class PlacePublic(mixin.RowIdPublic, PlaceBase):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class PlacesPublic(BaseSQLModel):
|
|
||||||
data: list[PlacePublic]
|
|
||||||
count: int
|
|
||||||
|
|
||||||
|
|
||||||
# endregion
|
|
||||||
@@ -1,283 +0,0 @@
|
|||||||
from datetime import timedelta
|
|
||||||
from typing import TYPE_CHECKING
|
|
||||||
|
|
||||||
from sqlalchemy import Interval
|
|
||||||
from sqlmodel import (
|
|
||||||
Session,
|
|
||||||
Relationship,
|
|
||||||
Field,
|
|
||||||
)
|
|
||||||
|
|
||||||
from . import mixin
|
|
||||||
from .base import (
|
|
||||||
BaseSQLModel,
|
|
||||||
DocumentedStrEnum,
|
|
||||||
DocumentedStrFlagType,
|
|
||||||
auto_enum,
|
|
||||||
RowId,
|
|
||||||
)
|
|
||||||
|
|
||||||
from .hike import (
|
|
||||||
Hike,
|
|
||||||
HikeTimeCalculation,
|
|
||||||
HikeTeamLink,
|
|
||||||
)
|
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
|
||||||
from .place import Place
|
|
||||||
|
|
||||||
# region # Route ###############################################################
|
|
||||||
|
|
||||||
|
|
||||||
class RouteType(DocumentedStrEnum):
|
|
||||||
START_FINISH = auto_enum() # Start at the start and end at the finish
|
|
||||||
CIRCULAR = auto_enum() # Start some ware, finish at the last new place
|
|
||||||
CIRCULAR_BACK_TO_START = auto_enum() # Start and finish on the same random place (CIRCULAR + next to start)
|
|
||||||
|
|
||||||
|
|
||||||
# ##############################################################################
|
|
||||||
|
|
||||||
|
|
||||||
class RouteBase(
|
|
||||||
mixin.Name,
|
|
||||||
mixin.Contact,
|
|
||||||
BaseSQLModel,
|
|
||||||
):
|
|
||||||
route_type: RouteType = Field(
|
|
||||||
default=RouteType.START_FINISH,
|
|
||||||
nullable=False,
|
|
||||||
)
|
|
||||||
|
|
||||||
time_calculation_override: HikeTimeCalculation | None = Field(
|
|
||||||
default=None,
|
|
||||||
nullable=True,
|
|
||||||
description="Should this route be calculated different then the main hike",
|
|
||||||
sa_type=DocumentedStrFlagType(HikeTimeCalculation),
|
|
||||||
)
|
|
||||||
|
|
||||||
min_time: timedelta | None = Field(
|
|
||||||
default=None,
|
|
||||||
description="Min time correction, None = min of all, positive is used as 0:00, negative is subtracted from min of all time",
|
|
||||||
sa_type=Interval,
|
|
||||||
)
|
|
||||||
|
|
||||||
max_time: timedelta | None = Field(
|
|
||||||
default=None,
|
|
||||||
description="Max time correction, None = max of all, positive is used as max, negative is subtracted from max of all time",
|
|
||||||
sa_type=Interval,
|
|
||||||
)
|
|
||||||
|
|
||||||
hike_id: RowId = Field(
|
|
||||||
nullable=False,
|
|
||||||
foreign_key="hike.id",
|
|
||||||
)
|
|
||||||
|
|
||||||
start_id: RowId | None = Field(
|
|
||||||
default=None,
|
|
||||||
nullable=True,
|
|
||||||
foreign_key="place.id",
|
|
||||||
ondelete="SET NULL",
|
|
||||||
description="Start place.id of the route, None for random in CIRCULAR or CIRCULAR_BACK_TO_START",
|
|
||||||
)
|
|
||||||
|
|
||||||
finish_id: RowId | None = Field(
|
|
||||||
default=None,
|
|
||||||
nullable=True,
|
|
||||||
foreign_key="place.id",
|
|
||||||
ondelete="SET NULL",
|
|
||||||
description="Finish place.id of the route, None for random or decremented",
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
# Properties to receive via API on creation
|
|
||||||
class RouteCreate(RouteBase):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
# Properties to receive via API on update, all are optional
|
|
||||||
class RouteUpdate(RouteBase):
|
|
||||||
route_type: RouteType | None = Field(default=None) # type: ignore
|
|
||||||
hike_id: RowId | None = Field(default=None) # type: ignore
|
|
||||||
|
|
||||||
|
|
||||||
class Route(mixin.RowId, RouteBase, table=True):
|
|
||||||
# --- database only items --------------------------------------------------
|
|
||||||
|
|
||||||
# --- read only items ------------------------------------------------------
|
|
||||||
|
|
||||||
# --- back_populates links -------------------------------------------------
|
|
||||||
hike: "Hike" = Relationship(back_populates="routes")
|
|
||||||
teams: list["HikeTeamLink"] = Relationship(back_populates="route")
|
|
||||||
|
|
||||||
start: "Place" = Relationship(sa_relationship_kwargs={"foreign_keys": "[Route.start_id]"})
|
|
||||||
finish: "Place" = Relationship(sa_relationship_kwargs={"foreign_keys": "[Route.finish_id]"})
|
|
||||||
|
|
||||||
places: list["Place"] = Relationship(
|
|
||||||
sa_relationship_kwargs={
|
|
||||||
"primaryjoin": "or_("
|
|
||||||
"Place.id==Route.start_id,"
|
|
||||||
"Place.id==Route.finish_id,"
|
|
||||||
"Place.id==RoutePart.place_id,"
|
|
||||||
"Place.id==RoutePart.to_place_id,"
|
|
||||||
")",
|
|
||||||
"foreign_keys": "["
|
|
||||||
"Route.start_id,"
|
|
||||||
"Route.finish_id,"
|
|
||||||
"RoutePart.place_id,"
|
|
||||||
"RoutePart.to_place_id"
|
|
||||||
"]",
|
|
||||||
"viewonly": True,
|
|
||||||
},
|
|
||||||
)
|
|
||||||
|
|
||||||
route_parts: list["RoutePart"] = Relationship(back_populates="route")
|
|
||||||
|
|
||||||
# --- CRUD actions ---------------------------------------------------------
|
|
||||||
@classmethod
|
|
||||||
def create(cls, *, session: Session, create_obj: RouteCreate) -> "Route":
|
|
||||||
data_obj = create_obj.model_dump(exclude_unset=True)
|
|
||||||
|
|
||||||
db_obj = cls.model_validate(data_obj)
|
|
||||||
session.add(db_obj)
|
|
||||||
session.commit()
|
|
||||||
session.refresh(db_obj)
|
|
||||||
return db_obj
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def update(
|
|
||||||
cls, *, session: Session, db_obj: "Route", in_obj: RouteUpdate
|
|
||||||
) -> "Route":
|
|
||||||
data_obj = in_obj.model_dump(exclude_unset=True)
|
|
||||||
db_obj.sqlmodel_update(data_obj)
|
|
||||||
session.add(db_obj)
|
|
||||||
session.commit()
|
|
||||||
session.refresh(db_obj)
|
|
||||||
return db_obj
|
|
||||||
|
|
||||||
|
|
||||||
# Properties to return via API, id is always required
|
|
||||||
class RoutePublic(mixin.RowIdPublic, RouteBase):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class RoutesPublic(BaseSQLModel):
|
|
||||||
data: list[RoutePublic]
|
|
||||||
count: int
|
|
||||||
|
|
||||||
|
|
||||||
# endregion
|
|
||||||
|
|
||||||
# region # Route / Place link ##################################################
|
|
||||||
|
|
||||||
|
|
||||||
class RoutePartBase(
|
|
||||||
mixin.NameOveride,
|
|
||||||
mixin.ShortNameOveride,
|
|
||||||
BaseSQLModel,
|
|
||||||
):
|
|
||||||
route_id: RowId = Field(
|
|
||||||
nullable=False,
|
|
||||||
foreign_key="route.id",
|
|
||||||
)
|
|
||||||
|
|
||||||
place_id: RowId = Field(
|
|
||||||
nullable=False,
|
|
||||||
foreign_key="place.id",
|
|
||||||
)
|
|
||||||
|
|
||||||
to_place_id: RowId = Field(
|
|
||||||
nullable=False,
|
|
||||||
foreign_key="place.id",
|
|
||||||
)
|
|
||||||
|
|
||||||
travel_time: timedelta | None = Field(
|
|
||||||
default=None,
|
|
||||||
nullable=True,
|
|
||||||
description="Time to travel during test walk, only used for information",
|
|
||||||
sa_type=Interval,
|
|
||||||
)
|
|
||||||
travel_distance: int | None = Field(
|
|
||||||
default=None,
|
|
||||||
nullable=True,
|
|
||||||
ge=0,
|
|
||||||
description="Distance in meters to travel, only used for information",
|
|
||||||
)
|
|
||||||
|
|
||||||
# TODO: Convert to time groups
|
|
||||||
free_walk_time: bool | None = Field(
|
|
||||||
default=False,
|
|
||||||
description="If true, travel time is not add to calculated for travel time",
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
# Properties to receive via API on creation
|
|
||||||
class RoutePartBaseCreate(RoutePartBase):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
# Properties to receive via API on update, all are optional
|
|
||||||
class RoutePartBaseUpdate(RoutePartBase):
|
|
||||||
route_id: RowId | None = Field(default=None) # type: ignore
|
|
||||||
place_id: RowId | None = Field(default=None) # type: ignore
|
|
||||||
to_place_id: RowId | None = Field(default=None) # type: ignore
|
|
||||||
free_walk_time: bool | None = Field(default=None) # type: ignore
|
|
||||||
|
|
||||||
|
|
||||||
class RoutePart(mixin.RowId, RoutePartBase, table=True):
|
|
||||||
# --- database only items --------------------------------------------------
|
|
||||||
|
|
||||||
# --- read only items ------------------------------------------------------
|
|
||||||
|
|
||||||
# --- back_populates links -------------------------------------------------
|
|
||||||
route: "Route" = Relationship(back_populates="route_parts")
|
|
||||||
place: "Place" = Relationship(back_populates="route_parts", sa_relationship_kwargs={"foreign_keys": "[RoutePart.place_id]"})
|
|
||||||
place_to: "Place" = Relationship(sa_relationship_kwargs={"foreign_keys": "[RoutePart.to_place_id]"})
|
|
||||||
|
|
||||||
places: list["Place"] = Relationship(
|
|
||||||
sa_relationship_kwargs={
|
|
||||||
"primaryjoin": "or_("
|
|
||||||
"Place.id==RoutePart.place_id,"
|
|
||||||
"Place.id==RoutePart.to_place_id,"
|
|
||||||
")",
|
|
||||||
"foreign_keys": "["
|
|
||||||
"RoutePart.place_id,"
|
|
||||||
"RoutePart.to_place_id"
|
|
||||||
"]",
|
|
||||||
"viewonly": True,
|
|
||||||
},
|
|
||||||
)
|
|
||||||
|
|
||||||
# --- CRUD actions ---------------------------------------------------------
|
|
||||||
@classmethod
|
|
||||||
def create(cls, *, session: Session, create_obj: RouteCreate) -> "RoutePart":
|
|
||||||
data_obj = create_obj.model_dump(exclude_unset=True)
|
|
||||||
|
|
||||||
db_obj = cls.model_validate(data_obj)
|
|
||||||
session.add(db_obj)
|
|
||||||
session.commit()
|
|
||||||
session.refresh(db_obj)
|
|
||||||
return db_obj
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def update(
|
|
||||||
cls, *, session: Session, db_obj: "RoutePart", in_obj: RouteUpdate
|
|
||||||
) -> "RoutePart":
|
|
||||||
data_obj = in_obj.model_dump(exclude_unset=True)
|
|
||||||
db_obj.sqlmodel_update(data_obj)
|
|
||||||
session.add(db_obj)
|
|
||||||
session.commit()
|
|
||||||
session.refresh(db_obj)
|
|
||||||
return db_obj
|
|
||||||
|
|
||||||
|
|
||||||
# Properties to return via API, id is always required
|
|
||||||
class RoutePartPublic(mixin.RowIdPublic, RoutePartBase):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class RoutePartsPublic(BaseSQLModel):
|
|
||||||
data: list[RoutePartPublic]
|
|
||||||
count: int
|
|
||||||
|
|
||||||
|
|
||||||
# endregion
|
|
||||||
@@ -16,7 +16,6 @@ if TYPE_CHECKING:
|
|||||||
from .event import Event
|
from .event import Event
|
||||||
from .division import DivisionTeamLink
|
from .division import DivisionTeamLink
|
||||||
from .member import MemberTeamLink
|
from .member import MemberTeamLink
|
||||||
from .hike import HikeTeamLink
|
|
||||||
|
|
||||||
# region # Team ################################################################
|
# region # Team ################################################################
|
||||||
|
|
||||||
@@ -52,7 +51,6 @@ class Team(mixin.RowId, TeamBase, table=True):
|
|||||||
event: "Event" = Relationship(back_populates="teams")
|
event: "Event" = Relationship(back_populates="teams")
|
||||||
division_link: "DivisionTeamLink" = Relationship(back_populates="team", cascade_delete=True)
|
division_link: "DivisionTeamLink" = Relationship(back_populates="team", cascade_delete=True)
|
||||||
member_links: list["MemberTeamLink"] = Relationship(back_populates="team", cascade_delete=True)
|
member_links: list["MemberTeamLink"] = Relationship(back_populates="team", cascade_delete=True)
|
||||||
hike_links: list["HikeTeamLink"] = Relationship(back_populates="team", cascade_delete=True)
|
|
||||||
|
|
||||||
# --- CRUD actions ---------------------------------------------------------
|
# --- CRUD actions ---------------------------------------------------------
|
||||||
@classmethod
|
@classmethod
|
||||||
|
|||||||
@@ -18,6 +18,7 @@ if TYPE_CHECKING:
|
|||||||
from .apikey import ApiKey
|
from .apikey import ApiKey
|
||||||
from .event import EventUserLink
|
from .event import EventUserLink
|
||||||
from .member import Member
|
from .member import Member
|
||||||
|
from .serie import SerieUserLink
|
||||||
|
|
||||||
|
|
||||||
# region # User ################################################################
|
# region # User ################################################################
|
||||||
@@ -31,10 +32,7 @@ class PermissionModule(DocumentedStrEnum):
|
|||||||
ASSOCIATION = auto_enum()
|
ASSOCIATION = auto_enum()
|
||||||
DIVISION = auto_enum()
|
DIVISION = auto_enum()
|
||||||
MEMBER = auto_enum()
|
MEMBER = auto_enum()
|
||||||
|
SERIE = auto_enum()
|
||||||
HIKE = auto_enum()
|
|
||||||
ROUTE = auto_enum()
|
|
||||||
PLACE = auto_enum()
|
|
||||||
|
|
||||||
|
|
||||||
class PermissionPart(DocumentedStrEnum):
|
class PermissionPart(DocumentedStrEnum):
|
||||||
@@ -53,8 +51,6 @@ class PermissionRight(DocumentedIntFlag):
|
|||||||
MANAGE_DIVISIONS = auto_enum()
|
MANAGE_DIVISIONS = auto_enum()
|
||||||
MANAGE_MEMBERS = auto_enum()
|
MANAGE_MEMBERS = auto_enum()
|
||||||
|
|
||||||
MANAGE_HIKES = auto_enum()
|
|
||||||
|
|
||||||
ADMIN = ( CREATE
|
ADMIN = ( CREATE
|
||||||
| READ
|
| READ
|
||||||
| UPDATE
|
| UPDATE
|
||||||
@@ -63,7 +59,6 @@ class PermissionRight(DocumentedIntFlag):
|
|||||||
| MANAGE_TEAMS
|
| MANAGE_TEAMS
|
||||||
| MANAGE_DIVISIONS
|
| MANAGE_DIVISIONS
|
||||||
| MANAGE_MEMBERS
|
| MANAGE_MEMBERS
|
||||||
| MANAGE_HIKES
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@@ -143,6 +138,7 @@ class User(mixin.RowId, UserBase, table=True):
|
|||||||
# --- many-to-many links ---------------------------------------------------
|
# --- many-to-many links ---------------------------------------------------
|
||||||
roles: list["Role"] = Relationship(back_populates="users", link_model=UserRoleLink)
|
roles: list["Role"] = Relationship(back_populates="users", link_model=UserRoleLink)
|
||||||
event_links: list["EventUserLink"] = Relationship(back_populates="user")
|
event_links: list["EventUserLink"] = Relationship(back_populates="user")
|
||||||
|
serie_links: list["SerieUserLink"] = Relationship(back_populates="user")
|
||||||
|
|
||||||
# --- CRUD actions ---------------------------------------------------------
|
# --- CRUD actions ---------------------------------------------------------
|
||||||
@classmethod
|
@classmethod
|
||||||
|
|||||||
@@ -1,223 +0,0 @@
|
|||||||
import uuid
|
|
||||||
|
|
||||||
from fastapi import status
|
|
||||||
from fastapi.testclient import TestClient
|
|
||||||
from sqlmodel import Session
|
|
||||||
|
|
||||||
from app.core.config import settings
|
|
||||||
from app.tests.utils.hike import create_random_hike
|
|
||||||
from app.tests.utils.route import create_random_route
|
|
||||||
|
|
||||||
|
|
||||||
def test_create_hike(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
|
|
||||||
data = {
|
|
||||||
"name": "RSW Maasdelta 2026",
|
|
||||||
"contact": "Sebas",
|
|
||||||
}
|
|
||||||
response = client.post(
|
|
||||||
f"{settings.API_V1_STR}/hikes/",
|
|
||||||
headers=superuser_token_headers,
|
|
||||||
json=data,
|
|
||||||
)
|
|
||||||
assert response.status_code == status.HTTP_200_OK
|
|
||||||
content = response.json()
|
|
||||||
assert content["name"] == data["name"]
|
|
||||||
assert content["contact"] == data["contact"]
|
|
||||||
assert "id" in content
|
|
||||||
|
|
||||||
|
|
||||||
def test_create_hike_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
|
|
||||||
data = {
|
|
||||||
"name": "No permissions",
|
|
||||||
"contact": "No permissions",
|
|
||||||
}
|
|
||||||
response = client.post(
|
|
||||||
f"{settings.API_V1_STR}/hikes/",
|
|
||||||
headers=normal_user_token_headers,
|
|
||||||
json=data,
|
|
||||||
)
|
|
||||||
assert response.status_code == status.HTTP_403_FORBIDDEN
|
|
||||||
assert response.json()["detail"] == "Not enough permissions"
|
|
||||||
|
|
||||||
|
|
||||||
def test_read_hike(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
|
|
||||||
hike = create_random_hike(db)
|
|
||||||
response = client.get(
|
|
||||||
f"{settings.API_V1_STR}/hikes/{hike.id}",
|
|
||||||
headers=superuser_token_headers,
|
|
||||||
)
|
|
||||||
assert response.status_code == status.HTTP_200_OK
|
|
||||||
content = response.json()
|
|
||||||
assert content["id"] == str(hike.id)
|
|
||||||
assert content["name"] == hike.name
|
|
||||||
assert content["contact"] == hike.contact
|
|
||||||
|
|
||||||
|
|
||||||
def test_read_hike_not_found(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
|
|
||||||
response = client.get(
|
|
||||||
f"{settings.API_V1_STR}/hikes/{uuid.uuid4()}",
|
|
||||||
headers=superuser_token_headers,
|
|
||||||
)
|
|
||||||
assert response.status_code == status.HTTP_404_NOT_FOUND
|
|
||||||
assert response.json()["detail"] == "Hike not found"
|
|
||||||
|
|
||||||
|
|
||||||
def test_read_hike_no_permission(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
|
|
||||||
hike = create_random_hike(db)
|
|
||||||
response = client.get(
|
|
||||||
f"{settings.API_V1_STR}/hikes/{hike.id}",
|
|
||||||
headers=normal_user_token_headers,
|
|
||||||
)
|
|
||||||
assert response.status_code == status.HTTP_403_FORBIDDEN
|
|
||||||
assert response.json()["detail"] == "Not enough permissions"
|
|
||||||
|
|
||||||
|
|
||||||
def test_read_hikes(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
|
|
||||||
create_random_hike(db)
|
|
||||||
create_random_hike(db)
|
|
||||||
response = client.get(
|
|
||||||
f"{settings.API_V1_STR}/hikes/",
|
|
||||||
headers=superuser_token_headers,
|
|
||||||
)
|
|
||||||
assert response.status_code == status.HTTP_200_OK
|
|
||||||
content = response.json()
|
|
||||||
assert "count" in content
|
|
||||||
assert content["count"] >= 2
|
|
||||||
assert "data" in content
|
|
||||||
assert isinstance(content["data"], list)
|
|
||||||
assert len(content["data"]) <= content["count"]
|
|
||||||
|
|
||||||
|
|
||||||
def test_read_hikes_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
|
|
||||||
create_random_hike(db)
|
|
||||||
create_random_hike(db)
|
|
||||||
response = client.get(
|
|
||||||
f"{settings.API_V1_STR}/hikes/",
|
|
||||||
headers=normal_user_token_headers,
|
|
||||||
)
|
|
||||||
assert response.status_code == status.HTTP_200_OK
|
|
||||||
content = response.json()
|
|
||||||
assert "count" in content
|
|
||||||
assert content["count"] == 0
|
|
||||||
assert "data" in content
|
|
||||||
assert isinstance(content["data"], list)
|
|
||||||
assert len(content["data"]) == 0
|
|
||||||
|
|
||||||
|
|
||||||
def test_update_hike(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
|
|
||||||
hike = create_random_hike(db)
|
|
||||||
data = {
|
|
||||||
"name": "Updated name",
|
|
||||||
"contact": "Updated contact",
|
|
||||||
}
|
|
||||||
response = client.put(
|
|
||||||
f"{settings.API_V1_STR}/hikes/{hike.id}",
|
|
||||||
headers=superuser_token_headers,
|
|
||||||
json=data,
|
|
||||||
)
|
|
||||||
assert response.status_code == status.HTTP_200_OK
|
|
||||||
content = response.json()
|
|
||||||
assert content["id"] == str(hike.id)
|
|
||||||
assert content["name"] == data["name"]
|
|
||||||
assert content["contact"] == data["contact"]
|
|
||||||
|
|
||||||
|
|
||||||
def test_update_hike_not_found(client: TestClient, superuser_token_headers: dict[str, str]) -> None:
|
|
||||||
data = {
|
|
||||||
"name": "Not found",
|
|
||||||
"contact": "Not found",
|
|
||||||
}
|
|
||||||
response = client.put(
|
|
||||||
f"{settings.API_V1_STR}/hikes/{uuid.uuid4()}",
|
|
||||||
headers=superuser_token_headers,
|
|
||||||
json=data,
|
|
||||||
)
|
|
||||||
assert response.status_code == status.HTTP_404_NOT_FOUND
|
|
||||||
assert response.json()["detail"] == "Hike not found"
|
|
||||||
|
|
||||||
|
|
||||||
def test_update_hike_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
|
|
||||||
hike = create_random_hike(db)
|
|
||||||
data = {
|
|
||||||
"name": "No permissions",
|
|
||||||
"contact": "No permissions",
|
|
||||||
}
|
|
||||||
response = client.put(
|
|
||||||
f"{settings.API_V1_STR}/hikes/{hike.id}",
|
|
||||||
headers=normal_user_token_headers,
|
|
||||||
json=data,
|
|
||||||
)
|
|
||||||
assert response.status_code == status.HTTP_403_FORBIDDEN
|
|
||||||
assert response.json()["detail"] == "Not enough permissions"
|
|
||||||
|
|
||||||
|
|
||||||
def test_delete_hike(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
|
|
||||||
hike = create_random_hike(db)
|
|
||||||
response = client.delete(
|
|
||||||
f"{settings.API_V1_STR}/hikes/{hike.id}",
|
|
||||||
headers=superuser_token_headers,
|
|
||||||
)
|
|
||||||
assert response.status_code == status.HTTP_200_OK
|
|
||||||
assert response.json()["message"] == "Hike deleted successfully"
|
|
||||||
|
|
||||||
|
|
||||||
def test_delete_hike_not_found(client: TestClient, superuser_token_headers: dict[str, str]) -> None:
|
|
||||||
response = client.delete(
|
|
||||||
f"{settings.API_V1_STR}/hikes/{uuid.uuid4()}",
|
|
||||||
headers=superuser_token_headers,
|
|
||||||
)
|
|
||||||
assert response.status_code == status.HTTP_404_NOT_FOUND
|
|
||||||
assert response.json()["detail"] == "Hike not found"
|
|
||||||
|
|
||||||
|
|
||||||
def test_delete_hike_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
|
|
||||||
hike = create_random_hike(db)
|
|
||||||
response = client.delete(
|
|
||||||
f"{settings.API_V1_STR}/hikes/{hike.id}",
|
|
||||||
headers=normal_user_token_headers,
|
|
||||||
)
|
|
||||||
assert response.status_code == status.HTTP_403_FORBIDDEN
|
|
||||||
assert response.json()["detail"] == "Not enough permissions"
|
|
||||||
|
|
||||||
|
|
||||||
def test_read_hike_routes(
|
|
||||||
client: TestClient, superuser_token_headers: dict[str, str], db: Session
|
|
||||||
) -> None:
|
|
||||||
hike = create_random_hike(db)
|
|
||||||
create_random_route(db, hike=hike)
|
|
||||||
create_random_route(db, hike=hike)
|
|
||||||
response = client.get(
|
|
||||||
f"{settings.API_V1_STR}/hikes/{hike.id}/routes",
|
|
||||||
headers=superuser_token_headers,
|
|
||||||
)
|
|
||||||
assert response.status_code == status.HTTP_200_OK
|
|
||||||
content = response.json()
|
|
||||||
assert "count" in content
|
|
||||||
assert content["count"] == 2
|
|
||||||
assert "data" in content
|
|
||||||
assert isinstance(content["data"], list)
|
|
||||||
assert len(content["data"]) <= content["count"]
|
|
||||||
|
|
||||||
|
|
||||||
def test_read_hike_routes_not_found(
|
|
||||||
client: TestClient, superuser_token_headers: dict[str, str], db: Session
|
|
||||||
) -> None:
|
|
||||||
response = client.get(
|
|
||||||
f"{settings.API_V1_STR}/hikes/{uuid.uuid4()}/routes",
|
|
||||||
headers=superuser_token_headers,
|
|
||||||
)
|
|
||||||
assert response.status_code == status.HTTP_404_NOT_FOUND
|
|
||||||
assert response.json()["detail"] == "Hike not found"
|
|
||||||
|
|
||||||
|
|
||||||
def test_read_hike_routes_no_permissions(
|
|
||||||
client: TestClient, normal_user_token_headers: dict[str, str], db: Session
|
|
||||||
) -> None:
|
|
||||||
hike = create_random_hike(db)
|
|
||||||
create_random_route(db, hike=hike)
|
|
||||||
response = client.get(
|
|
||||||
f"{settings.API_V1_STR}/hikes/{hike.id}/routes",
|
|
||||||
headers=normal_user_token_headers,
|
|
||||||
)
|
|
||||||
assert response.status_code == status.HTTP_403_FORBIDDEN
|
|
||||||
assert response.json()["detail"] == "Not enough permissions"
|
|
||||||
@@ -1,217 +0,0 @@
|
|||||||
import uuid
|
|
||||||
|
|
||||||
from fastapi import status
|
|
||||||
from fastapi.testclient import TestClient
|
|
||||||
from sqlmodel import Session
|
|
||||||
|
|
||||||
from app.core.config import settings
|
|
||||||
from app.tests.utils.place import create_random_place
|
|
||||||
|
|
||||||
from app.models.place import PlaceType, VisitedCountType
|
|
||||||
|
|
||||||
|
|
||||||
def test_create_place(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
|
|
||||||
data = {
|
|
||||||
"place_type": PlaceType.PLACE,
|
|
||||||
|
|
||||||
"name": "Post 1",
|
|
||||||
"short_name": "1",
|
|
||||||
|
|
||||||
"contact": "Sebas",
|
|
||||||
"description": "Post met renspel",
|
|
||||||
|
|
||||||
"question_file": None,
|
|
||||||
"answer_file": None,
|
|
||||||
"place_time": "PT10M",
|
|
||||||
|
|
||||||
"latitude": None,
|
|
||||||
"longitude": None,
|
|
||||||
"radius": None,
|
|
||||||
|
|
||||||
"max_points": None,
|
|
||||||
"visited_points": 10,
|
|
||||||
"visited_count_type": VisitedCountType.ONE_VISIT,
|
|
||||||
"skipped_penalty_points": 50,
|
|
||||||
}
|
|
||||||
response = client.post(
|
|
||||||
f"{settings.API_V1_STR}/places/",
|
|
||||||
headers=superuser_token_headers,
|
|
||||||
json=data,
|
|
||||||
)
|
|
||||||
assert response.status_code == status.HTTP_200_OK
|
|
||||||
content = response.json()
|
|
||||||
assert content["name"] == data["name"]
|
|
||||||
assert content["place_type"] == data["place_type"]
|
|
||||||
assert content["name"] == data["name"]
|
|
||||||
assert content["short_name"] == data["short_name"]
|
|
||||||
assert content["contact"] == data["contact"]
|
|
||||||
assert content["description"] == data["description"]
|
|
||||||
assert content["question_file"] == data["question_file"]
|
|
||||||
assert content["answer_file"] == data["answer_file"]
|
|
||||||
assert content["latitude"] == data["latitude"]
|
|
||||||
assert content["longitude"] == data["longitude"]
|
|
||||||
assert content["radius"] == data["radius"]
|
|
||||||
assert content["max_points"] == data["max_points"]
|
|
||||||
assert content["visited_points"] == data["visited_points"]
|
|
||||||
assert content["visited_count_type"] == data["visited_count_type"]
|
|
||||||
assert content["skipped_penalty_points"] == data["skipped_penalty_points"]
|
|
||||||
assert content["place_time"] == data["place_time"]
|
|
||||||
assert "id" in content
|
|
||||||
|
|
||||||
|
|
||||||
def test_create_place_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
|
|
||||||
data = {
|
|
||||||
"place_type": PlaceType.PLACE,
|
|
||||||
"name": "No permissions",
|
|
||||||
"short_name": "No perm",
|
|
||||||
"visited_count_type": VisitedCountType.ONE_VISIT,
|
|
||||||
}
|
|
||||||
response = client.post(
|
|
||||||
f"{settings.API_V1_STR}/places/",
|
|
||||||
headers=normal_user_token_headers,
|
|
||||||
json=data,
|
|
||||||
)
|
|
||||||
assert response.status_code == status.HTTP_403_FORBIDDEN
|
|
||||||
assert response.json()["detail"] == "Not enough permissions"
|
|
||||||
|
|
||||||
|
|
||||||
def test_read_place(
|
|
||||||
client: TestClient, superuser_token_headers: dict[str, str], db: Session
|
|
||||||
) -> None:
|
|
||||||
place = create_random_place(db)
|
|
||||||
response = client.get(
|
|
||||||
f"{settings.API_V1_STR}/places/{place.id}",
|
|
||||||
headers=superuser_token_headers,
|
|
||||||
)
|
|
||||||
assert response.status_code == status.HTTP_200_OK
|
|
||||||
content = response.json()
|
|
||||||
assert content["id"] == str(place.id)
|
|
||||||
assert content["name"] == place.name
|
|
||||||
assert content["contact"] == place.contact
|
|
||||||
|
|
||||||
|
|
||||||
def test_read_place_not_found(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
|
|
||||||
response = client.get(
|
|
||||||
f"{settings.API_V1_STR}/places/{uuid.uuid4()}",
|
|
||||||
headers=superuser_token_headers,
|
|
||||||
)
|
|
||||||
assert response.status_code == status.HTTP_404_NOT_FOUND
|
|
||||||
assert response.json()["detail"] == "Place not found"
|
|
||||||
|
|
||||||
|
|
||||||
def test_read_place_no_permission(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
|
|
||||||
place = create_random_place(db)
|
|
||||||
response = client.get(
|
|
||||||
f"{settings.API_V1_STR}/places/{place.id}",
|
|
||||||
headers=normal_user_token_headers,
|
|
||||||
)
|
|
||||||
assert response.status_code == status.HTTP_403_FORBIDDEN
|
|
||||||
assert response.json()["detail"] == "Not enough permissions"
|
|
||||||
|
|
||||||
|
|
||||||
def test_read_places(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
|
|
||||||
create_random_place(db)
|
|
||||||
create_random_place(db)
|
|
||||||
response = client.get(
|
|
||||||
f"{settings.API_V1_STR}/places/",
|
|
||||||
headers=superuser_token_headers,
|
|
||||||
)
|
|
||||||
assert response.status_code == status.HTTP_200_OK
|
|
||||||
content = response.json()
|
|
||||||
assert "count" in content
|
|
||||||
assert content["count"] >= 2
|
|
||||||
assert "data" in content
|
|
||||||
assert isinstance(content["data"], list)
|
|
||||||
assert len(content["data"]) <= content["count"]
|
|
||||||
|
|
||||||
|
|
||||||
def test_read_places_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
|
|
||||||
create_random_place(db)
|
|
||||||
create_random_place(db)
|
|
||||||
response = client.get(
|
|
||||||
f"{settings.API_V1_STR}/places/",
|
|
||||||
headers=normal_user_token_headers,
|
|
||||||
)
|
|
||||||
assert response.status_code == status.HTTP_200_OK
|
|
||||||
content = response.json()
|
|
||||||
assert "count" in content
|
|
||||||
assert content["count"] == 0
|
|
||||||
assert "data" in content
|
|
||||||
assert isinstance(content["data"], list)
|
|
||||||
assert len(content["data"]) == 0
|
|
||||||
|
|
||||||
|
|
||||||
def test_update_place(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
|
|
||||||
place = create_random_place(db)
|
|
||||||
data = {
|
|
||||||
"name": "Updated name",
|
|
||||||
"short_name": "4", # TODO: Fix Shortname
|
|
||||||
}
|
|
||||||
response = client.put(
|
|
||||||
f"{settings.API_V1_STR}/places/{place.id}",
|
|
||||||
headers=superuser_token_headers,
|
|
||||||
json=data,
|
|
||||||
)
|
|
||||||
assert response.status_code == status.HTTP_200_OK
|
|
||||||
content = response.json()
|
|
||||||
assert content["id"] == str(place.id)
|
|
||||||
assert content["name"] == data["name"]
|
|
||||||
assert content["short_name"] == data["short_name"]
|
|
||||||
|
|
||||||
|
|
||||||
def test_update_place_not_found(client: TestClient, superuser_token_headers: dict[str, str]) -> None:
|
|
||||||
data = {
|
|
||||||
"name": "Not found",
|
|
||||||
"short_name": "5", # TODO: Fix Shortname
|
|
||||||
}
|
|
||||||
response = client.put(
|
|
||||||
f"{settings.API_V1_STR}/places/{uuid.uuid4()}",
|
|
||||||
headers=superuser_token_headers,
|
|
||||||
json=data,
|
|
||||||
)
|
|
||||||
assert response.status_code == status.HTTP_404_NOT_FOUND
|
|
||||||
assert response.json()["detail"] == "Place not found"
|
|
||||||
|
|
||||||
|
|
||||||
def test_update_place_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
|
|
||||||
place = create_random_place(db)
|
|
||||||
data = {
|
|
||||||
"name": "No permissions",
|
|
||||||
"short_name": "6", # TODO: Fix Shortname
|
|
||||||
}
|
|
||||||
response = client.put(
|
|
||||||
f"{settings.API_V1_STR}/places/{place.id}",
|
|
||||||
headers=normal_user_token_headers,
|
|
||||||
json=data,
|
|
||||||
)
|
|
||||||
assert response.status_code == status.HTTP_403_FORBIDDEN
|
|
||||||
assert response.json()["detail"] == "Not enough permissions"
|
|
||||||
|
|
||||||
|
|
||||||
def test_delete_place(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
|
|
||||||
place = create_random_place(db)
|
|
||||||
response = client.delete(
|
|
||||||
f"{settings.API_V1_STR}/places/{place.id}",
|
|
||||||
headers=superuser_token_headers,
|
|
||||||
)
|
|
||||||
assert response.status_code == status.HTTP_200_OK
|
|
||||||
assert response.json()["message"] == "Place deleted successfully"
|
|
||||||
|
|
||||||
|
|
||||||
def test_delete_place_not_found(client: TestClient, superuser_token_headers: dict[str, str]) -> None:
|
|
||||||
response = client.delete(
|
|
||||||
f"{settings.API_V1_STR}/places/{uuid.uuid4()}",
|
|
||||||
headers=superuser_token_headers,
|
|
||||||
)
|
|
||||||
assert response.status_code == status.HTTP_404_NOT_FOUND
|
|
||||||
assert response.json()["detail"] == "Place not found"
|
|
||||||
|
|
||||||
|
|
||||||
def test_delete_place_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
|
|
||||||
place = create_random_place(db)
|
|
||||||
response = client.delete(
|
|
||||||
f"{settings.API_V1_STR}/places/{place.id}",
|
|
||||||
headers=normal_user_token_headers,
|
|
||||||
)
|
|
||||||
assert response.status_code == status.HTTP_403_FORBIDDEN
|
|
||||||
assert response.json()["detail"] == "Not enough permissions"
|
|
||||||
@@ -5,20 +5,16 @@ from fastapi.testclient import TestClient
|
|||||||
from sqlmodel import Session
|
from sqlmodel import Session
|
||||||
|
|
||||||
from app.core.config import settings
|
from app.core.config import settings
|
||||||
from app.tests.utils.hike import create_random_hike
|
from app.tests.utils.serie import create_random_serie
|
||||||
from app.tests.utils.route import create_random_route
|
|
||||||
|
|
||||||
|
|
||||||
def test_create_route(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
|
def test_create_serie(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
|
||||||
hike = create_random_hike(db)
|
|
||||||
|
|
||||||
data = {
|
data = {
|
||||||
"name": "Roete A",
|
"name": "Zondag spel",
|
||||||
"contact": "Sebas",
|
"contact": "Rick",
|
||||||
"hike_id": str(hike.id),
|
|
||||||
}
|
}
|
||||||
response = client.post(
|
response = client.post(
|
||||||
f"{settings.API_V1_STR}/routes/",
|
f"{settings.API_V1_STR}/series/",
|
||||||
headers=superuser_token_headers,
|
headers=superuser_token_headers,
|
||||||
json=data,
|
json=data,
|
||||||
)
|
)
|
||||||
@@ -26,18 +22,16 @@ def test_create_route(client: TestClient, superuser_token_headers: dict[str, str
|
|||||||
content = response.json()
|
content = response.json()
|
||||||
assert content["name"] == data["name"]
|
assert content["name"] == data["name"]
|
||||||
assert content["contact"] == data["contact"]
|
assert content["contact"] == data["contact"]
|
||||||
assert content["hike_id"] == data["hike_id"]
|
|
||||||
assert "id" in content
|
assert "id" in content
|
||||||
|
|
||||||
|
|
||||||
def test_create_route_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
|
def test_create_serie_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
|
||||||
data = {
|
data = {
|
||||||
"name": "No permissions",
|
"name": "Zondag Spel",
|
||||||
"contact": "No permissions",
|
"contact": "Rick",
|
||||||
"hike_id": str(uuid.uuid4()),
|
|
||||||
}
|
}
|
||||||
response = client.post(
|
response = client.post(
|
||||||
f"{settings.API_V1_STR}/routes/",
|
f"{settings.API_V1_STR}/series/",
|
||||||
headers=normal_user_token_headers,
|
headers=normal_user_token_headers,
|
||||||
json=data,
|
json=data,
|
||||||
)
|
)
|
||||||
@@ -45,45 +39,43 @@ def test_create_route_no_permissions(client: TestClient, normal_user_token_heade
|
|||||||
assert response.json()["detail"] == "Not enough permissions"
|
assert response.json()["detail"] == "Not enough permissions"
|
||||||
|
|
||||||
|
|
||||||
def test_read_route(
|
def test_read_serie(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
|
||||||
client: TestClient, superuser_token_headers: dict[str, str], db: Session
|
serie = create_random_serie(db)
|
||||||
) -> None:
|
|
||||||
route = create_random_route(db)
|
|
||||||
response = client.get(
|
response = client.get(
|
||||||
f"{settings.API_V1_STR}/routes/{route.id}",
|
f"{settings.API_V1_STR}/series/{serie.id}",
|
||||||
headers=superuser_token_headers,
|
headers=superuser_token_headers,
|
||||||
)
|
)
|
||||||
assert response.status_code == status.HTTP_200_OK
|
assert response.status_code == status.HTTP_200_OK
|
||||||
content = response.json()
|
content = response.json()
|
||||||
assert content["id"] == str(route.id)
|
assert content["id"] == str(serie.id)
|
||||||
assert content["name"] == route.name
|
assert content["name"] == serie.name
|
||||||
assert content["contact"] == route.contact
|
assert content["contact"] == serie.contact
|
||||||
|
|
||||||
|
|
||||||
def test_read_route_not_found(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
|
def test_read_serie_not_found(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
|
||||||
response = client.get(
|
response = client.get(
|
||||||
f"{settings.API_V1_STR}/routes/{uuid.uuid4()}",
|
f"{settings.API_V1_STR}/series/{uuid.uuid4()}",
|
||||||
headers=superuser_token_headers,
|
headers=superuser_token_headers,
|
||||||
)
|
)
|
||||||
assert response.status_code == status.HTTP_404_NOT_FOUND
|
assert response.status_code == status.HTTP_404_NOT_FOUND
|
||||||
assert response.json()["detail"] == "Route not found"
|
assert response.json()["detail"] == "Serie not found"
|
||||||
|
|
||||||
|
|
||||||
def test_read_route_no_permission(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
|
def test_read_serie_no_permission(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
|
||||||
route = create_random_route(db)
|
serie = create_random_serie(db)
|
||||||
response = client.get(
|
response = client.get(
|
||||||
f"{settings.API_V1_STR}/routes/{route.id}",
|
f"{settings.API_V1_STR}/series/{serie.id}",
|
||||||
headers=normal_user_token_headers,
|
headers=normal_user_token_headers,
|
||||||
)
|
)
|
||||||
assert response.status_code == status.HTTP_403_FORBIDDEN
|
assert response.status_code == status.HTTP_403_FORBIDDEN
|
||||||
assert response.json()["detail"] == "Not enough permissions"
|
assert response.json()["detail"] == "Not enough permissions"
|
||||||
|
|
||||||
|
|
||||||
def test_read_routes(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
|
def test_read_series(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
|
||||||
create_random_route(db)
|
create_random_serie(db)
|
||||||
create_random_route(db)
|
create_random_serie(db)
|
||||||
response = client.get(
|
response = client.get(
|
||||||
f"{settings.API_V1_STR}/routes/",
|
f"{settings.API_V1_STR}/series/",
|
||||||
headers=superuser_token_headers,
|
headers=superuser_token_headers,
|
||||||
)
|
)
|
||||||
assert response.status_code == status.HTTP_200_OK
|
assert response.status_code == status.HTTP_200_OK
|
||||||
@@ -95,11 +87,11 @@ def test_read_routes(client: TestClient, superuser_token_headers: dict[str, str]
|
|||||||
assert len(content["data"]) <= content["count"]
|
assert len(content["data"]) <= content["count"]
|
||||||
|
|
||||||
|
|
||||||
def test_read_routes_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
|
def test_read_series_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
|
||||||
create_random_route(db)
|
create_random_serie(db)
|
||||||
create_random_route(db)
|
create_random_serie(db)
|
||||||
response = client.get(
|
response = client.get(
|
||||||
f"{settings.API_V1_STR}/routes/",
|
f"{settings.API_V1_STR}/series/",
|
||||||
headers=normal_user_token_headers,
|
headers=normal_user_token_headers,
|
||||||
)
|
)
|
||||||
assert response.status_code == status.HTTP_200_OK
|
assert response.status_code == status.HTTP_200_OK
|
||||||
@@ -111,46 +103,46 @@ def test_read_routes_no_permissions(client: TestClient, normal_user_token_header
|
|||||||
assert len(content["data"]) == 0
|
assert len(content["data"]) == 0
|
||||||
|
|
||||||
|
|
||||||
def test_update_route(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
|
def test_update_serie(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
|
||||||
route = create_random_route(db)
|
serie = create_random_serie(db)
|
||||||
data = {
|
data = {
|
||||||
"name": "Updated name",
|
"name": "Updated name",
|
||||||
"contact": "Updated contact",
|
"contact": "Updated contact",
|
||||||
}
|
}
|
||||||
response = client.put(
|
response = client.put(
|
||||||
f"{settings.API_V1_STR}/routes/{route.id}",
|
f"{settings.API_V1_STR}/series/{serie.id}",
|
||||||
headers=superuser_token_headers,
|
headers=superuser_token_headers,
|
||||||
json=data,
|
json=data,
|
||||||
)
|
)
|
||||||
assert response.status_code == status.HTTP_200_OK
|
assert response.status_code == status.HTTP_200_OK
|
||||||
content = response.json()
|
content = response.json()
|
||||||
assert content["id"] == str(route.id)
|
assert content["id"] == str(serie.id)
|
||||||
assert content["name"] == data["name"]
|
assert content["name"] == data["name"]
|
||||||
assert content["contact"] == data["contact"]
|
assert content["contact"] == data["contact"]
|
||||||
|
|
||||||
|
|
||||||
def test_update_route_not_found(client: TestClient, superuser_token_headers: dict[str, str]) -> None:
|
def test_update_serie_not_found(client: TestClient, superuser_token_headers: dict[str, str]) -> None:
|
||||||
data = {
|
data = {
|
||||||
"name": "Not found",
|
"name": "Not found",
|
||||||
"contact": "Not found",
|
"contact": "Not found",
|
||||||
}
|
}
|
||||||
response = client.put(
|
response = client.put(
|
||||||
f"{settings.API_V1_STR}/routes/{uuid.uuid4()}",
|
f"{settings.API_V1_STR}/series/{uuid.uuid4()}",
|
||||||
headers=superuser_token_headers,
|
headers=superuser_token_headers,
|
||||||
json=data,
|
json=data,
|
||||||
)
|
)
|
||||||
assert response.status_code == status.HTTP_404_NOT_FOUND
|
assert response.status_code == status.HTTP_404_NOT_FOUND
|
||||||
assert response.json()["detail"] == "Route not found"
|
assert response.json()["detail"] == "Serie not found"
|
||||||
|
|
||||||
|
|
||||||
def test_update_route_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
|
def test_update_serie_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
|
||||||
route = create_random_route(db)
|
serie = create_random_serie(db)
|
||||||
data = {
|
data = {
|
||||||
"name": "No permissions",
|
"name": "No permissions",
|
||||||
"contact": "No permissions",
|
"contact": "No permissions",
|
||||||
}
|
}
|
||||||
response = client.put(
|
response = client.put(
|
||||||
f"{settings.API_V1_STR}/routes/{route.id}",
|
f"{settings.API_V1_STR}/series/{serie.id}",
|
||||||
headers=normal_user_token_headers,
|
headers=normal_user_token_headers,
|
||||||
json=data,
|
json=data,
|
||||||
)
|
)
|
||||||
@@ -158,29 +150,29 @@ def test_update_route_no_permissions(client: TestClient, normal_user_token_heade
|
|||||||
assert response.json()["detail"] == "Not enough permissions"
|
assert response.json()["detail"] == "Not enough permissions"
|
||||||
|
|
||||||
|
|
||||||
def test_delete_route(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
|
def test_delete_serie(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
|
||||||
route = create_random_route(db)
|
serie = create_random_serie(db)
|
||||||
response = client.delete(
|
response = client.delete(
|
||||||
f"{settings.API_V1_STR}/routes/{route.id}",
|
f"{settings.API_V1_STR}/series/{serie.id}",
|
||||||
headers=superuser_token_headers,
|
headers=superuser_token_headers,
|
||||||
)
|
)
|
||||||
assert response.status_code == status.HTTP_200_OK
|
assert response.status_code == status.HTTP_200_OK
|
||||||
assert response.json()["message"] == "Route deleted successfully"
|
assert response.json()["message"] == "Serie deleted successfully"
|
||||||
|
|
||||||
|
|
||||||
def test_delete_route_not_found(client: TestClient, superuser_token_headers: dict[str, str]) -> None:
|
def test_delete_serie_not_found(client: TestClient, superuser_token_headers: dict[str, str]) -> None:
|
||||||
response = client.delete(
|
response = client.delete(
|
||||||
f"{settings.API_V1_STR}/routes/{uuid.uuid4()}",
|
f"{settings.API_V1_STR}/series/{uuid.uuid4()}",
|
||||||
headers=superuser_token_headers,
|
headers=superuser_token_headers,
|
||||||
)
|
)
|
||||||
assert response.status_code == status.HTTP_404_NOT_FOUND
|
assert response.status_code == status.HTTP_404_NOT_FOUND
|
||||||
assert response.json()["detail"] == "Route not found"
|
assert response.json()["detail"] == "Serie not found"
|
||||||
|
|
||||||
|
|
||||||
def test_delete_route_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
|
def test_delete_serie_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
|
||||||
route = create_random_route(db)
|
serie = create_random_serie(db)
|
||||||
response = client.delete(
|
response = client.delete(
|
||||||
f"{settings.API_V1_STR}/routes/{route.id}",
|
f"{settings.API_V1_STR}/series/{serie.id}",
|
||||||
headers=normal_user_token_headers,
|
headers=normal_user_token_headers,
|
||||||
)
|
)
|
||||||
assert response.status_code == status.HTTP_403_FORBIDDEN
|
assert response.status_code == status.HTTP_403_FORBIDDEN
|
||||||
@@ -1,12 +0,0 @@
|
|||||||
from sqlmodel import Session
|
|
||||||
|
|
||||||
from app.models.hike import Hike, HikeCreate
|
|
||||||
from app.tests.utils.utils import random_lower_string
|
|
||||||
|
|
||||||
|
|
||||||
def create_random_hike(db: Session, name: str = None) -> Hike:
|
|
||||||
if not name:
|
|
||||||
name = random_lower_string()
|
|
||||||
|
|
||||||
hike_in = HikeCreate(name=name)
|
|
||||||
return Hike.create(session=db, create_obj=hike_in)
|
|
||||||
@@ -1,14 +0,0 @@
|
|||||||
from sqlmodel import Session
|
|
||||||
|
|
||||||
from app.models.place import Place, PlaceCreate
|
|
||||||
from app.tests.utils.utils import random_lower_string, random_lower_short_string
|
|
||||||
|
|
||||||
|
|
||||||
def create_random_place(db: Session, name: str = None) -> Place:
|
|
||||||
if not name:
|
|
||||||
name = random_lower_string()
|
|
||||||
|
|
||||||
short_name = random_lower_short_string()
|
|
||||||
|
|
||||||
place_in = PlaceCreate(name=name, short_name=short_name)
|
|
||||||
return Place.create(session=db, create_obj=place_in)
|
|
||||||
@@ -1,17 +0,0 @@
|
|||||||
from sqlmodel import Session
|
|
||||||
|
|
||||||
from app.models.hike import Hike
|
|
||||||
from app.models.route import Route, RouteCreate
|
|
||||||
from app.tests.utils.utils import random_lower_string
|
|
||||||
from app.tests.utils.hike import create_random_hike
|
|
||||||
|
|
||||||
|
|
||||||
def create_random_route(db: Session, name: str = None, hike: Hike = None) -> Route:
|
|
||||||
if not name:
|
|
||||||
name = random_lower_string()
|
|
||||||
|
|
||||||
if not hike:
|
|
||||||
hike = create_random_hike(db=db)
|
|
||||||
|
|
||||||
route_in = RouteCreate(name=name, hike_id=hike.id)
|
|
||||||
return Route.create(session=db, create_obj=route_in)
|
|
||||||
12
backend/app/tests/utils/serie.py
Normal file
12
backend/app/tests/utils/serie.py
Normal file
@@ -0,0 +1,12 @@
|
|||||||
|
from sqlmodel import Session
|
||||||
|
|
||||||
|
from app.models.serie import Serie, SerieCreate
|
||||||
|
from app.tests.utils.utils import random_lower_string
|
||||||
|
|
||||||
|
|
||||||
|
def create_random_serie(db: Session, name: str = None) -> Serie:
|
||||||
|
if not name:
|
||||||
|
name = random_lower_string()
|
||||||
|
|
||||||
|
serie_in = SerieCreate(name=name)
|
||||||
|
return Serie.create(session=db, create_obj=serie_in)
|
||||||
@@ -6,12 +6,15 @@ from app.models.event import Event
|
|||||||
from app.models.team import Team, TeamCreate
|
from app.models.team import Team, TeamCreate
|
||||||
|
|
||||||
from app.tests.utils.event import create_random_event
|
from app.tests.utils.event import create_random_event
|
||||||
from app.tests.utils.utils import random_lower_string, random_lower_short_string
|
from app.tests.utils.utils import random_lower_string
|
||||||
|
|
||||||
|
|
||||||
|
def random_short_name() -> str:
|
||||||
|
return str(random.Random().randrange(1, 200))
|
||||||
|
|
||||||
def create_random_team(db: Session, event: Event | None = None) -> Team:
|
def create_random_team(db: Session, event: Event | None = None) -> Team:
|
||||||
name = random_lower_string()
|
name = random_lower_string()
|
||||||
short_name = random_lower_short_string()
|
short_name = random_short_name()
|
||||||
|
|
||||||
if not event:
|
if not event:
|
||||||
event = create_random_event(db)
|
event = create_random_event(db)
|
||||||
|
|||||||
@@ -10,10 +10,6 @@ def random_lower_string() -> str:
|
|||||||
return "".join(random.choices(string.ascii_lowercase, k=32))
|
return "".join(random.choices(string.ascii_lowercase, k=32))
|
||||||
|
|
||||||
|
|
||||||
def random_lower_short_string() -> str:
|
|
||||||
return str(random.Random().randrange(1, 8))
|
|
||||||
|
|
||||||
|
|
||||||
def random_email() -> str:
|
def random_email() -> str:
|
||||||
return f"{random_lower_string()}@{random_lower_string()}.com"
|
return f"{random_lower_string()}@{random_lower_string()}.com"
|
||||||
|
|
||||||
@@ -28,4 +24,3 @@ def get_superuser_token_headers(client: TestClient) -> dict[str, str]:
|
|||||||
a_token = tokens["access_token"]
|
a_token = tokens["access_token"]
|
||||||
headers = {"Authorization": f"Bearer {a_token}"}
|
headers = {"Authorization": f"Bearer {a_token}"}
|
||||||
return headers
|
return headers
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user