From c4d1871835eb3d5360f6cabf930cef65b3377685 Mon Sep 17 00:00:00 2001 From: Sebastiaan Date: Mon, 9 Jun 2025 22:19:38 +0200 Subject: [PATCH] Implement user links --- backend/app/api/routes/events.py | 159 +++-- backend/app/api/routes/teams.py | 10 +- backend/app/models/base.py | 1 + backend/app/models/event.py | 83 ++- backend/app/models/user.py | 6 +- backend/app/tests/api/routes/test_events.py | 613 +++++++++++++++++++- backend/app/tests/api/routes/test_teams.py | 8 +- 7 files changed, 781 insertions(+), 99 deletions(-) diff --git a/backend/app/api/routes/events.py b/backend/app/api/routes/events.py index edcc284..c054300 100644 --- a/backend/app/api/routes/events.py +++ b/backend/app/api/routes/events.py @@ -16,12 +16,15 @@ from app.models.event import ( EventsPublic, EventUpdate, EventUserLink, + EventUserLinkCreate, + EventUserLinkUpdate, + EventUserLinkPublic, + EventUserLinksPublic, ) from app.models.user import ( PermissionModule, PermissionPart, PermissionRight, - PermissionRightObject, User, ) @@ -87,7 +90,7 @@ def read_event(session: SessionDep, current_user: CurrentUser, id: RowId) -> Any part=PermissionPart.ADMIN, rights=PermissionRight.READ, ) and not (event.user_has_rights(user=current_user, rights=PermissionRight.READ)): - raise HTTPException(status_code=400, detail="Not enough permissions") + raise HTTPException(status_code=403, detail="Not enough permissions") return event @@ -131,7 +134,7 @@ def update_event( part=PermissionPart.ADMIN, rights=PermissionRight.UPDATE, ) and not (event.user_has_rights(user=current_user, rights=PermissionRight.UPDATE)): - raise HTTPException(status_code=400, detail="Not enough permissions") + raise HTTPException(status_code=403, detail="Not enough permissions") return Event.update(db_obj=event, in_obj=event_in, session=session) @@ -154,7 +157,7 @@ def delete_event( part=PermissionPart.ADMIN, rights=PermissionRight.DELETE, ) and not (event.user_has_rights(user=current_user, rights=PermissionRight.DELETE)): - raise HTTPException(status_code=400, detail="Not enough permissions") + raise HTTPException(status_code=403, detail="Not enough permissions") session.delete(event) session.commit() @@ -167,18 +170,56 @@ def delete_event( # region # Events / Users ###################################################### -@router.post("/{id}/users/{user_id}", tags=[ApiTags.USERS]) -def add_user_to_event( +@router.get("/{event_id}/users/", response_model=EventUserLinksPublic) +def read_event_users( + session: SessionDep, current_user: CurrentUser, event_id: RowId, skip: int = 0, limit: int = 100 +) -> Any: + """ + Retrieve all event users. + """ + + event = session.get(Event, event_id) + if not event: + raise HTTPException(status_code=404, detail="Event not found") + + if not current_user.has_permissions( + module=PermissionModule.EVENT, + part=PermissionPart.ADMIN, + rights=PermissionRight.MANAGE_USERS, + ) and not (event.user_has_rights(user=current_user, rights=PermissionRight.MANAGE_USERS)): + raise HTTPException(status_code=403, detail="Not enough permissions") + + count_statement = (select(func.count()) + .select_from(EventUserLink) + .where(EventUserLink.event_id == event.id) + ) + count = session.exec(count_statement).one() + statement = (select(EventUserLink) + .where(EventUserLink.event_id == event.id) + .offset(skip) + .limit(limit) + ) + event_user_links = session.exec(statement).all() + + return EventUserLinksPublic(data=event_user_links, count=count) + + +@router.post("/{event_id}/users/", tags=[ApiTags.USERS], response_model=EventUserLinkPublic) +def create_event_user( session: SessionDep, current_user: CurrentUser, - id: RowId, - user_id: RowId, - rights_in: PermissionRightObject, -) -> Message: + event_id: RowId, + user_in: EventUserLinkCreate, +) -> Any: """ - Add or update a user to an event. + Create a new link between a user and an event. """ - event = session.get(Event, id) + + if user_in.rights & ~PermissionRight.ADMIN: + # FIXME: find a proper richts checker + raise HTTPException(status_code=400, detail="Invalid permission rights") + + event = session.get(Event, event_id) if not event: raise HTTPException(status_code=404, detail="Event not found") @@ -186,45 +227,89 @@ def add_user_to_event( module=PermissionModule.EVENT, part=PermissionPart.ADMIN, rights=PermissionRight.MANAGE_USERS, - ) and not ( - event.user_has_rights( - user=current_user, rights=(PermissionRight.MANAGE_USERS | rights_in.rights) - ) - ): - raise HTTPException(status_code=400, detail="Not enough permissions") + ) and not (event.user_has_rights(user=current_user, rights=(PermissionRight.MANAGE_USERS | user_in.rights))): + raise HTTPException(status_code=403, detail="Not enough permissions") - user = session.get(User, user_id) - if not event: + user = session.get(User, user_in.user_id) + if not user: raise HTTPException(status_code=404, detail="User not found") - event.add_user(user=user, rights=rights_in.rights, session=session) - return Message( - message="User added successfully" - ) + user_link = event.get_user_link(user) + if user_link: + raise HTTPException(status_code=400, detail="User already part of this event") + + return event.add_user(user=user, rights=user_in.rights, session=session) -@router.delete("/{id}/users/{user_id}", tags=[ApiTags.USERS]) -def remove_user_from_event( - session: SessionDep, current_user: CurrentUser, id: RowId, user_id: RowId -) -> Message: +@router.put("/{event_id}/users/{user_id}", tags=[ApiTags.USERS], response_model=EventUserLinkPublic) +def update_user_in_event( + session: SessionDep, + current_user: CurrentUser, + event_id: RowId, + user_id: RowId, + user_in: EventUserLinkUpdate, +) -> Any: """ - Remove a user from an event. + Update a user link within an event. """ - event = session.get(Event, id) + + event = session.get(Event, event_id) if not event: raise HTTPException(status_code=404, detail="Event not found") - if not current_user.has_permissions( - module=PermissionModule.EVENT, - part=PermissionPart.ADMIN, - rights=PermissionRight.MANAGE_USERS, - ) and not event.user_has_rights(user=current_user, rights=PermissionRight.MANAGE_USERS): - raise HTTPException(status_code=403, detail="Not enough permissions") - user = session.get(User, user_id) if not user: raise HTTPException(status_code=404, detail="User not found") + valid_flags = sum(flag.value for flag in PermissionRight) + if user_in.rights & ~valid_flags: + # FIXME: find a proper richts checker + raise HTTPException(status_code=400, detail="Invalid permission rights") + + if not current_user.has_permissions( + module=PermissionModule.EVENT, + part=PermissionPart.ADMIN, + rights=PermissionRight.MANAGE_USERS, + ) and not (event.user_has_rights(user=current_user, rights=(PermissionRight.MANAGE_USERS | user_in.rights))): + raise HTTPException(status_code=403, detail="Not enough permissions") + + user_link = event.get_user_link(user) + if not user_link: + raise HTTPException(status_code=404, detail="User is not part of this event") + + return event.update_user(user=user, rights=user_in.rights, session=session) + + +@router.delete("/{event_id}/users/{user_id}", tags=[ApiTags.USERS]) +def remove_user_from_event( + session: SessionDep, current_user: CurrentUser, event_id: RowId, user_id: RowId +) -> Message: + """ + Remove a user link from an event. + """ + event = session.get(Event, event_id) + if not event: + raise HTTPException(status_code=404, detail="Event not found") + + user = session.get(User, user_id) + if not user: + raise HTTPException(status_code=404, detail="User not found") + + if not current_user.has_permissions( + module=PermissionModule.EVENT, + part=PermissionPart.ADMIN, + rights=PermissionRight.MANAGE_USERS, + ): + if current_user.id == user.id: + raise HTTPException(status_code=403, detail="Users are not allowed to delete themselves when they are not an super admin") + + if not event.user_has_rights(user=current_user, rights=PermissionRight.MANAGE_USERS): + raise HTTPException(status_code=403, detail="Not enough permissions") + + user_link = event.get_user_link(user) + if not user_link: + raise HTTPException(status_code=404, detail="User is not part of this event") + event.remove_user(user=user, session=session) return Message( message="User removed successfully" diff --git a/backend/app/api/routes/teams.py b/backend/app/api/routes/teams.py index 2b6a62b..e71e194 100644 --- a/backend/app/api/routes/teams.py +++ b/backend/app/api/routes/teams.py @@ -97,7 +97,7 @@ def read_team(session: SessionDep, current_user: CurrentUser, id: RowId) -> Any: part=PermissionPart.ADMIN, rights=PermissionRight.READ, ) and not (event.user_has_rights(user=current_user, rights=PermissionRight.MANAGE_TEAMS)): - raise HTTPException(status_code=400, detail="Not enough permissions") + raise HTTPException(status_code=403, detail="Not enough permissions") return team @@ -119,7 +119,7 @@ def create_team( part=PermissionPart.ADMIN, rights=PermissionRight.UPDATE, ) and not (event.user_has_rights(user=current_user, rights=PermissionRight.MANAGE_TEAMS)): - raise HTTPException(status_code=400, detail="Not enough permissions") + raise HTTPException(status_code=403, detail="Not enough permissions") team = Team.create(create_obj=team_in, session=session) return team @@ -146,7 +146,7 @@ def update_team( part=PermissionPart.ADMIN, rights=PermissionRight.UPDATE, ) and not (event.user_has_rights(user=current_user, rights=PermissionRight.MANAGE_TEAMS)): - raise HTTPException(status_code=400, detail="Not enough permissions") + raise HTTPException(status_code=403, detail="Not enough permissions") # Check rights for the new event data if team_in.event_id: @@ -159,7 +159,7 @@ def update_team( part=PermissionPart.ADMIN, rights=PermissionRight.UPDATE, ) and not (event.user_has_rights(user=current_user, rights=PermissionRight.MANAGE_TEAMS)): - raise HTTPException(status_code=400, detail="Not enough permissions") + raise HTTPException(status_code=403, detail="Not enough permissions") # Update the team team = Team.update(db_obj=team, in_obj=team_in, session=session) @@ -184,7 +184,7 @@ def delete_team(session: SessionDep,current_user: CurrentUser, id: RowId) -> Mes part=PermissionPart.ADMIN, rights=PermissionRight.DELETE, ) and not (event.user_has_rights(user=current_user, rights=PermissionRight.MANAGE_TEAMS)): - raise HTTPException(status_code=400, detail="Not enough permissions") + raise HTTPException(status_code=403, detail="Not enough permissions") session.delete(team) session.commit() diff --git a/backend/app/models/base.py b/backend/app/models/base.py index fec1b00..9b2b021 100644 --- a/backend/app/models/base.py +++ b/backend/app/models/base.py @@ -39,6 +39,7 @@ class DocumentedStrEnum(str, Enum): class DocumentedIntFlag(IntFlag): + # TODO: Build DB sport to proper store flags and make it possible to store all mutations pass diff --git a/backend/app/models/event.py b/backend/app/models/event.py index 0523e08..7948675 100644 --- a/backend/app/models/event.py +++ b/backend/app/models/event.py @@ -23,8 +23,23 @@ if TYPE_CHECKING: # region # Event ############################################################### -# Event auth -class EventUserLink(BaseSQLModel, table=True): +# Shared properties +class EventUserLinkBase(BaseSQLModel): + rights: PermissionRight = Field(default=PermissionRight.READ, nullable=False) + + +# Properties to receive via API on creation +class EventUserLinkCreate(EventUserLinkBase): + user_id: RowId = Field(default=None, nullable=False) + + +# Properties to receive via API on update, all are optional +class EventUserLinkUpdate(EventUserLinkBase): + pass + + +# Database model, database table inferred from class name +class EventUserLink(EventUserLinkBase, table=True): event_id: RowId = Field( foreign_key="event.id", primary_key=True, @@ -39,12 +54,22 @@ class EventUserLink(BaseSQLModel, table=True): ondelete="CASCADE", ) - rights: PermissionRight = Field(default=PermissionRight.READ, nullable=False) event: "Event" = Relationship(back_populates="user_links") user: "User" = Relationship(back_populates="event_links") +# Properties to return via API +class EventUserLinkPublic(EventUserLinkBase): + user_id: RowId + event_id: RowId + + +class EventUserLinksPublic(BaseSQLModel): + data: list[EventUserLinkPublic] + count: int + + # ############################################################################## @@ -102,41 +127,51 @@ class Event(mixin.RowId, EventBase, table=True): session.refresh(db_obj) return db_obj + def get_user_link(self, user: User) -> EventUserLink | None: + return next( + (link for link in self.user_links if link.user == user), None + ) + def add_user( self, user: User, rights: PermissionRight = PermissionRight.READ, *, session: Session, - ) -> "Event": - to_add = next((add for add in self.user_links if add.user == user), None) + ) -> "EventUserLink | None": + to_add = self.get_user_link(user=user) - if to_add: - to_add.rights = rights - session.add(to_add) - else: + if to_add is None: self.user_links.append(EventUserLink(event=self, user=user, rights=rights)) session.add(self.user_links[-1]) + session.commit() + return self.user_links[-1] - session.commit() + return None - return self + def update_user( + self, + user: User, + rights: PermissionRight = PermissionRight.READ, + *, + session: Session, + ) -> "EventUserLink | None": + to_update = self.get_user_link(user=user) - def remove_user(self, user: User, *, session: Session) -> "Event": - to_remove = next( - (remove for remove in self.user_links if remove.user == user), None - ) + if to_update: + to_update.rights = rights + session.add(to_update) + session.commit() + return to_update + + return None + + def remove_user(self, user: User, *, session: Session) -> None: + to_remove = self.get_user_link(user=user) if to_remove: - statement = select(EventUserLink).where( - EventUserLink.event_id == self.id, EventUserLink.user_id == user.id - ) - link_to_remove = session.exec(statement).first() + session.delete(to_remove) + session.commit() - if link_to_remove: - session.delete(link_to_remove) - session.commit() - - return self def user_has_rights( self, diff --git a/backend/app/models/user.py b/backend/app/models/user.py index bb9af18..988afeb 100644 --- a/backend/app/models/user.py +++ b/backend/app/models/user.py @@ -1,6 +1,6 @@ from typing import TYPE_CHECKING -from pydantic import EmailStr +from pydantic import EmailStr, field_validator from sqlmodel import Field, Relationship, Session, select from app.core.security import get_password_hash, verify_password @@ -46,10 +46,6 @@ class PermissionRight(DocumentedIntFlag): ADMIN = CREATE | READ | UPDATE | DELETE | MANAGE_USERS | MANAGE_TEAMS -class PermissionRightObject(BaseSQLModel): - rights: PermissionRight | None = Field(default=PermissionRight.READ, nullable=False) - - # ############################################################################## # link to User (many-to-many) class UserRoleLink(BaseSQLModel, table=True): diff --git a/backend/app/tests/api/routes/test_events.py b/backend/app/tests/api/routes/test_events.py index dbdd146..109c620 100644 --- a/backend/app/tests/api/routes/test_events.py +++ b/backend/app/tests/api/routes/test_events.py @@ -1,5 +1,6 @@ import uuid +import pytest from fastapi.testclient import TestClient from sqlmodel import Session @@ -31,6 +32,22 @@ def test_create_event(client: TestClient, superuser_token_headers: dict[str, str assert "end_at" in content +def test_create_event_no_permission(client: TestClient, normal_user_token_headers: dict[str, str]) -> None: + data = { + "name": "No create permission", + "contact": "Someone else", + } + + response = client.post( + f"{settings.API_V1_STR}/events/", + headers=normal_user_token_headers, + json=data, + ) + assert response.status_code == 403 + assert response.json()["detail"] == "Not enough permissions" + + + def test_read_event( client: TestClient, superuser_token_headers: dict[str, str], db: Session ) -> None: @@ -57,8 +74,7 @@ def test_read_event_not_found( headers=superuser_token_headers, ) assert response.status_code == 404 - content = response.json() - assert content["detail"] == "Event not found" + assert response.json()["detail"] == "Event not found" def test_read_event_not_enough_permissions( @@ -69,9 +85,8 @@ def test_read_event_not_enough_permissions( f"{settings.API_V1_STR}/events/{event.id}", headers=normal_user_token_headers, ) - assert response.status_code == 400 - content = response.json() - assert content["detail"] == "Not enough permissions" + assert response.status_code == 403 + assert response.json()["detail"] == "Not enough permissions" def test_read_event_with_event_user( @@ -163,8 +178,7 @@ def test_update_event_not_found( json=data, ) assert response.status_code == 404 - content = response.json() - assert content["detail"] == "Event not found" + assert response.json()["detail"] == "Event not found" def test_update_event_not_enough_permissions( @@ -177,9 +191,31 @@ def test_update_event_not_enough_permissions( headers=normal_user_token_headers, json=data, ) - assert response.status_code == 400 + assert response.status_code == 403 + assert response.json()["detail"] == "Not enough permissions" + + +def test_update_event_with_eventuser( + client: TestClient, event_user_token_headers: EventUserHeader, db: Session +) -> None: + event = event_user_token_headers.event + data = { + "name": "Updated name from eventuser", + "contact": "Updated contact from eventuser", + } + response = client.put( + f"{settings.API_V1_STR}/events/{event.id}", + headers=event_user_token_headers.headers, + json=data, + ) + assert response.status_code == 200 content = response.json() - assert content["detail"] == "Not enough permissions" + assert content["name"] == data["name"] + assert content["contact"] == data["contact"] + assert content["id"] == str(event.id) + assert content["is_active"] == event.is_active + assert str(content["start_at"]) == str(event.start_at) + assert str(content["end_at"]) == str(event.end_at) def test_delete_event( @@ -191,8 +227,7 @@ def test_delete_event( headers=superuser_token_headers, ) assert response.status_code == 200 - content = response.json() - assert content["message"] == "Event deleted successfully" + assert response.json()["message"] == "Event deleted successfully" def test_delete_event_not_found( @@ -215,9 +250,8 @@ def test_delete_event_not_enough_permissions( f"{settings.API_V1_STR}/events/{event.id}", headers=normal_user_token_headers, ) - assert response.status_code == 400 - content = response.json() - assert content["detail"] == "Not enough permissions" + assert response.status_code == 403 + assert response.json()["detail"] == "Not enough permissions" def test_delete_event_admin_user( @@ -246,9 +280,8 @@ def test_delete_event_not_enough_permissions_for_this_event( f"{settings.API_V1_STR}/events/{event.id}", headers=authentication_token_from_user(db=db, user=user, client=client), ) - assert response.status_code == 400 - content = response.json() - assert content["detail"] == "Not enough permissions" + assert response.status_code == 403 + assert response.json()["detail"] == "Not enough permissions" def test_delete_event_event_user_read_only_rights( @@ -262,13 +295,545 @@ def test_delete_event_event_user_read_only_rights( f"{settings.API_V1_STR}/events/{event.id}", headers=authentication_token_from_user(db=db, user=user, client=client), ) - assert response.status_code == 400 + assert response.status_code == 403 + assert response.json()["detail"] == "Not enough permissions" + + +def test_read_all_event_users( + client: TestClient, superuser_token_headers: dict[str, str], db: Session +) -> None: + event = create_random_event(db) + user1 = create_random_user(db) + user2 = create_random_user(db) + event.add_user(user=user1, rights=PermissionRight.READ, session=db) + event.add_user(user=user2, rights=PermissionRight.ADMIN, session=db) + + response = client.get( + f"{settings.API_V1_STR}/events/{event.id}/users", + headers=superuser_token_headers, + ) + assert response.status_code == 200 content = response.json() - assert content["detail"] == "Not enough permissions" + assert "count" in content + assert content["count"] == 2 + assert "data" in content + assert isinstance(content["data"], list) + assert len(content["data"]) <= content["count"] -# TODO: Add user (super, less rights, own rights, more rights) (*** user without rights) -# TODO: Edit user rights (super, less rights, own rights, more rights) (*** user without rights) -# TODO: Remove user (*** user without rights) -# TODO: Remove own user (is allowed) -# TODO: Remove not linked user +def test_read_all_event_users_no_permission( + client: TestClient, normal_user_token_headers: dict[str, str], db: Session +) -> None: + event = create_random_event(db) + + response = client.get( + f"{settings.API_V1_STR}/events/{event.id}/users", + headers=normal_user_token_headers, + ) + assert response.status_code == 403 + assert response.json()["detail"] == "Not enough permissions" + + +def test_read_all_event_users_with_event_user( + client: TestClient, db: Session +) -> None: + event = create_random_event(db) + user = create_random_user(db) + event.add_user(user=user, rights=PermissionRight.MANAGE_USERS, session=db) + + response = client.get( + f"{settings.API_V1_STR}/events/{event.id}/users", + headers=authentication_token_from_user(db=db, user=user, client=client), + ) + assert response.status_code == 200 + content = response.json() + assert "count" in content + assert content["count"] == 1 + assert "data" in content + assert isinstance(content["data"], list) + assert len(content["data"]) <= content["count"] + + +def test_read_all_event_users_with_event_user_no_permission( + client: TestClient, db: Session +) -> None: + event = create_random_event(db) + user = create_random_user(db) + event.add_user(user=user, rights=PermissionRight.READ, session=db) + + response = client.get( + f"{settings.API_V1_STR}/events/{event.id}/users", + headers=authentication_token_from_user(db=db, user=user, client=client), + ) + assert response.status_code == 403 + assert response.json()["detail"] == "Not enough permissions" + + +def test_add_user_to_event_not_found( + client: TestClient, superuser_token_headers: dict[str, str], db: Session +) -> None: + response = client.get( + f"{settings.API_V1_STR}/events/{uuid.uuid4()}/users", + headers=superuser_token_headers, + ) + assert response.status_code == 404 + assert response.json()["detail"] == "Event not found" + + +def test_add_user_to_event( + client: TestClient, superuser_token_headers: dict[str, str], db: Session +) -> None: + event = create_random_event(db) + user = create_random_user(db) + data = { + "user_id": str(user.id), + "rights": PermissionRight.READ, + } + + response = client.post( + f"{settings.API_V1_STR}/events/{event.id}/users", + headers=superuser_token_headers, + json=data, + ) + assert response.status_code == 200 + content = response.json() + assert "rights" in content + assert content["rights"] == PermissionRight.READ + assert content["user_id"] == str(user.id) + assert content["event_id"] == str(event.id) + + +def test_add_user_to_event_event_not_found( + client: TestClient, superuser_token_headers: dict[str, str], db: Session +) -> None: + user = create_random_user(db) + data = { + "user_id": str(user.id), + "rights": PermissionRight.READ, + } + + response = client.post( + f"{settings.API_V1_STR}/events/{uuid.uuid4()}/users", + headers=superuser_token_headers, + json=data, + ) + assert response.status_code == 404 + assert response.json()["detail"] == "Event not found" + + +def test_add_user_to_event_user_not_found( + client: TestClient, superuser_token_headers: dict[str, str], db: Session +) -> None: + event = create_random_event(db) + data = { + "user_id": str(uuid.uuid4()), + "rights": PermissionRight.READ, + } + + response = client.post( + f"{settings.API_V1_STR}/events/{event.id}/users", + headers=superuser_token_headers, + json=data, + ) + assert response.status_code == 404 + assert response.json()["detail"] == "User not found" + + +def test_add_user_to_event_already_exists( + client: TestClient, superuser_token_headers: dict[str, str], db: Session +) -> None: + event = create_random_event(db) + user = create_random_user(db) + event.add_user(user=user, rights=PermissionRight.READ, session=db) + data = { + "user_id": str(user.id), + "rights": PermissionRight.ADMIN, + } + + response = client.post( + f"{settings.API_V1_STR}/events/{event.id}/users", + headers=superuser_token_headers, + json=data, + ) + assert response.status_code == 400 + assert response.json()["detail"] == "User already part of this event" + + +def test_add_user_to_event_no_permissions( + client: TestClient, normal_user_token_headers: dict[str, str], db: Session +) -> None: + event = create_random_event(db) + user = create_random_user(db) + data = { + "user_id": str(user.id), + "rights": PermissionRight.READ, + } + + response = client.post( + f"{settings.API_V1_STR}/events/{event.id}/users", + headers=normal_user_token_headers, + json=data, + ) + assert response.status_code == 403 + assert response.json()["detail"] == "Not enough permissions" + + +def test_add_user_to_event_unknown_rights( + client: TestClient, superuser_token_headers: dict[str, str], db: Session +) -> None: + event = create_random_event(db) + user = create_random_user(db) + data = { + "user_id": str(user.id), + "rights": -1, # Invalid permission value + } + + response = client.post( + f"{settings.API_V1_STR}/events/{event.id}/users", + headers=superuser_token_headers, + json=data, + ) + assert response.status_code == 400 + assert response.json()["detail"] == "Invalid permission rights" + + +def test_add_user_with_more_rights_than_current_user( + client: TestClient, db: Session +) -> None: + event = create_random_event(db) + limited_user = create_random_user(db) + event.add_user(user=limited_user, rights=PermissionRight.MANAGE_USERS, session=db) + + target_user = create_random_user(db) + + data = { + "user_id": str(target_user.id), + "rights": PermissionRight.ADMIN, + } + + response = client.post( + f"{settings.API_V1_STR}/events/{event.id}/users", + headers=authentication_token_from_user(db=db, user=limited_user, client=client), + json=data, + ) + assert response.status_code == 403 + assert response.json()["detail"] == "Not enough permissions" + + +@pytest.mark.xfail(reason="Combined rights add might not yet be supported") +def test_add_user_rights_combined( + client: TestClient, superuser_token_headers: dict[str, str], db: Session +) -> None: + event = create_random_event(db) + + data = { + "rights": PermissionRight.READ | PermissionRight.UPDATE, + } + + response = client.post( + f"{settings.API_V1_STR}/events/{event.id}/users", + headers=superuser_token_headers, + json=data, + ) + + assert response.status_code == 200 + content = response.json() + assert "rights" in content + assert content["rights"] == data["rights"] + assert content["event_id"] == str(event.id) + + +def test_update_user_inside_event( + client: TestClient, superuser_token_headers: dict[str, str], db: Session +) -> None: + event = create_random_event(db) + user = create_random_user(db) + event.add_user(user=user, rights=PermissionRight.READ, session=db) + data = { + "rights": PermissionRight.UPDATE, + } + + response = client.put( + f"{settings.API_V1_STR}/events/{event.id}/users/{user.id}", + headers=superuser_token_headers, + json=data, + ) + assert response.status_code == 200 + content = response.json() + assert "rights" in content + assert content["rights"] == data["rights"] + assert content["user_id"] == str(user.id) + assert content["event_id"] == str(event.id) + + +def test_update_event_user_event_not_found( + client: TestClient, superuser_token_headers: dict[str, str], db: Session +): + user = create_random_user(db) + data = { + "rights": PermissionRight.UPDATE, + } + + response = client.put( + f"{settings.API_V1_STR}/events/{uuid.uuid4()}/users/{user.id}", + headers=superuser_token_headers, + json=data, + ) + assert response.status_code == 404 + assert response.json()["detail"] == "Event not found" + + +def test_update_event_user_user_not_found( + client: TestClient, superuser_token_headers: dict[str, str], db: Session +): + event = create_random_event(db) + data = { + "rights": PermissionRight.UPDATE, + } + + response = client.put( + f"{settings.API_V1_STR}/events/{event.id}/users/{uuid.uuid4()}", + headers=superuser_token_headers, + json=data, + ) + assert response.status_code == 404 + assert response.json()["detail"] == "User not found" + + +def test_update_event_user_unknown_rights( + client: TestClient, superuser_token_headers: dict[str, str], db: Session +): + event = create_random_event(db) + user = create_random_user(db) + event.add_user(user=user, rights=PermissionRight.READ, session=db) + data = { + "rights": -1, # Invalid permission value + } + + response = client.put( + f"{settings.API_V1_STR}/events/{event.id}/users/{user.id}", + headers=superuser_token_headers, + json=data, + ) + assert response.status_code == 400 + assert response.json()["detail"] == "Invalid permission rights" + + +def test_update_event_user_not_enough_permissions( + client: TestClient, normal_user_token_headers: dict[str, str], db: Session +): + event = create_random_event(db) + user = create_random_user(db) + event.add_user(user=user, rights=PermissionRight.ADMIN, session=db) + data = { + "rights": PermissionRight.READ + } + + response = client.put( + f"{settings.API_V1_STR}/events/{event.id}/users/{user.id}", + headers=normal_user_token_headers, + json=data, + ) + assert response.status_code == 403 + assert response.json()["detail"] == "Not enough permissions" + + +def test_update_event_user_with_event_user_same_event( + client: TestClient, db: Session +) -> None: + event = create_random_event(db) + user1 = create_random_user(db) + user2 = create_random_user(db) + + event.add_user(user=user1, rights=PermissionRight.ADMIN, session=db) + event.add_user(user=user2, rights=PermissionRight.READ, session=db) + + data = { + "rights": PermissionRight.UPDATE, + } + + # event_user1 tries to update event_user2 rights + response = client.put( + f"{settings.API_V1_STR}/events/{event.id}/users/{user2.id}", + headers=authentication_token_from_user(db=db, user=user1, client=client), + json=data, + ) + assert response.status_code == 200 + content = response.json() + assert content["rights"] == data["rights"] + assert content["user_id"] == str(user2.id) + assert content["event_id"] == str(event.id) + + +def test_update_event_user_from_other_event_forbidden( + client: TestClient, db: Session +) -> None: + event1 = create_random_event(db) + event2 = create_random_event(db) + + user1 = create_random_user(db) + user2 = create_random_user(db) + + event1.add_user(user=user1, rights=PermissionRight.ADMIN, session=db) + event2.add_user(user=user2, rights=PermissionRight.READ, session=db) + + data = { + "rights": PermissionRight.UPDATE, + } + + response = client.put( + f"{settings.API_V1_STR}/events/{event2.id}/users/{user2.id}", + headers=authentication_token_from_user(db=db, user=user1, client=client), + json=data, + ) + assert response.status_code == 403 + assert response.json()["detail"] == "Not enough permissions" + + +def test_update_event_user_from_other_event_thru_own_event( + client: TestClient, db: Session +) -> None: + event1 = create_random_event(db) + event2 = create_random_event(db) + + user1 = create_random_user(db) + user2 = create_random_user(db) + + event1.add_user(user=user1, rights=PermissionRight.ADMIN, session=db) + event2.add_user(user=user2, rights=PermissionRight.READ, session=db) + + data = { + "rights": PermissionRight.UPDATE, + } + + response = client.put( + f"{settings.API_V1_STR}/events/{event1.id}/users/{user2.id}", + headers=authentication_token_from_user(db=db, user=user1, client=client), + json=data, + ) + assert response.status_code == 404 + assert response.json()["detail"] == "User is not part of this event" + + +@pytest.mark.xfail(reason="Combined rights update might not yet be supported") +def test_update_user_rights_combined( + client: TestClient, superuser_token_headers: dict[str, str], db: Session +) -> None: + event = create_random_event(db) + user = create_random_user(db) + # Initially assign READ only rights + event.add_user(user=user, rights=PermissionRight.READ, session=db) + + data = { + "rights": PermissionRight.READ | PermissionRight.UPDATE, + } + + response = client.put( + f"{settings.API_V1_STR}/events/{event.id}/users/{user.id}", + headers=superuser_token_headers, + json=data, + ) + + assert response.status_code == 200 + content = response.json() + assert "rights" in content + assert content["rights"] == data["rights"] + assert content["user_id"] == str(user.id) + assert content["event_id"] == str(event.id) + + +def test_remove_user_from_event( + client: TestClient, superuser_token_headers: dict[str, str], db: Session +) -> None: + event = create_random_event(db) + user = create_random_user(db) + event.add_user(user=user, rights=PermissionRight.READ, session=db) + + response = client.delete( + f"{settings.API_V1_STR}/events/{event.id}/users/{user.id}", + headers=superuser_token_headers, + ) + + assert response.status_code == 200 + assert response.json()["message"] == "User removed successfully" + # assert not event.get_user_link(user) + + +def test_remove_user_from_event_event_not_found( + client: TestClient, superuser_token_headers: dict[str, str], db: Session +) -> None: + event = create_random_event(db) + user = create_random_user(db) + event.add_user(user=user, rights=PermissionRight.READ, session=db) + + response = client.delete( + f"{settings.API_V1_STR}/events/{uuid.uuid4()}/users/{user.id}", + headers=superuser_token_headers, + ) + + assert response.status_code == 404 + assert response.json()["detail"] == "Event not found" + + +def test_remove_user_from_event_user_not_found( + client: TestClient, superuser_token_headers: dict[str, str], db: Session +) -> None: + event = create_random_event(db) + + response = client.delete( + f"{settings.API_V1_STR}/events/{event.id}/users/{uuid.uuid4()}", + headers=superuser_token_headers, + ) + + assert response.status_code == 404 + assert response.json()["detail"] == "User not found" + + +def test_remove_user_from_event_user_not_in_event( + client: TestClient, superuser_token_headers: dict[str, str], db: Session +) -> None: + event = create_random_event(db) + user = create_random_user(db) + + response = client.delete( + f"{settings.API_V1_STR}/events/{event.id}/users/{user.id}", + headers=superuser_token_headers, + ) + + assert response.status_code == 404 + assert response.json()["detail"] == "User is not part of this event" + + +def test_remove_user_from_event_insufficient_permissions( + client: TestClient, db: Session +) -> None: + event = create_random_event(db) + user = create_random_user(db) + event.add_user(user=user, rights=PermissionRight.READ, session=db) + + limited_user = create_random_user(db) + event.add_user(user=limited_user, rights=PermissionRight.READ, session=db) + + response = client.delete( + f"{settings.API_V1_STR}/events/{event.id}/users/{user.id}", + headers=authentication_token_from_user(db=db, user=limited_user, client=client), + ) + + assert response.status_code == 403 + assert response.json()["detail"] == "Not enough permissions" + + +def test_remove_own_user_from_event( + client: TestClient, superuser_token_headers: dict[str, str], db: Session +) -> None: + event = create_random_event(db) + user = create_random_user(db) + event.add_user(user=user, rights=PermissionRight.MANAGE_USERS, session=db) + + response = client.delete( + f"{settings.API_V1_STR}/events/{event.id}/users/{user.id}", + headers=authentication_token_from_user(db=db, user=user, client=client), + ) + + assert response.status_code == 403 + assert response.json()["detail"] == "Users are not allowed to delete themselves when they are not an super admin" diff --git a/backend/app/tests/api/routes/test_teams.py b/backend/app/tests/api/routes/test_teams.py index 8d765c7..e225dad 100644 --- a/backend/app/tests/api/routes/test_teams.py +++ b/backend/app/tests/api/routes/test_teams.py @@ -82,7 +82,7 @@ def test_read_event_not_enough_permissions(client: TestClient, normal_user_token f"{settings.API_V1_STR}/teams/{team.id}", headers=normal_user_token_headers, ) - assert response.status_code == 400 + assert response.status_code == 403 assert response.json()["detail"] == "Not enough permissions" @@ -207,7 +207,7 @@ def test_update_team_not_enough_permissions(client: TestClient, normal_user_toke headers=normal_user_token_headers, json=data, ) - assert response.status_code == 400 + assert response.status_code == 403 assert response.json()["detail"] == "Not enough permissions" @@ -288,7 +288,7 @@ def test_update_team_event_with_event_user_not_enough_permissions(client: TestCl json=data, ) - assert response.status_code == 400 + assert response.status_code == 403 assert response.json()["detail"] == "Not enough permissions" @@ -317,7 +317,7 @@ def test_delete_not_enough_permissions(client: TestClient, normal_user_token_hea f"{settings.API_V1_STR}/teams/{team.id}", headers=normal_user_token_headers, ) - assert response.status_code == 400 + assert response.status_code == 403 assert response.json()["detail"] == "Not enough permissions"