Compare commits
2 Commits
feature/se
...
23d7d63103
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
23d7d63103 | ||
|
|
f958856e95 |
3
.idea/league-manager.iml
generated
3
.idea/league-manager.iml
generated
@@ -3,9 +3,8 @@
|
|||||||
<component name="NewModuleRootManager">
|
<component name="NewModuleRootManager">
|
||||||
<content url="file://$MODULE_DIR$">
|
<content url="file://$MODULE_DIR$">
|
||||||
<sourceFolder url="file://$MODULE_DIR$/backend" isTestSource="false" />
|
<sourceFolder url="file://$MODULE_DIR$/backend" isTestSource="false" />
|
||||||
<excludeFolder url="file://$MODULE_DIR$/.venv" />
|
|
||||||
</content>
|
</content>
|
||||||
<orderEntry type="jdk" jdkName="Python 3.14 (RSW_puntensysteem)" jdkType="Python SDK" />
|
<orderEntry type="jdk" jdkName="Python 3.13" jdkType="Python SDK" />
|
||||||
<orderEntry type="sourceFolder" forTests="false" />
|
<orderEntry type="sourceFolder" forTests="false" />
|
||||||
</component>
|
</component>
|
||||||
<component name="PyDocumentationSettings">
|
<component name="PyDocumentationSettings">
|
||||||
|
|||||||
2
.idea/misc.xml
generated
2
.idea/misc.xml
generated
@@ -3,5 +3,5 @@
|
|||||||
<component name="Black">
|
<component name="Black">
|
||||||
<option name="sdkName" value="Python 3.13" />
|
<option name="sdkName" value="Python 3.13" />
|
||||||
</component>
|
</component>
|
||||||
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.14 (RSW_puntensysteem)" project-jdk-type="Python SDK" />
|
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.13" project-jdk-type="Python SDK" />
|
||||||
</project>
|
</project>
|
||||||
@@ -10,7 +10,7 @@ from app.api.routes import (
|
|||||||
private,
|
private,
|
||||||
users,
|
users,
|
||||||
utils,
|
utils,
|
||||||
series,
|
hikes,
|
||||||
)
|
)
|
||||||
from app.core.config import settings
|
from app.core.config import settings
|
||||||
|
|
||||||
@@ -25,7 +25,8 @@ api_router.include_router(teams.router)
|
|||||||
api_router.include_router(associations.router)
|
api_router.include_router(associations.router)
|
||||||
api_router.include_router(divisions.router)
|
api_router.include_router(divisions.router)
|
||||||
api_router.include_router(members.router)
|
api_router.include_router(members.router)
|
||||||
api_router.include_router(series.router)
|
|
||||||
|
api_router.include_router(hikes.router)
|
||||||
|
|
||||||
|
|
||||||
if settings.ENVIRONMENT == "local":
|
if settings.ENVIRONMENT == "local":
|
||||||
|
|||||||
132
backend/app/api/routes/hikes.py
Normal file
132
backend/app/api/routes/hikes.py
Normal file
@@ -0,0 +1,132 @@
|
|||||||
|
from typing import Any
|
||||||
|
|
||||||
|
from fastapi import APIRouter, HTTPException, status
|
||||||
|
from sqlmodel import func, select
|
||||||
|
|
||||||
|
from app.api.deps import CurrentUser, SessionDep
|
||||||
|
from app.models.base import (
|
||||||
|
ApiTags,
|
||||||
|
Message,
|
||||||
|
RowId,
|
||||||
|
)
|
||||||
|
from app.models.hike import (
|
||||||
|
Hike,
|
||||||
|
HikeCreate,
|
||||||
|
HikeUpdate,
|
||||||
|
HikePublic,
|
||||||
|
HikesPublic,
|
||||||
|
)
|
||||||
|
from app.models.user import (
|
||||||
|
PermissionModule,
|
||||||
|
PermissionPart,
|
||||||
|
PermissionRight,
|
||||||
|
)
|
||||||
|
|
||||||
|
router = APIRouter(prefix="/hikes", tags=[ApiTags.HIKES])
|
||||||
|
|
||||||
|
|
||||||
|
# region # Hikes ########################################################
|
||||||
|
|
||||||
|
@router.get("/", response_model=HikesPublic)
|
||||||
|
def read_hikes(
|
||||||
|
session: SessionDep, current_user: CurrentUser, skip: int = 0, limit: int = 100
|
||||||
|
) -> Any:
|
||||||
|
"""
|
||||||
|
Retrieve all hikes.
|
||||||
|
"""
|
||||||
|
|
||||||
|
if current_user.has_permissions(
|
||||||
|
module=PermissionModule.HIKE,
|
||||||
|
part=PermissionPart.ADMIN,
|
||||||
|
rights=PermissionRight.READ,
|
||||||
|
):
|
||||||
|
count_statement = select(func.count()).select_from(Hike)
|
||||||
|
count = session.exec(count_statement).one()
|
||||||
|
statement = select(Hike).offset(skip).limit(limit)
|
||||||
|
hikes = session.exec(statement).all()
|
||||||
|
return HikesPublic(data=hikes, count=count)
|
||||||
|
|
||||||
|
return HikesPublic(data=[], count=0)
|
||||||
|
|
||||||
|
|
||||||
|
@router.get("/{id}", response_model=HikePublic)
|
||||||
|
def read_hike(session: SessionDep, current_user: CurrentUser, id: RowId) -> Any:
|
||||||
|
"""
|
||||||
|
Get hike by ID.
|
||||||
|
"""
|
||||||
|
hike = session.get(Hike, id)
|
||||||
|
if not hike:
|
||||||
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Hike not found")
|
||||||
|
|
||||||
|
if not current_user.has_permissions(
|
||||||
|
module=PermissionModule.HIKE,
|
||||||
|
part=PermissionPart.ADMIN,
|
||||||
|
rights=PermissionRight.READ,
|
||||||
|
):
|
||||||
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions")
|
||||||
|
|
||||||
|
return hike
|
||||||
|
|
||||||
|
|
||||||
|
@router.post("/", response_model=HikePublic)
|
||||||
|
def create_hike(
|
||||||
|
*, session: SessionDep, current_user: CurrentUser, hike_in: HikeCreate
|
||||||
|
) -> Any:
|
||||||
|
"""
|
||||||
|
Create new hike.
|
||||||
|
"""
|
||||||
|
|
||||||
|
if not current_user.has_permissions(
|
||||||
|
module=PermissionModule.HIKE,
|
||||||
|
part=PermissionPart.ADMIN,
|
||||||
|
rights=PermissionRight.CREATE,
|
||||||
|
):
|
||||||
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions")
|
||||||
|
|
||||||
|
hike = Hike.create(create_obj=hike_in, session=session)
|
||||||
|
return hike
|
||||||
|
|
||||||
|
|
||||||
|
@router.put("/{id}", response_model=HikePublic)
|
||||||
|
def update_hike(
|
||||||
|
*, session: SessionDep, current_user: CurrentUser, id: RowId, hike_in: HikeUpdate
|
||||||
|
) -> Any:
|
||||||
|
"""
|
||||||
|
Update a hike.
|
||||||
|
"""
|
||||||
|
hike = session.get(Hike, id)
|
||||||
|
if not hike:
|
||||||
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Hike not found")
|
||||||
|
|
||||||
|
if not current_user.has_permissions(
|
||||||
|
module=PermissionModule.HIKE,
|
||||||
|
part=PermissionPart.ADMIN,
|
||||||
|
rights=PermissionRight.UPDATE,
|
||||||
|
):
|
||||||
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions")
|
||||||
|
|
||||||
|
hike = Hike.update(db_obj=hike, in_obj=hike_in, session=session)
|
||||||
|
return hike
|
||||||
|
|
||||||
|
|
||||||
|
@router.delete("/{id}")
|
||||||
|
def delete_hike(session: SessionDep,current_user: CurrentUser, id: RowId) -> Message:
|
||||||
|
"""
|
||||||
|
Delete a hike.
|
||||||
|
"""
|
||||||
|
hike = session.get(Hike, id)
|
||||||
|
if not hike:
|
||||||
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Hike not found")
|
||||||
|
|
||||||
|
if not current_user.has_permissions(
|
||||||
|
module=PermissionModule.HIKE,
|
||||||
|
part=PermissionPart.ADMIN,
|
||||||
|
rights=PermissionRight.DELETE,
|
||||||
|
):
|
||||||
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions")
|
||||||
|
|
||||||
|
session.delete(hike)
|
||||||
|
session.commit()
|
||||||
|
return Message(message="Hike deleted successfully")
|
||||||
|
|
||||||
|
# endregion
|
||||||
@@ -1,176 +0,0 @@
|
|||||||
from typing import Any
|
|
||||||
|
|
||||||
from fastapi import APIRouter, HTTPException, status
|
|
||||||
from sqlmodel import func, select
|
|
||||||
|
|
||||||
from app.api.deps import CurrentUser, SessionDep
|
|
||||||
from app.models.base import (
|
|
||||||
ApiTags,
|
|
||||||
Message,
|
|
||||||
RowId,
|
|
||||||
)
|
|
||||||
from app.models.serie import (
|
|
||||||
Serie,
|
|
||||||
SerieCreate,
|
|
||||||
SerieUpdate,
|
|
||||||
SeriePublic,
|
|
||||||
SeriesPublic,
|
|
||||||
)
|
|
||||||
from app.models.division import (
|
|
||||||
Division,
|
|
||||||
DivisionsPublic,
|
|
||||||
)
|
|
||||||
from app.models.user import (
|
|
||||||
PermissionModule,
|
|
||||||
PermissionPart,
|
|
||||||
PermissionRight,
|
|
||||||
)
|
|
||||||
|
|
||||||
router = APIRouter(prefix="/series", tags=[ApiTags.SERIES])
|
|
||||||
|
|
||||||
|
|
||||||
# region # Series ########################################################
|
|
||||||
|
|
||||||
@router.get("/", response_model=SeriesPublic)
|
|
||||||
def read_series(
|
|
||||||
session: SessionDep, current_user: CurrentUser, skip: int = 0, limit: int = 100
|
|
||||||
) -> Any:
|
|
||||||
"""
|
|
||||||
Retrieve all series.
|
|
||||||
"""
|
|
||||||
|
|
||||||
if current_user.has_permissions(
|
|
||||||
module=PermissionModule.SERIE,
|
|
||||||
part=PermissionPart.ADMIN,
|
|
||||||
rights=PermissionRight.READ,
|
|
||||||
):
|
|
||||||
count_statement = select(func.count()).select_from(Serie)
|
|
||||||
count = session.exec(count_statement).one()
|
|
||||||
statement = select(Serie).offset(skip).limit(limit)
|
|
||||||
series = session.exec(statement).all()
|
|
||||||
return SeriesPublic(data=series, count=count)
|
|
||||||
|
|
||||||
return SeriesPublic(data=[], count=0)
|
|
||||||
|
|
||||||
|
|
||||||
@router.get("/{id}", response_model=SeriePublic)
|
|
||||||
def read_serie(session: SessionDep, current_user: CurrentUser, id: RowId) -> Any:
|
|
||||||
"""
|
|
||||||
Get serie by ID.
|
|
||||||
"""
|
|
||||||
serie = session.get(Serie, id)
|
|
||||||
if not serie:
|
|
||||||
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Serie not found")
|
|
||||||
|
|
||||||
if not current_user.has_permissions(
|
|
||||||
module=PermissionModule.SERIE,
|
|
||||||
part=PermissionPart.ADMIN,
|
|
||||||
rights=PermissionRight.READ,
|
|
||||||
):
|
|
||||||
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions")
|
|
||||||
|
|
||||||
return serie
|
|
||||||
|
|
||||||
|
|
||||||
@router.post("/", response_model=SeriePublic)
|
|
||||||
def create_serie(
|
|
||||||
*, session: SessionDep, current_user: CurrentUser, serie_in: SerieCreate
|
|
||||||
) -> Any:
|
|
||||||
"""
|
|
||||||
Create new serie.
|
|
||||||
"""
|
|
||||||
|
|
||||||
if not current_user.has_permissions(
|
|
||||||
module=PermissionModule.SERIE,
|
|
||||||
part=PermissionPart.ADMIN,
|
|
||||||
rights=PermissionRight.CREATE,
|
|
||||||
):
|
|
||||||
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions")
|
|
||||||
|
|
||||||
serie = Serie.create(create_obj=serie_in, session=session)
|
|
||||||
return serie
|
|
||||||
|
|
||||||
|
|
||||||
@router.put("/{id}", response_model=SeriePublic)
|
|
||||||
def update_serie(
|
|
||||||
*, session: SessionDep, current_user: CurrentUser, id: RowId, serie_in: SerieUpdate
|
|
||||||
) -> Any:
|
|
||||||
"""
|
|
||||||
Update a serie.
|
|
||||||
"""
|
|
||||||
serie = session.get(Serie, id)
|
|
||||||
if not serie:
|
|
||||||
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Serie not found")
|
|
||||||
|
|
||||||
if not current_user.has_permissions(
|
|
||||||
module=PermissionModule.SERIE,
|
|
||||||
part=PermissionPart.ADMIN,
|
|
||||||
rights=PermissionRight.UPDATE,
|
|
||||||
):
|
|
||||||
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions")
|
|
||||||
|
|
||||||
serie = Serie.update(db_obj=serie, in_obj=serie_in, session=session)
|
|
||||||
return serie
|
|
||||||
|
|
||||||
|
|
||||||
@router.delete("/{id}")
|
|
||||||
def delete_serie(session: SessionDep,current_user: CurrentUser, id: RowId) -> Message:
|
|
||||||
"""
|
|
||||||
Delete a serie.
|
|
||||||
"""
|
|
||||||
serie = session.get(Serie, id)
|
|
||||||
if not serie:
|
|
||||||
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Serie not found")
|
|
||||||
|
|
||||||
if not current_user.has_permissions(
|
|
||||||
module=PermissionModule.SERIE,
|
|
||||||
part=PermissionPart.ADMIN,
|
|
||||||
rights=PermissionRight.DELETE,
|
|
||||||
):
|
|
||||||
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions")
|
|
||||||
|
|
||||||
session.delete(serie)
|
|
||||||
session.commit()
|
|
||||||
return Message(message="Serie deleted successfully")
|
|
||||||
|
|
||||||
# endregion
|
|
||||||
|
|
||||||
|
|
||||||
# region # Series / Divisions ############################################
|
|
||||||
|
|
||||||
|
|
||||||
@router.get("/{series_id}/divisions/", response_model=DivisionsPublic)
|
|
||||||
def read_serie_division(
|
|
||||||
session: SessionDep, current_user: CurrentUser, series_id: RowId, skip: int = 0, limit: int = 100
|
|
||||||
) -> Any:
|
|
||||||
"""
|
|
||||||
Retrieve all serie divisions.
|
|
||||||
"""
|
|
||||||
|
|
||||||
serie = session.get(Serie, series_id)
|
|
||||||
if not serie:
|
|
||||||
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Serie not found")
|
|
||||||
|
|
||||||
if not current_user.has_permission(
|
|
||||||
module=PermissionModule.SERIE,
|
|
||||||
part=PermissionPart.ADMIN,
|
|
||||||
rights=(PermissionRight.MANAGE_DIVISIONS | PermissionRight.READ),
|
|
||||||
):
|
|
||||||
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions")
|
|
||||||
|
|
||||||
count_statement = (select(func.count())
|
|
||||||
.select_from(Division)
|
|
||||||
.where(Division.serie_id == serie.id)
|
|
||||||
)
|
|
||||||
count = session.exec(count_statement).one()
|
|
||||||
statement = (select(Division)
|
|
||||||
.where(Division.serie_id == serie.id)
|
|
||||||
.offset(skip)
|
|
||||||
.limit(limit)
|
|
||||||
)
|
|
||||||
divisions = session.exec(statement).all()
|
|
||||||
|
|
||||||
return DivisionsPublic(data=divisions, count=count)
|
|
||||||
|
|
||||||
|
|
||||||
# endregion
|
|
||||||
@@ -30,8 +30,8 @@ from app.models.member import (
|
|||||||
from app.models.apikey import (
|
from app.models.apikey import (
|
||||||
ApiKey,
|
ApiKey,
|
||||||
)
|
)
|
||||||
from app.models.serie import (
|
from app.models.hike import (
|
||||||
Serie,
|
Hike,
|
||||||
)
|
)
|
||||||
|
|
||||||
engine = create_engine(str(settings.SQLALCHEMY_DATABASE_URI))
|
engine = create_engine(str(settings.SQLALCHEMY_DATABASE_URI))
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ from enum import Enum, IntFlag # Python 3.11 >= StrEnum
|
|||||||
from enum import auto as auto_enum
|
from enum import auto as auto_enum
|
||||||
from uuid import UUID as RowId
|
from uuid import UUID as RowId
|
||||||
|
|
||||||
|
from sqlalchemy import TypeDecorator, Integer, VARCHAR
|
||||||
from sqlmodel import SQLModel
|
from sqlmodel import SQLModel
|
||||||
from sqlalchemy.orm import declared_attr
|
from sqlalchemy.orm import declared_attr
|
||||||
|
|
||||||
@@ -39,8 +40,85 @@ class DocumentedStrEnum(str, Enum):
|
|||||||
|
|
||||||
|
|
||||||
class DocumentedIntFlag(IntFlag):
|
class DocumentedIntFlag(IntFlag):
|
||||||
# TODO: Build DB sport to proper store flags and make it possible to store all mutations
|
@property
|
||||||
pass
|
def names(self) -> str:
|
||||||
|
"""
|
||||||
|
Returns a comma-separated string of all active flag names.
|
||||||
|
"""
|
||||||
|
# Exclude 0-value flags
|
||||||
|
return ",".join([flag.name for flag in type(self) if flag in self and flag.value != 0])
|
||||||
|
|
||||||
|
def __str__(self) -> str:
|
||||||
|
# Default string conversion uses the names
|
||||||
|
return self.names
|
||||||
|
|
||||||
|
# Optional: for Pydantic compatibility
|
||||||
|
def __get_pydantic_json__(self) -> str:
|
||||||
|
return self.names
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
class DocumentedIntFlagType(TypeDecorator):
|
||||||
|
impl = Integer
|
||||||
|
cache_ok = True
|
||||||
|
|
||||||
|
def __init__(self, enum_class: type[IntFlag], *args, **kwargs):
|
||||||
|
super().__init__(*args, **kwargs)
|
||||||
|
self.enum_class = enum_class
|
||||||
|
|
||||||
|
def process_bind_param(self, value, dialect):
|
||||||
|
"""
|
||||||
|
Convert IntFlag to integer before storing
|
||||||
|
"""
|
||||||
|
if value is None:
|
||||||
|
return None
|
||||||
|
if isinstance(value, self.enum_class):
|
||||||
|
return int(value)
|
||||||
|
if isinstance(value, int):
|
||||||
|
return value
|
||||||
|
raise ValueError(f"Invalid value for {self.enum_class.__name__}: {value!r}")
|
||||||
|
|
||||||
|
def process_result_value(self, value, dialect):
|
||||||
|
"""
|
||||||
|
Convert integer from DB back to IntFlag
|
||||||
|
"""
|
||||||
|
if value is None:
|
||||||
|
return None
|
||||||
|
return self.enum_class(value)
|
||||||
|
|
||||||
|
|
||||||
|
class DocumentedStrFlagType(TypeDecorator):
|
||||||
|
impl = VARCHAR
|
||||||
|
cache_ok = True
|
||||||
|
|
||||||
|
def __init__(self, enum_class: type[IntFlag], *args, **kwargs):
|
||||||
|
super().__init__(*args, **kwargs)
|
||||||
|
self.enum_class = enum_class
|
||||||
|
|
||||||
|
def process_bind_param(self, value, dialect):
|
||||||
|
"""
|
||||||
|
Convert IntFlag to comma-separated string of names for storing in DB.
|
||||||
|
"""
|
||||||
|
if value is None:
|
||||||
|
return None
|
||||||
|
if isinstance(value, self.enum_class):
|
||||||
|
return str(value)
|
||||||
|
raise ValueError(f"Invalid value for {self.enum_class.__name__}: {value!r}")
|
||||||
|
|
||||||
|
def process_result_value(self, value, dialect):
|
||||||
|
"""
|
||||||
|
Convert comma-separated string of names from DB back to IntFlag.
|
||||||
|
"""
|
||||||
|
if value is None or value == "":
|
||||||
|
return self.enum_class(0)
|
||||||
|
names = value.split(",")
|
||||||
|
result = self.enum_class(0)
|
||||||
|
for name in names:
|
||||||
|
try:
|
||||||
|
result |= self.enum_class[name]
|
||||||
|
except KeyError:
|
||||||
|
raise ValueError(f"Invalid flag name '{name}' for {self.enum_class.__name__}")
|
||||||
|
return result
|
||||||
|
|
||||||
|
|
||||||
# #############################################################################
|
# #############################################################################
|
||||||
@@ -59,13 +137,9 @@ class ApiTags(DocumentedStrEnum):
|
|||||||
ASSOCIATIONS = "Associations"
|
ASSOCIATIONS = "Associations"
|
||||||
DIVISIONS = "Divisions"
|
DIVISIONS = "Divisions"
|
||||||
MEMBERS = "Members"
|
MEMBERS = "Members"
|
||||||
SERIES = "Series"
|
|
||||||
|
|
||||||
|
HIKES = "Hikes"
|
||||||
|
|
||||||
class VisitedCountType(DocumentedStrEnum):
|
|
||||||
NONE = auto_enum()
|
|
||||||
ONE_VISIT = auto_enum()
|
|
||||||
EACH_VISIT = auto_enum()
|
|
||||||
|
|
||||||
# endregion
|
# endregion
|
||||||
|
|
||||||
|
|||||||
181
backend/app/models/hike.py
Normal file
181
backend/app/models/hike.py
Normal file
@@ -0,0 +1,181 @@
|
|||||||
|
from typing import TYPE_CHECKING
|
||||||
|
|
||||||
|
from sqlmodel import (
|
||||||
|
Session,
|
||||||
|
Relationship,
|
||||||
|
Field,
|
||||||
|
)
|
||||||
|
|
||||||
|
from . import mixin
|
||||||
|
from .base import (
|
||||||
|
BaseSQLModel,
|
||||||
|
DocumentedStrEnum,
|
||||||
|
DocumentedIntFlag,
|
||||||
|
DocumentedStrFlagType,
|
||||||
|
auto_enum,
|
||||||
|
)
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from .event import Event
|
||||||
|
|
||||||
|
# region # Hike ################################################################
|
||||||
|
|
||||||
|
class HikeTeamPage(DocumentedIntFlag):
|
||||||
|
ROUTE = auto_enum() # Route steps info
|
||||||
|
QUESTIONS = auto_enum() # Visible questions
|
||||||
|
|
||||||
|
VISITED_PLACES = auto_enum() # Places the team has been
|
||||||
|
CURRENT_PLACE = auto_enum() # Place where the team currently is
|
||||||
|
UNVISITED_PLACES = auto_enum() # Places the team still neat to visit
|
||||||
|
|
||||||
|
PLACES = VISITED_PLACES | CURRENT_PLACE | UNVISITED_PLACES # All the places
|
||||||
|
|
||||||
|
TRAVEL_TIME = auto_enum() # Total travel time
|
||||||
|
TRAVEL_FREE_TIME = auto_enum() # Total travel time
|
||||||
|
PLACE_TIME = auto_enum() # Total place time
|
||||||
|
CALCULATE_TIME = auto_enum() # Total time calculated based on hike settings
|
||||||
|
# TODO: Think about time between oter teams
|
||||||
|
|
||||||
|
PLACE_POINTS = auto_enum() # Points scored on a place
|
||||||
|
TOTAL_PLACE_POINTS = auto_enum() # All points got on all places
|
||||||
|
ASSIST_POINTS = auto_enum() # Minus points for assistants
|
||||||
|
# TODO: Think about place in classement
|
||||||
|
|
||||||
|
ASSIST_LOG = auto_enum() # Assisted items
|
||||||
|
ASSIST_LATTER = auto_enum() # ???
|
||||||
|
|
||||||
|
|
||||||
|
# ##############################################################################
|
||||||
|
|
||||||
|
|
||||||
|
class HikeTimeCalculation(DocumentedIntFlag):
|
||||||
|
TRAVEL_TIME = auto_enum() # Time traveling
|
||||||
|
|
||||||
|
# TODO: Think about time groups (in this model we have 2, free and non free)
|
||||||
|
TRAVEL_FREE_TIME = auto_enum() # Time that is excluded from traveling but not at a place
|
||||||
|
PLACE_TIME = auto_enum() # Time checked in at a place
|
||||||
|
|
||||||
|
TOTAL_TIME = TRAVEL_TIME | TRAVEL_FREE_TIME | PLACE_TIME
|
||||||
|
|
||||||
|
|
||||||
|
# ##############################################################################
|
||||||
|
|
||||||
|
|
||||||
|
class HikeBase(
|
||||||
|
mixin.Name,
|
||||||
|
mixin.Contact,
|
||||||
|
BaseSQLModel,
|
||||||
|
):
|
||||||
|
tracker_interval: int | None = Field(
|
||||||
|
default=None,
|
||||||
|
nullable=True,
|
||||||
|
description="Is GPS button available, value will be the interval",
|
||||||
|
)
|
||||||
|
|
||||||
|
is_multi_day: bool = Field(
|
||||||
|
default=False,
|
||||||
|
nullable=False,
|
||||||
|
description="Show datetime in stead of time only",
|
||||||
|
)
|
||||||
|
|
||||||
|
team_page: HikeTeamPage = Field(
|
||||||
|
default=HikeTeamPage.PLACES | HikeTeamPage.ROUTE | HikeTeamPage.QUESTIONS,
|
||||||
|
nullable=False,
|
||||||
|
description="Show all the places of the route inside the teams page",
|
||||||
|
sa_type=DocumentedStrFlagType(HikeTeamPage),
|
||||||
|
)
|
||||||
|
|
||||||
|
time_calculation: HikeTimeCalculation = Field(
|
||||||
|
default=HikeTimeCalculation.TRAVEL_TIME,
|
||||||
|
nullable=False,
|
||||||
|
description="Wath should we calculate inside the total time",
|
||||||
|
sa_type=DocumentedStrFlagType(HikeTimeCalculation),
|
||||||
|
)
|
||||||
|
|
||||||
|
min_time_points: int | None = Field(
|
||||||
|
default=0,
|
||||||
|
ge=0,
|
||||||
|
# TODO: le: max_time_points
|
||||||
|
description="Min points for time",
|
||||||
|
)
|
||||||
|
|
||||||
|
max_time_points: int | None = Field(
|
||||||
|
default=100,
|
||||||
|
ge=0, # TODO: ge > min_time_points
|
||||||
|
description="Max points for time",
|
||||||
|
)
|
||||||
|
|
||||||
|
max_question_points: int | None = Field(
|
||||||
|
default=None,
|
||||||
|
description="None: Calculate from answers (no max), Positive: Set as max for dynamic range",
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# Properties to receive via API on creation
|
||||||
|
class HikeCreate(HikeBase):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
# Properties to receive via API on update, all are optional
|
||||||
|
class HikeUpdate(HikeBase):
|
||||||
|
is_multi_day: bool | None = Field(default=None) # type: ignore
|
||||||
|
team_page: HikeTeamPage | None = Field(default=None) # type: ignore
|
||||||
|
time_calculation: HikeTimeCalculation | None = Field(default=None) # type: ignore
|
||||||
|
|
||||||
|
|
||||||
|
class Hike(mixin.RowId, HikeBase, table=True):
|
||||||
|
# --- database only items --------------------------------------------------
|
||||||
|
|
||||||
|
# --- read only items ------------------------------------------------------
|
||||||
|
|
||||||
|
# --- back_populates links -------------------------------------------------
|
||||||
|
|
||||||
|
# --- CRUD actions ---------------------------------------------------------
|
||||||
|
@classmethod
|
||||||
|
def create(cls, *, session: Session, create_obj: HikeCreate) -> "Hike":
|
||||||
|
data_obj = create_obj.model_dump(exclude_unset=True)
|
||||||
|
|
||||||
|
db_obj = cls.model_validate(data_obj)
|
||||||
|
session.add(db_obj)
|
||||||
|
session.commit()
|
||||||
|
session.refresh(db_obj)
|
||||||
|
return db_obj
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def update(
|
||||||
|
cls, *, session: Session, db_obj: "Hike", in_obj: HikeUpdate
|
||||||
|
) -> "Hike":
|
||||||
|
data_obj = in_obj.model_dump(exclude_unset=True)
|
||||||
|
db_obj.sqlmodel_update(data_obj)
|
||||||
|
session.add(db_obj)
|
||||||
|
session.commit()
|
||||||
|
session.refresh(db_obj)
|
||||||
|
return db_obj
|
||||||
|
|
||||||
|
|
||||||
|
# Properties to return via API, id is always required
|
||||||
|
class HikePublic(mixin.RowIdPublic, HikeBase):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class HikesPublic(BaseSQLModel):
|
||||||
|
data: list[HikePublic]
|
||||||
|
count: int
|
||||||
|
|
||||||
|
|
||||||
|
# endregion
|
||||||
|
|
||||||
|
|
||||||
|
# region # Hike / Route ########################################################
|
||||||
|
|
||||||
|
|
||||||
|
class RouteType(DocumentedStrEnum):
|
||||||
|
START_FINISH = auto_enum() # Start at the start and end at the finish
|
||||||
|
CIRCULAR = auto_enum() # Start some ware, finish at the last new place
|
||||||
|
CIRCULAR_BACK_TO_START = auto_enum() # Start and finish on the same random place (CIRCULAR + next to start)
|
||||||
|
|
||||||
|
|
||||||
|
# ##############################################################################
|
||||||
|
|
||||||
|
|
||||||
|
# endregion
|
||||||
@@ -6,7 +6,7 @@ from sqlmodel import (
|
|||||||
Field,
|
Field,
|
||||||
)
|
)
|
||||||
|
|
||||||
from .base import RowId as RowIdType, VisitedCountType
|
from .base import RowId as RowIdType
|
||||||
from ..core.config import settings
|
from ..core.config import settings
|
||||||
|
|
||||||
|
|
||||||
@@ -120,28 +120,3 @@ class Birthday(BaseModel):
|
|||||||
class Created(BaseModel):
|
class Created(BaseModel):
|
||||||
created_at: datetime | None = Field(nullable=False, default_factory=lambda: datetime.now(settings.tz_info))
|
created_at: datetime | None = Field(nullable=False, default_factory=lambda: datetime.now(settings.tz_info))
|
||||||
created_by: RowIdType | None = Field(default=None, nullable=True, foreign_key="user.id", ondelete="SET NULL")
|
created_by: RowIdType | None = Field(default=None, nullable=True, foreign_key="user.id", ondelete="SET NULL")
|
||||||
|
|
||||||
|
|
||||||
class QuestionInfo(BaseModel):
|
|
||||||
question_file: str | None = Field(default=None, nullable=True, max_length=255)
|
|
||||||
answer_file: str | None = Field(default=None, nullable=True, max_length=255)
|
|
||||||
|
|
||||||
|
|
||||||
class TeamAmmounts(BaseModel):
|
|
||||||
min_teams: int = Field(default=None, nullable=True, ge=1)
|
|
||||||
max_teams: int = Field(default=None, nullable=True, ge=1)
|
|
||||||
|
|
||||||
|
|
||||||
class MaxPoints(BaseModel):
|
|
||||||
max_points: int = Field(default=None, nullable=True, ge=0)
|
|
||||||
|
|
||||||
|
|
||||||
class VisitedPoints(BaseModel):
|
|
||||||
visited_points: int | None = Field(
|
|
||||||
default=None,
|
|
||||||
ge=0,
|
|
||||||
description="Visited points for this place, None will disable this function",
|
|
||||||
)
|
|
||||||
visited_count_type: VisitedCountType = Field(
|
|
||||||
default=VisitedCountType.NONE,
|
|
||||||
)
|
|
||||||
|
|||||||
@@ -1,206 +0,0 @@
|
|||||||
# app/models/post.py
|
|
||||||
from typing import TYPE_CHECKING
|
|
||||||
|
|
||||||
from sqlmodel import Field, Relationship, Session, select
|
|
||||||
|
|
||||||
from . import mixin
|
|
||||||
from .base import BaseSQLModel, RowId, DocumentedStrEnum, DocumentedIntFlag, auto_enum
|
|
||||||
from .user import PermissionRight, User
|
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
|
||||||
from .team import Team
|
|
||||||
|
|
||||||
# region # Post ##############################################################
|
|
||||||
|
|
||||||
class PostType(DocumentedIntFlag):
|
|
||||||
POINTS = auto_enum() #allows for the ability to get points
|
|
||||||
OPTION = auto_enum() #allows groups to enter waiting list
|
|
||||||
POINTS_OPTIONS = POINTS | POINTS
|
|
||||||
|
|
||||||
VISITED = auto_enum() #only mark if a team has visited this post
|
|
||||||
CORRECT = auto_enum() #possible to fill in if correct (get max points)
|
|
||||||
|
|
||||||
|
|
||||||
class ReplayType(DocumentedStrEnum):
|
|
||||||
NOT_POSSIBLE = auto_enum()
|
|
||||||
ONLY_WHEN_NO_POINTS = auto_enum()
|
|
||||||
YES_BUT_FIRST_POINTS_COUNT = auto_enum()
|
|
||||||
YES_BUT_LAST_POINTS_COUNT = auto_enum()
|
|
||||||
YES_BUT_HIGHEST_POINTS_COUNT = auto_enum()
|
|
||||||
ALWAYS_AS_NEW = auto_enum() # add points together
|
|
||||||
|
|
||||||
# ##############################################################################
|
|
||||||
|
|
||||||
# Shared properties
|
|
||||||
class PostUserLinkBase(BaseSQLModel):
|
|
||||||
rights: PermissionRight = Field(default=PermissionRight.READ, nullable=False)
|
|
||||||
|
|
||||||
# Properties to receive via API on creation
|
|
||||||
class PostUserLinkCreate(PostUserLinkBase):
|
|
||||||
user_id: RowId = Field(nullable=False)
|
|
||||||
|
|
||||||
# Properties to receive via API on update, all are optional
|
|
||||||
class PostUserLinkUpdate(PostUserLinkBase):
|
|
||||||
pass
|
|
||||||
|
|
||||||
# Database model (link tussen post en user)
|
|
||||||
class PostUserLink(PostUserLinkBase, table=True):
|
|
||||||
post_id: RowId = Field(
|
|
||||||
foreign_key="post.id",
|
|
||||||
primary_key=True,
|
|
||||||
nullable=False,
|
|
||||||
ondelete="CASCADE",
|
|
||||||
)
|
|
||||||
user_id: RowId = Field(
|
|
||||||
foreign_key="user.id",
|
|
||||||
primary_key=True,
|
|
||||||
nullable=False,
|
|
||||||
ondelete="CASCADE",
|
|
||||||
)
|
|
||||||
|
|
||||||
post: "Post" = Relationship(back_populates="user_links")
|
|
||||||
user: "User" = Relationship(back_populates="post_links")
|
|
||||||
|
|
||||||
# Properties to return via API
|
|
||||||
class PostUserLinkPublic(PostUserLinkBase):
|
|
||||||
user_id: RowId
|
|
||||||
post_id: RowId
|
|
||||||
|
|
||||||
class PostUserLinksPublic(BaseSQLModel):
|
|
||||||
data: list[PostUserLinkPublic]
|
|
||||||
count: int
|
|
||||||
|
|
||||||
# Shared properties
|
|
||||||
class PostBase(
|
|
||||||
#common
|
|
||||||
mixin.Name, #post name
|
|
||||||
mixin.Contact, #name contact person
|
|
||||||
mixin.IsActive, #scouting active (unused)
|
|
||||||
|
|
||||||
#post specific
|
|
||||||
mixin.ShortName, #shortname for post
|
|
||||||
|
|
||||||
mixin.MaxPoints, #max obtainable points
|
|
||||||
mixin.VisitedPoints, #points obtained for visiting post
|
|
||||||
|
|
||||||
mixin.TeamAmmounts, #teams, min&max teams required
|
|
||||||
|
|
||||||
mixin.QuestionInfo, #field for questions
|
|
||||||
|
|
||||||
BaseSQLModel,
|
|
||||||
):
|
|
||||||
post_type: PostType = Field(default=PostType.POINTS, nullable=False)
|
|
||||||
replay: ReplayType = Field(default=ReplayType.NOT_POSSIBLE, nullable=False)
|
|
||||||
|
|
||||||
# Properties to receive via API on creation
|
|
||||||
class PostCreate(PostBase):
|
|
||||||
pass
|
|
||||||
|
|
||||||
# Properties to receive via API on update, all are optional
|
|
||||||
class PostUpdate(mixin.ShortNameUpdate, PostBase):
|
|
||||||
post_type: PostType | None = Field(default=None, nullable=True) # type: ignore
|
|
||||||
replay: ReplayType | None = Field(default=None, nullable=True) # type: ignore
|
|
||||||
|
|
||||||
# Database model
|
|
||||||
class Post(mixin.RowId, PostBase, table=True):
|
|
||||||
|
|
||||||
user_links: list["PostUserLink"] = Relationship(back_populates="post", cascade_delete=True)
|
|
||||||
#teams: list["Team"] = Relationship(back_populates="post", cascade_delete=True)
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def create(cls, *, session: Session, create_obj: PostCreate) -> "Post":
|
|
||||||
data_obj = create_obj.model_dump(exclude_unset=True)
|
|
||||||
db_obj = cls.model_validate(data_obj)
|
|
||||||
session.add(db_obj)
|
|
||||||
session.commit()
|
|
||||||
session.refresh(db_obj)
|
|
||||||
return db_obj
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def update(cls, *, session: Session, db_obj: "Post", in_obj: PostUpdate) -> "Post":
|
|
||||||
data_obj = in_obj.model_dump(exclude_unset=True)
|
|
||||||
db_obj.sqlmodel_update(data_obj)
|
|
||||||
session.add(db_obj)
|
|
||||||
session.commit()
|
|
||||||
session.refresh(db_obj)
|
|
||||||
return db_obj
|
|
||||||
|
|
||||||
def get_user_link(self, user: User) -> "PostUserLink | None":
|
|
||||||
return next((link for link in self.user_links if link.user == user), None)
|
|
||||||
|
|
||||||
def add_user(
|
|
||||||
self,
|
|
||||||
user: User,
|
|
||||||
rights: PermissionRight = PermissionRight.READ,
|
|
||||||
*,
|
|
||||||
session: Session,
|
|
||||||
) -> "PostUserLink | None":
|
|
||||||
existing = self.get_user_link(user=user)
|
|
||||||
if existing is None:
|
|
||||||
new_link = PostUserLink(post=self, user=user, rights=rights)
|
|
||||||
self.user_links.append(new_link)
|
|
||||||
session.add(new_link)
|
|
||||||
session.commit()
|
|
||||||
return new_link
|
|
||||||
return None
|
|
||||||
|
|
||||||
def update_user(
|
|
||||||
self,
|
|
||||||
user: User,
|
|
||||||
rights: PermissionRight = PermissionRight.READ,
|
|
||||||
*,
|
|
||||||
session: Session,
|
|
||||||
) -> "PostUserLink | None":
|
|
||||||
link = self.get_user_link(user=user)
|
|
||||||
if link:
|
|
||||||
link.rights = rights
|
|
||||||
session.add(link)
|
|
||||||
session.commit()
|
|
||||||
return link
|
|
||||||
return None
|
|
||||||
|
|
||||||
def remove_user(self, user: User, *, session: Session) -> None:
|
|
||||||
link = self.get_user_link(user=user)
|
|
||||||
if link:
|
|
||||||
session.delete(link)
|
|
||||||
session.commit()
|
|
||||||
|
|
||||||
def user_has_rights(
|
|
||||||
self,
|
|
||||||
user: User,
|
|
||||||
rights: PermissionRight | None = None,
|
|
||||||
) -> bool:
|
|
||||||
return any(
|
|
||||||
(
|
|
||||||
link.user == user
|
|
||||||
and link.rights
|
|
||||||
and (not rights or (link.rights & rights) == rights)
|
|
||||||
)
|
|
||||||
for link in self.user_links
|
|
||||||
)
|
|
||||||
|
|
||||||
def user_has_right(
|
|
||||||
self,
|
|
||||||
user: User,
|
|
||||||
rights: PermissionRight | None = None,
|
|
||||||
) -> bool:
|
|
||||||
return any(
|
|
||||||
(
|
|
||||||
link.user == user
|
|
||||||
and link.rights
|
|
||||||
and (not rights or (link.rights & rights))
|
|
||||||
)
|
|
||||||
for link in self.user_links
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
# API output models
|
|
||||||
class PostPublic(mixin.RowIdPublic, PostBase):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class PostsPublic(BaseSQLModel):
|
|
||||||
data: list[PostPublic]
|
|
||||||
count: int
|
|
||||||
|
|
||||||
# endregion
|
|
||||||
@@ -1,173 +0,0 @@
|
|||||||
# app/models/serie.py
|
|
||||||
from typing import TYPE_CHECKING
|
|
||||||
|
|
||||||
from sqlmodel import Field, Relationship, Session, select
|
|
||||||
|
|
||||||
from . import mixin
|
|
||||||
from .base import BaseSQLModel, RowId
|
|
||||||
from .user import PermissionRight, User
|
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
|
||||||
from .team import Team
|
|
||||||
|
|
||||||
# region # Serie ##############################################################
|
|
||||||
|
|
||||||
# Shared properties
|
|
||||||
class SerieUserLinkBase(BaseSQLModel):
|
|
||||||
rights: PermissionRight = Field(default=PermissionRight.READ, nullable=False)
|
|
||||||
|
|
||||||
# Properties to receive via API on creation
|
|
||||||
class SerieUserLinkCreate(SerieUserLinkBase):
|
|
||||||
user_id: RowId = Field(nullable=False)
|
|
||||||
|
|
||||||
# Properties to receive via API on update, all are optional
|
|
||||||
class SerieUserLinkUpdate(SerieUserLinkBase):
|
|
||||||
pass
|
|
||||||
|
|
||||||
# Database model (link tussen serie en user)
|
|
||||||
class SerieUserLink(SerieUserLinkBase, table=True):
|
|
||||||
serie_id: RowId = Field(
|
|
||||||
foreign_key="serie.id",
|
|
||||||
primary_key=True,
|
|
||||||
nullable=False,
|
|
||||||
ondelete="CASCADE",
|
|
||||||
)
|
|
||||||
user_id: RowId = Field(
|
|
||||||
foreign_key="user.id",
|
|
||||||
primary_key=True,
|
|
||||||
nullable=False,
|
|
||||||
ondelete="CASCADE",
|
|
||||||
)
|
|
||||||
|
|
||||||
serie: "Serie" = Relationship(back_populates="user_links")
|
|
||||||
user: "User" = Relationship(back_populates="serie_links")
|
|
||||||
|
|
||||||
# Properties to return via API
|
|
||||||
class SerieUserLinkPublic(SerieUserLinkBase):
|
|
||||||
user_id: RowId
|
|
||||||
serie_id: RowId
|
|
||||||
|
|
||||||
class SerieUserLinksPublic(BaseSQLModel):
|
|
||||||
data: list[SerieUserLinkPublic]
|
|
||||||
count: int
|
|
||||||
|
|
||||||
# Shared properties
|
|
||||||
class SerieBase(
|
|
||||||
mixin.Name,
|
|
||||||
mixin.Contact,
|
|
||||||
mixin.IsActive,
|
|
||||||
BaseSQLModel,
|
|
||||||
):
|
|
||||||
pass
|
|
||||||
|
|
||||||
# Properties to receive via API on creation
|
|
||||||
class SerieCreate(SerieBase):
|
|
||||||
pass
|
|
||||||
|
|
||||||
# Properties to receive via API on update, all are optional
|
|
||||||
class SerieUpdate(SerieBase):
|
|
||||||
pass
|
|
||||||
|
|
||||||
# Database model
|
|
||||||
class Serie(mixin.RowId, SerieBase, table=True):
|
|
||||||
|
|
||||||
user_links: list["SerieUserLink"] = Relationship(back_populates="serie", cascade_delete=True)
|
|
||||||
#teams: list["Team"] = Relationship(back_populates="serie", cascade_delete=True)
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def create(cls, *, session: Session, create_obj: SerieCreate) -> "Serie":
|
|
||||||
data_obj = create_obj.model_dump(exclude_unset=True)
|
|
||||||
db_obj = cls.model_validate(data_obj)
|
|
||||||
session.add(db_obj)
|
|
||||||
session.commit()
|
|
||||||
session.refresh(db_obj)
|
|
||||||
return db_obj
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def update(cls, *, session: Session, db_obj: "Serie", in_obj: SerieUpdate) -> "Serie":
|
|
||||||
data_obj = in_obj.model_dump(exclude_unset=True)
|
|
||||||
db_obj.sqlmodel_update(data_obj)
|
|
||||||
session.add(db_obj)
|
|
||||||
session.commit()
|
|
||||||
session.refresh(db_obj)
|
|
||||||
return db_obj
|
|
||||||
|
|
||||||
def get_user_link(self, user: User) -> "SerieUserLink | None":
|
|
||||||
return next((link for link in self.user_links if link.user == user), None)
|
|
||||||
|
|
||||||
def add_user(
|
|
||||||
self,
|
|
||||||
user: User,
|
|
||||||
rights: PermissionRight = PermissionRight.READ,
|
|
||||||
*,
|
|
||||||
session: Session,
|
|
||||||
) -> "SerieUserLink | None":
|
|
||||||
existing = self.get_user_link(user=user)
|
|
||||||
if existing is None:
|
|
||||||
new_link = SerieUserLink(serie=self, user=user, rights=rights)
|
|
||||||
self.user_links.append(new_link)
|
|
||||||
session.add(new_link)
|
|
||||||
session.commit()
|
|
||||||
return new_link
|
|
||||||
return None #TODO aanpassen??
|
|
||||||
|
|
||||||
def update_user(
|
|
||||||
self,
|
|
||||||
user: User,
|
|
||||||
rights: PermissionRight = PermissionRight.READ,
|
|
||||||
*,
|
|
||||||
session: Session,
|
|
||||||
) -> "SerieUserLink | None":
|
|
||||||
link = self.get_user_link(user=user)
|
|
||||||
if link:
|
|
||||||
link.rights = rights
|
|
||||||
session.add(link)
|
|
||||||
session.commit()
|
|
||||||
return link
|
|
||||||
return None
|
|
||||||
|
|
||||||
def remove_user(self, user: User, *, session: Session) -> None:
|
|
||||||
link = self.get_user_link(user=user)
|
|
||||||
if link:
|
|
||||||
session.delete(link)
|
|
||||||
session.commit()
|
|
||||||
|
|
||||||
def user_has_rights(
|
|
||||||
self,
|
|
||||||
user: User,
|
|
||||||
rights: PermissionRight | None = None,
|
|
||||||
) -> bool:
|
|
||||||
return any(
|
|
||||||
(
|
|
||||||
link.user == user
|
|
||||||
and link.rights
|
|
||||||
and (not rights or (link.rights & rights) == rights)
|
|
||||||
)
|
|
||||||
for link in self.user_links
|
|
||||||
)
|
|
||||||
|
|
||||||
def user_has_right(
|
|
||||||
self,
|
|
||||||
user: User,
|
|
||||||
rights: PermissionRight | None = None,
|
|
||||||
) -> bool:
|
|
||||||
return any(
|
|
||||||
(
|
|
||||||
link.user == user
|
|
||||||
and link.rights
|
|
||||||
and (not rights or (link.rights & rights))
|
|
||||||
)
|
|
||||||
for link in self.user_links
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
# API output models
|
|
||||||
class SeriePublic(mixin.RowIdPublic, SerieBase):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class SeriesPublic(BaseSQLModel):
|
|
||||||
data: list[SeriePublic]
|
|
||||||
count: int
|
|
||||||
|
|
||||||
# endregion
|
|
||||||
@@ -18,7 +18,6 @@ if TYPE_CHECKING:
|
|||||||
from .apikey import ApiKey
|
from .apikey import ApiKey
|
||||||
from .event import EventUserLink
|
from .event import EventUserLink
|
||||||
from .member import Member
|
from .member import Member
|
||||||
from .serie import SerieUserLink
|
|
||||||
|
|
||||||
|
|
||||||
# region # User ################################################################
|
# region # User ################################################################
|
||||||
@@ -32,8 +31,8 @@ class PermissionModule(DocumentedStrEnum):
|
|||||||
ASSOCIATION = auto_enum()
|
ASSOCIATION = auto_enum()
|
||||||
DIVISION = auto_enum()
|
DIVISION = auto_enum()
|
||||||
MEMBER = auto_enum()
|
MEMBER = auto_enum()
|
||||||
SERIE = auto_enum()
|
|
||||||
POST = auto_enum()
|
HIKE = auto_enum()
|
||||||
|
|
||||||
|
|
||||||
class PermissionPart(DocumentedStrEnum):
|
class PermissionPart(DocumentedStrEnum):
|
||||||
@@ -139,7 +138,6 @@ class User(mixin.RowId, UserBase, table=True):
|
|||||||
# --- many-to-many links ---------------------------------------------------
|
# --- many-to-many links ---------------------------------------------------
|
||||||
roles: list["Role"] = Relationship(back_populates="users", link_model=UserRoleLink)
|
roles: list["Role"] = Relationship(back_populates="users", link_model=UserRoleLink)
|
||||||
event_links: list["EventUserLink"] = Relationship(back_populates="user")
|
event_links: list["EventUserLink"] = Relationship(back_populates="user")
|
||||||
serie_links: list["SerieUserLink"] = Relationship(back_populates="user")
|
|
||||||
|
|
||||||
# --- CRUD actions ---------------------------------------------------------
|
# --- CRUD actions ---------------------------------------------------------
|
||||||
@classmethod
|
@classmethod
|
||||||
|
|||||||
@@ -5,16 +5,16 @@ from fastapi.testclient import TestClient
|
|||||||
from sqlmodel import Session
|
from sqlmodel import Session
|
||||||
|
|
||||||
from app.core.config import settings
|
from app.core.config import settings
|
||||||
from app.tests.utils.serie import create_random_serie
|
from app.tests.utils.hike import create_random_hike
|
||||||
|
|
||||||
|
|
||||||
def test_create_serie(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
|
def test_create_hike(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
|
||||||
data = {
|
data = {
|
||||||
"name": "Zondag spel",
|
"name": "RSW Maasdelta 2026",
|
||||||
"contact": "Rick",
|
"contact": "Sebas",
|
||||||
}
|
}
|
||||||
response = client.post(
|
response = client.post(
|
||||||
f"{settings.API_V1_STR}/series/",
|
f"{settings.API_V1_STR}/hikes/",
|
||||||
headers=superuser_token_headers,
|
headers=superuser_token_headers,
|
||||||
json=data,
|
json=data,
|
||||||
)
|
)
|
||||||
@@ -25,13 +25,13 @@ def test_create_serie(client: TestClient, superuser_token_headers: dict[str, str
|
|||||||
assert "id" in content
|
assert "id" in content
|
||||||
|
|
||||||
|
|
||||||
def test_create_serie_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
|
def test_create_hike_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
|
||||||
data = {
|
data = {
|
||||||
"name": "Zondag Spel",
|
"name": "RSW Maasdelta 2026",
|
||||||
"contact": "Rick",
|
"contact": "Sebas",
|
||||||
}
|
}
|
||||||
response = client.post(
|
response = client.post(
|
||||||
f"{settings.API_V1_STR}/series/",
|
f"{settings.API_V1_STR}/hikes/",
|
||||||
headers=normal_user_token_headers,
|
headers=normal_user_token_headers,
|
||||||
json=data,
|
json=data,
|
||||||
)
|
)
|
||||||
@@ -39,43 +39,43 @@ def test_create_serie_no_permissions(client: TestClient, normal_user_token_heade
|
|||||||
assert response.json()["detail"] == "Not enough permissions"
|
assert response.json()["detail"] == "Not enough permissions"
|
||||||
|
|
||||||
|
|
||||||
def test_read_serie(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
|
def test_read_hike(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
|
||||||
serie = create_random_serie(db)
|
hike = create_random_hike(db)
|
||||||
response = client.get(
|
response = client.get(
|
||||||
f"{settings.API_V1_STR}/series/{serie.id}",
|
f"{settings.API_V1_STR}/hikes/{hike.id}",
|
||||||
headers=superuser_token_headers,
|
headers=superuser_token_headers,
|
||||||
)
|
)
|
||||||
assert response.status_code == status.HTTP_200_OK
|
assert response.status_code == status.HTTP_200_OK
|
||||||
content = response.json()
|
content = response.json()
|
||||||
assert content["id"] == str(serie.id)
|
assert content["id"] == str(hike.id)
|
||||||
assert content["name"] == serie.name
|
assert content["name"] == hike.name
|
||||||
assert content["contact"] == serie.contact
|
assert content["contact"] == hike.contact
|
||||||
|
|
||||||
|
|
||||||
def test_read_serie_not_found(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
|
def test_read_hike_not_found(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
|
||||||
response = client.get(
|
response = client.get(
|
||||||
f"{settings.API_V1_STR}/series/{uuid.uuid4()}",
|
f"{settings.API_V1_STR}/hikes/{uuid.uuid4()}",
|
||||||
headers=superuser_token_headers,
|
headers=superuser_token_headers,
|
||||||
)
|
)
|
||||||
assert response.status_code == status.HTTP_404_NOT_FOUND
|
assert response.status_code == status.HTTP_404_NOT_FOUND
|
||||||
assert response.json()["detail"] == "Serie not found"
|
assert response.json()["detail"] == "Hike not found"
|
||||||
|
|
||||||
|
|
||||||
def test_read_serie_no_permission(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
|
def test_read_hike_no_permission(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
|
||||||
serie = create_random_serie(db)
|
hike = create_random_hike(db)
|
||||||
response = client.get(
|
response = client.get(
|
||||||
f"{settings.API_V1_STR}/series/{serie.id}",
|
f"{settings.API_V1_STR}/hikes/{hike.id}",
|
||||||
headers=normal_user_token_headers,
|
headers=normal_user_token_headers,
|
||||||
)
|
)
|
||||||
assert response.status_code == status.HTTP_403_FORBIDDEN
|
assert response.status_code == status.HTTP_403_FORBIDDEN
|
||||||
assert response.json()["detail"] == "Not enough permissions"
|
assert response.json()["detail"] == "Not enough permissions"
|
||||||
|
|
||||||
|
|
||||||
def test_read_series(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
|
def test_read_hikes(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
|
||||||
create_random_serie(db)
|
create_random_hike(db)
|
||||||
create_random_serie(db)
|
create_random_hike(db)
|
||||||
response = client.get(
|
response = client.get(
|
||||||
f"{settings.API_V1_STR}/series/",
|
f"{settings.API_V1_STR}/hikes/",
|
||||||
headers=superuser_token_headers,
|
headers=superuser_token_headers,
|
||||||
)
|
)
|
||||||
assert response.status_code == status.HTTP_200_OK
|
assert response.status_code == status.HTTP_200_OK
|
||||||
@@ -87,11 +87,11 @@ def test_read_series(client: TestClient, superuser_token_headers: dict[str, str]
|
|||||||
assert len(content["data"]) <= content["count"]
|
assert len(content["data"]) <= content["count"]
|
||||||
|
|
||||||
|
|
||||||
def test_read_series_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
|
def test_read_hikes_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
|
||||||
create_random_serie(db)
|
create_random_hike(db)
|
||||||
create_random_serie(db)
|
create_random_hike(db)
|
||||||
response = client.get(
|
response = client.get(
|
||||||
f"{settings.API_V1_STR}/series/",
|
f"{settings.API_V1_STR}/hikes/",
|
||||||
headers=normal_user_token_headers,
|
headers=normal_user_token_headers,
|
||||||
)
|
)
|
||||||
assert response.status_code == status.HTTP_200_OK
|
assert response.status_code == status.HTTP_200_OK
|
||||||
@@ -103,46 +103,46 @@ def test_read_series_no_permissions(client: TestClient, normal_user_token_header
|
|||||||
assert len(content["data"]) == 0
|
assert len(content["data"]) == 0
|
||||||
|
|
||||||
|
|
||||||
def test_update_serie(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
|
def test_update_hike(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
|
||||||
serie = create_random_serie(db)
|
hike = create_random_hike(db)
|
||||||
data = {
|
data = {
|
||||||
"name": "Updated name",
|
"name": "Updated name",
|
||||||
"contact": "Updated contact",
|
"contact": "Updated contact",
|
||||||
}
|
}
|
||||||
response = client.put(
|
response = client.put(
|
||||||
f"{settings.API_V1_STR}/series/{serie.id}",
|
f"{settings.API_V1_STR}/hikes/{hike.id}",
|
||||||
headers=superuser_token_headers,
|
headers=superuser_token_headers,
|
||||||
json=data,
|
json=data,
|
||||||
)
|
)
|
||||||
assert response.status_code == status.HTTP_200_OK
|
assert response.status_code == status.HTTP_200_OK
|
||||||
content = response.json()
|
content = response.json()
|
||||||
assert content["id"] == str(serie.id)
|
assert content["id"] == str(hike.id)
|
||||||
assert content["name"] == data["name"]
|
assert content["name"] == data["name"]
|
||||||
assert content["contact"] == data["contact"]
|
assert content["contact"] == data["contact"]
|
||||||
|
|
||||||
|
|
||||||
def test_update_serie_not_found(client: TestClient, superuser_token_headers: dict[str, str]) -> None:
|
def test_update_hike_not_found(client: TestClient, superuser_token_headers: dict[str, str]) -> None:
|
||||||
data = {
|
data = {
|
||||||
"name": "Not found",
|
"name": "Not found",
|
||||||
"contact": "Not found",
|
"contact": "Not found",
|
||||||
}
|
}
|
||||||
response = client.put(
|
response = client.put(
|
||||||
f"{settings.API_V1_STR}/series/{uuid.uuid4()}",
|
f"{settings.API_V1_STR}/hikes/{uuid.uuid4()}",
|
||||||
headers=superuser_token_headers,
|
headers=superuser_token_headers,
|
||||||
json=data,
|
json=data,
|
||||||
)
|
)
|
||||||
assert response.status_code == status.HTTP_404_NOT_FOUND
|
assert response.status_code == status.HTTP_404_NOT_FOUND
|
||||||
assert response.json()["detail"] == "Serie not found"
|
assert response.json()["detail"] == "Hike not found"
|
||||||
|
|
||||||
|
|
||||||
def test_update_serie_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
|
def test_update_hike_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
|
||||||
serie = create_random_serie(db)
|
hike = create_random_hike(db)
|
||||||
data = {
|
data = {
|
||||||
"name": "No permissions",
|
"name": "No permissions",
|
||||||
"contact": "No permissions",
|
"contact": "No permissions",
|
||||||
}
|
}
|
||||||
response = client.put(
|
response = client.put(
|
||||||
f"{settings.API_V1_STR}/series/{serie.id}",
|
f"{settings.API_V1_STR}/hikes/{hike.id}",
|
||||||
headers=normal_user_token_headers,
|
headers=normal_user_token_headers,
|
||||||
json=data,
|
json=data,
|
||||||
)
|
)
|
||||||
@@ -150,29 +150,29 @@ def test_update_serie_no_permissions(client: TestClient, normal_user_token_heade
|
|||||||
assert response.json()["detail"] == "Not enough permissions"
|
assert response.json()["detail"] == "Not enough permissions"
|
||||||
|
|
||||||
|
|
||||||
def test_delete_serie(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
|
def test_delete_hike(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
|
||||||
serie = create_random_serie(db)
|
hike = create_random_hike(db)
|
||||||
response = client.delete(
|
response = client.delete(
|
||||||
f"{settings.API_V1_STR}/series/{serie.id}",
|
f"{settings.API_V1_STR}/hikes/{hike.id}",
|
||||||
headers=superuser_token_headers,
|
headers=superuser_token_headers,
|
||||||
)
|
)
|
||||||
assert response.status_code == status.HTTP_200_OK
|
assert response.status_code == status.HTTP_200_OK
|
||||||
assert response.json()["message"] == "Serie deleted successfully"
|
assert response.json()["message"] == "Hike deleted successfully"
|
||||||
|
|
||||||
|
|
||||||
def test_delete_serie_not_found(client: TestClient, superuser_token_headers: dict[str, str]) -> None:
|
def test_delete_hike_not_found(client: TestClient, superuser_token_headers: dict[str, str]) -> None:
|
||||||
response = client.delete(
|
response = client.delete(
|
||||||
f"{settings.API_V1_STR}/series/{uuid.uuid4()}",
|
f"{settings.API_V1_STR}/hikes/{uuid.uuid4()}",
|
||||||
headers=superuser_token_headers,
|
headers=superuser_token_headers,
|
||||||
)
|
)
|
||||||
assert response.status_code == status.HTTP_404_NOT_FOUND
|
assert response.status_code == status.HTTP_404_NOT_FOUND
|
||||||
assert response.json()["detail"] == "Serie not found"
|
assert response.json()["detail"] == "Hike not found"
|
||||||
|
|
||||||
|
|
||||||
def test_delete_serie_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
|
def test_delete_hike_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
|
||||||
serie = create_random_serie(db)
|
hike = create_random_hike(db)
|
||||||
response = client.delete(
|
response = client.delete(
|
||||||
f"{settings.API_V1_STR}/series/{serie.id}",
|
f"{settings.API_V1_STR}/hikes/{hike.id}",
|
||||||
headers=normal_user_token_headers,
|
headers=normal_user_token_headers,
|
||||||
)
|
)
|
||||||
assert response.status_code == status.HTTP_403_FORBIDDEN
|
assert response.status_code == status.HTTP_403_FORBIDDEN
|
||||||
@@ -1,213 +0,0 @@
|
|||||||
import uuid
|
|
||||||
|
|
||||||
from fastapi import status
|
|
||||||
from fastapi.testclient import TestClient
|
|
||||||
from sqlmodel import Session
|
|
||||||
|
|
||||||
from app.core.config import settings
|
|
||||||
from app.models.base import VisitedCountType
|
|
||||||
from app.tests.utils.post import create_random_post
|
|
||||||
|
|
||||||
from app.models.post import PostType, ReplayType
|
|
||||||
|
|
||||||
|
|
||||||
def test_create_post(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
|
|
||||||
data = {
|
|
||||||
"post_type": PostType.POINTS | PostType.VISITED | PostType.OPTION,
|
|
||||||
"replay": ReplayType.NOT_POSSIBLE,
|
|
||||||
|
|
||||||
"name": "Post 1",
|
|
||||||
"short_name": "1",
|
|
||||||
|
|
||||||
"contact": "Rick",
|
|
||||||
"description": "Post met renspel",
|
|
||||||
|
|
||||||
"question_file": None,
|
|
||||||
"answer_file": None,
|
|
||||||
|
|
||||||
"max_points": 5,
|
|
||||||
"visited_points": 5,
|
|
||||||
"visited_count_type": VisitedCountType.ONE_VISIT,
|
|
||||||
|
|
||||||
"min_teams": 1,
|
|
||||||
"max_teams": 2,
|
|
||||||
}
|
|
||||||
response = client.post(
|
|
||||||
f"{settings.API_V1_STR}/posts/",
|
|
||||||
headers=superuser_token_headers,
|
|
||||||
json=data,
|
|
||||||
)
|
|
||||||
assert response.status_code == status.HTTP_200_OK
|
|
||||||
content = response.json()
|
|
||||||
assert content["post_type"] == data["post_type"]
|
|
||||||
assert content["replay"] == data["replay"]
|
|
||||||
assert content["name"] == data["name"]
|
|
||||||
assert content["short_name"] == data["short_name"]
|
|
||||||
assert content["contact"] == data["contact"]
|
|
||||||
assert content["description"] == data["description"]
|
|
||||||
assert content["question_file"] == data["question_file"]
|
|
||||||
assert content["answer_file"] == data["answer_file"]
|
|
||||||
assert content["max_points"] == data["max_points"]
|
|
||||||
assert content["visited_points"] == data["visited_points"]
|
|
||||||
assert content["visited_count_type"] == data["visited_count_type"]
|
|
||||||
assert content["min_teams"] == data["min_teams"]
|
|
||||||
assert content["max_teams"] == data["max_teams"]
|
|
||||||
assert "id" in content
|
|
||||||
|
|
||||||
|
|
||||||
def test_create_post_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
|
|
||||||
data = {
|
|
||||||
"post_type": PostType.POINTS | PostType.VISITED | PostType.OPTION,
|
|
||||||
"replay": ReplayType.NOT_POSSIBLE,
|
|
||||||
"name": "No permissions",
|
|
||||||
"short_name": "No perm",
|
|
||||||
}
|
|
||||||
response = client.post(
|
|
||||||
f"{settings.API_V1_STR}/posts/",
|
|
||||||
headers=normal_user_token_headers,
|
|
||||||
json=data,
|
|
||||||
)
|
|
||||||
assert response.status_code == status.HTTP_403_FORBIDDEN
|
|
||||||
assert response.json()["detail"] == "Not enough permissions"
|
|
||||||
|
|
||||||
|
|
||||||
def test_read_post(
|
|
||||||
client: TestClient, superuser_token_headers: dict[str, str], db: Session
|
|
||||||
) -> None:
|
|
||||||
post = create_random_post(db)
|
|
||||||
response = client.get(
|
|
||||||
f"{settings.API_V1_STR}/posts/{post.id}",
|
|
||||||
headers=superuser_token_headers,
|
|
||||||
)
|
|
||||||
assert response.status_code == status.HTTP_200_OK
|
|
||||||
content = response.json()
|
|
||||||
assert content["id"] == str(post.id)
|
|
||||||
assert content["name"] == post.name
|
|
||||||
assert content["contact"] == post.contact
|
|
||||||
|
|
||||||
|
|
||||||
def test_read_post_not_found(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
|
|
||||||
response = client.get(
|
|
||||||
f"{settings.API_V1_STR}/posts/{uuid.uuid4()}",
|
|
||||||
headers=superuser_token_headers,
|
|
||||||
)
|
|
||||||
assert response.status_code == status.HTTP_404_NOT_FOUND
|
|
||||||
assert response.json()["detail"] == "Post not found"
|
|
||||||
|
|
||||||
|
|
||||||
def test_read_post_no_permission(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
|
|
||||||
post = create_random_post(db)
|
|
||||||
response = client.get(
|
|
||||||
f"{settings.API_V1_STR}/posts/{post.id}",
|
|
||||||
headers=normal_user_token_headers,
|
|
||||||
)
|
|
||||||
assert response.status_code == status.HTTP_403_FORBIDDEN
|
|
||||||
assert response.json()["detail"] == "Not enough permissions"
|
|
||||||
|
|
||||||
|
|
||||||
def test_read_posts(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
|
|
||||||
create_random_post(db)
|
|
||||||
create_random_post(db)
|
|
||||||
response = client.get(
|
|
||||||
f"{settings.API_V1_STR}/posts/",
|
|
||||||
headers=superuser_token_headers,
|
|
||||||
)
|
|
||||||
assert response.status_code == status.HTTP_200_OK
|
|
||||||
content = response.json()
|
|
||||||
assert "count" in content
|
|
||||||
assert content["count"] >= 2
|
|
||||||
assert "data" in content
|
|
||||||
assert isinstance(content["data"], list)
|
|
||||||
assert len(content["data"]) <= content["count"]
|
|
||||||
|
|
||||||
|
|
||||||
def test_read_posts_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
|
|
||||||
create_random_post(db)
|
|
||||||
create_random_post(db)
|
|
||||||
response = client.get(
|
|
||||||
f"{settings.API_V1_STR}/posts/",
|
|
||||||
headers=normal_user_token_headers,
|
|
||||||
)
|
|
||||||
assert response.status_code == status.HTTP_200_OK
|
|
||||||
content = response.json()
|
|
||||||
assert "count" in content
|
|
||||||
assert content["count"] == 0
|
|
||||||
assert "data" in content
|
|
||||||
assert isinstance(content["data"], list)
|
|
||||||
assert len(content["data"]) == 0
|
|
||||||
|
|
||||||
|
|
||||||
def test_update_post(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
|
|
||||||
post = create_random_post(db)
|
|
||||||
data = {
|
|
||||||
"name": "Updated name",
|
|
||||||
"short_name": "4",
|
|
||||||
}
|
|
||||||
response = client.put(
|
|
||||||
f"{settings.API_V1_STR}/posts/{post.id}",
|
|
||||||
headers=superuser_token_headers,
|
|
||||||
json=data,
|
|
||||||
)
|
|
||||||
assert response.status_code == status.HTTP_200_OK
|
|
||||||
content = response.json()
|
|
||||||
assert content["id"] == str(post.id)
|
|
||||||
assert content["name"] == data["name"]
|
|
||||||
assert content["short_name"] == data["short_name"]
|
|
||||||
|
|
||||||
|
|
||||||
def test_update_post_not_found(client: TestClient, superuser_token_headers: dict[str, str]) -> None:
|
|
||||||
data = {
|
|
||||||
"name": "Not found",
|
|
||||||
"short_name": "5",
|
|
||||||
}
|
|
||||||
response = client.put(
|
|
||||||
f"{settings.API_V1_STR}/posts/{uuid.uuid4()}",
|
|
||||||
headers=superuser_token_headers,
|
|
||||||
json=data,
|
|
||||||
)
|
|
||||||
assert response.status_code == status.HTTP_404_NOT_FOUND
|
|
||||||
assert response.json()["detail"] == "Post not found"
|
|
||||||
|
|
||||||
|
|
||||||
def test_update_post_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
|
|
||||||
post = create_random_post(db)
|
|
||||||
data = {
|
|
||||||
"name": "No permissions",
|
|
||||||
"short_name": "6",
|
|
||||||
}
|
|
||||||
response = client.put(
|
|
||||||
f"{settings.API_V1_STR}/posts/{post.id}",
|
|
||||||
headers=normal_user_token_headers,
|
|
||||||
json=data,
|
|
||||||
)
|
|
||||||
assert response.status_code == status.HTTP_403_FORBIDDEN
|
|
||||||
assert response.json()["detail"] == "Not enough permissions"
|
|
||||||
|
|
||||||
|
|
||||||
def test_delete_post(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
|
|
||||||
post = create_random_post(db)
|
|
||||||
response = client.delete(
|
|
||||||
f"{settings.API_V1_STR}/posts/{post.id}",
|
|
||||||
headers=superuser_token_headers,
|
|
||||||
)
|
|
||||||
assert response.status_code == status.HTTP_200_OK
|
|
||||||
assert response.json()["message"] == "Post deleted successfully"
|
|
||||||
|
|
||||||
|
|
||||||
def test_delete_post_not_found(client: TestClient, superuser_token_headers: dict[str, str]) -> None:
|
|
||||||
response = client.delete(
|
|
||||||
f"{settings.API_V1_STR}/posts/{uuid.uuid4()}",
|
|
||||||
headers=superuser_token_headers,
|
|
||||||
)
|
|
||||||
assert response.status_code == status.HTTP_404_NOT_FOUND
|
|
||||||
assert response.json()["detail"] == "Post not found"
|
|
||||||
|
|
||||||
|
|
||||||
def test_delete_post_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
|
|
||||||
post = create_random_post(db)
|
|
||||||
response = client.delete(
|
|
||||||
f"{settings.API_V1_STR}/posts/{post.id}",
|
|
||||||
headers=normal_user_token_headers,
|
|
||||||
)
|
|
||||||
assert response.status_code == status.HTTP_403_FORBIDDEN
|
|
||||||
assert response.json()["detail"] == "Not enough permissions"
|
|
||||||
12
backend/app/tests/utils/hike.py
Normal file
12
backend/app/tests/utils/hike.py
Normal file
@@ -0,0 +1,12 @@
|
|||||||
|
from sqlmodel import Session
|
||||||
|
|
||||||
|
from app.models.hike import Hike, HikeCreate
|
||||||
|
from app.tests.utils.utils import random_lower_string
|
||||||
|
|
||||||
|
|
||||||
|
def create_random_hike(db: Session, name: str = None) -> Hike:
|
||||||
|
if not name:
|
||||||
|
name = random_lower_string()
|
||||||
|
|
||||||
|
hike_in = HikeCreate(name=name)
|
||||||
|
return Hike.create(session=db, create_obj=hike_in)
|
||||||
@@ -1,12 +0,0 @@
|
|||||||
from sqlmodel import Session
|
|
||||||
|
|
||||||
from app.models.post import Post, PostCreate
|
|
||||||
from app.tests.utils.utils import random_lower_string
|
|
||||||
|
|
||||||
|
|
||||||
def create_random_post(db: Session, name: str = None) -> Post:
|
|
||||||
if not name:
|
|
||||||
name = random_lower_string()
|
|
||||||
|
|
||||||
post_in = PostCreate(name=name)
|
|
||||||
return Post.create(session=db, create_obj=post_in)
|
|
||||||
@@ -1,12 +0,0 @@
|
|||||||
from sqlmodel import Session
|
|
||||||
|
|
||||||
from app.models.serie import Serie, SerieCreate
|
|
||||||
from app.tests.utils.utils import random_lower_string
|
|
||||||
|
|
||||||
|
|
||||||
def create_random_serie(db: Session, name: str = None) -> Serie:
|
|
||||||
if not name:
|
|
||||||
name = random_lower_string()
|
|
||||||
|
|
||||||
serie_in = SerieCreate(name=name)
|
|
||||||
return Serie.create(session=db, create_obj=serie_in)
|
|
||||||
Reference in New Issue
Block a user