Implement divisions

This commit is contained in:
Sebastiaan
2025-06-10 21:39:39 +02:00
parent 13a1b4dd1e
commit 1d9e333ee0
11 changed files with 519 additions and 1 deletions

View File

@@ -4,6 +4,7 @@ from app.api.routes import (
events, events,
teams, teams,
associations, associations,
divisions,
login, login,
private, private,
users, users,
@@ -20,6 +21,7 @@ api_router.include_router(utils.router)
api_router.include_router(events.router) api_router.include_router(events.router)
api_router.include_router(teams.router) api_router.include_router(teams.router)
api_router.include_router(associations.router) api_router.include_router(associations.router)
api_router.include_router(divisions.router)
if settings.ENVIRONMENT == "local": if settings.ENVIRONMENT == "local":

View File

@@ -16,6 +16,10 @@ from app.models.association import (
AssociationPublic, AssociationPublic,
AssociationsPublic, AssociationsPublic,
) )
from app.models.division import (
Division,
DivisionsPublic,
)
from app.models.user import ( from app.models.user import (
PermissionModule, PermissionModule,
PermissionPart, PermissionPart,
@@ -130,3 +134,43 @@ def delete_association(session: SessionDep,current_user: CurrentUser, id: RowId)
return Message(message="Association deleted successfully") return Message(message="Association deleted successfully")
# endregion # endregion
# region # Associations / Divisions ############################################
@router.get("/{associations_id}/divisions/", response_model=DivisionsPublic)
def read_association_division(
session: SessionDep, current_user: CurrentUser, associations_id: RowId, skip: int = 0, limit: int = 100
) -> Any:
"""
Retrieve all association divisions.
"""
association = session.get(Association, associations_id)
if not association:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Association not found")
if not current_user.has_permission(
module=PermissionModule.ASSOCIATION,
part=PermissionPart.ADMIN,
rights=(PermissionRight.MANAGE_DIVISIONS | PermissionRight.READ),
):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions")
count_statement = (select(func.count())
.select_from(Division)
.where(Division.association_id == association.id)
)
count = session.exec(count_statement).one()
statement = (select(Division)
.where(Division.association_id == association.id)
.offset(skip)
.limit(limit)
)
divisions = session.exec(statement).all()
return DivisionsPublic(data=divisions, count=count)
# endregion

View File

@@ -0,0 +1,132 @@
from typing import Any
from fastapi import APIRouter, HTTPException, status
from sqlmodel import func, select
from app.api.deps import CurrentUser, SessionDep
from app.models.base import (
ApiTags,
Message,
RowId,
)
from app.models.division import (
Division,
DivisionCreate,
DivisionUpdate,
DivisionPublic,
DivisionsPublic,
)
from app.models.user import (
PermissionModule,
PermissionPart,
PermissionRight,
)
router = APIRouter(prefix="/divisions", tags=[ApiTags.DIVISIONS])
# region # Divisions ###########################################################
@router.get("/", response_model=DivisionsPublic)
def read_divisions(
session: SessionDep, current_user: CurrentUser, skip: int = 0, limit: int = 100
) -> Any:
"""
Retrieve all divisions.
"""
if current_user.has_permissions(
module=PermissionModule.DIVISION,
part=PermissionPart.ADMIN,
rights=PermissionRight.READ,
):
count_statement = select(func.count()).select_from(Division)
count = session.exec(count_statement).one()
statement = select(Division).offset(skip).limit(limit)
divisions = session.exec(statement).all()
return DivisionsPublic(data=divisions, count=count)
return DivisionsPublic(data=[], count=0)
@router.get("/{id}", response_model=DivisionPublic)
def read_division(session: SessionDep, current_user: CurrentUser, id: RowId) -> Any:
"""
Get division by ID.
"""
division = session.get(Division, id)
if not division:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Division not found")
if not current_user.has_permissions(
module=PermissionModule.DIVISION,
part=PermissionPart.ADMIN,
rights=PermissionRight.READ,
):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions")
return division
@router.post("/", response_model=DivisionPublic)
def create_division(
*, session: SessionDep, current_user: CurrentUser, division_in: DivisionCreate
) -> Any:
"""
Create new division.
"""
if not current_user.has_permissions(
module=PermissionModule.DIVISION,
part=PermissionPart.ADMIN,
rights=PermissionRight.CREATE,
):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions")
division = Division.create(create_obj=division_in, session=session)
return division
@router.put("/{id}", response_model=DivisionPublic)
def update_division(
*, session: SessionDep, current_user: CurrentUser, id: RowId, division_in: DivisionUpdate
) -> Any:
"""
Update a division.
"""
division = session.get(Division, id)
if not division:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Division not found")
if not current_user.has_permissions(
module=PermissionModule.DIVISION,
part=PermissionPart.ADMIN,
rights=PermissionRight.UPDATE,
):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions")
division = Division.update(db_obj=division, in_obj=division_in, session=session)
return division
@router.delete("/{id}")
def delete_division(session: SessionDep,current_user: CurrentUser, id: RowId) -> Message:
"""
Delete a division.
"""
division = session.get(Division, id)
if not division:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Division not found")
if not current_user.has_permissions(
module=PermissionModule.ASSOCIATION,
part=PermissionPart.ADMIN,
rights=PermissionRight.DELETE,
):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions")
session.delete(division)
session.commit()
return Message(message="Division deleted successfully")
# endregion

View File

@@ -8,6 +8,9 @@ from app.models.event import (
from app.models.association import ( from app.models.association import (
Association, Association,
) )
from app.models.division import (
Division,
)
from app.models.team import ( from app.models.team import (
Team, Team,
TeamCreate, TeamCreate,

View File

@@ -2,6 +2,7 @@ from typing import TYPE_CHECKING
from sqlmodel import ( from sqlmodel import (
Session, Session,
Relationship,
) )
from . import mixin from . import mixin
@@ -9,6 +10,9 @@ from .base import (
BaseSQLModel, BaseSQLModel,
) )
if TYPE_CHECKING:
from .division import Division
# region # Association ######################################################### # region # Association #########################################################
@@ -37,6 +41,7 @@ class Association(mixin.RowId, AssociationBase, table=True):
# --- read only items ------------------------------------------------------ # --- read only items ------------------------------------------------------
# --- back_populates links ------------------------------------------------- # --- back_populates links -------------------------------------------------
divisions: list["Division"] = Relationship(back_populates="association", cascade_delete=True)
# --- CRUD actions --------------------------------------------------------- # --- CRUD actions ---------------------------------------------------------
@classmethod @classmethod

View File

@@ -57,6 +57,7 @@ class ApiTags(DocumentedStrEnum):
EVENTS = "Events" EVENTS = "Events"
TEAMS = "Teams" TEAMS = "Teams"
ASSOCIATIONS = "Associations" ASSOCIATIONS = "Associations"
DIVISIONS = "Divisions"
# endregion # endregion

View File

@@ -0,0 +1,83 @@
from typing import TYPE_CHECKING
from sqlmodel import (
Session,
Field,
Relationship,
)
from . import mixin
from .base import (
BaseSQLModel,
RowId,
)
if TYPE_CHECKING:
from .association import Association
# region # Divisions ###########################################################
class DivisionBase(
mixin.Name,
mixin.Contact,
mixin.ScoutingId,
BaseSQLModel,
):
association_id: RowId = Field(
foreign_key="association.id", nullable=False, ondelete="CASCADE"
)
# Properties to receive via API on creation
class DivisionCreate(DivisionBase):
pass
# Properties to receive via API on update, all are optional
class DivisionUpdate(DivisionBase):
association_id: RowId | None = Field(default=None)
class Division(mixin.RowId, DivisionBase, table=True):
# --- database only items --------------------------------------------------
# --- read only items ------------------------------------------------------
# --- back_populates links -------------------------------------------------
association: "Association" = Relationship(back_populates="divisions") # , cascade_delete=True)
# --- CRUD actions ---------------------------------------------------------
@classmethod
def create(cls, *, session: Session, create_obj: DivisionCreate) -> "Division":
data_obj = create_obj.model_dump(exclude_unset=True)
db_obj = cls.model_validate(data_obj)
session.add(db_obj)
session.commit()
session.refresh(db_obj)
return db_obj
@classmethod
def update(
cls, *, session: Session, db_obj: "Division", in_obj: DivisionUpdate
) -> "Division":
data_obj = in_obj.model_dump(exclude_unset=True)
db_obj.sqlmodel_update(data_obj)
session.add(db_obj)
session.commit()
session.refresh(db_obj)
return db_obj
# Properties to return via API, id is always required
class DivisionPublic(mixin.RowIdPublic, DivisionBase):
association_id: RowId
class DivisionsPublic(BaseSQLModel):
data: list[DivisionPublic]
count: int
# endregion

View File

@@ -28,6 +28,7 @@ class PermissionModule(DocumentedStrEnum):
EVENT = auto_enum() EVENT = auto_enum()
TEAM = auto_enum() TEAM = auto_enum()
ASSOCIATION = auto_enum() ASSOCIATION = auto_enum()
DIVISION = auto_enum()
class PermissionPart(DocumentedStrEnum): class PermissionPart(DocumentedStrEnum):
@@ -43,8 +44,9 @@ class PermissionRight(DocumentedIntFlag):
MANAGE_USERS = auto_enum() MANAGE_USERS = auto_enum()
MANAGE_TEAMS = auto_enum() MANAGE_TEAMS = auto_enum()
MANAGE_DIVISIONS = auto_enum()
ADMIN = CREATE | READ | UPDATE | DELETE | MANAGE_USERS | MANAGE_TEAMS ADMIN = CREATE | READ | UPDATE | DELETE | MANAGE_USERS | MANAGE_TEAMS | MANAGE_DIVISIONS
# ############################################################################## # ##############################################################################

View File

@@ -6,6 +6,7 @@ from sqlmodel import Session
from app.core.config import settings from app.core.config import settings
from app.tests.utils.association import create_random_association from app.tests.utils.association import create_random_association
from app.tests.utils.division import create_random_division
def test_create_association(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None: def test_create_association(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
@@ -182,3 +183,40 @@ def test_delete_association_no_permissions(client: TestClient, normal_user_token
) )
assert response.status_code == status.HTTP_403_FORBIDDEN assert response.status_code == status.HTTP_403_FORBIDDEN
assert response.json()["detail"] == "Not enough permissions" assert response.json()["detail"] == "Not enough permissions"
def test_read_association_divisions(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
association = create_random_association(db)
create_random_division(db, association=association)
create_random_division(db, association=association)
response = client.get(
f"{settings.API_V1_STR}/associations/{association.id}/divisions",
headers=superuser_token_headers,
)
assert response.status_code == status.HTTP_200_OK
content = response.json()
assert "count" in content
assert content["count"] == 2
assert "data" in content
assert isinstance(content["data"], list)
assert len(content["data"]) <= content["count"]
def test_read_association_divisions_not_found(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
response = client.get(
f"{settings.API_V1_STR}/associations/{uuid.uuid4()}/divisions",
headers=superuser_token_headers,
)
assert response.status_code == status.HTTP_404_NOT_FOUND
assert response.json()["detail"] == "Association not found"
def test_read_association_divisions_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
association = create_random_association(db)
create_random_division(db, association=association)
response = client.get(
f"{settings.API_V1_STR}/associations/{association.id}/divisions",
headers=normal_user_token_headers,
)
assert response.status_code == status.HTTP_403_FORBIDDEN
assert response.json()["detail"] == "Not enough permissions"

View File

@@ -0,0 +1,191 @@
import uuid
from fastapi import status
from fastapi.testclient import TestClient
from sqlmodel import Session
from app.core.config import settings
from app.tests.utils.division import create_random_division
from app.tests.utils.association import create_random_association
def test_create_division(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
association = create_random_association(db)
data = {
"name": "Verkenners",
"scouting_id": "122314",
"association_id": str(association.id),
}
response = client.post(
f"{settings.API_V1_STR}/divisions/",
headers=superuser_token_headers,
json=data,
)
assert response.status_code == status.HTTP_200_OK
content = response.json()
assert content["name"] == data["name"]
assert content["scouting_id"] == data["scouting_id"]
assert content["association_id"] == str(association.id)
assert "contact" in content
assert "id" in content
def test_create_division_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
association = create_random_association(db)
data = {
"name": "Padvinsters",
"contact": "-",
"scouting_id": "122323",
"association_id": str(association.id),
}
response = client.post(
f"{settings.API_V1_STR}/divisions/",
headers=normal_user_token_headers,
json=data,
)
assert response.status_code == status.HTTP_403_FORBIDDEN
assert response.json()["detail"] == "Not enough permissions"
def test_read_division(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
division = create_random_division(db)
response = client.get(
f"{settings.API_V1_STR}/divisions/{division.id}",
headers=superuser_token_headers,
)
assert response.status_code == status.HTTP_200_OK
content = response.json()
assert content["id"] == str(division.id)
assert content["name"] == division.name
assert content["contact"] == division.contact
assert content["scouting_id"] == division.scouting_id
assert content["association_id"] == str(division.association_id)
def test_read_division_not_found(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
response = client.get(
f"{settings.API_V1_STR}/divisions/{uuid.uuid4()}",
headers=superuser_token_headers,
)
assert response.status_code == status.HTTP_404_NOT_FOUND
assert response.json()["detail"] == "Division not found"
def test_read_division_no_permission(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
division = create_random_division(db)
response = client.get(
f"{settings.API_V1_STR}/divisions/{division.id}",
headers=normal_user_token_headers,
)
assert response.status_code == status.HTTP_403_FORBIDDEN
assert response.json()["detail"] == "Not enough permissions"
def test_read_divisions(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
create_random_division(db)
create_random_division(db)
response = client.get(
f"{settings.API_V1_STR}/divisions/",
headers=superuser_token_headers,
)
assert response.status_code == status.HTTP_200_OK
content = response.json()
assert "count" in content
assert content["count"] >= 2
assert "data" in content
assert isinstance(content["data"], list)
assert len(content["data"]) <= content["count"]
def test_read_divisions_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
create_random_division(db)
create_random_division(db)
response = client.get(
f"{settings.API_V1_STR}/divisions/",
headers=normal_user_token_headers,
)
assert response.status_code == status.HTTP_200_OK
content = response.json()
assert "count" in content
assert content["count"] == 0
assert "data" in content
assert isinstance(content["data"], list)
assert len(content["data"]) == 0
def test_update_division(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
division = create_random_division(db)
data = {
"name": "Updated name",
"contact": "Updated contact",
}
response = client.put(
f"{settings.API_V1_STR}/divisions/{division.id}",
headers=superuser_token_headers,
json=data,
)
assert response.status_code == status.HTTP_200_OK
content = response.json()
assert content["id"] == str(division.id)
assert content["name"] == data["name"]
assert content["contact"] == data["contact"]
assert content["scouting_id"] == division.scouting_id
assert content["association_id"] == str(division.association_id)
def test_update_division_not_found(client: TestClient, superuser_token_headers: dict[str, str]) -> None:
data = {
"name": "Not found",
"contact": "Not found",
}
response = client.put(
f"{settings.API_V1_STR}/divisions/{uuid.uuid4()}",
headers=superuser_token_headers,
json=data,
)
assert response.status_code == status.HTTP_404_NOT_FOUND
assert response.json()["detail"] == "Division not found"
def test_update_division_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
division = create_random_division(db)
data = {
"name": "No permissions",
"contact": "No permissions",
}
response = client.put(
f"{settings.API_V1_STR}/divisions/{division.id}",
headers=normal_user_token_headers,
json=data,
)
assert response.status_code == status.HTTP_403_FORBIDDEN
assert response.json()["detail"] == "Not enough permissions"
def test_delete_division(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
division = create_random_division(db)
response = client.delete(
f"{settings.API_V1_STR}/divisions/{division.id}",
headers=superuser_token_headers,
)
assert response.status_code == status.HTTP_200_OK
assert response.json()["message"] == "Division deleted successfully"
def test_delete_division_not_found(client: TestClient, superuser_token_headers: dict[str, str]) -> None:
response = client.delete(
f"{settings.API_V1_STR}/divisions/{uuid.uuid4()}",
headers=superuser_token_headers,
)
assert response.status_code == status.HTTP_404_NOT_FOUND
assert response.json()["detail"] == "Division not found"
def test_delete_division_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
division = create_random_division(db)
response = client.delete(
f"{settings.API_V1_STR}/divisions/{division.id}",
headers=normal_user_token_headers,
)
assert response.status_code == status.HTTP_403_FORBIDDEN
assert response.json()["detail"] == "Not enough permissions"

View File

@@ -0,0 +1,17 @@
from sqlmodel import Session
from app.models.association import Association
from app.models.division import Division, DivisionCreate
from app.tests.utils.association import create_random_association
from app.tests.utils.utils import random_lower_string
def create_random_division(db: Session, name: str = None, association: Association = None) -> Division:
if not name:
name = random_lower_string()
if not association:
association = create_random_association(db)
division_in = DivisionCreate(name=name, association_id=association.id)
return Division.create(session=db, create_obj=division_in)