1 Commits

Author SHA1 Message Date
Rick
076765e5c5 Added base file for serie 2025-11-08 12:19:34 +01:00
10 changed files with 247 additions and 463 deletions

View File

@@ -10,7 +10,7 @@ from app.api.routes import (
private,
users,
utils,
hikes,
series,
)
from app.core.config import settings
@@ -25,8 +25,7 @@ api_router.include_router(teams.router)
api_router.include_router(associations.router)
api_router.include_router(divisions.router)
api_router.include_router(members.router)
api_router.include_router(hikes.router)
api_router.include_router(series.router)
if settings.ENVIRONMENT == "local":

View File

@@ -1,132 +0,0 @@
from typing import Any
from fastapi import APIRouter, HTTPException, status
from sqlmodel import func, select
from app.api.deps import CurrentUser, SessionDep
from app.models.base import (
ApiTags,
Message,
RowId,
)
from app.models.hike import (
Hike,
HikeCreate,
HikeUpdate,
HikePublic,
HikesPublic,
)
from app.models.user import (
PermissionModule,
PermissionPart,
PermissionRight,
)
router = APIRouter(prefix="/hikes", tags=[ApiTags.HIKES])
# region # Hikes ########################################################
@router.get("/", response_model=HikesPublic)
def read_hikes(
session: SessionDep, current_user: CurrentUser, skip: int = 0, limit: int = 100
) -> Any:
"""
Retrieve all hikes.
"""
if current_user.has_permissions(
module=PermissionModule.HIKE,
part=PermissionPart.ADMIN,
rights=PermissionRight.READ,
):
count_statement = select(func.count()).select_from(Hike)
count = session.exec(count_statement).one()
statement = select(Hike).offset(skip).limit(limit)
hikes = session.exec(statement).all()
return HikesPublic(data=hikes, count=count)
return HikesPublic(data=[], count=0)
@router.get("/{id}", response_model=HikePublic)
def read_hike(session: SessionDep, current_user: CurrentUser, id: RowId) -> Any:
"""
Get hike by ID.
"""
hike = session.get(Hike, id)
if not hike:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Hike not found")
if not current_user.has_permissions(
module=PermissionModule.HIKE,
part=PermissionPart.ADMIN,
rights=PermissionRight.READ,
):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions")
return hike
@router.post("/", response_model=HikePublic)
def create_hike(
*, session: SessionDep, current_user: CurrentUser, hike_in: HikeCreate
) -> Any:
"""
Create new hike.
"""
if not current_user.has_permissions(
module=PermissionModule.HIKE,
part=PermissionPart.ADMIN,
rights=PermissionRight.CREATE,
):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions")
hike = Hike.create(create_obj=hike_in, session=session)
return hike
@router.put("/{id}", response_model=HikePublic)
def update_hike(
*, session: SessionDep, current_user: CurrentUser, id: RowId, hike_in: HikeUpdate
) -> Any:
"""
Update a hike.
"""
hike = session.get(Hike, id)
if not hike:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Hike not found")
if not current_user.has_permissions(
module=PermissionModule.HIKE,
part=PermissionPart.ADMIN,
rights=PermissionRight.UPDATE,
):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions")
hike = Hike.update(db_obj=hike, in_obj=hike_in, session=session)
return hike
@router.delete("/{id}")
def delete_hike(session: SessionDep,current_user: CurrentUser, id: RowId) -> Message:
"""
Delete a hike.
"""
hike = session.get(Hike, id)
if not hike:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Hike not found")
if not current_user.has_permissions(
module=PermissionModule.HIKE,
part=PermissionPart.ADMIN,
rights=PermissionRight.DELETE,
):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions")
session.delete(hike)
session.commit()
return Message(message="Hike deleted successfully")
# endregion

View File

@@ -0,0 +1,176 @@
from typing import Any
from fastapi import APIRouter, HTTPException, status
from sqlmodel import func, select
from app.api.deps import CurrentUser, SessionDep
from app.models.base import (
ApiTags,
Message,
RowId,
)
from app.models.serie import (
Serie,
SerieCreate,
SerieUpdate,
SeriePublic,
SeriesPublic,
)
from app.models.division import (
Division,
DivisionsPublic,
)
from app.models.user import (
PermissionModule,
PermissionPart,
PermissionRight,
)
router = APIRouter(prefix="/series", tags=[ApiTags.SERIES])
# region # Series ########################################################
@router.get("/", response_model=SeriesPublic)
def read_series(
session: SessionDep, current_user: CurrentUser, skip: int = 0, limit: int = 100
) -> Any:
"""
Retrieve all series.
"""
if current_user.has_permissions(
module=PermissionModule.SERIE,
part=PermissionPart.ADMIN,
rights=PermissionRight.READ,
):
count_statement = select(func.count()).select_from(Serie)
count = session.exec(count_statement).one()
statement = select(Serie).offset(skip).limit(limit)
series = session.exec(statement).all()
return SeriesPublic(data=series, count=count)
return SeriesPublic(data=[], count=0)
@router.get("/{id}", response_model=SeriePublic)
def read_serie(session: SessionDep, current_user: CurrentUser, id: RowId) -> Any:
"""
Get serie by ID.
"""
serie = session.get(Serie, id)
if not serie:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Serie not found")
if not current_user.has_permissions(
module=PermissionModule.SERIE,
part=PermissionPart.ADMIN,
rights=PermissionRight.READ,
):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions")
return serie
@router.post("/", response_model=SeriePublic)
def create_serie(
*, session: SessionDep, current_user: CurrentUser, serie_in: SerieCreate
) -> Any:
"""
Create new serie.
"""
if not current_user.has_permissions(
module=PermissionModule.SERIE,
part=PermissionPart.ADMIN,
rights=PermissionRight.CREATE,
):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions")
serie = Serie.create(create_obj=serie_in, session=session)
return serie
@router.put("/{id}", response_model=SeriePublic)
def update_serie(
*, session: SessionDep, current_user: CurrentUser, id: RowId, serie_in: SerieUpdate
) -> Any:
"""
Update a serie.
"""
serie = session.get(Serie, id)
if not serie:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Serie not found")
if not current_user.has_permissions(
module=PermissionModule.SERIE,
part=PermissionPart.ADMIN,
rights=PermissionRight.UPDATE,
):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions")
serie = Serie.update(db_obj=serie, in_obj=serie_in, session=session)
return serie
@router.delete("/{id}")
def delete_serie(session: SessionDep,current_user: CurrentUser, id: RowId) -> Message:
"""
Delete a serie.
"""
serie = session.get(Serie, id)
if not serie:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Serie not found")
if not current_user.has_permissions(
module=PermissionModule.SERIE,
part=PermissionPart.ADMIN,
rights=PermissionRight.DELETE,
):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions")
session.delete(serie)
session.commit()
return Message(message="Serie deleted successfully")
# endregion
# region # Series / Divisions ############################################
@router.get("/{series_id}/divisions/", response_model=DivisionsPublic)
def read_serie_division(
session: SessionDep, current_user: CurrentUser, series_id: RowId, skip: int = 0, limit: int = 100
) -> Any:
"""
Retrieve all serie divisions.
"""
serie = session.get(Serie, series_id)
if not serie:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Serie not found")
if not current_user.has_permission(
module=PermissionModule.SERIE,
part=PermissionPart.ADMIN,
rights=(PermissionRight.MANAGE_DIVISIONS | PermissionRight.READ),
):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions")
count_statement = (select(func.count())
.select_from(Division)
.where(Division.serie_id == serie.id)
)
count = session.exec(count_statement).one()
statement = (select(Division)
.where(Division.serie_id == serie.id)
.offset(skip)
.limit(limit)
)
divisions = session.exec(statement).all()
return DivisionsPublic(data=divisions, count=count)
# endregion

View File

@@ -30,8 +30,8 @@ from app.models.member import (
from app.models.apikey import (
ApiKey,
)
from app.models.hike import (
Hike,
from app.models.serie import (
Serie,
)
engine = create_engine(str(settings.SQLALCHEMY_DATABASE_URI))

View File

@@ -4,7 +4,6 @@ from enum import Enum, IntFlag # Python 3.11 >= StrEnum
from enum import auto as auto_enum
from uuid import UUID as RowId
from sqlalchemy import TypeDecorator, Integer, VARCHAR
from sqlmodel import SQLModel
from sqlalchemy.orm import declared_attr
@@ -40,85 +39,8 @@ class DocumentedStrEnum(str, Enum):
class DocumentedIntFlag(IntFlag):
@property
def names(self) -> str:
"""
Returns a comma-separated string of all active flag names.
"""
# Exclude 0-value flags
return ",".join([flag.name for flag in type(self) if flag in self and flag.value != 0])
def __str__(self) -> str:
# Default string conversion uses the names
return self.names
# Optional: for Pydantic compatibility
def __get_pydantic_json__(self) -> str:
return self.names
class DocumentedIntFlagType(TypeDecorator):
impl = Integer
cache_ok = True
def __init__(self, enum_class: type[IntFlag], *args, **kwargs):
super().__init__(*args, **kwargs)
self.enum_class = enum_class
def process_bind_param(self, value, dialect):
"""
Convert IntFlag to integer before storing
"""
if value is None:
return None
if isinstance(value, self.enum_class):
return int(value)
if isinstance(value, int):
return value
raise ValueError(f"Invalid value for {self.enum_class.__name__}: {value!r}")
def process_result_value(self, value, dialect):
"""
Convert integer from DB back to IntFlag
"""
if value is None:
return None
return self.enum_class(value)
class DocumentedStrFlagType(TypeDecorator):
impl = VARCHAR
cache_ok = True
def __init__(self, enum_class: type[IntFlag], *args, **kwargs):
super().__init__(*args, **kwargs)
self.enum_class = enum_class
def process_bind_param(self, value, dialect):
"""
Convert IntFlag to comma-separated string of names for storing in DB.
"""
if value is None:
return None
if isinstance(value, self.enum_class):
return str(value)
raise ValueError(f"Invalid value for {self.enum_class.__name__}: {value!r}")
def process_result_value(self, value, dialect):
"""
Convert comma-separated string of names from DB back to IntFlag.
"""
if value is None or value == "":
return self.enum_class(0)
names = value.split(",")
result = self.enum_class(0)
for name in names:
try:
result |= self.enum_class[name]
except KeyError:
raise ValueError(f"Invalid flag name '{name}' for {self.enum_class.__name__}")
return result
# TODO: Build DB sport to proper store flags and make it possible to store all mutations
pass
# #############################################################################
@@ -137,8 +59,7 @@ class ApiTags(DocumentedStrEnum):
ASSOCIATIONS = "Associations"
DIVISIONS = "Divisions"
MEMBERS = "Members"
HIKES = "Hikes"
SERIES = "Series"
# endregion

View File

@@ -1,181 +0,0 @@
from typing import TYPE_CHECKING
from sqlmodel import (
Session,
Relationship,
Field,
)
from . import mixin
from .base import (
BaseSQLModel,
DocumentedStrEnum,
DocumentedIntFlag,
DocumentedStrFlagType,
auto_enum,
)
if TYPE_CHECKING:
from .event import Event
# region # Hike ################################################################
class HikeTeamPage(DocumentedIntFlag):
ROUTE = auto_enum() # Route steps info
QUESTIONS = auto_enum() # Visible questions
VISITED_PLACES = auto_enum() # Places the team has been
CURRENT_PLACE = auto_enum() # Place where the team currently is
UNVISITED_PLACES = auto_enum() # Places the team still neat to visit
PLACES = VISITED_PLACES | CURRENT_PLACE | UNVISITED_PLACES # All the places
TRAVEL_TIME = auto_enum() # Total travel time
TRAVEL_FREE_TIME = auto_enum() # Total travel time
PLACE_TIME = auto_enum() # Total place time
CALCULATE_TIME = auto_enum() # Total time calculated based on hike settings
# TODO: Think about time between oter teams
PLACE_POINTS = auto_enum() # Points scored on a place
TOTAL_PLACE_POINTS = auto_enum() # All points got on all places
ASSIST_POINTS = auto_enum() # Minus points for assistants
# TODO: Think about place in classement
ASSIST_LOG = auto_enum() # Assisted items
ASSIST_LATTER = auto_enum() # ???
# ##############################################################################
class HikeTimeCalculation(DocumentedIntFlag):
TRAVEL_TIME = auto_enum() # Time traveling
# TODO: Think about time groups (in this model we have 2, free and non free)
TRAVEL_FREE_TIME = auto_enum() # Time that is excluded from traveling but not at a place
PLACE_TIME = auto_enum() # Time checked in at a place
TOTAL_TIME = TRAVEL_TIME | TRAVEL_FREE_TIME | PLACE_TIME
# ##############################################################################
class HikeBase(
mixin.Name,
mixin.Contact,
BaseSQLModel,
):
tracker_interval: int | None = Field(
default=None,
nullable=True,
description="Is GPS button available, value will be the interval",
)
is_multi_day: bool = Field(
default=False,
nullable=False,
description="Show datetime in stead of time only",
)
team_page: HikeTeamPage = Field(
default=HikeTeamPage.PLACES | HikeTeamPage.ROUTE | HikeTeamPage.QUESTIONS,
nullable=False,
description="Show all the places of the route inside the teams page",
sa_type=DocumentedStrFlagType(HikeTeamPage),
)
time_calculation: HikeTimeCalculation = Field(
default=HikeTimeCalculation.TRAVEL_TIME,
nullable=False,
description="Wath should we calculate inside the total time",
sa_type=DocumentedStrFlagType(HikeTimeCalculation),
)
min_time_points: int | None = Field(
default=0,
ge=0,
# TODO: le: max_time_points
description="Min points for time",
)
max_time_points: int | None = Field(
default=100,
ge=0, # TODO: ge > min_time_points
description="Max points for time",
)
max_question_points: int | None = Field(
default=None,
description="None: Calculate from answers (no max), Positive: Set as max for dynamic range",
)
# Properties to receive via API on creation
class HikeCreate(HikeBase):
pass
# Properties to receive via API on update, all are optional
class HikeUpdate(HikeBase):
is_multi_day: bool | None = Field(default=None) # type: ignore
team_page: HikeTeamPage | None = Field(default=None) # type: ignore
time_calculation: HikeTimeCalculation | None = Field(default=None) # type: ignore
class Hike(mixin.RowId, HikeBase, table=True):
# --- database only items --------------------------------------------------
# --- read only items ------------------------------------------------------
# --- back_populates links -------------------------------------------------
# --- CRUD actions ---------------------------------------------------------
@classmethod
def create(cls, *, session: Session, create_obj: HikeCreate) -> "Hike":
data_obj = create_obj.model_dump(exclude_unset=True)
db_obj = cls.model_validate(data_obj)
session.add(db_obj)
session.commit()
session.refresh(db_obj)
return db_obj
@classmethod
def update(
cls, *, session: Session, db_obj: "Hike", in_obj: HikeUpdate
) -> "Hike":
data_obj = in_obj.model_dump(exclude_unset=True)
db_obj.sqlmodel_update(data_obj)
session.add(db_obj)
session.commit()
session.refresh(db_obj)
return db_obj
# Properties to return via API, id is always required
class HikePublic(mixin.RowIdPublic, HikeBase):
pass
class HikesPublic(BaseSQLModel):
data: list[HikePublic]
count: int
# endregion
# region # Hike / Route ########################################################
class RouteType(DocumentedStrEnum):
START_FINISH = auto_enum() # Start at the start and end at the finish
CIRCULAR = auto_enum() # Start some ware, finish at the last new place
CIRCULAR_BACK_TO_START = auto_enum() # Start and finish on the same random place (CIRCULAR + next to start)
# ##############################################################################
# endregion

View File

@@ -18,6 +18,7 @@ if TYPE_CHECKING:
from .apikey import ApiKey
from .event import EventUserLink
from .member import Member
from .serie import SerieUserLink
# region # User ################################################################
@@ -31,8 +32,7 @@ class PermissionModule(DocumentedStrEnum):
ASSOCIATION = auto_enum()
DIVISION = auto_enum()
MEMBER = auto_enum()
HIKE = auto_enum()
SERIE = auto_enum()
class PermissionPart(DocumentedStrEnum):
@@ -138,6 +138,7 @@ class User(mixin.RowId, UserBase, table=True):
# --- many-to-many links ---------------------------------------------------
roles: list["Role"] = Relationship(back_populates="users", link_model=UserRoleLink)
event_links: list["EventUserLink"] = Relationship(back_populates="user")
serie_links: list["SerieUserLink"] = Relationship(back_populates="user")
# --- CRUD actions ---------------------------------------------------------
@classmethod

View File

@@ -5,16 +5,16 @@ from fastapi.testclient import TestClient
from sqlmodel import Session
from app.core.config import settings
from app.tests.utils.hike import create_random_hike
from app.tests.utils.serie import create_random_serie
def test_create_hike(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
def test_create_serie(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
data = {
"name": "RSW Maasdelta 2026",
"contact": "Sebas",
"name": "Zondag spel",
"contact": "Rick",
}
response = client.post(
f"{settings.API_V1_STR}/hikes/",
f"{settings.API_V1_STR}/series/",
headers=superuser_token_headers,
json=data,
)
@@ -25,13 +25,13 @@ def test_create_hike(client: TestClient, superuser_token_headers: dict[str, str]
assert "id" in content
def test_create_hike_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
def test_create_serie_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
data = {
"name": "RSW Maasdelta 2026",
"contact": "Sebas",
"name": "Zondag Spel",
"contact": "Rick",
}
response = client.post(
f"{settings.API_V1_STR}/hikes/",
f"{settings.API_V1_STR}/series/",
headers=normal_user_token_headers,
json=data,
)
@@ -39,43 +39,43 @@ def test_create_hike_no_permissions(client: TestClient, normal_user_token_header
assert response.json()["detail"] == "Not enough permissions"
def test_read_hike(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
hike = create_random_hike(db)
def test_read_serie(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
serie = create_random_serie(db)
response = client.get(
f"{settings.API_V1_STR}/hikes/{hike.id}",
f"{settings.API_V1_STR}/series/{serie.id}",
headers=superuser_token_headers,
)
assert response.status_code == status.HTTP_200_OK
content = response.json()
assert content["id"] == str(hike.id)
assert content["name"] == hike.name
assert content["contact"] == hike.contact
assert content["id"] == str(serie.id)
assert content["name"] == serie.name
assert content["contact"] == serie.contact
def test_read_hike_not_found(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
def test_read_serie_not_found(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
response = client.get(
f"{settings.API_V1_STR}/hikes/{uuid.uuid4()}",
f"{settings.API_V1_STR}/series/{uuid.uuid4()}",
headers=superuser_token_headers,
)
assert response.status_code == status.HTTP_404_NOT_FOUND
assert response.json()["detail"] == "Hike not found"
assert response.json()["detail"] == "Serie not found"
def test_read_hike_no_permission(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
hike = create_random_hike(db)
def test_read_serie_no_permission(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
serie = create_random_serie(db)
response = client.get(
f"{settings.API_V1_STR}/hikes/{hike.id}",
f"{settings.API_V1_STR}/series/{serie.id}",
headers=normal_user_token_headers,
)
assert response.status_code == status.HTTP_403_FORBIDDEN
assert response.json()["detail"] == "Not enough permissions"
def test_read_hikes(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
create_random_hike(db)
create_random_hike(db)
def test_read_series(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
create_random_serie(db)
create_random_serie(db)
response = client.get(
f"{settings.API_V1_STR}/hikes/",
f"{settings.API_V1_STR}/series/",
headers=superuser_token_headers,
)
assert response.status_code == status.HTTP_200_OK
@@ -87,11 +87,11 @@ def test_read_hikes(client: TestClient, superuser_token_headers: dict[str, str],
assert len(content["data"]) <= content["count"]
def test_read_hikes_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
create_random_hike(db)
create_random_hike(db)
def test_read_series_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
create_random_serie(db)
create_random_serie(db)
response = client.get(
f"{settings.API_V1_STR}/hikes/",
f"{settings.API_V1_STR}/series/",
headers=normal_user_token_headers,
)
assert response.status_code == status.HTTP_200_OK
@@ -103,46 +103,46 @@ def test_read_hikes_no_permissions(client: TestClient, normal_user_token_headers
assert len(content["data"]) == 0
def test_update_hike(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
hike = create_random_hike(db)
def test_update_serie(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
serie = create_random_serie(db)
data = {
"name": "Updated name",
"contact": "Updated contact",
}
response = client.put(
f"{settings.API_V1_STR}/hikes/{hike.id}",
f"{settings.API_V1_STR}/series/{serie.id}",
headers=superuser_token_headers,
json=data,
)
assert response.status_code == status.HTTP_200_OK
content = response.json()
assert content["id"] == str(hike.id)
assert content["id"] == str(serie.id)
assert content["name"] == data["name"]
assert content["contact"] == data["contact"]
def test_update_hike_not_found(client: TestClient, superuser_token_headers: dict[str, str]) -> None:
def test_update_serie_not_found(client: TestClient, superuser_token_headers: dict[str, str]) -> None:
data = {
"name": "Not found",
"contact": "Not found",
}
response = client.put(
f"{settings.API_V1_STR}/hikes/{uuid.uuid4()}",
f"{settings.API_V1_STR}/series/{uuid.uuid4()}",
headers=superuser_token_headers,
json=data,
)
assert response.status_code == status.HTTP_404_NOT_FOUND
assert response.json()["detail"] == "Hike not found"
assert response.json()["detail"] == "Serie not found"
def test_update_hike_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
hike = create_random_hike(db)
def test_update_serie_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
serie = create_random_serie(db)
data = {
"name": "No permissions",
"contact": "No permissions",
}
response = client.put(
f"{settings.API_V1_STR}/hikes/{hike.id}",
f"{settings.API_V1_STR}/series/{serie.id}",
headers=normal_user_token_headers,
json=data,
)
@@ -150,29 +150,29 @@ def test_update_hike_no_permissions(client: TestClient, normal_user_token_header
assert response.json()["detail"] == "Not enough permissions"
def test_delete_hike(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
hike = create_random_hike(db)
def test_delete_serie(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
serie = create_random_serie(db)
response = client.delete(
f"{settings.API_V1_STR}/hikes/{hike.id}",
f"{settings.API_V1_STR}/series/{serie.id}",
headers=superuser_token_headers,
)
assert response.status_code == status.HTTP_200_OK
assert response.json()["message"] == "Hike deleted successfully"
assert response.json()["message"] == "Serie deleted successfully"
def test_delete_hike_not_found(client: TestClient, superuser_token_headers: dict[str, str]) -> None:
def test_delete_serie_not_found(client: TestClient, superuser_token_headers: dict[str, str]) -> None:
response = client.delete(
f"{settings.API_V1_STR}/hikes/{uuid.uuid4()}",
f"{settings.API_V1_STR}/series/{uuid.uuid4()}",
headers=superuser_token_headers,
)
assert response.status_code == status.HTTP_404_NOT_FOUND
assert response.json()["detail"] == "Hike not found"
assert response.json()["detail"] == "Serie not found"
def test_delete_hike_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
hike = create_random_hike(db)
def test_delete_serie_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
serie = create_random_serie(db)
response = client.delete(
f"{settings.API_V1_STR}/hikes/{hike.id}",
f"{settings.API_V1_STR}/series/{serie.id}",
headers=normal_user_token_headers,
)
assert response.status_code == status.HTTP_403_FORBIDDEN

View File

@@ -1,12 +0,0 @@
from sqlmodel import Session
from app.models.hike import Hike, HikeCreate
from app.tests.utils.utils import random_lower_string
def create_random_hike(db: Session, name: str = None) -> Hike:
if not name:
name = random_lower_string()
hike_in = HikeCreate(name=name)
return Hike.create(session=db, create_obj=hike_in)

View File

@@ -0,0 +1,12 @@
from sqlmodel import Session
from app.models.serie import Serie, SerieCreate
from app.tests.utils.utils import random_lower_string
def create_random_serie(db: Session, name: str = None) -> Serie:
if not name:
name = random_lower_string()
serie_in = SerieCreate(name=name)
return Serie.create(session=db, create_obj=serie_in)