Compare commits
1 Commits
23d7d63103
...
feature/se
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
076765e5c5 |
@@ -10,7 +10,7 @@ from app.api.routes import (
|
|||||||
private,
|
private,
|
||||||
users,
|
users,
|
||||||
utils,
|
utils,
|
||||||
hikes,
|
series,
|
||||||
)
|
)
|
||||||
from app.core.config import settings
|
from app.core.config import settings
|
||||||
|
|
||||||
@@ -25,8 +25,7 @@ api_router.include_router(teams.router)
|
|||||||
api_router.include_router(associations.router)
|
api_router.include_router(associations.router)
|
||||||
api_router.include_router(divisions.router)
|
api_router.include_router(divisions.router)
|
||||||
api_router.include_router(members.router)
|
api_router.include_router(members.router)
|
||||||
|
api_router.include_router(series.router)
|
||||||
api_router.include_router(hikes.router)
|
|
||||||
|
|
||||||
|
|
||||||
if settings.ENVIRONMENT == "local":
|
if settings.ENVIRONMENT == "local":
|
||||||
|
|||||||
@@ -1,132 +0,0 @@
|
|||||||
from typing import Any
|
|
||||||
|
|
||||||
from fastapi import APIRouter, HTTPException, status
|
|
||||||
from sqlmodel import func, select
|
|
||||||
|
|
||||||
from app.api.deps import CurrentUser, SessionDep
|
|
||||||
from app.models.base import (
|
|
||||||
ApiTags,
|
|
||||||
Message,
|
|
||||||
RowId,
|
|
||||||
)
|
|
||||||
from app.models.hike import (
|
|
||||||
Hike,
|
|
||||||
HikeCreate,
|
|
||||||
HikeUpdate,
|
|
||||||
HikePublic,
|
|
||||||
HikesPublic,
|
|
||||||
)
|
|
||||||
from app.models.user import (
|
|
||||||
PermissionModule,
|
|
||||||
PermissionPart,
|
|
||||||
PermissionRight,
|
|
||||||
)
|
|
||||||
|
|
||||||
router = APIRouter(prefix="/hikes", tags=[ApiTags.HIKES])
|
|
||||||
|
|
||||||
|
|
||||||
# region # Hikes ########################################################
|
|
||||||
|
|
||||||
@router.get("/", response_model=HikesPublic)
|
|
||||||
def read_hikes(
|
|
||||||
session: SessionDep, current_user: CurrentUser, skip: int = 0, limit: int = 100
|
|
||||||
) -> Any:
|
|
||||||
"""
|
|
||||||
Retrieve all hikes.
|
|
||||||
"""
|
|
||||||
|
|
||||||
if current_user.has_permissions(
|
|
||||||
module=PermissionModule.HIKE,
|
|
||||||
part=PermissionPart.ADMIN,
|
|
||||||
rights=PermissionRight.READ,
|
|
||||||
):
|
|
||||||
count_statement = select(func.count()).select_from(Hike)
|
|
||||||
count = session.exec(count_statement).one()
|
|
||||||
statement = select(Hike).offset(skip).limit(limit)
|
|
||||||
hikes = session.exec(statement).all()
|
|
||||||
return HikesPublic(data=hikes, count=count)
|
|
||||||
|
|
||||||
return HikesPublic(data=[], count=0)
|
|
||||||
|
|
||||||
|
|
||||||
@router.get("/{id}", response_model=HikePublic)
|
|
||||||
def read_hike(session: SessionDep, current_user: CurrentUser, id: RowId) -> Any:
|
|
||||||
"""
|
|
||||||
Get hike by ID.
|
|
||||||
"""
|
|
||||||
hike = session.get(Hike, id)
|
|
||||||
if not hike:
|
|
||||||
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Hike not found")
|
|
||||||
|
|
||||||
if not current_user.has_permissions(
|
|
||||||
module=PermissionModule.HIKE,
|
|
||||||
part=PermissionPart.ADMIN,
|
|
||||||
rights=PermissionRight.READ,
|
|
||||||
):
|
|
||||||
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions")
|
|
||||||
|
|
||||||
return hike
|
|
||||||
|
|
||||||
|
|
||||||
@router.post("/", response_model=HikePublic)
|
|
||||||
def create_hike(
|
|
||||||
*, session: SessionDep, current_user: CurrentUser, hike_in: HikeCreate
|
|
||||||
) -> Any:
|
|
||||||
"""
|
|
||||||
Create new hike.
|
|
||||||
"""
|
|
||||||
|
|
||||||
if not current_user.has_permissions(
|
|
||||||
module=PermissionModule.HIKE,
|
|
||||||
part=PermissionPart.ADMIN,
|
|
||||||
rights=PermissionRight.CREATE,
|
|
||||||
):
|
|
||||||
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions")
|
|
||||||
|
|
||||||
hike = Hike.create(create_obj=hike_in, session=session)
|
|
||||||
return hike
|
|
||||||
|
|
||||||
|
|
||||||
@router.put("/{id}", response_model=HikePublic)
|
|
||||||
def update_hike(
|
|
||||||
*, session: SessionDep, current_user: CurrentUser, id: RowId, hike_in: HikeUpdate
|
|
||||||
) -> Any:
|
|
||||||
"""
|
|
||||||
Update a hike.
|
|
||||||
"""
|
|
||||||
hike = session.get(Hike, id)
|
|
||||||
if not hike:
|
|
||||||
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Hike not found")
|
|
||||||
|
|
||||||
if not current_user.has_permissions(
|
|
||||||
module=PermissionModule.HIKE,
|
|
||||||
part=PermissionPart.ADMIN,
|
|
||||||
rights=PermissionRight.UPDATE,
|
|
||||||
):
|
|
||||||
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions")
|
|
||||||
|
|
||||||
hike = Hike.update(db_obj=hike, in_obj=hike_in, session=session)
|
|
||||||
return hike
|
|
||||||
|
|
||||||
|
|
||||||
@router.delete("/{id}")
|
|
||||||
def delete_hike(session: SessionDep,current_user: CurrentUser, id: RowId) -> Message:
|
|
||||||
"""
|
|
||||||
Delete a hike.
|
|
||||||
"""
|
|
||||||
hike = session.get(Hike, id)
|
|
||||||
if not hike:
|
|
||||||
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Hike not found")
|
|
||||||
|
|
||||||
if not current_user.has_permissions(
|
|
||||||
module=PermissionModule.HIKE,
|
|
||||||
part=PermissionPart.ADMIN,
|
|
||||||
rights=PermissionRight.DELETE,
|
|
||||||
):
|
|
||||||
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions")
|
|
||||||
|
|
||||||
session.delete(hike)
|
|
||||||
session.commit()
|
|
||||||
return Message(message="Hike deleted successfully")
|
|
||||||
|
|
||||||
# endregion
|
|
||||||
176
backend/app/api/routes/series.py
Normal file
176
backend/app/api/routes/series.py
Normal file
@@ -0,0 +1,176 @@
|
|||||||
|
from typing import Any
|
||||||
|
|
||||||
|
from fastapi import APIRouter, HTTPException, status
|
||||||
|
from sqlmodel import func, select
|
||||||
|
|
||||||
|
from app.api.deps import CurrentUser, SessionDep
|
||||||
|
from app.models.base import (
|
||||||
|
ApiTags,
|
||||||
|
Message,
|
||||||
|
RowId,
|
||||||
|
)
|
||||||
|
from app.models.serie import (
|
||||||
|
Serie,
|
||||||
|
SerieCreate,
|
||||||
|
SerieUpdate,
|
||||||
|
SeriePublic,
|
||||||
|
SeriesPublic,
|
||||||
|
)
|
||||||
|
from app.models.division import (
|
||||||
|
Division,
|
||||||
|
DivisionsPublic,
|
||||||
|
)
|
||||||
|
from app.models.user import (
|
||||||
|
PermissionModule,
|
||||||
|
PermissionPart,
|
||||||
|
PermissionRight,
|
||||||
|
)
|
||||||
|
|
||||||
|
router = APIRouter(prefix="/series", tags=[ApiTags.SERIES])
|
||||||
|
|
||||||
|
|
||||||
|
# region # Series ########################################################
|
||||||
|
|
||||||
|
@router.get("/", response_model=SeriesPublic)
|
||||||
|
def read_series(
|
||||||
|
session: SessionDep, current_user: CurrentUser, skip: int = 0, limit: int = 100
|
||||||
|
) -> Any:
|
||||||
|
"""
|
||||||
|
Retrieve all series.
|
||||||
|
"""
|
||||||
|
|
||||||
|
if current_user.has_permissions(
|
||||||
|
module=PermissionModule.SERIE,
|
||||||
|
part=PermissionPart.ADMIN,
|
||||||
|
rights=PermissionRight.READ,
|
||||||
|
):
|
||||||
|
count_statement = select(func.count()).select_from(Serie)
|
||||||
|
count = session.exec(count_statement).one()
|
||||||
|
statement = select(Serie).offset(skip).limit(limit)
|
||||||
|
series = session.exec(statement).all()
|
||||||
|
return SeriesPublic(data=series, count=count)
|
||||||
|
|
||||||
|
return SeriesPublic(data=[], count=0)
|
||||||
|
|
||||||
|
|
||||||
|
@router.get("/{id}", response_model=SeriePublic)
|
||||||
|
def read_serie(session: SessionDep, current_user: CurrentUser, id: RowId) -> Any:
|
||||||
|
"""
|
||||||
|
Get serie by ID.
|
||||||
|
"""
|
||||||
|
serie = session.get(Serie, id)
|
||||||
|
if not serie:
|
||||||
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Serie not found")
|
||||||
|
|
||||||
|
if not current_user.has_permissions(
|
||||||
|
module=PermissionModule.SERIE,
|
||||||
|
part=PermissionPart.ADMIN,
|
||||||
|
rights=PermissionRight.READ,
|
||||||
|
):
|
||||||
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions")
|
||||||
|
|
||||||
|
return serie
|
||||||
|
|
||||||
|
|
||||||
|
@router.post("/", response_model=SeriePublic)
|
||||||
|
def create_serie(
|
||||||
|
*, session: SessionDep, current_user: CurrentUser, serie_in: SerieCreate
|
||||||
|
) -> Any:
|
||||||
|
"""
|
||||||
|
Create new serie.
|
||||||
|
"""
|
||||||
|
|
||||||
|
if not current_user.has_permissions(
|
||||||
|
module=PermissionModule.SERIE,
|
||||||
|
part=PermissionPart.ADMIN,
|
||||||
|
rights=PermissionRight.CREATE,
|
||||||
|
):
|
||||||
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions")
|
||||||
|
|
||||||
|
serie = Serie.create(create_obj=serie_in, session=session)
|
||||||
|
return serie
|
||||||
|
|
||||||
|
|
||||||
|
@router.put("/{id}", response_model=SeriePublic)
|
||||||
|
def update_serie(
|
||||||
|
*, session: SessionDep, current_user: CurrentUser, id: RowId, serie_in: SerieUpdate
|
||||||
|
) -> Any:
|
||||||
|
"""
|
||||||
|
Update a serie.
|
||||||
|
"""
|
||||||
|
serie = session.get(Serie, id)
|
||||||
|
if not serie:
|
||||||
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Serie not found")
|
||||||
|
|
||||||
|
if not current_user.has_permissions(
|
||||||
|
module=PermissionModule.SERIE,
|
||||||
|
part=PermissionPart.ADMIN,
|
||||||
|
rights=PermissionRight.UPDATE,
|
||||||
|
):
|
||||||
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions")
|
||||||
|
|
||||||
|
serie = Serie.update(db_obj=serie, in_obj=serie_in, session=session)
|
||||||
|
return serie
|
||||||
|
|
||||||
|
|
||||||
|
@router.delete("/{id}")
|
||||||
|
def delete_serie(session: SessionDep,current_user: CurrentUser, id: RowId) -> Message:
|
||||||
|
"""
|
||||||
|
Delete a serie.
|
||||||
|
"""
|
||||||
|
serie = session.get(Serie, id)
|
||||||
|
if not serie:
|
||||||
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Serie not found")
|
||||||
|
|
||||||
|
if not current_user.has_permissions(
|
||||||
|
module=PermissionModule.SERIE,
|
||||||
|
part=PermissionPart.ADMIN,
|
||||||
|
rights=PermissionRight.DELETE,
|
||||||
|
):
|
||||||
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions")
|
||||||
|
|
||||||
|
session.delete(serie)
|
||||||
|
session.commit()
|
||||||
|
return Message(message="Serie deleted successfully")
|
||||||
|
|
||||||
|
# endregion
|
||||||
|
|
||||||
|
|
||||||
|
# region # Series / Divisions ############################################
|
||||||
|
|
||||||
|
|
||||||
|
@router.get("/{series_id}/divisions/", response_model=DivisionsPublic)
|
||||||
|
def read_serie_division(
|
||||||
|
session: SessionDep, current_user: CurrentUser, series_id: RowId, skip: int = 0, limit: int = 100
|
||||||
|
) -> Any:
|
||||||
|
"""
|
||||||
|
Retrieve all serie divisions.
|
||||||
|
"""
|
||||||
|
|
||||||
|
serie = session.get(Serie, series_id)
|
||||||
|
if not serie:
|
||||||
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Serie not found")
|
||||||
|
|
||||||
|
if not current_user.has_permission(
|
||||||
|
module=PermissionModule.SERIE,
|
||||||
|
part=PermissionPart.ADMIN,
|
||||||
|
rights=(PermissionRight.MANAGE_DIVISIONS | PermissionRight.READ),
|
||||||
|
):
|
||||||
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions")
|
||||||
|
|
||||||
|
count_statement = (select(func.count())
|
||||||
|
.select_from(Division)
|
||||||
|
.where(Division.serie_id == serie.id)
|
||||||
|
)
|
||||||
|
count = session.exec(count_statement).one()
|
||||||
|
statement = (select(Division)
|
||||||
|
.where(Division.serie_id == serie.id)
|
||||||
|
.offset(skip)
|
||||||
|
.limit(limit)
|
||||||
|
)
|
||||||
|
divisions = session.exec(statement).all()
|
||||||
|
|
||||||
|
return DivisionsPublic(data=divisions, count=count)
|
||||||
|
|
||||||
|
|
||||||
|
# endregion
|
||||||
@@ -30,8 +30,8 @@ from app.models.member import (
|
|||||||
from app.models.apikey import (
|
from app.models.apikey import (
|
||||||
ApiKey,
|
ApiKey,
|
||||||
)
|
)
|
||||||
from app.models.hike import (
|
from app.models.serie import (
|
||||||
Hike,
|
Serie,
|
||||||
)
|
)
|
||||||
|
|
||||||
engine = create_engine(str(settings.SQLALCHEMY_DATABASE_URI))
|
engine = create_engine(str(settings.SQLALCHEMY_DATABASE_URI))
|
||||||
|
|||||||
@@ -4,7 +4,6 @@ from enum import Enum, IntFlag # Python 3.11 >= StrEnum
|
|||||||
from enum import auto as auto_enum
|
from enum import auto as auto_enum
|
||||||
from uuid import UUID as RowId
|
from uuid import UUID as RowId
|
||||||
|
|
||||||
from sqlalchemy import TypeDecorator, Integer, VARCHAR
|
|
||||||
from sqlmodel import SQLModel
|
from sqlmodel import SQLModel
|
||||||
from sqlalchemy.orm import declared_attr
|
from sqlalchemy.orm import declared_attr
|
||||||
|
|
||||||
@@ -40,85 +39,8 @@ class DocumentedStrEnum(str, Enum):
|
|||||||
|
|
||||||
|
|
||||||
class DocumentedIntFlag(IntFlag):
|
class DocumentedIntFlag(IntFlag):
|
||||||
@property
|
# TODO: Build DB sport to proper store flags and make it possible to store all mutations
|
||||||
def names(self) -> str:
|
pass
|
||||||
"""
|
|
||||||
Returns a comma-separated string of all active flag names.
|
|
||||||
"""
|
|
||||||
# Exclude 0-value flags
|
|
||||||
return ",".join([flag.name for flag in type(self) if flag in self and flag.value != 0])
|
|
||||||
|
|
||||||
def __str__(self) -> str:
|
|
||||||
# Default string conversion uses the names
|
|
||||||
return self.names
|
|
||||||
|
|
||||||
# Optional: for Pydantic compatibility
|
|
||||||
def __get_pydantic_json__(self) -> str:
|
|
||||||
return self.names
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class DocumentedIntFlagType(TypeDecorator):
|
|
||||||
impl = Integer
|
|
||||||
cache_ok = True
|
|
||||||
|
|
||||||
def __init__(self, enum_class: type[IntFlag], *args, **kwargs):
|
|
||||||
super().__init__(*args, **kwargs)
|
|
||||||
self.enum_class = enum_class
|
|
||||||
|
|
||||||
def process_bind_param(self, value, dialect):
|
|
||||||
"""
|
|
||||||
Convert IntFlag to integer before storing
|
|
||||||
"""
|
|
||||||
if value is None:
|
|
||||||
return None
|
|
||||||
if isinstance(value, self.enum_class):
|
|
||||||
return int(value)
|
|
||||||
if isinstance(value, int):
|
|
||||||
return value
|
|
||||||
raise ValueError(f"Invalid value for {self.enum_class.__name__}: {value!r}")
|
|
||||||
|
|
||||||
def process_result_value(self, value, dialect):
|
|
||||||
"""
|
|
||||||
Convert integer from DB back to IntFlag
|
|
||||||
"""
|
|
||||||
if value is None:
|
|
||||||
return None
|
|
||||||
return self.enum_class(value)
|
|
||||||
|
|
||||||
|
|
||||||
class DocumentedStrFlagType(TypeDecorator):
|
|
||||||
impl = VARCHAR
|
|
||||||
cache_ok = True
|
|
||||||
|
|
||||||
def __init__(self, enum_class: type[IntFlag], *args, **kwargs):
|
|
||||||
super().__init__(*args, **kwargs)
|
|
||||||
self.enum_class = enum_class
|
|
||||||
|
|
||||||
def process_bind_param(self, value, dialect):
|
|
||||||
"""
|
|
||||||
Convert IntFlag to comma-separated string of names for storing in DB.
|
|
||||||
"""
|
|
||||||
if value is None:
|
|
||||||
return None
|
|
||||||
if isinstance(value, self.enum_class):
|
|
||||||
return str(value)
|
|
||||||
raise ValueError(f"Invalid value for {self.enum_class.__name__}: {value!r}")
|
|
||||||
|
|
||||||
def process_result_value(self, value, dialect):
|
|
||||||
"""
|
|
||||||
Convert comma-separated string of names from DB back to IntFlag.
|
|
||||||
"""
|
|
||||||
if value is None or value == "":
|
|
||||||
return self.enum_class(0)
|
|
||||||
names = value.split(",")
|
|
||||||
result = self.enum_class(0)
|
|
||||||
for name in names:
|
|
||||||
try:
|
|
||||||
result |= self.enum_class[name]
|
|
||||||
except KeyError:
|
|
||||||
raise ValueError(f"Invalid flag name '{name}' for {self.enum_class.__name__}")
|
|
||||||
return result
|
|
||||||
|
|
||||||
|
|
||||||
# #############################################################################
|
# #############################################################################
|
||||||
@@ -137,8 +59,7 @@ class ApiTags(DocumentedStrEnum):
|
|||||||
ASSOCIATIONS = "Associations"
|
ASSOCIATIONS = "Associations"
|
||||||
DIVISIONS = "Divisions"
|
DIVISIONS = "Divisions"
|
||||||
MEMBERS = "Members"
|
MEMBERS = "Members"
|
||||||
|
SERIES = "Series"
|
||||||
HIKES = "Hikes"
|
|
||||||
|
|
||||||
|
|
||||||
# endregion
|
# endregion
|
||||||
|
|||||||
@@ -1,181 +0,0 @@
|
|||||||
from typing import TYPE_CHECKING
|
|
||||||
|
|
||||||
from sqlmodel import (
|
|
||||||
Session,
|
|
||||||
Relationship,
|
|
||||||
Field,
|
|
||||||
)
|
|
||||||
|
|
||||||
from . import mixin
|
|
||||||
from .base import (
|
|
||||||
BaseSQLModel,
|
|
||||||
DocumentedStrEnum,
|
|
||||||
DocumentedIntFlag,
|
|
||||||
DocumentedStrFlagType,
|
|
||||||
auto_enum,
|
|
||||||
)
|
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
|
||||||
from .event import Event
|
|
||||||
|
|
||||||
# region # Hike ################################################################
|
|
||||||
|
|
||||||
class HikeTeamPage(DocumentedIntFlag):
|
|
||||||
ROUTE = auto_enum() # Route steps info
|
|
||||||
QUESTIONS = auto_enum() # Visible questions
|
|
||||||
|
|
||||||
VISITED_PLACES = auto_enum() # Places the team has been
|
|
||||||
CURRENT_PLACE = auto_enum() # Place where the team currently is
|
|
||||||
UNVISITED_PLACES = auto_enum() # Places the team still neat to visit
|
|
||||||
|
|
||||||
PLACES = VISITED_PLACES | CURRENT_PLACE | UNVISITED_PLACES # All the places
|
|
||||||
|
|
||||||
TRAVEL_TIME = auto_enum() # Total travel time
|
|
||||||
TRAVEL_FREE_TIME = auto_enum() # Total travel time
|
|
||||||
PLACE_TIME = auto_enum() # Total place time
|
|
||||||
CALCULATE_TIME = auto_enum() # Total time calculated based on hike settings
|
|
||||||
# TODO: Think about time between oter teams
|
|
||||||
|
|
||||||
PLACE_POINTS = auto_enum() # Points scored on a place
|
|
||||||
TOTAL_PLACE_POINTS = auto_enum() # All points got on all places
|
|
||||||
ASSIST_POINTS = auto_enum() # Minus points for assistants
|
|
||||||
# TODO: Think about place in classement
|
|
||||||
|
|
||||||
ASSIST_LOG = auto_enum() # Assisted items
|
|
||||||
ASSIST_LATTER = auto_enum() # ???
|
|
||||||
|
|
||||||
|
|
||||||
# ##############################################################################
|
|
||||||
|
|
||||||
|
|
||||||
class HikeTimeCalculation(DocumentedIntFlag):
|
|
||||||
TRAVEL_TIME = auto_enum() # Time traveling
|
|
||||||
|
|
||||||
# TODO: Think about time groups (in this model we have 2, free and non free)
|
|
||||||
TRAVEL_FREE_TIME = auto_enum() # Time that is excluded from traveling but not at a place
|
|
||||||
PLACE_TIME = auto_enum() # Time checked in at a place
|
|
||||||
|
|
||||||
TOTAL_TIME = TRAVEL_TIME | TRAVEL_FREE_TIME | PLACE_TIME
|
|
||||||
|
|
||||||
|
|
||||||
# ##############################################################################
|
|
||||||
|
|
||||||
|
|
||||||
class HikeBase(
|
|
||||||
mixin.Name,
|
|
||||||
mixin.Contact,
|
|
||||||
BaseSQLModel,
|
|
||||||
):
|
|
||||||
tracker_interval: int | None = Field(
|
|
||||||
default=None,
|
|
||||||
nullable=True,
|
|
||||||
description="Is GPS button available, value will be the interval",
|
|
||||||
)
|
|
||||||
|
|
||||||
is_multi_day: bool = Field(
|
|
||||||
default=False,
|
|
||||||
nullable=False,
|
|
||||||
description="Show datetime in stead of time only",
|
|
||||||
)
|
|
||||||
|
|
||||||
team_page: HikeTeamPage = Field(
|
|
||||||
default=HikeTeamPage.PLACES | HikeTeamPage.ROUTE | HikeTeamPage.QUESTIONS,
|
|
||||||
nullable=False,
|
|
||||||
description="Show all the places of the route inside the teams page",
|
|
||||||
sa_type=DocumentedStrFlagType(HikeTeamPage),
|
|
||||||
)
|
|
||||||
|
|
||||||
time_calculation: HikeTimeCalculation = Field(
|
|
||||||
default=HikeTimeCalculation.TRAVEL_TIME,
|
|
||||||
nullable=False,
|
|
||||||
description="Wath should we calculate inside the total time",
|
|
||||||
sa_type=DocumentedStrFlagType(HikeTimeCalculation),
|
|
||||||
)
|
|
||||||
|
|
||||||
min_time_points: int | None = Field(
|
|
||||||
default=0,
|
|
||||||
ge=0,
|
|
||||||
# TODO: le: max_time_points
|
|
||||||
description="Min points for time",
|
|
||||||
)
|
|
||||||
|
|
||||||
max_time_points: int | None = Field(
|
|
||||||
default=100,
|
|
||||||
ge=0, # TODO: ge > min_time_points
|
|
||||||
description="Max points for time",
|
|
||||||
)
|
|
||||||
|
|
||||||
max_question_points: int | None = Field(
|
|
||||||
default=None,
|
|
||||||
description="None: Calculate from answers (no max), Positive: Set as max for dynamic range",
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
# Properties to receive via API on creation
|
|
||||||
class HikeCreate(HikeBase):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
# Properties to receive via API on update, all are optional
|
|
||||||
class HikeUpdate(HikeBase):
|
|
||||||
is_multi_day: bool | None = Field(default=None) # type: ignore
|
|
||||||
team_page: HikeTeamPage | None = Field(default=None) # type: ignore
|
|
||||||
time_calculation: HikeTimeCalculation | None = Field(default=None) # type: ignore
|
|
||||||
|
|
||||||
|
|
||||||
class Hike(mixin.RowId, HikeBase, table=True):
|
|
||||||
# --- database only items --------------------------------------------------
|
|
||||||
|
|
||||||
# --- read only items ------------------------------------------------------
|
|
||||||
|
|
||||||
# --- back_populates links -------------------------------------------------
|
|
||||||
|
|
||||||
# --- CRUD actions ---------------------------------------------------------
|
|
||||||
@classmethod
|
|
||||||
def create(cls, *, session: Session, create_obj: HikeCreate) -> "Hike":
|
|
||||||
data_obj = create_obj.model_dump(exclude_unset=True)
|
|
||||||
|
|
||||||
db_obj = cls.model_validate(data_obj)
|
|
||||||
session.add(db_obj)
|
|
||||||
session.commit()
|
|
||||||
session.refresh(db_obj)
|
|
||||||
return db_obj
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def update(
|
|
||||||
cls, *, session: Session, db_obj: "Hike", in_obj: HikeUpdate
|
|
||||||
) -> "Hike":
|
|
||||||
data_obj = in_obj.model_dump(exclude_unset=True)
|
|
||||||
db_obj.sqlmodel_update(data_obj)
|
|
||||||
session.add(db_obj)
|
|
||||||
session.commit()
|
|
||||||
session.refresh(db_obj)
|
|
||||||
return db_obj
|
|
||||||
|
|
||||||
|
|
||||||
# Properties to return via API, id is always required
|
|
||||||
class HikePublic(mixin.RowIdPublic, HikeBase):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class HikesPublic(BaseSQLModel):
|
|
||||||
data: list[HikePublic]
|
|
||||||
count: int
|
|
||||||
|
|
||||||
|
|
||||||
# endregion
|
|
||||||
|
|
||||||
|
|
||||||
# region # Hike / Route ########################################################
|
|
||||||
|
|
||||||
|
|
||||||
class RouteType(DocumentedStrEnum):
|
|
||||||
START_FINISH = auto_enum() # Start at the start and end at the finish
|
|
||||||
CIRCULAR = auto_enum() # Start some ware, finish at the last new place
|
|
||||||
CIRCULAR_BACK_TO_START = auto_enum() # Start and finish on the same random place (CIRCULAR + next to start)
|
|
||||||
|
|
||||||
|
|
||||||
# ##############################################################################
|
|
||||||
|
|
||||||
|
|
||||||
# endregion
|
|
||||||
@@ -18,6 +18,7 @@ if TYPE_CHECKING:
|
|||||||
from .apikey import ApiKey
|
from .apikey import ApiKey
|
||||||
from .event import EventUserLink
|
from .event import EventUserLink
|
||||||
from .member import Member
|
from .member import Member
|
||||||
|
from .serie import SerieUserLink
|
||||||
|
|
||||||
|
|
||||||
# region # User ################################################################
|
# region # User ################################################################
|
||||||
@@ -31,8 +32,7 @@ class PermissionModule(DocumentedStrEnum):
|
|||||||
ASSOCIATION = auto_enum()
|
ASSOCIATION = auto_enum()
|
||||||
DIVISION = auto_enum()
|
DIVISION = auto_enum()
|
||||||
MEMBER = auto_enum()
|
MEMBER = auto_enum()
|
||||||
|
SERIE = auto_enum()
|
||||||
HIKE = auto_enum()
|
|
||||||
|
|
||||||
|
|
||||||
class PermissionPart(DocumentedStrEnum):
|
class PermissionPart(DocumentedStrEnum):
|
||||||
@@ -138,6 +138,7 @@ class User(mixin.RowId, UserBase, table=True):
|
|||||||
# --- many-to-many links ---------------------------------------------------
|
# --- many-to-many links ---------------------------------------------------
|
||||||
roles: list["Role"] = Relationship(back_populates="users", link_model=UserRoleLink)
|
roles: list["Role"] = Relationship(back_populates="users", link_model=UserRoleLink)
|
||||||
event_links: list["EventUserLink"] = Relationship(back_populates="user")
|
event_links: list["EventUserLink"] = Relationship(back_populates="user")
|
||||||
|
serie_links: list["SerieUserLink"] = Relationship(back_populates="user")
|
||||||
|
|
||||||
# --- CRUD actions ---------------------------------------------------------
|
# --- CRUD actions ---------------------------------------------------------
|
||||||
@classmethod
|
@classmethod
|
||||||
|
|||||||
@@ -5,16 +5,16 @@ from fastapi.testclient import TestClient
|
|||||||
from sqlmodel import Session
|
from sqlmodel import Session
|
||||||
|
|
||||||
from app.core.config import settings
|
from app.core.config import settings
|
||||||
from app.tests.utils.hike import create_random_hike
|
from app.tests.utils.serie import create_random_serie
|
||||||
|
|
||||||
|
|
||||||
def test_create_hike(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
|
def test_create_serie(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
|
||||||
data = {
|
data = {
|
||||||
"name": "RSW Maasdelta 2026",
|
"name": "Zondag spel",
|
||||||
"contact": "Sebas",
|
"contact": "Rick",
|
||||||
}
|
}
|
||||||
response = client.post(
|
response = client.post(
|
||||||
f"{settings.API_V1_STR}/hikes/",
|
f"{settings.API_V1_STR}/series/",
|
||||||
headers=superuser_token_headers,
|
headers=superuser_token_headers,
|
||||||
json=data,
|
json=data,
|
||||||
)
|
)
|
||||||
@@ -25,13 +25,13 @@ def test_create_hike(client: TestClient, superuser_token_headers: dict[str, str]
|
|||||||
assert "id" in content
|
assert "id" in content
|
||||||
|
|
||||||
|
|
||||||
def test_create_hike_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
|
def test_create_serie_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
|
||||||
data = {
|
data = {
|
||||||
"name": "RSW Maasdelta 2026",
|
"name": "Zondag Spel",
|
||||||
"contact": "Sebas",
|
"contact": "Rick",
|
||||||
}
|
}
|
||||||
response = client.post(
|
response = client.post(
|
||||||
f"{settings.API_V1_STR}/hikes/",
|
f"{settings.API_V1_STR}/series/",
|
||||||
headers=normal_user_token_headers,
|
headers=normal_user_token_headers,
|
||||||
json=data,
|
json=data,
|
||||||
)
|
)
|
||||||
@@ -39,43 +39,43 @@ def test_create_hike_no_permissions(client: TestClient, normal_user_token_header
|
|||||||
assert response.json()["detail"] == "Not enough permissions"
|
assert response.json()["detail"] == "Not enough permissions"
|
||||||
|
|
||||||
|
|
||||||
def test_read_hike(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
|
def test_read_serie(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
|
||||||
hike = create_random_hike(db)
|
serie = create_random_serie(db)
|
||||||
response = client.get(
|
response = client.get(
|
||||||
f"{settings.API_V1_STR}/hikes/{hike.id}",
|
f"{settings.API_V1_STR}/series/{serie.id}",
|
||||||
headers=superuser_token_headers,
|
headers=superuser_token_headers,
|
||||||
)
|
)
|
||||||
assert response.status_code == status.HTTP_200_OK
|
assert response.status_code == status.HTTP_200_OK
|
||||||
content = response.json()
|
content = response.json()
|
||||||
assert content["id"] == str(hike.id)
|
assert content["id"] == str(serie.id)
|
||||||
assert content["name"] == hike.name
|
assert content["name"] == serie.name
|
||||||
assert content["contact"] == hike.contact
|
assert content["contact"] == serie.contact
|
||||||
|
|
||||||
|
|
||||||
def test_read_hike_not_found(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
|
def test_read_serie_not_found(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
|
||||||
response = client.get(
|
response = client.get(
|
||||||
f"{settings.API_V1_STR}/hikes/{uuid.uuid4()}",
|
f"{settings.API_V1_STR}/series/{uuid.uuid4()}",
|
||||||
headers=superuser_token_headers,
|
headers=superuser_token_headers,
|
||||||
)
|
)
|
||||||
assert response.status_code == status.HTTP_404_NOT_FOUND
|
assert response.status_code == status.HTTP_404_NOT_FOUND
|
||||||
assert response.json()["detail"] == "Hike not found"
|
assert response.json()["detail"] == "Serie not found"
|
||||||
|
|
||||||
|
|
||||||
def test_read_hike_no_permission(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
|
def test_read_serie_no_permission(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
|
||||||
hike = create_random_hike(db)
|
serie = create_random_serie(db)
|
||||||
response = client.get(
|
response = client.get(
|
||||||
f"{settings.API_V1_STR}/hikes/{hike.id}",
|
f"{settings.API_V1_STR}/series/{serie.id}",
|
||||||
headers=normal_user_token_headers,
|
headers=normal_user_token_headers,
|
||||||
)
|
)
|
||||||
assert response.status_code == status.HTTP_403_FORBIDDEN
|
assert response.status_code == status.HTTP_403_FORBIDDEN
|
||||||
assert response.json()["detail"] == "Not enough permissions"
|
assert response.json()["detail"] == "Not enough permissions"
|
||||||
|
|
||||||
|
|
||||||
def test_read_hikes(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
|
def test_read_series(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
|
||||||
create_random_hike(db)
|
create_random_serie(db)
|
||||||
create_random_hike(db)
|
create_random_serie(db)
|
||||||
response = client.get(
|
response = client.get(
|
||||||
f"{settings.API_V1_STR}/hikes/",
|
f"{settings.API_V1_STR}/series/",
|
||||||
headers=superuser_token_headers,
|
headers=superuser_token_headers,
|
||||||
)
|
)
|
||||||
assert response.status_code == status.HTTP_200_OK
|
assert response.status_code == status.HTTP_200_OK
|
||||||
@@ -87,11 +87,11 @@ def test_read_hikes(client: TestClient, superuser_token_headers: dict[str, str],
|
|||||||
assert len(content["data"]) <= content["count"]
|
assert len(content["data"]) <= content["count"]
|
||||||
|
|
||||||
|
|
||||||
def test_read_hikes_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
|
def test_read_series_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
|
||||||
create_random_hike(db)
|
create_random_serie(db)
|
||||||
create_random_hike(db)
|
create_random_serie(db)
|
||||||
response = client.get(
|
response = client.get(
|
||||||
f"{settings.API_V1_STR}/hikes/",
|
f"{settings.API_V1_STR}/series/",
|
||||||
headers=normal_user_token_headers,
|
headers=normal_user_token_headers,
|
||||||
)
|
)
|
||||||
assert response.status_code == status.HTTP_200_OK
|
assert response.status_code == status.HTTP_200_OK
|
||||||
@@ -103,46 +103,46 @@ def test_read_hikes_no_permissions(client: TestClient, normal_user_token_headers
|
|||||||
assert len(content["data"]) == 0
|
assert len(content["data"]) == 0
|
||||||
|
|
||||||
|
|
||||||
def test_update_hike(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
|
def test_update_serie(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
|
||||||
hike = create_random_hike(db)
|
serie = create_random_serie(db)
|
||||||
data = {
|
data = {
|
||||||
"name": "Updated name",
|
"name": "Updated name",
|
||||||
"contact": "Updated contact",
|
"contact": "Updated contact",
|
||||||
}
|
}
|
||||||
response = client.put(
|
response = client.put(
|
||||||
f"{settings.API_V1_STR}/hikes/{hike.id}",
|
f"{settings.API_V1_STR}/series/{serie.id}",
|
||||||
headers=superuser_token_headers,
|
headers=superuser_token_headers,
|
||||||
json=data,
|
json=data,
|
||||||
)
|
)
|
||||||
assert response.status_code == status.HTTP_200_OK
|
assert response.status_code == status.HTTP_200_OK
|
||||||
content = response.json()
|
content = response.json()
|
||||||
assert content["id"] == str(hike.id)
|
assert content["id"] == str(serie.id)
|
||||||
assert content["name"] == data["name"]
|
assert content["name"] == data["name"]
|
||||||
assert content["contact"] == data["contact"]
|
assert content["contact"] == data["contact"]
|
||||||
|
|
||||||
|
|
||||||
def test_update_hike_not_found(client: TestClient, superuser_token_headers: dict[str, str]) -> None:
|
def test_update_serie_not_found(client: TestClient, superuser_token_headers: dict[str, str]) -> None:
|
||||||
data = {
|
data = {
|
||||||
"name": "Not found",
|
"name": "Not found",
|
||||||
"contact": "Not found",
|
"contact": "Not found",
|
||||||
}
|
}
|
||||||
response = client.put(
|
response = client.put(
|
||||||
f"{settings.API_V1_STR}/hikes/{uuid.uuid4()}",
|
f"{settings.API_V1_STR}/series/{uuid.uuid4()}",
|
||||||
headers=superuser_token_headers,
|
headers=superuser_token_headers,
|
||||||
json=data,
|
json=data,
|
||||||
)
|
)
|
||||||
assert response.status_code == status.HTTP_404_NOT_FOUND
|
assert response.status_code == status.HTTP_404_NOT_FOUND
|
||||||
assert response.json()["detail"] == "Hike not found"
|
assert response.json()["detail"] == "Serie not found"
|
||||||
|
|
||||||
|
|
||||||
def test_update_hike_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
|
def test_update_serie_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
|
||||||
hike = create_random_hike(db)
|
serie = create_random_serie(db)
|
||||||
data = {
|
data = {
|
||||||
"name": "No permissions",
|
"name": "No permissions",
|
||||||
"contact": "No permissions",
|
"contact": "No permissions",
|
||||||
}
|
}
|
||||||
response = client.put(
|
response = client.put(
|
||||||
f"{settings.API_V1_STR}/hikes/{hike.id}",
|
f"{settings.API_V1_STR}/series/{serie.id}",
|
||||||
headers=normal_user_token_headers,
|
headers=normal_user_token_headers,
|
||||||
json=data,
|
json=data,
|
||||||
)
|
)
|
||||||
@@ -150,29 +150,29 @@ def test_update_hike_no_permissions(client: TestClient, normal_user_token_header
|
|||||||
assert response.json()["detail"] == "Not enough permissions"
|
assert response.json()["detail"] == "Not enough permissions"
|
||||||
|
|
||||||
|
|
||||||
def test_delete_hike(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
|
def test_delete_serie(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
|
||||||
hike = create_random_hike(db)
|
serie = create_random_serie(db)
|
||||||
response = client.delete(
|
response = client.delete(
|
||||||
f"{settings.API_V1_STR}/hikes/{hike.id}",
|
f"{settings.API_V1_STR}/series/{serie.id}",
|
||||||
headers=superuser_token_headers,
|
headers=superuser_token_headers,
|
||||||
)
|
)
|
||||||
assert response.status_code == status.HTTP_200_OK
|
assert response.status_code == status.HTTP_200_OK
|
||||||
assert response.json()["message"] == "Hike deleted successfully"
|
assert response.json()["message"] == "Serie deleted successfully"
|
||||||
|
|
||||||
|
|
||||||
def test_delete_hike_not_found(client: TestClient, superuser_token_headers: dict[str, str]) -> None:
|
def test_delete_serie_not_found(client: TestClient, superuser_token_headers: dict[str, str]) -> None:
|
||||||
response = client.delete(
|
response = client.delete(
|
||||||
f"{settings.API_V1_STR}/hikes/{uuid.uuid4()}",
|
f"{settings.API_V1_STR}/series/{uuid.uuid4()}",
|
||||||
headers=superuser_token_headers,
|
headers=superuser_token_headers,
|
||||||
)
|
)
|
||||||
assert response.status_code == status.HTTP_404_NOT_FOUND
|
assert response.status_code == status.HTTP_404_NOT_FOUND
|
||||||
assert response.json()["detail"] == "Hike not found"
|
assert response.json()["detail"] == "Serie not found"
|
||||||
|
|
||||||
|
|
||||||
def test_delete_hike_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
|
def test_delete_serie_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
|
||||||
hike = create_random_hike(db)
|
serie = create_random_serie(db)
|
||||||
response = client.delete(
|
response = client.delete(
|
||||||
f"{settings.API_V1_STR}/hikes/{hike.id}",
|
f"{settings.API_V1_STR}/series/{serie.id}",
|
||||||
headers=normal_user_token_headers,
|
headers=normal_user_token_headers,
|
||||||
)
|
)
|
||||||
assert response.status_code == status.HTTP_403_FORBIDDEN
|
assert response.status_code == status.HTTP_403_FORBIDDEN
|
||||||
@@ -1,12 +0,0 @@
|
|||||||
from sqlmodel import Session
|
|
||||||
|
|
||||||
from app.models.hike import Hike, HikeCreate
|
|
||||||
from app.tests.utils.utils import random_lower_string
|
|
||||||
|
|
||||||
|
|
||||||
def create_random_hike(db: Session, name: str = None) -> Hike:
|
|
||||||
if not name:
|
|
||||||
name = random_lower_string()
|
|
||||||
|
|
||||||
hike_in = HikeCreate(name=name)
|
|
||||||
return Hike.create(session=db, create_obj=hike_in)
|
|
||||||
12
backend/app/tests/utils/serie.py
Normal file
12
backend/app/tests/utils/serie.py
Normal file
@@ -0,0 +1,12 @@
|
|||||||
|
from sqlmodel import Session
|
||||||
|
|
||||||
|
from app.models.serie import Serie, SerieCreate
|
||||||
|
from app.tests.utils.utils import random_lower_string
|
||||||
|
|
||||||
|
|
||||||
|
def create_random_serie(db: Session, name: str = None) -> Serie:
|
||||||
|
if not name:
|
||||||
|
name = random_lower_string()
|
||||||
|
|
||||||
|
serie_in = SerieCreate(name=name)
|
||||||
|
return Serie.create(session=db, create_obj=serie_in)
|
||||||
Reference in New Issue
Block a user