Implement user links

This commit is contained in:
Sebastiaan
2025-06-09 22:19:38 +02:00
parent 23c6ddc915
commit c4d1871835
7 changed files with 781 additions and 99 deletions

View File

@@ -16,12 +16,15 @@ from app.models.event import (
EventsPublic,
EventUpdate,
EventUserLink,
EventUserLinkCreate,
EventUserLinkUpdate,
EventUserLinkPublic,
EventUserLinksPublic,
)
from app.models.user import (
PermissionModule,
PermissionPart,
PermissionRight,
PermissionRightObject,
User,
)
@@ -87,7 +90,7 @@ def read_event(session: SessionDep, current_user: CurrentUser, id: RowId) -> Any
part=PermissionPart.ADMIN,
rights=PermissionRight.READ,
) and not (event.user_has_rights(user=current_user, rights=PermissionRight.READ)):
raise HTTPException(status_code=400, detail="Not enough permissions")
raise HTTPException(status_code=403, detail="Not enough permissions")
return event
@@ -131,7 +134,7 @@ def update_event(
part=PermissionPart.ADMIN,
rights=PermissionRight.UPDATE,
) and not (event.user_has_rights(user=current_user, rights=PermissionRight.UPDATE)):
raise HTTPException(status_code=400, detail="Not enough permissions")
raise HTTPException(status_code=403, detail="Not enough permissions")
return Event.update(db_obj=event, in_obj=event_in, session=session)
@@ -154,7 +157,7 @@ def delete_event(
part=PermissionPart.ADMIN,
rights=PermissionRight.DELETE,
) and not (event.user_has_rights(user=current_user, rights=PermissionRight.DELETE)):
raise HTTPException(status_code=400, detail="Not enough permissions")
raise HTTPException(status_code=403, detail="Not enough permissions")
session.delete(event)
session.commit()
@@ -167,18 +170,56 @@ def delete_event(
# region # Events / Users ######################################################
@router.post("/{id}/users/{user_id}", tags=[ApiTags.USERS])
def add_user_to_event(
@router.get("/{event_id}/users/", response_model=EventUserLinksPublic)
def read_event_users(
session: SessionDep, current_user: CurrentUser, event_id: RowId, skip: int = 0, limit: int = 100
) -> Any:
"""
Retrieve all event users.
"""
event = session.get(Event, event_id)
if not event:
raise HTTPException(status_code=404, detail="Event not found")
if not current_user.has_permissions(
module=PermissionModule.EVENT,
part=PermissionPart.ADMIN,
rights=PermissionRight.MANAGE_USERS,
) and not (event.user_has_rights(user=current_user, rights=PermissionRight.MANAGE_USERS)):
raise HTTPException(status_code=403, detail="Not enough permissions")
count_statement = (select(func.count())
.select_from(EventUserLink)
.where(EventUserLink.event_id == event.id)
)
count = session.exec(count_statement).one()
statement = (select(EventUserLink)
.where(EventUserLink.event_id == event.id)
.offset(skip)
.limit(limit)
)
event_user_links = session.exec(statement).all()
return EventUserLinksPublic(data=event_user_links, count=count)
@router.post("/{event_id}/users/", tags=[ApiTags.USERS], response_model=EventUserLinkPublic)
def create_event_user(
session: SessionDep,
current_user: CurrentUser,
id: RowId,
user_id: RowId,
rights_in: PermissionRightObject,
) -> Message:
event_id: RowId,
user_in: EventUserLinkCreate,
) -> Any:
"""
Add or update a user to an event.
Create a new link between a user and an event.
"""
event = session.get(Event, id)
if user_in.rights & ~PermissionRight.ADMIN:
# FIXME: find a proper richts checker
raise HTTPException(status_code=400, detail="Invalid permission rights")
event = session.get(Event, event_id)
if not event:
raise HTTPException(status_code=404, detail="Event not found")
@@ -186,45 +227,89 @@ def add_user_to_event(
module=PermissionModule.EVENT,
part=PermissionPart.ADMIN,
rights=PermissionRight.MANAGE_USERS,
) and not (
event.user_has_rights(
user=current_user, rights=(PermissionRight.MANAGE_USERS | rights_in.rights)
)
):
raise HTTPException(status_code=400, detail="Not enough permissions")
) and not (event.user_has_rights(user=current_user, rights=(PermissionRight.MANAGE_USERS | user_in.rights))):
raise HTTPException(status_code=403, detail="Not enough permissions")
user = session.get(User, user_id)
if not event:
user = session.get(User, user_in.user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
event.add_user(user=user, rights=rights_in.rights, session=session)
return Message(
message="User added successfully"
)
user_link = event.get_user_link(user)
if user_link:
raise HTTPException(status_code=400, detail="User already part of this event")
return event.add_user(user=user, rights=user_in.rights, session=session)
@router.delete("/{id}/users/{user_id}", tags=[ApiTags.USERS])
def remove_user_from_event(
session: SessionDep, current_user: CurrentUser, id: RowId, user_id: RowId
) -> Message:
@router.put("/{event_id}/users/{user_id}", tags=[ApiTags.USERS], response_model=EventUserLinkPublic)
def update_user_in_event(
session: SessionDep,
current_user: CurrentUser,
event_id: RowId,
user_id: RowId,
user_in: EventUserLinkUpdate,
) -> Any:
"""
Remove a user from an event.
Update a user link within an event.
"""
event = session.get(Event, id)
event = session.get(Event, event_id)
if not event:
raise HTTPException(status_code=404, detail="Event not found")
if not current_user.has_permissions(
module=PermissionModule.EVENT,
part=PermissionPart.ADMIN,
rights=PermissionRight.MANAGE_USERS,
) and not event.user_has_rights(user=current_user, rights=PermissionRight.MANAGE_USERS):
raise HTTPException(status_code=403, detail="Not enough permissions")
user = session.get(User, user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
valid_flags = sum(flag.value for flag in PermissionRight)
if user_in.rights & ~valid_flags:
# FIXME: find a proper richts checker
raise HTTPException(status_code=400, detail="Invalid permission rights")
if not current_user.has_permissions(
module=PermissionModule.EVENT,
part=PermissionPart.ADMIN,
rights=PermissionRight.MANAGE_USERS,
) and not (event.user_has_rights(user=current_user, rights=(PermissionRight.MANAGE_USERS | user_in.rights))):
raise HTTPException(status_code=403, detail="Not enough permissions")
user_link = event.get_user_link(user)
if not user_link:
raise HTTPException(status_code=404, detail="User is not part of this event")
return event.update_user(user=user, rights=user_in.rights, session=session)
@router.delete("/{event_id}/users/{user_id}", tags=[ApiTags.USERS])
def remove_user_from_event(
session: SessionDep, current_user: CurrentUser, event_id: RowId, user_id: RowId
) -> Message:
"""
Remove a user link from an event.
"""
event = session.get(Event, event_id)
if not event:
raise HTTPException(status_code=404, detail="Event not found")
user = session.get(User, user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
if not current_user.has_permissions(
module=PermissionModule.EVENT,
part=PermissionPart.ADMIN,
rights=PermissionRight.MANAGE_USERS,
):
if current_user.id == user.id:
raise HTTPException(status_code=403, detail="Users are not allowed to delete themselves when they are not an super admin")
if not event.user_has_rights(user=current_user, rights=PermissionRight.MANAGE_USERS):
raise HTTPException(status_code=403, detail="Not enough permissions")
user_link = event.get_user_link(user)
if not user_link:
raise HTTPException(status_code=404, detail="User is not part of this event")
event.remove_user(user=user, session=session)
return Message(
message="User removed successfully"

View File

@@ -97,7 +97,7 @@ def read_team(session: SessionDep, current_user: CurrentUser, id: RowId) -> Any:
part=PermissionPart.ADMIN,
rights=PermissionRight.READ,
) and not (event.user_has_rights(user=current_user, rights=PermissionRight.MANAGE_TEAMS)):
raise HTTPException(status_code=400, detail="Not enough permissions")
raise HTTPException(status_code=403, detail="Not enough permissions")
return team
@@ -119,7 +119,7 @@ def create_team(
part=PermissionPart.ADMIN,
rights=PermissionRight.UPDATE,
) and not (event.user_has_rights(user=current_user, rights=PermissionRight.MANAGE_TEAMS)):
raise HTTPException(status_code=400, detail="Not enough permissions")
raise HTTPException(status_code=403, detail="Not enough permissions")
team = Team.create(create_obj=team_in, session=session)
return team
@@ -146,7 +146,7 @@ def update_team(
part=PermissionPart.ADMIN,
rights=PermissionRight.UPDATE,
) and not (event.user_has_rights(user=current_user, rights=PermissionRight.MANAGE_TEAMS)):
raise HTTPException(status_code=400, detail="Not enough permissions")
raise HTTPException(status_code=403, detail="Not enough permissions")
# Check rights for the new event data
if team_in.event_id:
@@ -159,7 +159,7 @@ def update_team(
part=PermissionPart.ADMIN,
rights=PermissionRight.UPDATE,
) and not (event.user_has_rights(user=current_user, rights=PermissionRight.MANAGE_TEAMS)):
raise HTTPException(status_code=400, detail="Not enough permissions")
raise HTTPException(status_code=403, detail="Not enough permissions")
# Update the team
team = Team.update(db_obj=team, in_obj=team_in, session=session)
@@ -184,7 +184,7 @@ def delete_team(session: SessionDep,current_user: CurrentUser, id: RowId) -> Mes
part=PermissionPart.ADMIN,
rights=PermissionRight.DELETE,
) and not (event.user_has_rights(user=current_user, rights=PermissionRight.MANAGE_TEAMS)):
raise HTTPException(status_code=400, detail="Not enough permissions")
raise HTTPException(status_code=403, detail="Not enough permissions")
session.delete(team)
session.commit()

View File

@@ -39,6 +39,7 @@ class DocumentedStrEnum(str, Enum):
class DocumentedIntFlag(IntFlag):
# TODO: Build DB sport to proper store flags and make it possible to store all mutations
pass

View File

@@ -23,8 +23,23 @@ if TYPE_CHECKING:
# region # Event ###############################################################
# Event auth
class EventUserLink(BaseSQLModel, table=True):
# Shared properties
class EventUserLinkBase(BaseSQLModel):
rights: PermissionRight = Field(default=PermissionRight.READ, nullable=False)
# Properties to receive via API on creation
class EventUserLinkCreate(EventUserLinkBase):
user_id: RowId = Field(default=None, nullable=False)
# Properties to receive via API on update, all are optional
class EventUserLinkUpdate(EventUserLinkBase):
pass
# Database model, database table inferred from class name
class EventUserLink(EventUserLinkBase, table=True):
event_id: RowId = Field(
foreign_key="event.id",
primary_key=True,
@@ -39,12 +54,22 @@ class EventUserLink(BaseSQLModel, table=True):
ondelete="CASCADE",
)
rights: PermissionRight = Field(default=PermissionRight.READ, nullable=False)
event: "Event" = Relationship(back_populates="user_links")
user: "User" = Relationship(back_populates="event_links")
# Properties to return via API
class EventUserLinkPublic(EventUserLinkBase):
user_id: RowId
event_id: RowId
class EventUserLinksPublic(BaseSQLModel):
data: list[EventUserLinkPublic]
count: int
# ##############################################################################
@@ -102,41 +127,51 @@ class Event(mixin.RowId, EventBase, table=True):
session.refresh(db_obj)
return db_obj
def get_user_link(self, user: User) -> EventUserLink | None:
return next(
(link for link in self.user_links if link.user == user), None
)
def add_user(
self,
user: User,
rights: PermissionRight = PermissionRight.READ,
*,
session: Session,
) -> "Event":
to_add = next((add for add in self.user_links if add.user == user), None)
) -> "EventUserLink | None":
to_add = self.get_user_link(user=user)
if to_add:
to_add.rights = rights
session.add(to_add)
else:
if to_add is None:
self.user_links.append(EventUserLink(event=self, user=user, rights=rights))
session.add(self.user_links[-1])
session.commit()
return self.user_links[-1]
session.commit()
return None
return self
def update_user(
self,
user: User,
rights: PermissionRight = PermissionRight.READ,
*,
session: Session,
) -> "EventUserLink | None":
to_update = self.get_user_link(user=user)
def remove_user(self, user: User, *, session: Session) -> "Event":
to_remove = next(
(remove for remove in self.user_links if remove.user == user), None
)
if to_update:
to_update.rights = rights
session.add(to_update)
session.commit()
return to_update
return None
def remove_user(self, user: User, *, session: Session) -> None:
to_remove = self.get_user_link(user=user)
if to_remove:
statement = select(EventUserLink).where(
EventUserLink.event_id == self.id, EventUserLink.user_id == user.id
)
link_to_remove = session.exec(statement).first()
session.delete(to_remove)
session.commit()
if link_to_remove:
session.delete(link_to_remove)
session.commit()
return self
def user_has_rights(
self,

View File

@@ -1,6 +1,6 @@
from typing import TYPE_CHECKING
from pydantic import EmailStr
from pydantic import EmailStr, field_validator
from sqlmodel import Field, Relationship, Session, select
from app.core.security import get_password_hash, verify_password
@@ -46,10 +46,6 @@ class PermissionRight(DocumentedIntFlag):
ADMIN = CREATE | READ | UPDATE | DELETE | MANAGE_USERS | MANAGE_TEAMS
class PermissionRightObject(BaseSQLModel):
rights: PermissionRight | None = Field(default=PermissionRight.READ, nullable=False)
# ##############################################################################
# link to User (many-to-many)
class UserRoleLink(BaseSQLModel, table=True):

View File

@@ -1,5 +1,6 @@
import uuid
import pytest
from fastapi.testclient import TestClient
from sqlmodel import Session
@@ -31,6 +32,22 @@ def test_create_event(client: TestClient, superuser_token_headers: dict[str, str
assert "end_at" in content
def test_create_event_no_permission(client: TestClient, normal_user_token_headers: dict[str, str]) -> None:
data = {
"name": "No create permission",
"contact": "Someone else",
}
response = client.post(
f"{settings.API_V1_STR}/events/",
headers=normal_user_token_headers,
json=data,
)
assert response.status_code == 403
assert response.json()["detail"] == "Not enough permissions"
def test_read_event(
client: TestClient, superuser_token_headers: dict[str, str], db: Session
) -> None:
@@ -57,8 +74,7 @@ def test_read_event_not_found(
headers=superuser_token_headers,
)
assert response.status_code == 404
content = response.json()
assert content["detail"] == "Event not found"
assert response.json()["detail"] == "Event not found"
def test_read_event_not_enough_permissions(
@@ -69,9 +85,8 @@ def test_read_event_not_enough_permissions(
f"{settings.API_V1_STR}/events/{event.id}",
headers=normal_user_token_headers,
)
assert response.status_code == 400
content = response.json()
assert content["detail"] == "Not enough permissions"
assert response.status_code == 403
assert response.json()["detail"] == "Not enough permissions"
def test_read_event_with_event_user(
@@ -163,8 +178,7 @@ def test_update_event_not_found(
json=data,
)
assert response.status_code == 404
content = response.json()
assert content["detail"] == "Event not found"
assert response.json()["detail"] == "Event not found"
def test_update_event_not_enough_permissions(
@@ -177,9 +191,31 @@ def test_update_event_not_enough_permissions(
headers=normal_user_token_headers,
json=data,
)
assert response.status_code == 400
assert response.status_code == 403
assert response.json()["detail"] == "Not enough permissions"
def test_update_event_with_eventuser(
client: TestClient, event_user_token_headers: EventUserHeader, db: Session
) -> None:
event = event_user_token_headers.event
data = {
"name": "Updated name from eventuser",
"contact": "Updated contact from eventuser",
}
response = client.put(
f"{settings.API_V1_STR}/events/{event.id}",
headers=event_user_token_headers.headers,
json=data,
)
assert response.status_code == 200
content = response.json()
assert content["detail"] == "Not enough permissions"
assert content["name"] == data["name"]
assert content["contact"] == data["contact"]
assert content["id"] == str(event.id)
assert content["is_active"] == event.is_active
assert str(content["start_at"]) == str(event.start_at)
assert str(content["end_at"]) == str(event.end_at)
def test_delete_event(
@@ -191,8 +227,7 @@ def test_delete_event(
headers=superuser_token_headers,
)
assert response.status_code == 200
content = response.json()
assert content["message"] == "Event deleted successfully"
assert response.json()["message"] == "Event deleted successfully"
def test_delete_event_not_found(
@@ -215,9 +250,8 @@ def test_delete_event_not_enough_permissions(
f"{settings.API_V1_STR}/events/{event.id}",
headers=normal_user_token_headers,
)
assert response.status_code == 400
content = response.json()
assert content["detail"] == "Not enough permissions"
assert response.status_code == 403
assert response.json()["detail"] == "Not enough permissions"
def test_delete_event_admin_user(
@@ -246,9 +280,8 @@ def test_delete_event_not_enough_permissions_for_this_event(
f"{settings.API_V1_STR}/events/{event.id}",
headers=authentication_token_from_user(db=db, user=user, client=client),
)
assert response.status_code == 400
content = response.json()
assert content["detail"] == "Not enough permissions"
assert response.status_code == 403
assert response.json()["detail"] == "Not enough permissions"
def test_delete_event_event_user_read_only_rights(
@@ -262,13 +295,545 @@ def test_delete_event_event_user_read_only_rights(
f"{settings.API_V1_STR}/events/{event.id}",
headers=authentication_token_from_user(db=db, user=user, client=client),
)
assert response.status_code == 400
assert response.status_code == 403
assert response.json()["detail"] == "Not enough permissions"
def test_read_all_event_users(
client: TestClient, superuser_token_headers: dict[str, str], db: Session
) -> None:
event = create_random_event(db)
user1 = create_random_user(db)
user2 = create_random_user(db)
event.add_user(user=user1, rights=PermissionRight.READ, session=db)
event.add_user(user=user2, rights=PermissionRight.ADMIN, session=db)
response = client.get(
f"{settings.API_V1_STR}/events/{event.id}/users",
headers=superuser_token_headers,
)
assert response.status_code == 200
content = response.json()
assert content["detail"] == "Not enough permissions"
assert "count" in content
assert content["count"] == 2
assert "data" in content
assert isinstance(content["data"], list)
assert len(content["data"]) <= content["count"]
# TODO: Add user (super, less rights, own rights, more rights) (*** user without rights)
# TODO: Edit user rights (super, less rights, own rights, more rights) (*** user without rights)
# TODO: Remove user (*** user without rights)
# TODO: Remove own user (is allowed)
# TODO: Remove not linked user
def test_read_all_event_users_no_permission(
client: TestClient, normal_user_token_headers: dict[str, str], db: Session
) -> None:
event = create_random_event(db)
response = client.get(
f"{settings.API_V1_STR}/events/{event.id}/users",
headers=normal_user_token_headers,
)
assert response.status_code == 403
assert response.json()["detail"] == "Not enough permissions"
def test_read_all_event_users_with_event_user(
client: TestClient, db: Session
) -> None:
event = create_random_event(db)
user = create_random_user(db)
event.add_user(user=user, rights=PermissionRight.MANAGE_USERS, session=db)
response = client.get(
f"{settings.API_V1_STR}/events/{event.id}/users",
headers=authentication_token_from_user(db=db, user=user, client=client),
)
assert response.status_code == 200
content = response.json()
assert "count" in content
assert content["count"] == 1
assert "data" in content
assert isinstance(content["data"], list)
assert len(content["data"]) <= content["count"]
def test_read_all_event_users_with_event_user_no_permission(
client: TestClient, db: Session
) -> None:
event = create_random_event(db)
user = create_random_user(db)
event.add_user(user=user, rights=PermissionRight.READ, session=db)
response = client.get(
f"{settings.API_V1_STR}/events/{event.id}/users",
headers=authentication_token_from_user(db=db, user=user, client=client),
)
assert response.status_code == 403
assert response.json()["detail"] == "Not enough permissions"
def test_add_user_to_event_not_found(
client: TestClient, superuser_token_headers: dict[str, str], db: Session
) -> None:
response = client.get(
f"{settings.API_V1_STR}/events/{uuid.uuid4()}/users",
headers=superuser_token_headers,
)
assert response.status_code == 404
assert response.json()["detail"] == "Event not found"
def test_add_user_to_event(
client: TestClient, superuser_token_headers: dict[str, str], db: Session
) -> None:
event = create_random_event(db)
user = create_random_user(db)
data = {
"user_id": str(user.id),
"rights": PermissionRight.READ,
}
response = client.post(
f"{settings.API_V1_STR}/events/{event.id}/users",
headers=superuser_token_headers,
json=data,
)
assert response.status_code == 200
content = response.json()
assert "rights" in content
assert content["rights"] == PermissionRight.READ
assert content["user_id"] == str(user.id)
assert content["event_id"] == str(event.id)
def test_add_user_to_event_event_not_found(
client: TestClient, superuser_token_headers: dict[str, str], db: Session
) -> None:
user = create_random_user(db)
data = {
"user_id": str(user.id),
"rights": PermissionRight.READ,
}
response = client.post(
f"{settings.API_V1_STR}/events/{uuid.uuid4()}/users",
headers=superuser_token_headers,
json=data,
)
assert response.status_code == 404
assert response.json()["detail"] == "Event not found"
def test_add_user_to_event_user_not_found(
client: TestClient, superuser_token_headers: dict[str, str], db: Session
) -> None:
event = create_random_event(db)
data = {
"user_id": str(uuid.uuid4()),
"rights": PermissionRight.READ,
}
response = client.post(
f"{settings.API_V1_STR}/events/{event.id}/users",
headers=superuser_token_headers,
json=data,
)
assert response.status_code == 404
assert response.json()["detail"] == "User not found"
def test_add_user_to_event_already_exists(
client: TestClient, superuser_token_headers: dict[str, str], db: Session
) -> None:
event = create_random_event(db)
user = create_random_user(db)
event.add_user(user=user, rights=PermissionRight.READ, session=db)
data = {
"user_id": str(user.id),
"rights": PermissionRight.ADMIN,
}
response = client.post(
f"{settings.API_V1_STR}/events/{event.id}/users",
headers=superuser_token_headers,
json=data,
)
assert response.status_code == 400
assert response.json()["detail"] == "User already part of this event"
def test_add_user_to_event_no_permissions(
client: TestClient, normal_user_token_headers: dict[str, str], db: Session
) -> None:
event = create_random_event(db)
user = create_random_user(db)
data = {
"user_id": str(user.id),
"rights": PermissionRight.READ,
}
response = client.post(
f"{settings.API_V1_STR}/events/{event.id}/users",
headers=normal_user_token_headers,
json=data,
)
assert response.status_code == 403
assert response.json()["detail"] == "Not enough permissions"
def test_add_user_to_event_unknown_rights(
client: TestClient, superuser_token_headers: dict[str, str], db: Session
) -> None:
event = create_random_event(db)
user = create_random_user(db)
data = {
"user_id": str(user.id),
"rights": -1, # Invalid permission value
}
response = client.post(
f"{settings.API_V1_STR}/events/{event.id}/users",
headers=superuser_token_headers,
json=data,
)
assert response.status_code == 400
assert response.json()["detail"] == "Invalid permission rights"
def test_add_user_with_more_rights_than_current_user(
client: TestClient, db: Session
) -> None:
event = create_random_event(db)
limited_user = create_random_user(db)
event.add_user(user=limited_user, rights=PermissionRight.MANAGE_USERS, session=db)
target_user = create_random_user(db)
data = {
"user_id": str(target_user.id),
"rights": PermissionRight.ADMIN,
}
response = client.post(
f"{settings.API_V1_STR}/events/{event.id}/users",
headers=authentication_token_from_user(db=db, user=limited_user, client=client),
json=data,
)
assert response.status_code == 403
assert response.json()["detail"] == "Not enough permissions"
@pytest.mark.xfail(reason="Combined rights add might not yet be supported")
def test_add_user_rights_combined(
client: TestClient, superuser_token_headers: dict[str, str], db: Session
) -> None:
event = create_random_event(db)
data = {
"rights": PermissionRight.READ | PermissionRight.UPDATE,
}
response = client.post(
f"{settings.API_V1_STR}/events/{event.id}/users",
headers=superuser_token_headers,
json=data,
)
assert response.status_code == 200
content = response.json()
assert "rights" in content
assert content["rights"] == data["rights"]
assert content["event_id"] == str(event.id)
def test_update_user_inside_event(
client: TestClient, superuser_token_headers: dict[str, str], db: Session
) -> None:
event = create_random_event(db)
user = create_random_user(db)
event.add_user(user=user, rights=PermissionRight.READ, session=db)
data = {
"rights": PermissionRight.UPDATE,
}
response = client.put(
f"{settings.API_V1_STR}/events/{event.id}/users/{user.id}",
headers=superuser_token_headers,
json=data,
)
assert response.status_code == 200
content = response.json()
assert "rights" in content
assert content["rights"] == data["rights"]
assert content["user_id"] == str(user.id)
assert content["event_id"] == str(event.id)
def test_update_event_user_event_not_found(
client: TestClient, superuser_token_headers: dict[str, str], db: Session
):
user = create_random_user(db)
data = {
"rights": PermissionRight.UPDATE,
}
response = client.put(
f"{settings.API_V1_STR}/events/{uuid.uuid4()}/users/{user.id}",
headers=superuser_token_headers,
json=data,
)
assert response.status_code == 404
assert response.json()["detail"] == "Event not found"
def test_update_event_user_user_not_found(
client: TestClient, superuser_token_headers: dict[str, str], db: Session
):
event = create_random_event(db)
data = {
"rights": PermissionRight.UPDATE,
}
response = client.put(
f"{settings.API_V1_STR}/events/{event.id}/users/{uuid.uuid4()}",
headers=superuser_token_headers,
json=data,
)
assert response.status_code == 404
assert response.json()["detail"] == "User not found"
def test_update_event_user_unknown_rights(
client: TestClient, superuser_token_headers: dict[str, str], db: Session
):
event = create_random_event(db)
user = create_random_user(db)
event.add_user(user=user, rights=PermissionRight.READ, session=db)
data = {
"rights": -1, # Invalid permission value
}
response = client.put(
f"{settings.API_V1_STR}/events/{event.id}/users/{user.id}",
headers=superuser_token_headers,
json=data,
)
assert response.status_code == 400
assert response.json()["detail"] == "Invalid permission rights"
def test_update_event_user_not_enough_permissions(
client: TestClient, normal_user_token_headers: dict[str, str], db: Session
):
event = create_random_event(db)
user = create_random_user(db)
event.add_user(user=user, rights=PermissionRight.ADMIN, session=db)
data = {
"rights": PermissionRight.READ
}
response = client.put(
f"{settings.API_V1_STR}/events/{event.id}/users/{user.id}",
headers=normal_user_token_headers,
json=data,
)
assert response.status_code == 403
assert response.json()["detail"] == "Not enough permissions"
def test_update_event_user_with_event_user_same_event(
client: TestClient, db: Session
) -> None:
event = create_random_event(db)
user1 = create_random_user(db)
user2 = create_random_user(db)
event.add_user(user=user1, rights=PermissionRight.ADMIN, session=db)
event.add_user(user=user2, rights=PermissionRight.READ, session=db)
data = {
"rights": PermissionRight.UPDATE,
}
# event_user1 tries to update event_user2 rights
response = client.put(
f"{settings.API_V1_STR}/events/{event.id}/users/{user2.id}",
headers=authentication_token_from_user(db=db, user=user1, client=client),
json=data,
)
assert response.status_code == 200
content = response.json()
assert content["rights"] == data["rights"]
assert content["user_id"] == str(user2.id)
assert content["event_id"] == str(event.id)
def test_update_event_user_from_other_event_forbidden(
client: TestClient, db: Session
) -> None:
event1 = create_random_event(db)
event2 = create_random_event(db)
user1 = create_random_user(db)
user2 = create_random_user(db)
event1.add_user(user=user1, rights=PermissionRight.ADMIN, session=db)
event2.add_user(user=user2, rights=PermissionRight.READ, session=db)
data = {
"rights": PermissionRight.UPDATE,
}
response = client.put(
f"{settings.API_V1_STR}/events/{event2.id}/users/{user2.id}",
headers=authentication_token_from_user(db=db, user=user1, client=client),
json=data,
)
assert response.status_code == 403
assert response.json()["detail"] == "Not enough permissions"
def test_update_event_user_from_other_event_thru_own_event(
client: TestClient, db: Session
) -> None:
event1 = create_random_event(db)
event2 = create_random_event(db)
user1 = create_random_user(db)
user2 = create_random_user(db)
event1.add_user(user=user1, rights=PermissionRight.ADMIN, session=db)
event2.add_user(user=user2, rights=PermissionRight.READ, session=db)
data = {
"rights": PermissionRight.UPDATE,
}
response = client.put(
f"{settings.API_V1_STR}/events/{event1.id}/users/{user2.id}",
headers=authentication_token_from_user(db=db, user=user1, client=client),
json=data,
)
assert response.status_code == 404
assert response.json()["detail"] == "User is not part of this event"
@pytest.mark.xfail(reason="Combined rights update might not yet be supported")
def test_update_user_rights_combined(
client: TestClient, superuser_token_headers: dict[str, str], db: Session
) -> None:
event = create_random_event(db)
user = create_random_user(db)
# Initially assign READ only rights
event.add_user(user=user, rights=PermissionRight.READ, session=db)
data = {
"rights": PermissionRight.READ | PermissionRight.UPDATE,
}
response = client.put(
f"{settings.API_V1_STR}/events/{event.id}/users/{user.id}",
headers=superuser_token_headers,
json=data,
)
assert response.status_code == 200
content = response.json()
assert "rights" in content
assert content["rights"] == data["rights"]
assert content["user_id"] == str(user.id)
assert content["event_id"] == str(event.id)
def test_remove_user_from_event(
client: TestClient, superuser_token_headers: dict[str, str], db: Session
) -> None:
event = create_random_event(db)
user = create_random_user(db)
event.add_user(user=user, rights=PermissionRight.READ, session=db)
response = client.delete(
f"{settings.API_V1_STR}/events/{event.id}/users/{user.id}",
headers=superuser_token_headers,
)
assert response.status_code == 200
assert response.json()["message"] == "User removed successfully"
# assert not event.get_user_link(user)
def test_remove_user_from_event_event_not_found(
client: TestClient, superuser_token_headers: dict[str, str], db: Session
) -> None:
event = create_random_event(db)
user = create_random_user(db)
event.add_user(user=user, rights=PermissionRight.READ, session=db)
response = client.delete(
f"{settings.API_V1_STR}/events/{uuid.uuid4()}/users/{user.id}",
headers=superuser_token_headers,
)
assert response.status_code == 404
assert response.json()["detail"] == "Event not found"
def test_remove_user_from_event_user_not_found(
client: TestClient, superuser_token_headers: dict[str, str], db: Session
) -> None:
event = create_random_event(db)
response = client.delete(
f"{settings.API_V1_STR}/events/{event.id}/users/{uuid.uuid4()}",
headers=superuser_token_headers,
)
assert response.status_code == 404
assert response.json()["detail"] == "User not found"
def test_remove_user_from_event_user_not_in_event(
client: TestClient, superuser_token_headers: dict[str, str], db: Session
) -> None:
event = create_random_event(db)
user = create_random_user(db)
response = client.delete(
f"{settings.API_V1_STR}/events/{event.id}/users/{user.id}",
headers=superuser_token_headers,
)
assert response.status_code == 404
assert response.json()["detail"] == "User is not part of this event"
def test_remove_user_from_event_insufficient_permissions(
client: TestClient, db: Session
) -> None:
event = create_random_event(db)
user = create_random_user(db)
event.add_user(user=user, rights=PermissionRight.READ, session=db)
limited_user = create_random_user(db)
event.add_user(user=limited_user, rights=PermissionRight.READ, session=db)
response = client.delete(
f"{settings.API_V1_STR}/events/{event.id}/users/{user.id}",
headers=authentication_token_from_user(db=db, user=limited_user, client=client),
)
assert response.status_code == 403
assert response.json()["detail"] == "Not enough permissions"
def test_remove_own_user_from_event(
client: TestClient, superuser_token_headers: dict[str, str], db: Session
) -> None:
event = create_random_event(db)
user = create_random_user(db)
event.add_user(user=user, rights=PermissionRight.MANAGE_USERS, session=db)
response = client.delete(
f"{settings.API_V1_STR}/events/{event.id}/users/{user.id}",
headers=authentication_token_from_user(db=db, user=user, client=client),
)
assert response.status_code == 403
assert response.json()["detail"] == "Users are not allowed to delete themselves when they are not an super admin"

View File

@@ -82,7 +82,7 @@ def test_read_event_not_enough_permissions(client: TestClient, normal_user_token
f"{settings.API_V1_STR}/teams/{team.id}",
headers=normal_user_token_headers,
)
assert response.status_code == 400
assert response.status_code == 403
assert response.json()["detail"] == "Not enough permissions"
@@ -207,7 +207,7 @@ def test_update_team_not_enough_permissions(client: TestClient, normal_user_toke
headers=normal_user_token_headers,
json=data,
)
assert response.status_code == 400
assert response.status_code == 403
assert response.json()["detail"] == "Not enough permissions"
@@ -288,7 +288,7 @@ def test_update_team_event_with_event_user_not_enough_permissions(client: TestCl
json=data,
)
assert response.status_code == 400
assert response.status_code == 403
assert response.json()["detail"] == "Not enough permissions"
@@ -317,7 +317,7 @@ def test_delete_not_enough_permissions(client: TestClient, normal_user_token_hea
f"{settings.API_V1_STR}/teams/{team.id}",
headers=normal_user_token_headers,
)
assert response.status_code == 400
assert response.status_code == 403
assert response.json()["detail"] == "Not enough permissions"