from typing import Any import sqlalchemy from fastapi import APIRouter, HTTPException, status from sqlmodel import func, select, and_, or_, SQLModel from sqlalchemy.orm import joinedload from app.api.deps import CurrentUser, SessionDep from app.models.base import ( ApiTags, Message, RowId, ) from app.models.member import ( Member, MemberCreate, MemberUpdate, MemberPublic, MembersPublic, MemberTeamLink, ) from app.models.event import Event, EventUserLink from app.models.team import Team from app.models.user import ( PermissionModule, PermissionPart, PermissionRight, ) router = APIRouter(prefix="/members", tags=[ApiTags.MEMBERS]) # region # Members ############################################################# def load_member( session: SessionDep, current_user: CurrentUser, id: RowId | None = None, module: PermissionModule = PermissionModule.MEMBER, part: PermissionPart = PermissionPart.ADMIN, user_rights: PermissionRight | None = None, event_rights: PermissionRight | None = PermissionRight.MANAGE_MEMBERS, ) -> Member | None: member = session.get(Member, id) if id and not member: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Member not found") no_links = True valid = False # Global member permissions if current_user.has_permissions(module=module, part=part, rights=user_rights): # Also valid for create new valid = True # Own member items elif hasattr(member, "user") and member.user and member.user == current_user: valid = True # Event member permissions elif hasattr(member, "team_links"): for link in member.team_links: team = link.team if team and team.event: no_links = False if team.event.user_has_rights(user=current_user, rights=event_rights): valid = True break # Not yet linked, or unlinked member if no_links and hasattr(member, "created_by") and member.created_by == current_user.id: valid = True if not valid: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions") return member @router.get("/", response_model=MembersPublic) def read_members( session: SessionDep, current_user: CurrentUser, skip: int = 0, limit: int = 100 ) -> Any: """ Retrieve all members. """ if current_user.has_permissions( module=PermissionModule.MEMBER, part=PermissionPart.ADMIN, rights=PermissionRight.READ, ): data_query = ( select(Member) ) else: data_query = ( select(Member) .outerjoin(MemberTeamLink, MemberTeamLink.member_id == Member.id) .outerjoin(Team, MemberTeamLink.team_id == Team.id) .outerjoin(Event, Team.event_id == Event.id) .outerjoin(EventUserLink, EventUserLink.event_id == Event.id) .where( or_( # Own member Member.id == current_user.member_id, # Created by user and unlinked and_( Member.created_by == current_user.id, MemberTeamLink.team_id == None ), # Event permissions via team -> event -> EventUserLink and_( EventUserLink.user_id == current_user.id, # FIXME: EventUserLink.rights.op("&")(PermissionRight.MANAGE_MEMBERS) != 0 ), ) ) ) # Cache as subquery data_sub_query = data_query.subquery() aliased_member = sqlalchemy.orm.aliased(Member, data_sub_query) # Count using subquery count = session.exec( select(func.count()).select_from(data_sub_query) ).one() # Paginated data query using same subquery data = session.exec( select(aliased_member).offset(skip).limit(limit) ).all() return MembersPublic(count=count, data=data) @router.get("/{id}", response_model=MemberPublic) def read_member(session: SessionDep, current_user: CurrentUser, id: RowId) -> Any: """ Get member by ID. """ member = load_member( session=session, current_user=current_user, id=id, user_rights=PermissionRight.READ, ) return member @router.post("/", response_model=MemberPublic) def create_member( *, session: SessionDep, current_user: CurrentUser, member_in: MemberCreate ) -> Any: """ Create new member. """ load_member( session=session, current_user=current_user, user_rights=PermissionRight.CREATE, ) member = Member.create(create_obj=member_in, session=session, user=current_user) return member @router.put("/{id}", response_model=MemberPublic) def update_member( *, session: SessionDep, current_user: CurrentUser, id: RowId, member_in: MemberUpdate ) -> Any: """ Update a member. """ member = load_member( session=session, current_user=current_user, id=id, user_rights=PermissionRight.UPDATE, ) member = Member.update(db_obj=member, in_obj=member_in, session=session) return member @router.delete("/{id}") def delete_member(session: SessionDep,current_user: CurrentUser, id: RowId) -> Message: """ Delete a member. """ member = load_member( session=session, current_user=current_user, id=id, user_rights=PermissionRight.DELETE, ) session.delete(member) session.commit() return Message(message="Member deleted successfully") # endregion