Implement teams in own file

This commit is contained in:
Sebastiaan
2025-06-09 15:42:44 +02:00
parent 5aab2add76
commit 7387dd7b40
11 changed files with 630 additions and 245 deletions

View File

@@ -2,6 +2,7 @@ from fastapi import APIRouter
from app.api.routes import (
events,
teams,
login,
private,
users,
@@ -16,6 +17,7 @@ api_router.include_router(utils.router)
api_router.include_router(events.router)
api_router.include_router(teams.router)
if settings.ENVIRONMENT == "local":

View File

@@ -16,10 +16,6 @@ from app.models.event import (
EventsPublic,
EventUpdate,
EventUserLink,
EventTeam,
EventTeamCreate,
EventTeamPublic,
EventTeamsPublic,
)
from app.models.user import (
PermissionModule,
@@ -88,8 +84,9 @@ def read_event(session: SessionDep, current_user: CurrentUser, id: RowId) -> Any
module=PermissionModule.EVENT,
part=PermissionPart.ADMIN,
rights=PermissionRight.READ,
) and (event.user_has_rights(user=current_user, rights=PermissionRight.READ)):
) and not (event.user_has_rights(user=current_user, rights=PermissionRight.READ)):
raise HTTPException(status_code=400, detail="Not enough permissions")
return event
@@ -131,7 +128,7 @@ def update_event(
module=PermissionModule.EVENT,
part=PermissionPart.ADMIN,
rights=PermissionRight.UPDATE,
) and (event.user_has_rights(user=current_user, rights=PermissionRight.UPDATE)):
) and not (event.user_has_rights(user=current_user, rights=PermissionRight.UPDATE)):
raise HTTPException(status_code=400, detail="Not enough permissions")
return Event.update(db_obj=event, in_obj=event_in, session=session)
@@ -154,7 +151,7 @@ def delete_event(
module=PermissionModule.EVENT,
part=PermissionPart.ADMIN,
rights=PermissionRight.DELETE,
) and (event.user_has_rights(user=current_user, rights=PermissionRight.DELETE)):
) and not (event.user_has_rights(user=current_user, rights=PermissionRight.DELETE)):
raise HTTPException(status_code=400, detail="Not enough permissions")
session.delete(event)
@@ -187,7 +184,7 @@ def add_user_to_event(
module=PermissionModule.EVENT,
part=PermissionPart.ADMIN,
rights=PermissionRight.MANAGE_USERS,
) and (
) and not (
event.user_has_rights(
user=current_user, rights=(PermissionRight.MANAGE_USERS | rights_in.rights)
)
@@ -219,9 +216,7 @@ def remove_user_from_event(
module=PermissionModule.EVENT,
part=PermissionPart.ADMIN,
rights=PermissionRight.MANAGE_USERS,
) and not event.user_has_rights(
user=current_user, rights=PermissionRight.MANAGE_USERS
):
) and not event.user_has_rights(user=current_user, rights=PermissionRight.MANAGE_USERS):
raise HTTPException(status_code=403, detail="Not enough permissions")
user = session.get(User, user_id)
@@ -235,165 +230,3 @@ def remove_user_from_event(
# endregion
# region # Event / Teams #######################################################
@router.get("/{id}/teams", response_model=EventTeamsPublic, tags=router.tags + [ApiTags.TEAMS])
def read_event_teams(
session: SessionDep, current_user: CurrentUser, id: RowId, skip: int = 0, limit: int = 100
) -> Any:
"""
Retrieve event teams from a single event.
"""
# Event permissions
event = session.get(Event, id)
if not event:
raise HTTPException(status_code=404, detail="Event not found")
if not current_user.has_permission(
module=PermissionModule.EVENT,
part=PermissionPart.ADMIN,
rights=(PermissionRight.READ | PermissionRight.MANGE_TEAMS),
) and ( event and (event.user_has_right(user=current_user, rights=(PermissionRight.READ | PermissionRight.MANGE_TEAMS)))):
raise HTTPException(status_code=400, detail="Not enough permissions")
# Get list
count_statement = (
select(func.count())
.select_from(EventTeam)
.where(EventTeam.event_id == id)
)
count = session.exec(count_statement).one()
statement = (
select(EventTeam)
.where(EventTeam.event_id == id)
.offset(skip)
.limit(limit)
)
event_teams = session.exec(statement).all()
return EventTeamsPublic(data=event_teams, count=count)
@router.post("/{id}/teams", response_model=EventTeamPublic, tags=router.tags + [ApiTags.TEAMS])
def create_event_team(
*, session: SessionDep, current_user: CurrentUser, id: RowId, event_team_in: EventTeamCreate
) -> Any:
"""
Create new team inside event.
"""
event = session.get(Event, id)
if not current_user.has_permissions(
module=PermissionModule.EVENT,
part=PermissionPart.ADMIN,
rights=PermissionRight.MANGE_TEAMS,
) and ( event and (event.user_has_rights(user=current_user, rights=PermissionRight.MANGE_TEAMS))):
raise HTTPException(status_code=400, detail="Not enough permissions")
event_team = EventTeam.create(create_obj=event_team_in, event=event, session=session)
return event_team
@router.get("-teams", response_model=EventTeamsPublic, tags=router.tags + [ApiTags.TEAMS])
def read_all_event_teams(
session: SessionDep, current_user: CurrentUser, skip: int = 0, limit: int = 100
) -> Any:
"""
Retrieve all event teams.
"""
if not current_user.has_permissions(
module=PermissionModule.EVENT,
part=PermissionPart.ADMIN,
rights=PermissionRight.MANGE_TEAMS,
):
raise HTTPException(status_code=400, detail="Not enough permissions")
# Get list
count_statement = (
select(func.count())
.select_from(EventTeam)
)
count = session.exec(count_statement).one()
statement = (
select(EventTeam)
.offset(skip)
.limit(limit)
)
event_teams = session.exec(statement).all()
return EventTeamsPublic(data=event_teams, count=count)
@router.get("-teams/{id}", response_model=EventTeamPublic, tags=router.tags + [ApiTags.TEAMS])
def read_event_team(session: SessionDep, current_user: CurrentUser, id: RowId) -> Any:
"""
Get event team by ID.
"""
event_team = session.get(EventTeam, id)
if not event_team:
raise HTTPException(status_code=404, detail="Event team not found")
event = session.get(Event, event_team.event_id)
if not current_user.has_permissions(
module=PermissionModule.EVENT,
part=PermissionPart.ADMIN,
rights=PermissionRight.MANGE_TEAMS,
) and ( event and (event.user_has_rights(user=current_user, rights=PermissionRight.MANGE_TEAMS))):
raise HTTPException(status_code=400, detail="Not enough permissions")
return event_team
@router.put("-teams/{id}", response_model=EventTeamPublic, tags=router.tags + [ApiTags.TEAMS])
def create_event_team(
*, session: SessionDep, current_user: CurrentUser, id: RowId, event_team_in: EventTeamCreate
) -> Any:
"""
Update team.
"""
event_team = session.get(EventTeam, id)
if not event_team:
raise HTTPException(status_code=404, detail="Event team not found")
event = session.get(Event, event_team.event_id)
if not current_user.has_permissions(
module=PermissionModule.EVENT,
part=PermissionPart.ADMIN,
rights=PermissionRight.MANGE_TEAMS,
) and ( event and (event.user_has_rights(user=current_user, rights=PermissionRight.MANGE_TEAMS))):
raise HTTPException(status_code=400, detail="Not enough permissions")
event_team = EventTeam.update(db_obj=event_team, in_obj=event_team_in, session=session)
return event_team
@router.delete("-teams/{id}", tags=router.tags + [ApiTags.TEAMS])
def delete_event_team(session: SessionDep,current_user: CurrentUser, id: RowId) -> Message:
"""
Delete an event team.
"""
event_team = session.get(EventTeam, id)
if not event_team:
raise HTTPException(status_code=404, detail="Event team not found")
event = session.get(Event, event_team.event_id)
if not current_user.has_permissions(
module=PermissionModule.EVENT,
part=PermissionPart.ADMIN,
rights=PermissionRight.MANGE_TEAMS,
) and (event.user_has_rights(user=current_user, rights=PermissionRight.MANGE_TEAMS)):
raise HTTPException(status_code=400, detail="Not enough permissions")
session.delete(event_team)
session.commit()
return Message(message="Event team deleted successfully")
# endregion

View File

@@ -0,0 +1,193 @@
from typing import Any
from fastapi import APIRouter, HTTPException
from sqlmodel import func, select
from app.api.deps import CurrentUser, SessionDep
from app.models.base import (
ApiTags,
Message,
RowId,
)
from app.models.team import (
Team,
TeamCreate,
TeamUpdate,
TeamPublic,
TeamsPublic,
)
from app.models.event import (
Event,
EventUserLink,
)
from app.models.user import (
PermissionModule,
PermissionPart,
PermissionRight,
)
router = APIRouter(prefix="/teams", tags=[ApiTags.TEAMS])
# region # Teams ###############################################################
@router.get("/", response_model=TeamsPublic)
def read_teams(
session: SessionDep, current_user: CurrentUser, skip: int = 0, limit: int = 100
) -> Any:
"""
Retrieve all teams.
"""
if current_user.has_permissions(
module=PermissionModule.TEAM,
part=PermissionPart.ADMIN,
rights=PermissionRight.READ,
):
count_statement = select(func.count()).select_from(Team)
count = session.exec(count_statement).one()
statement = select(Team).offset(skip).limit(limit)
teams = session.exec(statement).all()
else:
# Only read teams that are connected to an event that the user can read
count_statement = (
select(func.count())
.select_from(Team)
.join(Event) # Join with Event to filter teams based on events
.join(EventUserLink) # Join with EventUserLink to check user permissions
.where(
EventUserLink.user_id == current_user.id,
# FIXME: (EventUserLink.rights & (PermissionRight.READ | PermissionRight.MANAGE_TEAMS)) > 0
)
)
count = session.exec(count_statement).one()
statement = (
select(Team)
.join(Event)
.join(EventUserLink)
.where(
EventUserLink.user_id == current_user.id,
# FIXME: (EventUserLink.rights & (PermissionRight.READ | PermissionRight.MANAGE_TEAMS)) > 0
)
.offset(skip)
.limit(limit)
)
teams = session.exec(statement).all()
return TeamsPublic(data=teams, count=count)
@router.get("/{id}", response_model=TeamPublic)
def read_team(session: SessionDep, current_user: CurrentUser, id: RowId) -> Any:
"""
Get team by ID.
"""
team = session.get(Team, id)
if not team:
raise HTTPException(status_code=404, detail="Team not found")
event = session.get(Event, team.event_id)
if not event:
raise HTTPException(status_code=404, detail="Event not found")
if not current_user.has_permissions(
module=PermissionModule.TEAM,
part=PermissionPart.ADMIN,
rights=PermissionRight.READ,
) and not (event.user_has_rights(user=current_user, rights=PermissionRight.MANAGE_TEAMS)):
raise HTTPException(status_code=400, detail="Not enough permissions")
return team
@router.post("/", response_model=TeamPublic)
def create_team(
*, session: SessionDep, current_user: CurrentUser, team_in: TeamCreate
) -> Any:
"""
Create new team.
"""
event = session.get(Event, team_in.event_id)
if not event:
raise HTTPException(status_code=404, detail="Event not found")
if not current_user.has_permissions(
module=PermissionModule.TEAM,
part=PermissionPart.ADMIN,
rights=PermissionRight.UPDATE,
) and not (event.user_has_rights(user=current_user, rights=PermissionRight.MANAGE_TEAMS)):
raise HTTPException(status_code=400, detail="Not enough permissions")
team = Team.create(create_obj=team_in, session=session)
return team
@router.put("/{id}", response_model=TeamPublic)
def update_team(
*, session: SessionDep, current_user: CurrentUser, id: RowId, team_in: TeamUpdate
) -> Any:
"""
Update a team.
"""
team = session.get(Team, id)
if not team:
raise HTTPException(status_code=404, detail="Team not found")
# Check user's permissions for the existing event
event = session.get(Event, team.event_id)
if not event:
raise HTTPException(status_code=404, detail="Event not found")
if not current_user.has_permissions(
module=PermissionModule.TEAM,
part=PermissionPart.ADMIN,
rights=PermissionRight.UPDATE,
) and not (event.user_has_rights(user=current_user, rights=PermissionRight.MANAGE_TEAMS)):
raise HTTPException(status_code=400, detail="Not enough permissions")
# Check rights for the new event data
if team_in.event_id:
event = session.get(Event, team_in.event_id)
if not event:
raise HTTPException(status_code=404, detail="New event not found")
if not current_user.has_permissions(
module=PermissionModule.TEAM,
part=PermissionPart.ADMIN,
rights=PermissionRight.UPDATE,
) and not (event.user_has_rights(user=current_user, rights=PermissionRight.MANAGE_TEAMS)):
raise HTTPException(status_code=400, detail="Not enough permissions")
# Update the team
team = Team.update(db_obj=team, in_obj=team_in, session=session)
return team
@router.delete("/{id}")
def delete_team(session: SessionDep,current_user: CurrentUser, id: RowId) -> Message:
"""
Delete a team.
"""
team = session.get(Team, id)
if not team:
raise HTTPException(status_code=404, detail="Team not found")
event = session.get(Event, team.event_id)
if not event:
raise HTTPException(status_code=404, detail="Event not found")
if not current_user.has_permissions(
module=PermissionModule.TEAM,
part=PermissionPart.ADMIN,
rights=PermissionRight.DELETE,
) and not (event.user_has_rights(user=current_user, rights=PermissionRight.MANAGE_TEAMS)):
raise HTTPException(status_code=400, detail="Not enough permissions")
session.delete(team)
session.commit()
return Message(message="Team deleted successfully")
# endregion

View File

@@ -5,6 +5,10 @@ from app.models.event import (
Event,
EventCreate,
)
from app.models.team import (
Team,
TeamCreate,
)
from app.models.user import (
Permission,
PermissionModule,
@@ -119,6 +123,16 @@ def init_db(session: Session) -> None:
event = Event.create(session=session, create_obj=event_in)
event.add_user(user, PermissionRight.ADMIN, session=session)
team = session.exec(
select(Event).where(Team.theme_name == "Laaiend vuur 熾熱的火 🔥")
).first()
if not team:
team_in = TeamCreate(
theme_name="Laaiend vuur 熾熱的火 🔥",
event_id=event.id,
)
team = Team.create(session=session, create_obj=team_in)
session.commit()
# endregion

View File

@@ -1,3 +1,5 @@
from typing import TYPE_CHECKING
from sqlmodel import (
Field,
Relationship,
@@ -15,6 +17,9 @@ from .user import (
User,
)
if TYPE_CHECKING:
from .team import Team
# region # Event ###############################################################
@@ -72,7 +77,7 @@ class Event(mixin.RowId, EventBase, table=True):
# --- many-to-many links ---------------------------------------------------
user_links: list["EventUserLink"] = Relationship(back_populates="event")
team_links: list["EventTeam"] = Relationship(back_populates="event")
team_links: list["Team"] = Relationship(back_populates="event")
# --- CRUD actions ---------------------------------------------------------
@classmethod
@@ -179,70 +184,3 @@ class EventsPublic(BaseSQLModel):
# endregion
# region EventTeam ############################################################
class EventTeamBase(mixin.ThemeName, mixin.CheckInCheckOut, mixin.Canceled, BaseSQLModel):
# scouting_team_id: RowId | None = Field(
# foreign_key="ScoutingTeam.id", nullable=False, ondelete="CASCADE"
# )
pass
# Properties to receive via API on creation
class EventTeamCreate(EventTeamBase):
pass
# Properties to receive via API on update, all are optional
class EventTeamUpdate(mixin.ThemeNameUpdate, EventTeamBase):
pass
class EventTeam(mixin.RowId, EventTeamBase, table=True):
# --- database only items --------------------------------------------------
# --- read only items ------------------------------------------------------
event_id: RowId = Field(
foreign_key="event.id", nullable=False, ondelete="CASCADE"
)
# --- back_populates links -------------------------------------------------
event: "Event" = Relationship(back_populates="team_links")#, cascade_delete=True)
# team: "ScoutingTeam" = Relationship(back_populates="event_links", cascade_delete=True)
# --- CRUD actions ---------------------------------------------------------
@classmethod
def create(cls, *, session: Session, create_obj: EventTeamCreate, event: Event) -> "EventTeam":
data_obj = create_obj.model_dump(exclude_unset=True)
db_obj = cls.model_validate(data_obj, update={"event_id": event.id})
session.add(db_obj)
session.commit()
session.refresh(db_obj)
return db_obj
@classmethod
def update(
cls, *, session: Session, db_obj: "EventTeam", in_obj: EventTeamUpdate
) -> "EventTeam":
data_obj = in_obj.model_dump(exclude_unset=True)
db_obj.sqlmodel_update(data_obj)
session.add(db_obj)
session.commit()
session.refresh(db_obj)
return db_obj
# Properties to return via API, id is always required
class EventTeamPublic(mixin.RowIdPublic, EventTeamBase):
event_id: RowId
class EventTeamsPublic(BaseSQLModel):
data: list[EventTeamPublic]
count: int
# endregion

View File

@@ -10,7 +10,7 @@ from .base import RowId as RowIdType
class Name(BaseModel):
name: str | None = Field(default=None, nullable=False, unique=True, max_length=255)
name: str | None = Field(default=None, nullable=False, unique=False, max_length=255)
class FullName(BaseModel):

View File

@@ -0,0 +1,87 @@
from typing import TYPE_CHECKING
from sqlmodel import (
Field,
Relationship,
Session,
)
from . import mixin
from .base import (
BaseSQLModel,
RowId,
)
if TYPE_CHECKING:
from .event import Event
# region # Team ################################################################
class TeamBase(
mixin.ThemeName,
mixin.CheckInCheckOut,
mixin.Canceled,
BaseSQLModel
):
event_id: RowId = Field(
foreign_key="event.id", nullable=False, ondelete="CASCADE"
)
# scouting_team_id: RowId | None = Field(
# foreign_key="ScoutingTeam.id", nullable=False, ondelete="CASCADE"
# )
# Properties to receive via API on creation
class TeamCreate(TeamBase):
pass
# Properties to receive via API on update, all are optional
class TeamUpdate(mixin.ThemeNameUpdate, TeamBase):
event_id: RowId | None = Field(default=None)
class Team(mixin.RowId, TeamBase, table=True):
# --- database only items --------------------------------------------------
# --- read only items ------------------------------------------------------
# --- back_populates links -------------------------------------------------
event: "Event" = Relationship(back_populates="team_links")#, cascade_delete=True)
# team: "ScoutingTeam" = Relationship(back_populates="event_links", cascade_delete=True)
# --- CRUD actions ---------------------------------------------------------
@classmethod
def create(cls, *, session: Session, create_obj: TeamCreate) -> "Team":
data_obj = create_obj.model_dump(exclude_unset=True)
db_obj = cls.model_validate(data_obj)
session.add(db_obj)
session.commit()
session.refresh(db_obj)
return db_obj
@classmethod
def update(
cls, *, session: Session, db_obj: "Team", in_obj: TeamUpdate
) -> "Team":
data_obj = in_obj.model_dump(exclude_unset=True)
db_obj.sqlmodel_update(data_obj)
session.add(db_obj)
session.commit()
session.refresh(db_obj)
return db_obj
# Properties to return via API, id is always required
class TeamPublic(mixin.RowIdPublic, TeamBase):
event_id: RowId
class TeamsPublic(BaseSQLModel):
data: list[TeamPublic]
count: int
# endregion

View File

@@ -26,6 +26,7 @@ class PermissionModule(DocumentedStrEnum):
SYSTEM = auto_enum()
USER = auto_enum()
EVENT = auto_enum()
TEAM = auto_enum()
class PermissionPart(DocumentedStrEnum):
@@ -40,9 +41,9 @@ class PermissionRight(DocumentedIntFlag):
DELETE = auto_enum()
MANAGE_USERS = auto_enum()
MANGE_TEAMS = auto_enum()
MANAGE_TEAMS = auto_enum()
ADMIN = CREATE | READ | UPDATE | DELETE | MANAGE_USERS | MANGE_TEAMS
ADMIN = CREATE | READ | UPDATE | DELETE | MANAGE_USERS | MANAGE_TEAMS
class PermissionRightObject(BaseSQLModel):

View File

@@ -0,0 +1,299 @@
import uuid
from fastapi.testclient import TestClient
from sqlmodel import Session
from app.models.user import PermissionRight
from app.core.config import settings
from app.tests.conftest import EventUserHeader
from app.tests.utils.event import create_random_event
from app.tests.utils.team import create_random_team
def test_create_team(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
event = create_random_event(db)
data = {
"theme_name": "Foo",
"event_id": str(event.id),
}
response = client.post(
f"{settings.API_V1_STR}/teams/",
headers=superuser_token_headers,
json=data,
)
assert response.status_code == 200
content = response.json()
assert content["theme_name"] == data["theme_name"]
assert content["event_id"] == str(event.id)
assert "id" in content
def test_create_team_without_event(client: TestClient, superuser_token_headers: dict[str, str]) -> None:
data = {
"theme_name": "No Event Team",
}
response = client.post(
f"{settings.API_V1_STR}/teams/",
headers=superuser_token_headers,
json=data,
)
assert response.status_code == 422
assert response.json()["detail"][0]["loc"] == ["body", "event_id"]
def test_create_team_with_incorrect_event(client: TestClient, superuser_token_headers: dict[str, str]) -> None:
data = {
"theme_name": "No Event Team",
"event_id": str(uuid.uuid4()), # Non-existent event
}
response = client.post(
f"{settings.API_V1_STR}/teams/",
headers=superuser_token_headers,
json=data,
)
assert response.status_code == 404
assert response.json()["detail"] == "Event not found"
def test_read_team(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
team = create_random_team(db)
response = client.get(
f"{settings.API_V1_STR}/teams/{team.id}",
headers=superuser_token_headers,
)
assert response.status_code == 200
content = response.json()
assert content["id"] == str(team.id)
assert content["theme_name"] == team.theme_name
assert content["event_id"] == str(team.event_id)
def test_read_team_not_found(client: TestClient, superuser_token_headers: dict[str, str]) -> None:
response = client.get(
f"{settings.API_V1_STR}/teams/{uuid.uuid4()}",
headers=superuser_token_headers,
)
assert response.status_code == 404
assert response.json()["detail"] == "Team not found"
def test_read_event_not_enough_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
team = create_random_team(db)
response = client.get(
f"{settings.API_V1_STR}/teams/{team.id}",
headers=normal_user_token_headers,
)
assert response.status_code == 400
assert response.json()["detail"] == "Not enough permissions"
def test_read_team_with_event_user(client: TestClient, event_user_token_headers: EventUserHeader, db: Session) -> None:
team = create_random_team(db, event=event_user_token_headers.event)
response = client.get(
f"{settings.API_V1_STR}/teams/{team.id}",
headers=event_user_token_headers.headers,
)
assert response.status_code == 200
content = response.json()
assert content["id"] == str(team.id)
assert content["theme_name"] == team.theme_name
assert content["event_id"] == str(event_user_token_headers.event.id)
def test_read_teams(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
create_random_team(db)
create_random_team(db)
response = client.get(
f"{settings.API_V1_STR}/teams/",
headers=superuser_token_headers,
)
assert response.status_code == 200
content = response.json()
assert "data" in content
assert isinstance(content["data"], list)
assert content["count"] >= 2
def test_read_teams_with_normal_user(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
create_random_team(db)
create_random_team(db)
response = client.get(
f"{settings.API_V1_STR}/teams/",
headers=normal_user_token_headers,
)
assert response.status_code == 200
content = response.json()
assert content["count"] == 0
def test_read_teams_with_event_user(client: TestClient, event_user_token_headers: EventUserHeader, db: Session) -> None:
team = create_random_team(db, event=event_user_token_headers.event)
response = client.get(
f"{settings.API_V1_STR}/teams/",
headers=event_user_token_headers.headers,
)
assert response.status_code == 200
content = response.json()
assert "data" in content
assert isinstance(content["data"], list)
assert content["count"] >= 1
def test_update_team_name(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
team = create_random_team(db)
data = {"theme_name": "Updated Team Name"}
response = client.put(
f"{settings.API_V1_STR}/teams/{team.id}",
headers=superuser_token_headers,
json=data,
)
assert response.status_code == 200
content = response.json()
assert content["id"] == str(team.id)
assert content["theme_name"] == data["theme_name"]
assert content["event_id"] == str(team.event_id)
def test_update_team_not_found(client: TestClient, superuser_token_headers: dict[str, str]) -> None:
data = {"theme_name": "Non-existent team"}
response = client.put(
f"{settings.API_V1_STR}/teams/{uuid.uuid4()}",
headers=superuser_token_headers,
json=data,
)
assert response.status_code == 404
assert response.json()["detail"] == "Team not found"
def test_update_team_not_enough_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
team = create_random_team(db)
data = {"theme_name": "Not enough permissions team"}
response = client.put(
f"{settings.API_V1_STR}/teams/{team.id}",
headers=normal_user_token_headers,
json=data,
)
assert response.status_code == 400
assert response.json()["detail"] == "Not enough permissions"
def test_update_team_name_with_event_permissions(client: TestClient, event_user_token_headers: EventUserHeader, db: Session) -> None:
team = create_random_team(db, event=event_user_token_headers.event)
data = {"theme_name": "Updated Team Name with Event permissions"}
response = client.put(
f"{settings.API_V1_STR}/teams/{team.id}",
headers=event_user_token_headers.headers,
json=data,
)
assert response.status_code == 200
content = response.json()
assert content["id"] == str(team.id)
assert content["theme_name"] == data["theme_name"]
assert content["event_id"] == str(event_user_token_headers.event.id)
def test_update_team_event(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
team = create_random_team(db)
new_event = create_random_event(db)
data = {"event_id": str(new_event.id)}
response = client.put(
f"{settings.API_V1_STR}/teams/{team.id}",
headers=superuser_token_headers,
json=data,
)
assert response.status_code == 200
content = response.json()
assert content["id"] == str(team.id)
assert content["theme_name"] == team.theme_name
assert content["event_id"] == str(new_event.id)
def test_update_team_event_not_found(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
team = create_random_team(db)
data = {"event_id": str(uuid.uuid4())}
response = client.put(
f"{settings.API_V1_STR}/teams/{team.id}",
headers=superuser_token_headers,
json=data,
)
assert response.status_code == 404
assert response.json()["detail"] == "New event not found"
def test_update_team_event_with_event_user(client: TestClient, event_user_token_headers: EventUserHeader, db: Session) -> None:
team = create_random_team(db, event=event_user_token_headers.event)
new_event = create_random_event(db)
new_event.add_user(user=event_user_token_headers.user, rights=PermissionRight.MANAGE_TEAMS, session=db)
data = {"event_id": str(new_event.id)}
response = client.put(
f"{settings.API_V1_STR}/teams/{team.id}",
headers=event_user_token_headers.headers,
json=data,
)
assert response.status_code == 200
content = response.json()
assert content["id"] == str(team.id)
assert content["theme_name"] == team.theme_name
assert content["event_id"] == str(new_event.id)
def test_update_team_event_with_event_user_not_enough_permissions(client: TestClient, event_user_token_headers: EventUserHeader, db: Session) -> None:
team = create_random_team(db, event=event_user_token_headers.event)
new_event = create_random_event(db)
data = {"event_id": str(new_event.id)}
response = client.put(
f"{settings.API_V1_STR}/teams/{team.id}",
headers=event_user_token_headers.headers,
json=data,
)
assert response.status_code == 400
assert response.json()["detail"] == "Not enough permissions"
def test_delete_team(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
team = create_random_team(db)
response = client.delete(
f"{settings.API_V1_STR}/teams/{team.id}",
headers=superuser_token_headers,
)
assert response.status_code == 200
assert response.json()["message"] == "Team deleted successfully"
def test_delete_team_not_found(client: TestClient, superuser_token_headers: dict[str, str]) -> None:
response = client.delete(
f"{settings.API_V1_STR}/teams/{uuid.uuid4()}",
headers=superuser_token_headers,
)
assert response.status_code == 404
assert response.json()["detail"] == "Team not found"
def test_delete_not_enough_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
team = create_random_team(db)
response = client.delete(
f"{settings.API_V1_STR}/teams/{team.id}",
headers=normal_user_token_headers,
)
assert response.status_code == 400
assert response.json()["detail"] == "Not enough permissions"
def test_delete_team_with_event_user(client: TestClient, event_user_token_headers: EventUserHeader, db: Session) -> None:
team = create_random_team(db, event=event_user_token_headers.event)
response = client.delete(
f"{settings.API_V1_STR}/teams/{team.id}",
headers=event_user_token_headers.headers,
)
assert response.status_code == 200
assert response.json()["message"] == "Team deleted successfully"

View File

@@ -0,0 +1,18 @@
from sqlmodel import Session
from app.models.event import Event
from app.models.team import Team, TeamCreate
from app.tests.utils.event import create_random_event
from app.tests.utils.utils import random_lower_string
def create_random_team(db: Session, event: Event | None = None) -> Team:
name = random_lower_string()
if not event:
event = create_random_event(db)
team_in = TeamCreate(theme_name=name, event_id=event.id)
team = Team.create(session=db, create_obj=team_in)
return team

View File

@@ -3,6 +3,6 @@
set -e
set -x
coverage run --source=app -m pytest
coverage run --source=app -m pytest ${TEST}
coverage report --show-missing
coverage html --title "${@-coverage}"