180 lines
4.8 KiB
Python
180 lines
4.8 KiB
Python
from typing import Any
|
|
|
|
from fastapi import APIRouter, HTTPException, status
|
|
from sqlmodel import func, select
|
|
|
|
from app.api.deps import CurrentUser, SessionDep
|
|
from app.models.base import (
|
|
ApiTags,
|
|
Message,
|
|
RowId,
|
|
)
|
|
from app.models.hike import (
|
|
Hike,
|
|
HikeCreate,
|
|
HikeUpdate,
|
|
HikePublic,
|
|
HikesPublic,
|
|
)
|
|
from app.models.route import (
|
|
Route,
|
|
RoutesPublic,
|
|
)
|
|
from app.models.user import (
|
|
PermissionModule,
|
|
PermissionPart,
|
|
PermissionRight,
|
|
)
|
|
|
|
router = APIRouter(prefix="/hikes", tags=[ApiTags.HIKES])
|
|
|
|
|
|
# region # Hikes ########################################################
|
|
|
|
@router.get("/", response_model=HikesPublic)
|
|
def read_hikes(
|
|
session: SessionDep, current_user: CurrentUser, skip: int = 0, limit: int = 100
|
|
) -> Any:
|
|
"""
|
|
Retrieve all hikes.
|
|
"""
|
|
|
|
if current_user.has_permissions(
|
|
module=PermissionModule.HIKE,
|
|
part=PermissionPart.ADMIN,
|
|
rights=PermissionRight.READ,
|
|
):
|
|
count_statement = select(func.count()).select_from(Hike)
|
|
count = session.exec(count_statement).one()
|
|
statement = select(Hike).offset(skip).limit(limit)
|
|
hikes = session.exec(statement).all()
|
|
return HikesPublic(data=hikes, count=count)
|
|
|
|
return HikesPublic(data=[], count=0)
|
|
|
|
|
|
@router.get("/{id}", response_model=HikePublic)
|
|
def read_hike(session: SessionDep, current_user: CurrentUser, id: RowId) -> Any:
|
|
"""
|
|
Get hike by ID.
|
|
"""
|
|
hike = session.get(Hike, id)
|
|
if not hike:
|
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Hike not found")
|
|
|
|
if not current_user.has_permissions(
|
|
module=PermissionModule.HIKE,
|
|
part=PermissionPart.ADMIN,
|
|
rights=PermissionRight.READ,
|
|
):
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions")
|
|
|
|
return hike
|
|
|
|
|
|
@router.post("/", response_model=HikePublic)
|
|
def create_hike(
|
|
*, session: SessionDep, current_user: CurrentUser, hike_in: HikeCreate
|
|
) -> Any:
|
|
"""
|
|
Create new hike.
|
|
"""
|
|
|
|
if not current_user.has_permissions(
|
|
module=PermissionModule.HIKE,
|
|
part=PermissionPart.ADMIN,
|
|
rights=PermissionRight.CREATE,
|
|
):
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions")
|
|
|
|
hike = Hike.create(create_obj=hike_in, session=session)
|
|
return hike
|
|
|
|
|
|
@router.put("/{id}", response_model=HikePublic)
|
|
def update_hike(
|
|
*, session: SessionDep, current_user: CurrentUser, id: RowId, hike_in: HikeUpdate
|
|
) -> Any:
|
|
"""
|
|
Update a hike.
|
|
"""
|
|
hike = session.get(Hike, id)
|
|
if not hike:
|
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Hike not found")
|
|
|
|
if not current_user.has_permissions(
|
|
module=PermissionModule.HIKE,
|
|
part=PermissionPart.ADMIN,
|
|
rights=PermissionRight.UPDATE,
|
|
):
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions")
|
|
|
|
hike = Hike.update(db_obj=hike, in_obj=hike_in, session=session)
|
|
return hike
|
|
|
|
|
|
@router.delete("/{id}")
|
|
def delete_hike(session: SessionDep,current_user: CurrentUser, id: RowId) -> Message:
|
|
"""
|
|
Delete a hike.
|
|
"""
|
|
hike = session.get(Hike, id)
|
|
if not hike:
|
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Hike not found")
|
|
|
|
if not current_user.has_permissions(
|
|
module=PermissionModule.HIKE,
|
|
part=PermissionPart.ADMIN,
|
|
rights=PermissionRight.DELETE,
|
|
):
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions")
|
|
|
|
session.delete(hike)
|
|
session.commit()
|
|
return Message(message="Hike deleted successfully")
|
|
|
|
|
|
# endregion
|
|
|
|
# region # Hike / Routes #######################################################
|
|
|
|
|
|
@router.get("/{hike_id}/routes/", response_model=RoutesPublic)
|
|
def read_hike_route(
|
|
session: SessionDep,
|
|
current_user: CurrentUser,
|
|
hike_id: RowId,
|
|
skip: int = 0,
|
|
limit: int = 100,
|
|
) -> Any:
|
|
"""
|
|
Retrieve all hike routes.
|
|
"""
|
|
|
|
hike = session.get(Hike, hike_id)
|
|
if not hike:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND, detail="Hike not found"
|
|
)
|
|
|
|
if not current_user.has_permission(
|
|
module=PermissionModule.HIKE,
|
|
part=PermissionPart.ADMIN,
|
|
rights=(PermissionRight.MANAGE_HIKES | PermissionRight.READ),
|
|
):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions"
|
|
)
|
|
|
|
data_query = select(Route).where(
|
|
Route.hike_id == hike_id,
|
|
)
|
|
|
|
count = session.exec(select(func.count()).select_from(data_query.subquery())).one()
|
|
data = session.exec(data_query.offset(skip).limit(limit)).all()
|
|
|
|
return RoutesPublic(data=data, count=count)
|
|
|
|
|
|
# endregion
|