Add base for members

This commit is contained in:
Sebastiaan
2025-06-17 21:23:14 +02:00
parent 1e6b138873
commit 479ca1986f
16 changed files with 921 additions and 63 deletions

View File

@@ -5,6 +5,7 @@ from app.api.routes import (
teams,
associations,
divisions,
members,
login,
private,
users,
@@ -22,6 +23,7 @@ api_router.include_router(events.router)
api_router.include_router(teams.router)
api_router.include_router(associations.router)
api_router.include_router(divisions.router)
api_router.include_router(members.router)
if settings.ENVIRONMENT == "local":

View File

@@ -0,0 +1,208 @@
from typing import Any
import sqlalchemy
from fastapi import APIRouter, HTTPException, status
from sqlmodel import func, select, and_, or_, SQLModel
from sqlalchemy.orm import joinedload
from app.api.deps import CurrentUser, SessionDep
from app.models.base import (
ApiTags,
Message,
RowId,
)
from app.models.member import (
Member,
MemberCreate,
MemberUpdate,
MemberPublic,
MembersPublic,
MemberTeamLink,
)
from app.models.event import Event, EventUserLink
from app.models.team import Team
from app.models.user import (
PermissionModule,
PermissionPart,
PermissionRight,
)
router = APIRouter(prefix="/members", tags=[ApiTags.MEMBERS])
# region # Members #############################################################
def load_member(
session: SessionDep,
current_user: CurrentUser,
id: RowId | None = None,
module: PermissionModule = PermissionModule.MEMBER,
part: PermissionPart = PermissionPart.ADMIN,
user_rights: PermissionRight | None = None,
event_rights: PermissionRight | None = PermissionRight.MANAGE_MEMBERS,
) -> Member | None:
member = session.get(Member, id)
if id and not member:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Member not found")
no_links = True
valid = False
# Global member permissions
if current_user.has_permissions(module=module, part=part, rights=user_rights):
# Also valid for create new
valid = True
# Own member items
elif hasattr(member, "user") and member.user and member.user == current_user:
valid = True
# Event member permissions
elif hasattr(member, "team_links"):
for link in member.team_links:
team = link.team
if team and team.event:
no_links = False
if team.event.user_has_rights(user=current_user, rights=event_rights):
valid = True
break
# Not yet linked, or unlinked member
if no_links and hasattr(member, "created_by") and member.created_by == current_user.id:
valid = True
if not valid:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions")
return member
@router.get("/", response_model=MembersPublic)
def read_members(
session: SessionDep, current_user: CurrentUser, skip: int = 0, limit: int = 100
) -> Any:
"""
Retrieve all members.
"""
if current_user.has_permissions(
module=PermissionModule.MEMBER,
part=PermissionPart.ADMIN,
rights=PermissionRight.READ,
):
data_query = (
select(Member)
)
else:
data_query = (
select(Member)
.outerjoin(MemberTeamLink, MemberTeamLink.member_id == Member.id)
.outerjoin(Team, MemberTeamLink.team_id == Team.id)
.outerjoin(Event, Team.event_id == Event.id)
.outerjoin(EventUserLink, EventUserLink.event_id == Event.id)
.where(
or_(
# Own member
Member.id == current_user.member_id,
# Created by user and unlinked
and_(
Member.created_by == current_user.id,
MemberTeamLink.team_id == None
),
# Event permissions via team -> event -> EventUserLink
and_(
EventUserLink.user_id == current_user.id,
# FIXME: EventUserLink.rights.op("&")(PermissionRight.MANAGE_MEMBERS) != 0
),
)
)
)
# Cache as subquery
data_sub_query = data_query.subquery()
aliased_member = sqlalchemy.orm.aliased(Member, data_sub_query)
# Count using subquery
count = session.exec(
select(func.count()).select_from(data_sub_query)
).one()
# Paginated data query using same subquery
data = session.exec(
select(aliased_member).offset(skip).limit(limit)
).all()
return MembersPublic(count=count, data=data)
@router.get("/{id}", response_model=MemberPublic)
def read_member(session: SessionDep, current_user: CurrentUser, id: RowId) -> Any:
"""
Get member by ID.
"""
member = load_member(
session=session,
current_user=current_user,
id=id,
user_rights=PermissionRight.READ,
)
return member
@router.post("/", response_model=MemberPublic)
def create_member(
*, session: SessionDep, current_user: CurrentUser, member_in: MemberCreate
) -> Any:
"""
Create new member.
"""
load_member(
session=session,
current_user=current_user,
user_rights=PermissionRight.CREATE,
)
member = Member.create(create_obj=member_in, session=session, user=current_user)
return member
@router.put("/{id}", response_model=MemberPublic)
def update_member(
*, session: SessionDep, current_user: CurrentUser, id: RowId, member_in: MemberUpdate
) -> Any:
"""
Update a member.
"""
member = load_member(
session=session,
current_user=current_user,
id=id,
user_rights=PermissionRight.UPDATE,
)
member = Member.update(db_obj=member, in_obj=member_in, session=session)
return member
@router.delete("/{id}")
def delete_member(session: SessionDep,current_user: CurrentUser, id: RowId) -> Message:
"""
Delete a member.
"""
member = load_member(
session=session,
current_user=current_user,
id=id,
user_rights=PermissionRight.DELETE,
)
session.delete(member)
session.commit()
return Message(message="Member deleted successfully")
# endregion

View File

@@ -17,7 +17,6 @@ router = APIRouter(tags=[ApiTags.PRIVATE], prefix="/private")
class PrivateUserCreate(BaseModel):
email: str
password: str
full_name: str
is_verified: bool = False
@@ -29,7 +28,6 @@ def create_user(user_in: PrivateUserCreate, session: SessionDep) -> Any:
user = User(
email=user_in.email,
full_name=user_in.full_name,
hashed_password=get_password_hash(user_in.password),
)

View File

@@ -4,11 +4,13 @@ from fastapi import APIRouter, HTTPException, status
from sqlmodel import func, select
from app.api.deps import CurrentUser, SessionDep
from app.api.routes.members import load_member
from app.models.base import (
ApiTags,
Message,
RowId,
)
from app.models.member import MemberTeamLink, MemberTeamLinkCreate, MemberTeamLinkUpdate
from app.models.team import (
Team,
TeamCreate,
@@ -263,3 +265,136 @@ def delete_team_division_link(session: SessionDep, current_user: CurrentUser, id
return Message(message="Division deleted from team successfully")
# endregion
# region # Teams / Members #####################################################
def load_member_link(team: Team, member_id: RowId):
link = next((link for link in team.member_links if link.member_id == member_id), None)
if not link:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Member not found")
return link
@router.get("/{team_id}/members", response_model=TeamsPublic)
def read_team_member_links(
session: SessionDep, current_user: CurrentUser, team_id: RowId, skip: int = 0, limit: int = 100
) -> Any:
"""
Retrieve all member links from a teams.
"""
if current_user.has_permissions(
module=PermissionModule.TEAM,
part=PermissionPart.ADMIN,
rights=PermissionRight.READ,
):
data_query = (
select(MemberTeamLink)
.where(MemberTeamLink.team_id == team_id)
)
else:
data_query = (
select(MemberTeamLink)
.join(Team, Team.id == MemberTeamLink.team_id)
.join(EventUserLink, EventUserLink.event_id == Team.event_id)
.where(
MemberTeamLink.team_id == team_id,
EventUserLink.user_id == current_user.id,
# FIXME: (EventUserLink.rights & (PermissionRight.MANAGE_MEMBERS)) > 0
)
)
count = session.exec(select(func.count()).select_from(data_query.subquery())).one()
data = session.exec(data_query.offset(skip).limit(limit)).all()
return TeamsPublic(data=data, count=count)
@router.get("/{team_id}/members/{member_id}", response_model=TeamPublic)
def read_team_member_link(session: SessionDep, current_user: CurrentUser, team_id: RowId, member_id: RowId) -> Any:
"""
Get member link by member ID.
"""
team = load_team(
session=session,
current_user=current_user,
id=team_id,
user_rights=PermissionRight.MANAGE_MEMBERS,
event_rights=PermissionRight.MANAGE_MEMBERS,
)
link = load_member_link(team=team, member_id=member_id)
return link
@router.post("/{team_id}/members", response_model=TeamPublic)
def create_team_member_link(
*, session: SessionDep, current_user: CurrentUser, team_id: RowId, link_in: MemberTeamLinkCreate
) -> Any:
"""
Create new team.
"""
team = load_team(
session=session,
current_user=current_user,
id=team_id,
user_rights=PermissionRight.MANAGE_MEMBERS,
event_rights=PermissionRight.MANAGE_MEMBERS,
)
# Check if user has rights for current status of the member
load_member(
session=session,
current_user=current_user,
id=link_in.member_id,
user_rights=PermissionRight.MANAGE_MEMBERS,
)
link = MemberTeamLink.create(session=session, create_obj=link_in, team=team)
return link
@router.put("/{team_id}/members/{member_id}", response_model=TeamPublic)
def update_team_member_link(
*, session: SessionDep, current_user: CurrentUser, team_id: RowId, member_id: RowId, link_in: MemberTeamLinkUpdate
) -> Any:
"""
Update a team member link.
"""
team = load_team(
session=session,
current_user=current_user,
id=team_id,
user_rights=PermissionRight.MANAGE_MEMBERS,
event_rights=PermissionRight.MANAGE_MEMBERS,
)
link = load_member_link(team=team, member_id=member_id)
link = MemberTeamLink.update(session=session, db_obj=link, in_obj=link_in)
return link
@router.delete("/{team_id}/members/{member_id}")
def delete_team_member_link(session: SessionDep,current_user: CurrentUser, team_id: RowId, member_id: RowId) -> Message:
"""
Delete a team member link.
"""
team = load_team(
session=session,
current_user=current_user,
id=team_id,
user_rights=PermissionRight.MANAGE_MEMBERS,
event_rights=PermissionRight.MANAGE_MEMBERS,
)
link = load_member_link(team=team, member_id=member_id)
session.delete(link)
session.commit()
return Message(message="Team member link deleted successfully")
# endregion

View File

@@ -19,6 +19,7 @@ from app.models.apikey import (
ApiKeysPublic,
)
from app.models.base import ApiTags, Message, RowId
from app.models.member import MemberPublic, MemberUpdate, Member
from app.models.user import (
PermissionModule,
PermissionPart,
@@ -195,6 +196,37 @@ def read_user_me(current_user: CurrentUser) -> Any:
return current_user
@router.get("/me/member", response_model=MemberPublic, tags=[ApiTags.MEMBERS])
def read_user_me_member(current_user: CurrentUser) -> Any:
"""
Get current user member.
"""
return current_user.member
@router.put("/me/member", response_model=MemberPublic, tags=[ApiTags.MEMBERS])
def update_user_me_member(
*, session: SessionDep, current_user: CurrentUser, member_in: MemberUpdate
) -> Any:
"""
Get current user member.
"""
member = session.get(Member, current_user.member_id)
data_obj = member_in.model_dump(exclude_unset=True)
if not member:
member = Member.model_validate(data_obj)
current_user.member_id = member.id
session.add(current_user)
else:
member.sqlmodel_update(data_obj)
session.add(member)
session.commit()
session.refresh(member)
return member
@router.delete("/me", response_model=Message)
def delete_user_me(session: SessionDep, current_user: CurrentUser) -> Any:
"""

View File

@@ -24,6 +24,9 @@ from app.models.user import (
User,
UserCreate,
)
from app.models.member import (
Member, MemberCreate,
)
from app.models.apikey import (
ApiKey,
)
@@ -115,6 +118,15 @@ def init_db(session: Session) -> None:
)
user = User.create(session=session, create_obj=user_in)
user.add_role(db_obj=system_admin_role, session=session)
if not user.member_id:
member_in = MemberCreate(
name="Super Admin",
)
member = Member.create(session=session, create_obj=member_in, user=user)
user.member = member
session.add(user)
session.commit()
event = session.exec(

View File

@@ -58,6 +58,7 @@ class ApiTags(DocumentedStrEnum):
TEAMS = "Teams"
ASSOCIATIONS = "Associations"
DIVISIONS = "Divisions"
MEMBERS = "Members"
# endregion

View File

@@ -57,8 +57,6 @@ class DivisionTeamLink(DivisionTeamLinkBase, table=True):
division: "Division" = Relationship(back_populates="team_links")
team: "Team" = Relationship(back_populates="division_link")
# Members (1 lid > meerdere teams | many-to-one)
# --- CRUD actions ---------------------------------------------------------
@classmethod
def create(cls, *, session: Session, create_obj: DivisionTeamLinkCreate, team: "Team") -> "DivisionTeamLink":

View File

@@ -0,0 +1,191 @@
from typing import TYPE_CHECKING, Optional
from sqlmodel import (
Session,
Field,
Relationship,
)
from . import mixin
from .base import (
BaseSQLModel,
DocumentedIntFlag,
auto_enum,
RowId,
)
if TYPE_CHECKING:
from .user import User
from .team import Team
# region # Member / Teams ######################################################
class MemberRank(DocumentedIntFlag):
TEAM_MEMBER = auto_enum()
TEAM_ASSISTANT_LEADER = auto_enum()
TEAM_LEADER = auto_enum()
DIVISION_LEADER = auto_enum()
VOLUNTEER = auto_enum()
# ##############################################################################
# Shared properties
class MemberTeamLinkBase(BaseSQLModel):
rank: MemberRank = Field(default=MemberRank.TEAM_MEMBER, nullable=False)
# Properties to receive via API on creation
class MemberTeamLinkCreate(MemberTeamLinkBase):
member_id: RowId = Field(default=None, nullable=False)
# Properties to receive via API on update, all are optional
class MemberTeamLinkUpdate(MemberTeamLinkBase):
pass
# Database model, database table inferred from class name
class MemberTeamLink(MemberTeamLinkBase, table=True):
# --- database only items --------------------------------------------------
# --- read only items ------------------------------------------------------
# --- back_populates links -------------------------------------------------
member_id: RowId = Field(
foreign_key="member.id",
primary_key=True,
nullable=False,
ondelete="CASCADE",
)
team_id: RowId = Field(
foreign_key="team.id",
primary_key=True,
nullable=False,
ondelete="CASCADE",
)
member: "Member" = Relationship(back_populates="team_links")
team: "Team" = Relationship(back_populates="member_links")
# --- CRUD actions ---------------------------------------------------------
@classmethod
def create(cls, *, session: Session, create_obj: MemberTeamLinkCreate, team: "Team") -> "MemberTeamLink":
data_obj = create_obj.model_dump(exclude_unset=True)
db_obj = cls.model_validate(data_obj, update={"team_id": team.id})
session.add(db_obj)
session.commit()
session.refresh(db_obj)
return db_obj
@classmethod
def update(
cls, *, session: Session, db_obj: "MemberTeamLink", in_obj: MemberTeamLinkUpdate
) -> "MemberTeamLink":
data_obj = in_obj.model_dump(exclude_unset=True)
db_obj.sqlmodel_update(data_obj)
session.add(db_obj)
session.commit()
session.refresh(db_obj)
return db_obj
# Properties to return via API
class MemberTeamLinkPublic(MemberTeamLinkBase):
member_id: RowId
team_id: RowId
class MemberTeamLinksPublic(BaseSQLModel):
data: list[MemberTeamLinkPublic]
count: int
# endregion
# region # Member ##############################################################
# Shared properties
class MemberBase(
mixin.Name,
mixin.Contact,
mixin.ScoutingId,
mixin.Comment,
mixin.Allergy,
mixin.Birthday,
mixin.Canceled,
BaseSQLModel,
):
pass
# Properties to receive via API on creation
class MemberCreate(MemberBase):
pass
# Properties to receive via API on update, all are optional
class MemberUpdate(MemberBase):
pass
# Database model, database table inferred from class name
class Member(mixin.RowId, mixin.Created, MemberBase, table=True):
# --- database only items --------------------------------------------------
# --- read only items ------------------------------------------------------
# --- back_populates links -------------------------------------------------
user: Optional["User"] = Relationship(
back_populates="member",
sa_relationship_kwargs={"foreign_keys": "User.member_id"},
)
team_links: list["MemberTeamLink"] = Relationship(back_populates="member", cascade_delete=True)
# --- CRUD actions ---------------------------------------------------------
@classmethod
def create(cls, *, session: Session, create_obj: MemberCreate, user: Optional["User"] = None) -> "Member":
data_obj = create_obj.model_dump(exclude_unset=True)
extra_fields = {}
if user:
extra_fields["created_by"] = user.id
db_obj = cls.model_validate(data_obj, update=extra_fields)
session.add(db_obj)
session.commit()
session.refresh(db_obj)
return db_obj
@classmethod
def update(
cls, *, session: Session, db_obj: "Member", in_obj: MemberUpdate
) -> "Member":
data_obj = in_obj.model_dump(exclude_unset=True)
db_obj.sqlmodel_update(data_obj)
session.add(db_obj)
session.commit()
session.refresh(db_obj)
return db_obj
# Properties to return via API, id is always required
class MemberPublic(mixin.RowIdPublic, MemberBase):
# TODO: Return user_id
pass
class MembersPublic(BaseSQLModel):
data: list[MemberPublic]
count: int
# endregion

View File

@@ -1,5 +1,5 @@
import uuid
from datetime import datetime
from datetime import datetime, date
from pydantic import BaseModel, EmailStr
from sqlmodel import (
@@ -7,6 +7,7 @@ from sqlmodel import (
)
from .base import RowId as RowIdType
from ..core.config import settings
class Name(BaseModel):
@@ -89,6 +90,14 @@ class Description(BaseModel):
description: str | None = Field(default=None, nullable=True, max_length=512)
class Comment(BaseModel):
comment: str | None = Field(default=None, nullable=True, max_length=512)
class Allergy(BaseModel):
allergy: str | None = Field(default=None, nullable=True, max_length=512)
class StartEndDate:
start_at: datetime | None = Field(default=None, nullable=True)
end_at: datetime | None = Field(default=None, nullable=True)
@@ -104,3 +113,10 @@ class CheckInCheckOut(BaseModel):
checkout_at: datetime | None = Field(default=None, nullable=True)
class Birthday(BaseModel):
birthday_at: date | None = Field(default=None, nullable=True)
class Created(BaseModel):
created_at: datetime | None = Field(nullable=False, default_factory=lambda: datetime.now(settings.tz_info))
created_by: RowIdType | None = Field(default=None, nullable=True, foreign_key="user.id", ondelete="SET NULL")

View File

@@ -15,6 +15,7 @@ from .base import (
if TYPE_CHECKING:
from .event import Event
from .division import DivisionTeamLink
from .member import MemberTeamLink
# region # Team ################################################################
@@ -49,6 +50,7 @@ class Team(mixin.RowId, TeamBase, table=True):
# --- back_populates links -------------------------------------------------
event: "Event" = Relationship(back_populates="teams")
division_link: "DivisionTeamLink" = Relationship(back_populates="team", cascade_delete=True)
member_links: list["MemberTeamLink"] = Relationship(back_populates="team", cascade_delete=True)
# --- CRUD actions ---------------------------------------------------------
@classmethod

View File

@@ -1,4 +1,4 @@
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Optional
from pydantic import EmailStr, field_validator
from sqlmodel import Field, Relationship, Session, select
@@ -17,6 +17,7 @@ from .base import (
if TYPE_CHECKING:
from .apikey import ApiKey
from .event import EventUserLink
from .member import Member
# region # User ################################################################
@@ -29,6 +30,7 @@ class PermissionModule(DocumentedStrEnum):
TEAM = auto_enum()
ASSOCIATION = auto_enum()
DIVISION = auto_enum()
MEMBER = auto_enum()
class PermissionPart(DocumentedStrEnum):
@@ -45,8 +47,17 @@ class PermissionRight(DocumentedIntFlag):
MANAGE_USERS = auto_enum()
MANAGE_TEAMS = auto_enum()
MANAGE_DIVISIONS = auto_enum()
MANAGE_MEMBERS = auto_enum()
ADMIN = CREATE | READ | UPDATE | DELETE | MANAGE_USERS | MANAGE_TEAMS | MANAGE_DIVISIONS
ADMIN = ( CREATE
| READ
| UPDATE
| DELETE
| MANAGE_USERS
| MANAGE_TEAMS
| MANAGE_DIVISIONS
| MANAGE_MEMBERS
)
# ##############################################################################
@@ -75,13 +86,16 @@ class UserRoleLink(BaseSQLModel, table=True):
class UserBase(
mixin.UserName,
mixin.Email,
mixin.FullName,
mixin.ScoutingId,
mixin.IsActive,
mixin.IsVerified,
BaseSQLModel,
):
pass
member_id: RowId | None = Field(
default=None,
foreign_key="member.id",
nullable=True,
ondelete="SET NULL",
)
# Properties to receive via API on creation
@@ -89,7 +103,7 @@ class UserCreate(mixin.Password, UserBase):
pass
class UserRegister(mixin.Password, mixin.FullName, BaseSQLModel):
class UserRegister(mixin.Password, BaseSQLModel):
email: EmailStr = Field(max_length=255)
@@ -98,7 +112,7 @@ class UserUpdate(mixin.EmailUpdate, mixin.PasswordUpdate, UserBase):
pass
class UserUpdateMe(mixin.FullName, mixin.EmailUpdate, BaseSQLModel):
class UserUpdateMe(mixin.EmailUpdate, BaseSQLModel):
pass
@@ -113,6 +127,10 @@ class User(mixin.RowId, UserBase, table=True):
hashed_password: str
# --- back_populates links -------------------------------------------------
member: Optional["Member"] = Relationship(
back_populates="user",
sa_relationship_kwargs={"foreign_keys": "User.member_id"},
)
api_keys: list["ApiKey"] = Relationship(back_populates="user", cascade_delete=True)
# --- many-to-many links ---------------------------------------------------

View File

@@ -0,0 +1,273 @@
import uuid
from fastapi import status
from fastapi.testclient import TestClient
from sqlmodel import Session, select
from app.core.config import settings
from app.models.member import Member, MemberTeamLink, MemberTeamLinkCreate, MemberRank
from app.models.user import User
from app.tests.conftest import EventUserHeader
from app.tests.utils.team import create_random_team
from app.tests.utils.member import create_random_member
from app.tests.utils.user import create_random_user, authentication_token_from_user
def test_create_member(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
data = {
"name": "John Do",
"scouting_id": "12345678",
"allergy": "Do not feed Tomatoes",
}
response = client.post(
f"{settings.API_V1_STR}/members/",
headers=superuser_token_headers,
json=data,
)
assert response.status_code == status.HTTP_200_OK
content = response.json()
assert content["name"] == data["name"]
assert content["scouting_id"] == data["scouting_id"]
assert content["allergy"] == data["allergy"]
assert "contact" in content
assert "comment" in content
assert "birthday_at" in content
assert "canceled_at" in content
assert "canceled_reason" in content
assert "id" in content
member_query = select(Member).where(Member.id == content["id"])
member_db = db.exec(member_query).first()
assert member_db
assert member_db.name == data["name"]
assert member_db.scouting_id == data["scouting_id"]
assert member_db.allergy == data["allergy"]
user = User.get_by_email(session=db, email=settings.FIRST_SUPERUSER)
assert member_db.created_by == user.id
def test_create_member_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
data = {
"name": "No John",
"scouting_id": "0",
"comment": "Is not existing",
}
response = client.post(
f"{settings.API_V1_STR}/members/",
headers=normal_user_token_headers,
json=data,
)
assert response.status_code == status.HTTP_403_FORBIDDEN
assert response.json()["detail"] == "Not enough permissions"
def test_read_member(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
member = create_random_member(db)
response = client.get(
f"{settings.API_V1_STR}/members/{member.id}",
headers=superuser_token_headers,
)
assert response.status_code == status.HTTP_200_OK
content = response.json()
assert content["id"] == str(member.id)
assert content["name"] == member.name
assert content["contact"] == member.contact
assert content["scouting_id"] == member.scouting_id
assert content["comment"] == member.comment
assert content["allergy"] == member.allergy
assert content["birthday_at"] == member.birthday_at
assert content["canceled_at"] == member.canceled_at
assert content["canceled_reason"] == member.canceled_reason
def test_read_member_event_user(client: TestClient, event_user_token_headers: EventUserHeader, db: Session) -> None:
team = create_random_team(db, event=event_user_token_headers.event)
member = create_random_member(db)
link = MemberTeamLinkCreate(member_id=member.id, rank=MemberRank.TEAM_MEMBER)
MemberTeamLink.create(session=db, create_obj=link, team=team)
response = client.get(
f"{settings.API_V1_STR}/members/{member.id}",
headers=event_user_token_headers.headers,
)
assert response.status_code == status.HTTP_200_OK
content = response.json()
assert content["id"] == str(member.id)
assert content["name"] == member.name
assert content["contact"] == member.contact
assert content["scouting_id"] == member.scouting_id
assert content["comment"] == member.comment
assert content["allergy"] == member.allergy
assert content["birthday_at"] == member.birthday_at
assert content["canceled_at"] == member.canceled_at
assert content["canceled_reason"] == member.canceled_reason
def test_read_member_own_user(client: TestClient, db: Session) -> None:
member = create_random_member(db)
user = create_random_user(db)
user.member_id = member.id
db.add(user)
db.commit()
response = client.get(
f"{settings.API_V1_STR}/members/{member.id}",
headers=authentication_token_from_user(client=client, db=db, user=user),
)
assert response.status_code == status.HTTP_200_OK
content = response.json()
assert content["id"] == str(member.id)
assert content["name"] == member.name
assert content["contact"] == member.contact
assert content["scouting_id"] == member.scouting_id
assert content["comment"] == member.comment
assert content["allergy"] == member.allergy
assert content["birthday_at"] == member.birthday_at
assert content["canceled_at"] == member.canceled_at
assert content["canceled_reason"] == member.canceled_reason
def test_read_member_not_found(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
response = client.get(
f"{settings.API_V1_STR}/members/{uuid.uuid4()}",
headers=superuser_token_headers,
)
assert response.status_code == status.HTTP_404_NOT_FOUND
assert response.json()["detail"] == "Member not found"
def test_read_member_no_permission(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
member = create_random_member(db)
response = client.get(
f"{settings.API_V1_STR}/members/{member.id}",
headers=normal_user_token_headers,
)
assert response.status_code == status.HTTP_403_FORBIDDEN
assert response.json()["detail"] == "Not enough permissions"
def test_read_members(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
create_random_member(db)
create_random_member(db)
response = client.get(
f"{settings.API_V1_STR}/members/",
headers=superuser_token_headers,
)
assert response.status_code == status.HTTP_200_OK
content = response.json()
assert "count" in content
assert content["count"] >= 2
assert "data" in content
assert isinstance(content["data"], list)
assert len(content["data"]) <= content["count"]
def test_read_members_event_user(client: TestClient, event_user_token_headers: EventUserHeader, db: Session) -> None:
team = create_random_team(db, event=event_user_token_headers.event)
member = create_random_member(db)
link = MemberTeamLinkCreate(member_id=member.id, rank=MemberRank.TEAM_MEMBER)
MemberTeamLink.create(session=db, create_obj=link, team=team)
response = client.get(
f"{settings.API_V1_STR}/members/",
headers=event_user_token_headers.headers,
)
assert response.status_code == status.HTTP_200_OK
content = response.json()
assert "count" in content
assert content["count"] >= 1
assert "data" in content
assert isinstance(content["data"], list)
assert len(content["data"]) <= content["count"]
def test_read_members_no_permissions(client: TestClient, normal_user_token_headers: dict[str, str], db: Session) -> None:
create_random_member(db)
create_random_member(db)
response = client.get(
f"{settings.API_V1_STR}/divisions/",
headers=normal_user_token_headers,
)
assert response.status_code == status.HTTP_200_OK
content = response.json()
assert "count" in content
assert content["count"] == 0
assert "data" in content
assert isinstance(content["data"], list)
assert len(content["data"]) == 0
def test_read_members_self_created(client: TestClient, db: Session) -> None:
user = create_random_user(db)
member = create_random_member(db)
member.created_by = user.id
db.add(member)
db.commit()
response = client.get(
f"{settings.API_V1_STR}/members/",
headers=authentication_token_from_user(client=client, db=db, user=user),
)
assert response.status_code == status.HTTP_200_OK
content = response.json()
assert "count" in content
assert content["count"] == 1
assert "data" in content
assert isinstance(content["data"], list)
assert len(content["data"]) <= content["count"]
def test_update_member(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
member = create_random_member(db)
member_id = member.id
data = {
"name": "Updated name",
"contact": "Updated contact",
}
response = client.put(
f"{settings.API_V1_STR}/members/{member_id}",
headers=superuser_token_headers,
json=data,
)
assert response.status_code == status.HTTP_200_OK
content = response.json()
assert content["id"] == str(member_id)
assert content["name"] == data["name"]
assert content["contact"] == data["contact"]
member_query = select(Member).where(Member.id == member_id)
member_db = db.exec(member_query).first()
assert member_db
db.refresh(member_db)
assert member_db.name == data["name"]
assert member_db.contact == data["contact"]
assert content["scouting_id"] == member_db.scouting_id
assert content["comment"] == member_db.comment
assert content["allergy"] == member_db.allergy
assert content["birthday_at"] == member_db.birthday_at
assert content["canceled_at"] == member_db.canceled_at
assert content["canceled_reason"] == member_db.canceled_reason
def test_delete_member(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
member = create_random_member(db)
member_id = member.id
response = client.delete(
f"{settings.API_V1_STR}/members/{member_id}",
headers=superuser_token_headers,
)
assert response.status_code == status.HTTP_200_OK
assert response.json()["message"] == "Member deleted successfully"
member_query = select(Member).where(Member.id == member_id)
member_db = db.exec(member_query).first()
assert member_db is None
# TODO: Team link tests
# app/models/member.py 79 8 90% 91-96
# app/api/routes/teams.py 122 26 79% 272-276, 287-310, 318-328, 339-357, 367-378, 386-398

View File

@@ -172,9 +172,8 @@ def test_retrieve_users(
def test_update_user_me(
client: TestClient, normal_user_token_headers: dict[str, str], db: Session
) -> None:
full_name = "Updated Name"
email = random_email()
data = {"full_name": full_name, "email": email}
data = {"email": email}
r = client.patch(
f"{settings.API_V1_STR}/users/me",
headers=normal_user_token_headers,
@@ -183,13 +182,11 @@ def test_update_user_me(
assert r.status_code == status.HTTP_200_OK
updated_user = r.json()
assert updated_user["email"] == email
assert updated_user["full_name"] == full_name
user_query = select(User).where(User.email == email)
user_db = db.exec(user_query).first()
assert user_db
assert user_db.email == email
assert user_db.full_name == full_name
def test_update_password_me(
@@ -311,8 +308,7 @@ def test_update_password_me_same_password_error(
def test_register_user(client: TestClient, db: Session) -> None:
username = random_email()
password = random_lower_string()
full_name = random_lower_string()
data = {"email": username, "password": password, "full_name": full_name}
data = {"email": username, "password": password}
r = client.post(
f"{settings.API_V1_STR}/users/signup",
json=data,
@@ -320,23 +316,19 @@ def test_register_user(client: TestClient, db: Session) -> None:
assert r.status_code == status.HTTP_200_OK
created_user = r.json()
assert created_user["email"] == username
assert created_user["full_name"] == full_name
user_query = select(User).where(User.email == username)
user_db = db.exec(user_query).first()
assert user_db
assert user_db.email == username
assert user_db.full_name == full_name
assert verify_password(password, user_db.hashed_password)
def test_register_user_already_exists_error(client: TestClient) -> None:
password = random_lower_string()
full_name = random_lower_string()
data = {
"email": settings.FIRST_SUPERUSER,
"password": password,
"full_name": full_name,
}
r = client.post(
f"{settings.API_V1_STR}/users/signup",
@@ -346,45 +338,6 @@ def test_register_user_already_exists_error(client: TestClient) -> None:
assert r.json()["detail"] == "The user with this email already exists in the system"
def test_update_user(
client: TestClient, superuser_token_headers: dict[str, str], db: Session
) -> None:
username = random_email()
password = random_lower_string()
user_in = UserCreate(email=username, password=password)
user = User.create(session=db, create_obj=user_in)
data = {"full_name": "Updated_full_name"}
r = client.patch(
f"{settings.API_V1_STR}/users/{user.id}",
headers=superuser_token_headers,
json=data,
)
assert r.status_code == status.HTTP_200_OK
updated_user = r.json()
assert updated_user["full_name"] == "Updated_full_name"
user_query = select(User).where(User.email == username)
user_db = db.exec(user_query).first()
db.refresh(user_db)
assert user_db
assert user_db.full_name == "Updated_full_name"
def test_update_user_not_exists(
client: TestClient, superuser_token_headers: dict[str, str]
) -> None:
data = {"full_name": "Updated_full_name"}
r = client.patch(
f"{settings.API_V1_STR}/users/{uuid.uuid4()}",
headers=superuser_token_headers,
json=data,
)
assert r.status_code == status.HTTP_404_NOT_FOUND
assert r.json()["detail"] == "The user with this id does not exist in the system"
def test_update_user_email_exists(
client: TestClient, superuser_token_headers: dict[str, str], db: Session
) -> None:

View File

@@ -19,8 +19,8 @@ def db() -> Generator[Session, None, None]:
with Session(engine) as session:
init_db(session)
yield session
statement = delete(User)
session.execute(statement)
# statement = delete(User)
# session.execute(statement)
session.commit()

View File

@@ -0,0 +1,19 @@
from sqlmodel import Session
from app.models.team import Team
from app.models.member import Member, MemberCreate, MemberTeamLink, MemberTeamLinkCreate, MemberRank
from app.tests.utils.utils import random_lower_string
def create_random_member(db: Session) -> Member:
member_in = MemberCreate(
name=random_lower_string(),
contact=random_lower_string(),
scouting_id=random_lower_string(),
comment=random_lower_string(),
allergy=random_lower_string(),
# birthday_at=random_datetime(),
# canceled_at=random_datetime(),
canceled_reason=random_lower_string(),
)
return Member.create(session=db, create_obj=member_in)