Use proper HTTP status codes

This commit is contained in:
Sebastiaan
2025-06-09 22:35:53 +02:00
parent c4d1871835
commit eac43be278
10 changed files with 173 additions and 168 deletions

View File

@@ -1,6 +1,6 @@
from typing import Any from typing import Any
from fastapi import APIRouter, HTTPException from fastapi import APIRouter, HTTPException, status
from sqlmodel import func, select from sqlmodel import func, select
from app.api.deps import CurrentUser, SessionDep from app.api.deps import CurrentUser, SessionDep
@@ -83,14 +83,14 @@ def read_event(session: SessionDep, current_user: CurrentUser, id: RowId) -> Any
""" """
event = session.get(Event, id) event = session.get(Event, id)
if not event: if not event:
raise HTTPException(status_code=404, detail="Event not found") raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Event not found")
if not current_user.has_permissions( if not current_user.has_permissions(
module=PermissionModule.EVENT, module=PermissionModule.EVENT,
part=PermissionPart.ADMIN, part=PermissionPart.ADMIN,
rights=PermissionRight.READ, rights=PermissionRight.READ,
) and not (event.user_has_rights(user=current_user, rights=PermissionRight.READ)): ) and not (event.user_has_rights(user=current_user, rights=PermissionRight.READ)):
raise HTTPException(status_code=403, detail="Not enough permissions") raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions")
return event return event
@@ -107,7 +107,7 @@ def create_event(
part=PermissionPart.ADMIN, part=PermissionPart.ADMIN,
rights=PermissionRight.CREATE, rights=PermissionRight.CREATE,
): ):
raise HTTPException(status_code=403, detail="Not enough permissions") raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions")
event = Event.create(create_obj=event_in, session=session) event = Event.create(create_obj=event_in, session=session)
event.add_user(user=current_user, rights=PermissionRight.ADMIN, session=session) event.add_user(user=current_user, rights=PermissionRight.ADMIN, session=session)
@@ -127,14 +127,14 @@ def update_event(
""" """
event = session.get(Event, id) event = session.get(Event, id)
if not event: if not event:
raise HTTPException(status_code=404, detail="Event not found") raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Event not found")
if not current_user.has_permissions( if not current_user.has_permissions(
module=PermissionModule.EVENT, module=PermissionModule.EVENT,
part=PermissionPart.ADMIN, part=PermissionPart.ADMIN,
rights=PermissionRight.UPDATE, rights=PermissionRight.UPDATE,
) and not (event.user_has_rights(user=current_user, rights=PermissionRight.UPDATE)): ) and not (event.user_has_rights(user=current_user, rights=PermissionRight.UPDATE)):
raise HTTPException(status_code=403, detail="Not enough permissions") raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions")
return Event.update(db_obj=event, in_obj=event_in, session=session) return Event.update(db_obj=event, in_obj=event_in, session=session)
@@ -150,14 +150,14 @@ def delete_event(
""" """
event = session.get(Event, id) event = session.get(Event, id)
if not event: if not event:
raise HTTPException(status_code=404, detail="Event not found") raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Event not found")
if not current_user.has_permissions( if not current_user.has_permissions(
module=PermissionModule.EVENT, module=PermissionModule.EVENT,
part=PermissionPart.ADMIN, part=PermissionPart.ADMIN,
rights=PermissionRight.DELETE, rights=PermissionRight.DELETE,
) and not (event.user_has_rights(user=current_user, rights=PermissionRight.DELETE)): ) and not (event.user_has_rights(user=current_user, rights=PermissionRight.DELETE)):
raise HTTPException(status_code=403, detail="Not enough permissions") raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions")
session.delete(event) session.delete(event)
session.commit() session.commit()
@@ -180,14 +180,14 @@ def read_event_users(
event = session.get(Event, event_id) event = session.get(Event, event_id)
if not event: if not event:
raise HTTPException(status_code=404, detail="Event not found") raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Event not found")
if not current_user.has_permissions( if not current_user.has_permissions(
module=PermissionModule.EVENT, module=PermissionModule.EVENT,
part=PermissionPart.ADMIN, part=PermissionPart.ADMIN,
rights=PermissionRight.MANAGE_USERS, rights=PermissionRight.MANAGE_USERS,
) and not (event.user_has_rights(user=current_user, rights=PermissionRight.MANAGE_USERS)): ) and not (event.user_has_rights(user=current_user, rights=PermissionRight.MANAGE_USERS)):
raise HTTPException(status_code=403, detail="Not enough permissions") raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions")
count_statement = (select(func.count()) count_statement = (select(func.count())
.select_from(EventUserLink) .select_from(EventUserLink)
@@ -217,26 +217,26 @@ def create_event_user(
if user_in.rights & ~PermissionRight.ADMIN: if user_in.rights & ~PermissionRight.ADMIN:
# FIXME: find a proper richts checker # FIXME: find a proper richts checker
raise HTTPException(status_code=400, detail="Invalid permission rights") raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid permission rights")
event = session.get(Event, event_id) event = session.get(Event, event_id)
if not event: if not event:
raise HTTPException(status_code=404, detail="Event not found") raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Event not found")
if not current_user.has_permissions( if not current_user.has_permissions(
module=PermissionModule.EVENT, module=PermissionModule.EVENT,
part=PermissionPart.ADMIN, part=PermissionPart.ADMIN,
rights=PermissionRight.MANAGE_USERS, rights=PermissionRight.MANAGE_USERS,
) and not (event.user_has_rights(user=current_user, rights=(PermissionRight.MANAGE_USERS | user_in.rights))): ) and not (event.user_has_rights(user=current_user, rights=(PermissionRight.MANAGE_USERS | user_in.rights))):
raise HTTPException(status_code=403, detail="Not enough permissions") raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions")
user = session.get(User, user_in.user_id) user = session.get(User, user_in.user_id)
if not user: if not user:
raise HTTPException(status_code=404, detail="User not found") raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found")
user_link = event.get_user_link(user) user_link = event.get_user_link(user)
if user_link: if user_link:
raise HTTPException(status_code=400, detail="User already part of this event") raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="User already part of this event")
return event.add_user(user=user, rights=user_in.rights, session=session) return event.add_user(user=user, rights=user_in.rights, session=session)
@@ -255,27 +255,27 @@ def update_user_in_event(
event = session.get(Event, event_id) event = session.get(Event, event_id)
if not event: if not event:
raise HTTPException(status_code=404, detail="Event not found") raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Event not found")
user = session.get(User, user_id) user = session.get(User, user_id)
if not user: if not user:
raise HTTPException(status_code=404, detail="User not found") raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found")
valid_flags = sum(flag.value for flag in PermissionRight) valid_flags = sum(flag.value for flag in PermissionRight)
if user_in.rights & ~valid_flags: if user_in.rights & ~valid_flags:
# FIXME: find a proper richts checker # FIXME: find a proper richts checker
raise HTTPException(status_code=400, detail="Invalid permission rights") raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid permission rights")
if not current_user.has_permissions( if not current_user.has_permissions(
module=PermissionModule.EVENT, module=PermissionModule.EVENT,
part=PermissionPart.ADMIN, part=PermissionPart.ADMIN,
rights=PermissionRight.MANAGE_USERS, rights=PermissionRight.MANAGE_USERS,
) and not (event.user_has_rights(user=current_user, rights=(PermissionRight.MANAGE_USERS | user_in.rights))): ) and not (event.user_has_rights(user=current_user, rights=(PermissionRight.MANAGE_USERS | user_in.rights))):
raise HTTPException(status_code=403, detail="Not enough permissions") raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions")
user_link = event.get_user_link(user) user_link = event.get_user_link(user)
if not user_link: if not user_link:
raise HTTPException(status_code=404, detail="User is not part of this event") raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User is not part of this event")
return event.update_user(user=user, rights=user_in.rights, session=session) return event.update_user(user=user, rights=user_in.rights, session=session)
@@ -289,11 +289,11 @@ def remove_user_from_event(
""" """
event = session.get(Event, event_id) event = session.get(Event, event_id)
if not event: if not event:
raise HTTPException(status_code=404, detail="Event not found") raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Event not found")
user = session.get(User, user_id) user = session.get(User, user_id)
if not user: if not user:
raise HTTPException(status_code=404, detail="User not found") raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found")
if not current_user.has_permissions( if not current_user.has_permissions(
module=PermissionModule.EVENT, module=PermissionModule.EVENT,
@@ -301,14 +301,14 @@ def remove_user_from_event(
rights=PermissionRight.MANAGE_USERS, rights=PermissionRight.MANAGE_USERS,
): ):
if current_user.id == user.id: if current_user.id == user.id:
raise HTTPException(status_code=403, detail="Users are not allowed to delete themselves when they are not an super admin") raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Users are not allowed to delete themselves when they are not an super admin")
if not event.user_has_rights(user=current_user, rights=PermissionRight.MANAGE_USERS): if not event.user_has_rights(user=current_user, rights=PermissionRight.MANAGE_USERS):
raise HTTPException(status_code=403, detail="Not enough permissions") raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions")
user_link = event.get_user_link(user) user_link = event.get_user_link(user)
if not user_link: if not user_link:
raise HTTPException(status_code=404, detail="User is not part of this event") raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User is not part of this event")
event.remove_user(user=user, session=session) event.remove_user(user=user, session=session)
return Message( return Message(

View File

@@ -1,7 +1,7 @@
from datetime import timedelta from datetime import timedelta
from typing import Annotated, Any from typing import Annotated, Any
from fastapi import APIRouter, Depends, HTTPException from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.responses import HTMLResponse from fastapi.responses import HTMLResponse
from fastapi.security import OAuth2PasswordRequestForm from fastapi.security import OAuth2PasswordRequestForm
@@ -33,9 +33,9 @@ def login_access_token(
session=session, email=form_data.username, password=form_data.password session=session, email=form_data.username, password=form_data.password
) )
if not user: if not user:
raise HTTPException(status_code=400, detail="Incorrect email or password") raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Incorrect email or password")
elif not user.is_active: elif not user.is_active:
raise HTTPException(status_code=400, detail="Inactive user") raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Inactive user")
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES) access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
return Token( return Token(
access_token=security.create_access_token( access_token=security.create_access_token(
@@ -54,9 +54,9 @@ def login_apikey(
""" """
user = ApiKey.authenticate(session=session, api_key=api_key) user = ApiKey.authenticate(session=session, api_key=api_key)
if not user: if not user:
raise HTTPException(status_code=400, detail="Incorrect apikey") raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Incorrect apikey")
elif not user.is_active: elif not user.is_active:
raise HTTPException(status_code=400, detail="Inactive user") raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Inactive user")
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES) access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
return Token( return Token(
access_token=security.create_access_token( access_token=security.create_access_token(
@@ -82,7 +82,7 @@ def recover_password(email: str, session: SessionDep) -> Message:
if not user: if not user:
raise HTTPException( raise HTTPException(
status_code=404, status_code=status.HTTP_404_NOT_FOUND,
detail="The user with this email does not exist in the system.", detail="The user with this email does not exist in the system.",
) )
password_reset_token = generate_password_reset_token(email=email) password_reset_token = generate_password_reset_token(email=email)
@@ -104,15 +104,15 @@ def reset_password(session: SessionDep, body: NewPassword) -> Message:
""" """
email = verify_password_reset_token(token=body.token) email = verify_password_reset_token(token=body.token)
if not email: if not email:
raise HTTPException(status_code=400, detail="Invalid token") raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid token")
user = User.get_by_email(session=session, email=email) user = User.get_by_email(session=session, email=email)
if not user: if not user:
raise HTTPException( raise HTTPException(
status_code=404, status_code=status.HTTP_404_NOT_FOUND,
detail="The user with this email does not exist in the system.", detail="The user with this email does not exist in the system.",
) )
elif not user.is_active: elif not user.is_active:
raise HTTPException(status_code=400, detail="Inactive user") raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Inactive user")
hashed_password = get_password_hash(password=body.new_password) hashed_password = get_password_hash(password=body.new_password)
user.hashed_password = hashed_password user.hashed_password = hashed_password
session.add(user) session.add(user)
@@ -133,7 +133,7 @@ def recover_password_html_content(email: str, session: SessionDep) -> Any:
if not user: if not user:
raise HTTPException( raise HTTPException(
status_code=404, status_code=status.HTTP_404_NOT_FOUND,
detail="The user with this username does not exist in the system.", detail="The user with this username does not exist in the system.",
) )
password_reset_token = generate_password_reset_token(email=email) password_reset_token = generate_password_reset_token(email=email)

View File

@@ -1,6 +1,6 @@
from typing import Any from typing import Any
from fastapi import APIRouter, HTTPException from fastapi import APIRouter, HTTPException, status
from sqlmodel import func, select from sqlmodel import func, select
from app.api.deps import CurrentUser, SessionDep from app.api.deps import CurrentUser, SessionDep
@@ -86,18 +86,18 @@ def read_team(session: SessionDep, current_user: CurrentUser, id: RowId) -> Any:
""" """
team = session.get(Team, id) team = session.get(Team, id)
if not team: if not team:
raise HTTPException(status_code=404, detail="Team not found") raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Team not found")
event = session.get(Event, team.event_id) event = session.get(Event, team.event_id)
if not event: if not event:
raise HTTPException(status_code=404, detail="Event not found") raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Event not found")
if not current_user.has_permissions( if not current_user.has_permissions(
module=PermissionModule.TEAM, module=PermissionModule.TEAM,
part=PermissionPart.ADMIN, part=PermissionPart.ADMIN,
rights=PermissionRight.READ, rights=PermissionRight.READ,
) and not (event.user_has_rights(user=current_user, rights=PermissionRight.MANAGE_TEAMS)): ) and not (event.user_has_rights(user=current_user, rights=PermissionRight.MANAGE_TEAMS)):
raise HTTPException(status_code=403, detail="Not enough permissions") raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions")
return team return team
@@ -112,14 +112,14 @@ def create_team(
event = session.get(Event, team_in.event_id) event = session.get(Event, team_in.event_id)
if not event: if not event:
raise HTTPException(status_code=404, detail="Event not found") raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Event not found")
if not current_user.has_permissions( if not current_user.has_permissions(
module=PermissionModule.TEAM, module=PermissionModule.TEAM,
part=PermissionPart.ADMIN, part=PermissionPart.ADMIN,
rights=PermissionRight.UPDATE, rights=PermissionRight.UPDATE,
) and not (event.user_has_rights(user=current_user, rights=PermissionRight.MANAGE_TEAMS)): ) and not (event.user_has_rights(user=current_user, rights=PermissionRight.MANAGE_TEAMS)):
raise HTTPException(status_code=403, detail="Not enough permissions") raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions")
team = Team.create(create_obj=team_in, session=session) team = Team.create(create_obj=team_in, session=session)
return team return team
@@ -134,32 +134,32 @@ def update_team(
""" """
team = session.get(Team, id) team = session.get(Team, id)
if not team: if not team:
raise HTTPException(status_code=404, detail="Team not found") raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Team not found")
# Check user's permissions for the existing event # Check user's permissions for the existing event
event = session.get(Event, team.event_id) event = session.get(Event, team.event_id)
if not event: if not event:
raise HTTPException(status_code=404, detail="Event not found") raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Event not found")
if not current_user.has_permissions( if not current_user.has_permissions(
module=PermissionModule.TEAM, module=PermissionModule.TEAM,
part=PermissionPart.ADMIN, part=PermissionPart.ADMIN,
rights=PermissionRight.UPDATE, rights=PermissionRight.UPDATE,
) and not (event.user_has_rights(user=current_user, rights=PermissionRight.MANAGE_TEAMS)): ) and not (event.user_has_rights(user=current_user, rights=PermissionRight.MANAGE_TEAMS)):
raise HTTPException(status_code=403, detail="Not enough permissions") raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions")
# Check rights for the new event data # Check rights for the new event data
if team_in.event_id: if team_in.event_id:
event = session.get(Event, team_in.event_id) event = session.get(Event, team_in.event_id)
if not event: if not event:
raise HTTPException(status_code=404, detail="New event not found") raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="New event not found")
if not current_user.has_permissions( if not current_user.has_permissions(
module=PermissionModule.TEAM, module=PermissionModule.TEAM,
part=PermissionPart.ADMIN, part=PermissionPart.ADMIN,
rights=PermissionRight.UPDATE, rights=PermissionRight.UPDATE,
) and not (event.user_has_rights(user=current_user, rights=PermissionRight.MANAGE_TEAMS)): ) and not (event.user_has_rights(user=current_user, rights=PermissionRight.MANAGE_TEAMS)):
raise HTTPException(status_code=403, detail="Not enough permissions") raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions")
# Update the team # Update the team
team = Team.update(db_obj=team, in_obj=team_in, session=session) team = Team.update(db_obj=team, in_obj=team_in, session=session)
@@ -173,18 +173,18 @@ def delete_team(session: SessionDep,current_user: CurrentUser, id: RowId) -> Mes
""" """
team = session.get(Team, id) team = session.get(Team, id)
if not team: if not team:
raise HTTPException(status_code=404, detail="Team not found") raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Team not found")
event = session.get(Event, team.event_id) event = session.get(Event, team.event_id)
if not event: if not event:
raise HTTPException(status_code=404, detail="Event not found") raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Event not found")
if not current_user.has_permissions( if not current_user.has_permissions(
module=PermissionModule.TEAM, module=PermissionModule.TEAM,
part=PermissionPart.ADMIN, part=PermissionPart.ADMIN,
rights=PermissionRight.DELETE, rights=PermissionRight.DELETE,
) and not (event.user_has_rights(user=current_user, rights=PermissionRight.MANAGE_TEAMS)): ) and not (event.user_has_rights(user=current_user, rights=PermissionRight.MANAGE_TEAMS)):
raise HTTPException(status_code=403, detail="Not enough permissions") raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions")
session.delete(team) session.delete(team)
session.commit() session.commit()

View File

@@ -1,7 +1,7 @@
import uuid import uuid
from typing import Any from typing import Any
from fastapi import APIRouter, Depends, HTTPException from fastapi import APIRouter, Depends, HTTPException, status
from sqlmodel import func, select from sqlmodel import func, select
from app.api.deps import ( from app.api.deps import (
@@ -66,7 +66,7 @@ def create_user(*, session: SessionDep, user_in: UserCreate) -> Any:
user = User.get_by_email(session=session, email=user_in.email) user = User.get_by_email(session=session, email=user_in.email)
if user: if user:
raise HTTPException( raise HTTPException(
status_code=400, status_code=status.HTTP_400_BAD_REQUEST,
detail="The user with this email already exists in the system.", detail="The user with this email already exists in the system.",
) )
@@ -95,7 +95,7 @@ def update_user_me(
existing_user = User.get_by_email(session=session, email=user_in.email) existing_user = User.get_by_email(session=session, email=user_in.email)
if existing_user and existing_user.id != current_user.id: if existing_user and existing_user.id != current_user.id:
raise HTTPException( raise HTTPException(
status_code=409, detail="User with this email already exists" status_code=status.HTTP_409_CONFLICT, detail="User with this email already exists"
) )
user_data = user_in.model_dump(exclude_unset=True) user_data = user_in.model_dump(exclude_unset=True)
current_user.sqlmodel_update(user_data) current_user.sqlmodel_update(user_data)
@@ -113,10 +113,10 @@ def update_password_me(
Update own password. Update own password.
""" """
if not verify_password(body.current_password, current_user.hashed_password): if not verify_password(body.current_password, current_user.hashed_password):
raise HTTPException(status_code=400, detail="Incorrect password") raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Incorrect password")
if body.current_password == body.new_password: if body.current_password == body.new_password:
raise HTTPException( raise HTTPException(
status_code=400, detail="New password cannot be the same as the current one" status_code=status.HTTP_400_BAD_REQUEST, detail="New password cannot be the same as the current one"
) )
hashed_password = get_password_hash(body.new_password) hashed_password = get_password_hash(body.new_password)
current_user.hashed_password = hashed_password current_user.hashed_password = hashed_password
@@ -184,7 +184,7 @@ def delete_apikey_me(
session.commit() session.commit()
return Message(message="Api key deleted successfully") return Message(message="Api key deleted successfully")
raise HTTPException(status_code=404, detail="API key not found") raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="API key not found")
@router.get("/me", response_model=UserPublic) @router.get("/me", response_model=UserPublic)
@@ -206,7 +206,7 @@ def delete_user_me(session: SessionDep, current_user: CurrentUser) -> Any:
rights=PermissionRight.DELETE, rights=PermissionRight.DELETE,
): ):
raise HTTPException( raise HTTPException(
status_code=403, detail="Super users are not allowed to delete themselves" status_code=status.HTTP_403_FORBIDDEN, detail="Super users are not allowed to delete themselves"
) )
session.delete(current_user) session.delete(current_user)
session.commit() session.commit()
@@ -221,7 +221,7 @@ def register_user(session: SessionDep, user_in: UserRegister) -> Any:
user = User.get_by_email(session=session, email=user_in.email) user = User.get_by_email(session=session, email=user_in.email)
if user: if user:
raise HTTPException( raise HTTPException(
status_code=400, status_code=status.HTTP_400_BAD_REQUEST,
detail="The user with this email already exists in the system", detail="The user with this email already exists in the system",
) )
user_create = UserCreate.model_validate(user_in) user_create = UserCreate.model_validate(user_in)
@@ -245,7 +245,7 @@ def read_user_by_id(
rights=PermissionRight.READ, rights=PermissionRight.READ,
): ):
raise HTTPException( raise HTTPException(
status_code=403, status_code=status.HTTP_403_FORBIDDEN,
detail="The user doesn't have enough privileges", detail="The user doesn't have enough privileges",
) )
return user return user
@@ -269,14 +269,14 @@ def update_user(
db_user = session.get(User, user_id) db_user = session.get(User, user_id)
if not db_user: if not db_user:
raise HTTPException( raise HTTPException(
status_code=404, status_code=status.HTTP_404_NOT_FOUND,
detail="The user with this id does not exist in the system", detail="The user with this id does not exist in the system",
) )
if user_in.email: if user_in.email:
existing_user = User.get_by_email(session=session, email=user_in.email) existing_user = User.get_by_email(session=session, email=user_in.email)
if existing_user and existing_user.id != user_id: if existing_user and existing_user.id != user_id:
raise HTTPException( raise HTTPException(
status_code=409, detail="User with this email already exists" status_code=status.HTTP_409_CONFLICT, detail="User with this email already exists"
) )
db_user = User.update(session=session, db_obj=db_user, in_obj=user_in) db_user = User.update(session=session, db_obj=db_user, in_obj=user_in)
@@ -292,10 +292,10 @@ def delete_user(
""" """
user = session.get(User, user_id) user = session.get(User, user_id)
if not user: if not user:
raise HTTPException(status_code=404, detail="User not found") raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found")
if user == current_user: if user == current_user:
raise HTTPException( raise HTTPException(
status_code=403, detail="Super users are not allowed to delete themselves" status_code=status.HTTP_403_FORBIDDEN, detail="Super users are not allowed to delete themselves"
) )
# statement = delete(Item).where(col(Item.owner_id) == user_id) # statement = delete(Item).where(col(Item.owner_id) == user_id)
# session.exec(statement) # type: ignore # session.exec(statement) # type: ignore

View File

@@ -1,4 +1,4 @@
from fastapi import APIRouter, Depends from fastapi import APIRouter, Depends, status
from pydantic.networks import EmailStr from pydantic.networks import EmailStr
from app.api.deps import get_current_system_admin from app.api.deps import get_current_system_admin
@@ -11,7 +11,7 @@ router = APIRouter(prefix="/utils", tags=[ApiTags.UTILS])
@router.post( @router.post(
"/test-email/", "/test-email/",
dependencies=[Depends(get_current_system_admin)], dependencies=[Depends(get_current_system_admin)],
status_code=201, status_code=status.HTTP_201_CREATED,
) )
def test_email(email_to: EmailStr) -> Message: def test_email(email_to: EmailStr) -> Message:
""" """

View File

@@ -1,6 +1,7 @@
import uuid import uuid
import pytest import pytest
from fastapi import status
from fastapi.testclient import TestClient from fastapi.testclient import TestClient
from sqlmodel import Session from sqlmodel import Session
@@ -22,7 +23,7 @@ def test_create_event(client: TestClient, superuser_token_headers: dict[str, str
headers=superuser_token_headers, headers=superuser_token_headers,
json=data, json=data,
) )
assert response.status_code == 200 assert response.status_code == status.HTTP_200_OK
content = response.json() content = response.json()
assert content["name"] == data["name"] assert content["name"] == data["name"]
assert content["contact"] == data["contact"] assert content["contact"] == data["contact"]
@@ -43,7 +44,7 @@ def test_create_event_no_permission(client: TestClient, normal_user_token_header
headers=normal_user_token_headers, headers=normal_user_token_headers,
json=data, json=data,
) )
assert response.status_code == 403 assert response.status_code == status.HTTP_403_FORBIDDEN
assert response.json()["detail"] == "Not enough permissions" assert response.json()["detail"] == "Not enough permissions"
@@ -56,7 +57,7 @@ def test_read_event(
f"{settings.API_V1_STR}/events/{event.id}", f"{settings.API_V1_STR}/events/{event.id}",
headers=superuser_token_headers, headers=superuser_token_headers,
) )
assert response.status_code == 200 assert response.status_code == status.HTTP_200_OK
content = response.json() content = response.json()
assert content["name"] == event.name assert content["name"] == event.name
assert content["contact"] == event.contact assert content["contact"] == event.contact
@@ -73,7 +74,7 @@ def test_read_event_not_found(
f"{settings.API_V1_STR}/events/{uuid.uuid4()}", f"{settings.API_V1_STR}/events/{uuid.uuid4()}",
headers=superuser_token_headers, headers=superuser_token_headers,
) )
assert response.status_code == 404 assert response.status_code == status.HTTP_404_NOT_FOUND
assert response.json()["detail"] == "Event not found" assert response.json()["detail"] == "Event not found"
@@ -85,7 +86,7 @@ def test_read_event_not_enough_permissions(
f"{settings.API_V1_STR}/events/{event.id}", f"{settings.API_V1_STR}/events/{event.id}",
headers=normal_user_token_headers, headers=normal_user_token_headers,
) )
assert response.status_code == 403 assert response.status_code == status.HTTP_403_FORBIDDEN
assert response.json()["detail"] == "Not enough permissions" assert response.json()["detail"] == "Not enough permissions"
@@ -97,7 +98,7 @@ def test_read_event_with_event_user(
f"{settings.API_V1_STR}/events/{event.id}", f"{settings.API_V1_STR}/events/{event.id}",
headers=event_user_token_headers.headers, headers=event_user_token_headers.headers,
) )
assert response.status_code == 200 assert response.status_code == status.HTTP_200_OK
content = response.json() content = response.json()
assert content["name"] == event.name assert content["name"] == event.name
assert content["contact"] == event.contact assert content["contact"] == event.contact
@@ -116,7 +117,7 @@ def test_read_events(
f"{settings.API_V1_STR}/events/", f"{settings.API_V1_STR}/events/",
headers=superuser_token_headers, headers=superuser_token_headers,
) )
assert response.status_code == 200 assert response.status_code == status.HTTP_200_OK
content = response.json() content = response.json()
assert "count" in content assert "count" in content
assert content["count"] >= 2 assert content["count"] >= 2
@@ -136,7 +137,7 @@ def test_read_events_with_event_user(
f"{settings.API_V1_STR}/events/", f"{settings.API_V1_STR}/events/",
headers=authentication_token_from_user(db=db, user=user, client=client), headers=authentication_token_from_user(db=db, user=user, client=client),
) )
assert response.status_code == 200 assert response.status_code == status.HTTP_200_OK
content = response.json() content = response.json()
assert "count" in content assert "count" in content
assert content["count"] == 1 assert content["count"] == 1
@@ -158,7 +159,7 @@ def test_update_event(
headers=superuser_token_headers, headers=superuser_token_headers,
json=data, json=data,
) )
assert response.status_code == 200 assert response.status_code == status.HTTP_200_OK
content = response.json() content = response.json()
assert content["name"] == data["name"] assert content["name"] == data["name"]
assert content["contact"] == data["contact"] assert content["contact"] == data["contact"]
@@ -177,7 +178,7 @@ def test_update_event_not_found(
headers=superuser_token_headers, headers=superuser_token_headers,
json=data, json=data,
) )
assert response.status_code == 404 assert response.status_code == status.HTTP_404_NOT_FOUND
assert response.json()["detail"] == "Event not found" assert response.json()["detail"] == "Event not found"
@@ -191,7 +192,7 @@ def test_update_event_not_enough_permissions(
headers=normal_user_token_headers, headers=normal_user_token_headers,
json=data, json=data,
) )
assert response.status_code == 403 assert response.status_code == status.HTTP_403_FORBIDDEN
assert response.json()["detail"] == "Not enough permissions" assert response.json()["detail"] == "Not enough permissions"
@@ -208,7 +209,7 @@ def test_update_event_with_eventuser(
headers=event_user_token_headers.headers, headers=event_user_token_headers.headers,
json=data, json=data,
) )
assert response.status_code == 200 assert response.status_code == status.HTTP_200_OK
content = response.json() content = response.json()
assert content["name"] == data["name"] assert content["name"] == data["name"]
assert content["contact"] == data["contact"] assert content["contact"] == data["contact"]
@@ -226,7 +227,7 @@ def test_delete_event(
f"{settings.API_V1_STR}/events/{event.id}", f"{settings.API_V1_STR}/events/{event.id}",
headers=superuser_token_headers, headers=superuser_token_headers,
) )
assert response.status_code == 200 assert response.status_code == status.HTTP_200_OK
assert response.json()["message"] == "Event deleted successfully" assert response.json()["message"] == "Event deleted successfully"
@@ -237,7 +238,7 @@ def test_delete_event_not_found(
f"{settings.API_V1_STR}/events/{uuid.uuid4()}", f"{settings.API_V1_STR}/events/{uuid.uuid4()}",
headers=superuser_token_headers, headers=superuser_token_headers,
) )
assert response.status_code == 404 assert response.status_code == status.HTTP_404_NOT_FOUND
content = response.json() content = response.json()
assert content["detail"] == "Event not found" assert content["detail"] == "Event not found"
@@ -250,7 +251,7 @@ def test_delete_event_not_enough_permissions(
f"{settings.API_V1_STR}/events/{event.id}", f"{settings.API_V1_STR}/events/{event.id}",
headers=normal_user_token_headers, headers=normal_user_token_headers,
) )
assert response.status_code == 403 assert response.status_code == status.HTTP_403_FORBIDDEN
assert response.json()["detail"] == "Not enough permissions" assert response.json()["detail"] == "Not enough permissions"
@@ -265,7 +266,7 @@ def test_delete_event_admin_user(
f"{settings.API_V1_STR}/events/{event.id}", f"{settings.API_V1_STR}/events/{event.id}",
headers=authentication_token_from_user(db=db, user=user, client=client), headers=authentication_token_from_user(db=db, user=user, client=client),
) )
assert response.status_code == 200 assert response.status_code == status.HTTP_200_OK
content = response.json() content = response.json()
assert content["message"] == "Event deleted successfully" assert content["message"] == "Event deleted successfully"
@@ -280,7 +281,7 @@ def test_delete_event_not_enough_permissions_for_this_event(
f"{settings.API_V1_STR}/events/{event.id}", f"{settings.API_V1_STR}/events/{event.id}",
headers=authentication_token_from_user(db=db, user=user, client=client), headers=authentication_token_from_user(db=db, user=user, client=client),
) )
assert response.status_code == 403 assert response.status_code == status.HTTP_403_FORBIDDEN
assert response.json()["detail"] == "Not enough permissions" assert response.json()["detail"] == "Not enough permissions"
@@ -295,7 +296,7 @@ def test_delete_event_event_user_read_only_rights(
f"{settings.API_V1_STR}/events/{event.id}", f"{settings.API_V1_STR}/events/{event.id}",
headers=authentication_token_from_user(db=db, user=user, client=client), headers=authentication_token_from_user(db=db, user=user, client=client),
) )
assert response.status_code == 403 assert response.status_code == status.HTTP_403_FORBIDDEN
assert response.json()["detail"] == "Not enough permissions" assert response.json()["detail"] == "Not enough permissions"
@@ -312,7 +313,7 @@ def test_read_all_event_users(
f"{settings.API_V1_STR}/events/{event.id}/users", f"{settings.API_V1_STR}/events/{event.id}/users",
headers=superuser_token_headers, headers=superuser_token_headers,
) )
assert response.status_code == 200 assert response.status_code == status.HTTP_200_OK
content = response.json() content = response.json()
assert "count" in content assert "count" in content
assert content["count"] == 2 assert content["count"] == 2
@@ -330,7 +331,7 @@ def test_read_all_event_users_no_permission(
f"{settings.API_V1_STR}/events/{event.id}/users", f"{settings.API_V1_STR}/events/{event.id}/users",
headers=normal_user_token_headers, headers=normal_user_token_headers,
) )
assert response.status_code == 403 assert response.status_code == status.HTTP_403_FORBIDDEN
assert response.json()["detail"] == "Not enough permissions" assert response.json()["detail"] == "Not enough permissions"
@@ -345,7 +346,7 @@ def test_read_all_event_users_with_event_user(
f"{settings.API_V1_STR}/events/{event.id}/users", f"{settings.API_V1_STR}/events/{event.id}/users",
headers=authentication_token_from_user(db=db, user=user, client=client), headers=authentication_token_from_user(db=db, user=user, client=client),
) )
assert response.status_code == 200 assert response.status_code == status.HTTP_200_OK
content = response.json() content = response.json()
assert "count" in content assert "count" in content
assert content["count"] == 1 assert content["count"] == 1
@@ -365,7 +366,7 @@ def test_read_all_event_users_with_event_user_no_permission(
f"{settings.API_V1_STR}/events/{event.id}/users", f"{settings.API_V1_STR}/events/{event.id}/users",
headers=authentication_token_from_user(db=db, user=user, client=client), headers=authentication_token_from_user(db=db, user=user, client=client),
) )
assert response.status_code == 403 assert response.status_code == status.HTTP_403_FORBIDDEN
assert response.json()["detail"] == "Not enough permissions" assert response.json()["detail"] == "Not enough permissions"
@@ -376,7 +377,7 @@ def test_add_user_to_event_not_found(
f"{settings.API_V1_STR}/events/{uuid.uuid4()}/users", f"{settings.API_V1_STR}/events/{uuid.uuid4()}/users",
headers=superuser_token_headers, headers=superuser_token_headers,
) )
assert response.status_code == 404 assert response.status_code == status.HTTP_404_NOT_FOUND
assert response.json()["detail"] == "Event not found" assert response.json()["detail"] == "Event not found"
@@ -395,7 +396,7 @@ def test_add_user_to_event(
headers=superuser_token_headers, headers=superuser_token_headers,
json=data, json=data,
) )
assert response.status_code == 200 assert response.status_code == status.HTTP_200_OK
content = response.json() content = response.json()
assert "rights" in content assert "rights" in content
assert content["rights"] == PermissionRight.READ assert content["rights"] == PermissionRight.READ
@@ -417,7 +418,7 @@ def test_add_user_to_event_event_not_found(
headers=superuser_token_headers, headers=superuser_token_headers,
json=data, json=data,
) )
assert response.status_code == 404 assert response.status_code == status.HTTP_404_NOT_FOUND
assert response.json()["detail"] == "Event not found" assert response.json()["detail"] == "Event not found"
@@ -435,7 +436,7 @@ def test_add_user_to_event_user_not_found(
headers=superuser_token_headers, headers=superuser_token_headers,
json=data, json=data,
) )
assert response.status_code == 404 assert response.status_code == status.HTTP_404_NOT_FOUND
assert response.json()["detail"] == "User not found" assert response.json()["detail"] == "User not found"
@@ -455,7 +456,7 @@ def test_add_user_to_event_already_exists(
headers=superuser_token_headers, headers=superuser_token_headers,
json=data, json=data,
) )
assert response.status_code == 400 assert response.status_code == status.HTTP_400_BAD_REQUEST
assert response.json()["detail"] == "User already part of this event" assert response.json()["detail"] == "User already part of this event"
@@ -474,7 +475,7 @@ def test_add_user_to_event_no_permissions(
headers=normal_user_token_headers, headers=normal_user_token_headers,
json=data, json=data,
) )
assert response.status_code == 403 assert response.status_code == status.HTTP_403_FORBIDDEN
assert response.json()["detail"] == "Not enough permissions" assert response.json()["detail"] == "Not enough permissions"
@@ -493,7 +494,7 @@ def test_add_user_to_event_unknown_rights(
headers=superuser_token_headers, headers=superuser_token_headers,
json=data, json=data,
) )
assert response.status_code == 400 assert response.status_code == status.HTTP_400_BAD_REQUEST
assert response.json()["detail"] == "Invalid permission rights" assert response.json()["detail"] == "Invalid permission rights"
@@ -516,7 +517,7 @@ def test_add_user_with_more_rights_than_current_user(
headers=authentication_token_from_user(db=db, user=limited_user, client=client), headers=authentication_token_from_user(db=db, user=limited_user, client=client),
json=data, json=data,
) )
assert response.status_code == 403 assert response.status_code == status.HTTP_403_FORBIDDEN
assert response.json()["detail"] == "Not enough permissions" assert response.json()["detail"] == "Not enough permissions"
@@ -536,7 +537,7 @@ def test_add_user_rights_combined(
json=data, json=data,
) )
assert response.status_code == 200 assert response.status_code == status.HTTP_200_OK
content = response.json() content = response.json()
assert "rights" in content assert "rights" in content
assert content["rights"] == data["rights"] assert content["rights"] == data["rights"]
@@ -558,7 +559,7 @@ def test_update_user_inside_event(
headers=superuser_token_headers, headers=superuser_token_headers,
json=data, json=data,
) )
assert response.status_code == 200 assert response.status_code == status.HTTP_200_OK
content = response.json() content = response.json()
assert "rights" in content assert "rights" in content
assert content["rights"] == data["rights"] assert content["rights"] == data["rights"]
@@ -579,7 +580,7 @@ def test_update_event_user_event_not_found(
headers=superuser_token_headers, headers=superuser_token_headers,
json=data, json=data,
) )
assert response.status_code == 404 assert response.status_code == status.HTTP_404_NOT_FOUND
assert response.json()["detail"] == "Event not found" assert response.json()["detail"] == "Event not found"
@@ -596,7 +597,7 @@ def test_update_event_user_user_not_found(
headers=superuser_token_headers, headers=superuser_token_headers,
json=data, json=data,
) )
assert response.status_code == 404 assert response.status_code == status.HTTP_404_NOT_FOUND
assert response.json()["detail"] == "User not found" assert response.json()["detail"] == "User not found"
@@ -615,7 +616,7 @@ def test_update_event_user_unknown_rights(
headers=superuser_token_headers, headers=superuser_token_headers,
json=data, json=data,
) )
assert response.status_code == 400 assert response.status_code == status.HTTP_400_BAD_REQUEST
assert response.json()["detail"] == "Invalid permission rights" assert response.json()["detail"] == "Invalid permission rights"
@@ -634,7 +635,7 @@ def test_update_event_user_not_enough_permissions(
headers=normal_user_token_headers, headers=normal_user_token_headers,
json=data, json=data,
) )
assert response.status_code == 403 assert response.status_code == status.HTTP_403_FORBIDDEN
assert response.json()["detail"] == "Not enough permissions" assert response.json()["detail"] == "Not enough permissions"
@@ -658,7 +659,7 @@ def test_update_event_user_with_event_user_same_event(
headers=authentication_token_from_user(db=db, user=user1, client=client), headers=authentication_token_from_user(db=db, user=user1, client=client),
json=data, json=data,
) )
assert response.status_code == 200 assert response.status_code == status.HTTP_200_OK
content = response.json() content = response.json()
assert content["rights"] == data["rights"] assert content["rights"] == data["rights"]
assert content["user_id"] == str(user2.id) assert content["user_id"] == str(user2.id)
@@ -686,7 +687,7 @@ def test_update_event_user_from_other_event_forbidden(
headers=authentication_token_from_user(db=db, user=user1, client=client), headers=authentication_token_from_user(db=db, user=user1, client=client),
json=data, json=data,
) )
assert response.status_code == 403 assert response.status_code == status.HTTP_403_FORBIDDEN
assert response.json()["detail"] == "Not enough permissions" assert response.json()["detail"] == "Not enough permissions"
@@ -711,7 +712,7 @@ def test_update_event_user_from_other_event_thru_own_event(
headers=authentication_token_from_user(db=db, user=user1, client=client), headers=authentication_token_from_user(db=db, user=user1, client=client),
json=data, json=data,
) )
assert response.status_code == 404 assert response.status_code == status.HTTP_404_NOT_FOUND
assert response.json()["detail"] == "User is not part of this event" assert response.json()["detail"] == "User is not part of this event"
@@ -734,7 +735,7 @@ def test_update_user_rights_combined(
json=data, json=data,
) )
assert response.status_code == 200 assert response.status_code == status.HTTP_200_OK
content = response.json() content = response.json()
assert "rights" in content assert "rights" in content
assert content["rights"] == data["rights"] assert content["rights"] == data["rights"]
@@ -754,7 +755,7 @@ def test_remove_user_from_event(
headers=superuser_token_headers, headers=superuser_token_headers,
) )
assert response.status_code == 200 assert response.status_code == status.HTTP_200_OK
assert response.json()["message"] == "User removed successfully" assert response.json()["message"] == "User removed successfully"
# assert not event.get_user_link(user) # assert not event.get_user_link(user)
@@ -771,7 +772,7 @@ def test_remove_user_from_event_event_not_found(
headers=superuser_token_headers, headers=superuser_token_headers,
) )
assert response.status_code == 404 assert response.status_code == status.HTTP_404_NOT_FOUND
assert response.json()["detail"] == "Event not found" assert response.json()["detail"] == "Event not found"
@@ -785,7 +786,7 @@ def test_remove_user_from_event_user_not_found(
headers=superuser_token_headers, headers=superuser_token_headers,
) )
assert response.status_code == 404 assert response.status_code == status.HTTP_404_NOT_FOUND
assert response.json()["detail"] == "User not found" assert response.json()["detail"] == "User not found"
@@ -800,7 +801,7 @@ def test_remove_user_from_event_user_not_in_event(
headers=superuser_token_headers, headers=superuser_token_headers,
) )
assert response.status_code == 404 assert response.status_code == status.HTTP_404_NOT_FOUND
assert response.json()["detail"] == "User is not part of this event" assert response.json()["detail"] == "User is not part of this event"
@@ -819,7 +820,7 @@ def test_remove_user_from_event_insufficient_permissions(
headers=authentication_token_from_user(db=db, user=limited_user, client=client), headers=authentication_token_from_user(db=db, user=limited_user, client=client),
) )
assert response.status_code == 403 assert response.status_code == status.HTTP_403_FORBIDDEN
assert response.json()["detail"] == "Not enough permissions" assert response.json()["detail"] == "Not enough permissions"
@@ -835,5 +836,5 @@ def test_remove_own_user_from_event(
headers=authentication_token_from_user(db=db, user=user, client=client), headers=authentication_token_from_user(db=db, user=user, client=client),
) )
assert response.status_code == 403 assert response.status_code == status.HTTP_403_FORBIDDEN
assert response.json()["detail"] == "Users are not allowed to delete themselves when they are not an super admin" assert response.json()["detail"] == "Users are not allowed to delete themselves when they are not an super admin"

View File

@@ -1,5 +1,6 @@
from unittest.mock import patch from unittest.mock import patch
from fastapi import status
from fastapi.testclient import TestClient from fastapi.testclient import TestClient
from sqlmodel import Session from sqlmodel import Session
@@ -19,7 +20,7 @@ def test_get_access_token(client: TestClient) -> None:
} }
r = client.post(f"{settings.API_V1_STR}/login/access-token", data=login_data) r = client.post(f"{settings.API_V1_STR}/login/access-token", data=login_data)
tokens = r.json() tokens = r.json()
assert r.status_code == 200 assert r.status_code == status.HTTP_200_OK
assert "access_token" in tokens assert "access_token" in tokens
assert tokens["access_token"] assert tokens["access_token"]
@@ -30,7 +31,7 @@ def test_get_access_token_incorrect_password(client: TestClient) -> None:
"password": "incorrect", "password": "incorrect",
} }
r = client.post(f"{settings.API_V1_STR}/login/access-token", data=login_data) r = client.post(f"{settings.API_V1_STR}/login/access-token", data=login_data)
assert r.status_code == 400 assert r.status_code == status.HTTP_400_BAD_REQUEST
def test_use_access_token( def test_use_access_token(
@@ -41,7 +42,7 @@ def test_use_access_token(
headers=superuser_token_headers, headers=superuser_token_headers,
) )
result = r.json() result = r.json()
assert r.status_code == 200 assert r.status_code == status.HTTP_200_OK
assert "email" in result assert "email" in result
@@ -60,7 +61,7 @@ def test_use_api_key(client: TestClient, db: Session) -> None:
r = client.get(f"{settings.API_V1_STR}/login/api-key/{api_key.api_key}") r = client.get(f"{settings.API_V1_STR}/login/api-key/{api_key.api_key}")
tokens = r.json() tokens = r.json()
assert r.status_code == 200 assert r.status_code == status.HTTP_200_OK
assert "access_token" in tokens assert "access_token" in tokens
assert tokens["access_token"] assert tokens["access_token"]
@@ -79,7 +80,7 @@ def test_use_api_key_inactive(client: TestClient, db: Session) -> None:
api_key = ApiKey.create(session=db, create_obj=create_obj) api_key = ApiKey.create(session=db, create_obj=create_obj)
r = client.get(f"{settings.API_V1_STR}/login/api-key/{api_key.api_key}") r = client.get(f"{settings.API_V1_STR}/login/api-key/{api_key.api_key}")
assert r.status_code == 400 assert r.status_code == status.HTTP_400_BAD_REQUEST
def test_use_api_key_user_inactive(client: TestClient, db: Session) -> None: def test_use_api_key_user_inactive(client: TestClient, db: Session) -> None:
@@ -101,7 +102,7 @@ def test_use_api_key_user_inactive(client: TestClient, db: Session) -> None:
api_key = ApiKey.create(session=db, create_obj=create_obj) api_key = ApiKey.create(session=db, create_obj=create_obj)
r = client.get(f"{settings.API_V1_STR}/login/api-key/{api_key.api_key}") r = client.get(f"{settings.API_V1_STR}/login/api-key/{api_key.api_key}")
assert r.status_code == 400 assert r.status_code == status.HTTP_400_BAD_REQUEST
def test_recovery_password( def test_recovery_password(
@@ -116,7 +117,7 @@ def test_recovery_password(
f"{settings.API_V1_STR}/password-recovery/{email}", f"{settings.API_V1_STR}/password-recovery/{email}",
headers=normal_user_token_headers, headers=normal_user_token_headers,
) )
assert r.status_code == 200 assert r.status_code == status.HTTP_200_OK
assert r.json() == {"message": "Password recovery email sent"} assert r.json() == {"message": "Password recovery email sent"}
@@ -129,7 +130,7 @@ def test_recovery_password_user_not_exits(
headers=normal_user_token_headers, headers=normal_user_token_headers,
) )
assert ( assert (
r.status_code == 404 r.status_code == status.HTTP_404_NOT_FOUND
) # TODO: Fix testing and do not leak known emails with 404 ) # TODO: Fix testing and do not leak known emails with 404
@@ -155,7 +156,7 @@ def test_reset_password(client: TestClient, db: Session) -> None:
json=data, json=data,
) )
assert r.status_code == 200 assert r.status_code == status.HTTP_200_OK
assert r.json() == {"message": "Password updated successfully"} assert r.json() == {"message": "Password updated successfully"}
db.refresh(user) db.refresh(user)
@@ -174,5 +175,5 @@ def test_reset_password_invalid_token(
response = r.json() response = r.json()
assert "detail" in response assert "detail" in response
assert r.status_code == 400 assert r.status_code == status.HTTP_400_BAD_REQUEST
assert response["detail"] == "Invalid token" assert response["detail"] == "Invalid token"

View File

@@ -1,3 +1,4 @@
from fastapi import status
from fastapi.testclient import TestClient from fastapi.testclient import TestClient
from sqlmodel import Session, select from sqlmodel import Session, select
@@ -15,7 +16,7 @@ def test_create_user(client: TestClient, db: Session) -> None:
}, },
) )
assert r.status_code == 200 assert r.status_code == status.HTTP_200_OK
data = r.json() data = r.json()

View File

@@ -1,5 +1,6 @@
import uuid import uuid
from fastapi import status
from fastapi.testclient import TestClient from fastapi.testclient import TestClient
from sqlmodel import Session from sqlmodel import Session
@@ -23,7 +24,7 @@ def test_create_team(client: TestClient, superuser_token_headers: dict[str, str]
headers=superuser_token_headers, headers=superuser_token_headers,
json=data, json=data,
) )
assert response.status_code == 200 assert response.status_code == status.HTTP_200_OK
content = response.json() content = response.json()
assert content["theme_name"] == data["theme_name"] assert content["theme_name"] == data["theme_name"]
assert content["event_id"] == str(event.id) assert content["event_id"] == str(event.id)
@@ -38,7 +39,7 @@ def test_create_team_without_event(client: TestClient, superuser_token_headers:
headers=superuser_token_headers, headers=superuser_token_headers,
json=data, json=data,
) )
assert response.status_code == 422 assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY
assert response.json()["detail"][0]["loc"] == ["body", "event_id"] assert response.json()["detail"][0]["loc"] == ["body", "event_id"]
@@ -52,7 +53,7 @@ def test_create_team_with_incorrect_event(client: TestClient, superuser_token_he
headers=superuser_token_headers, headers=superuser_token_headers,
json=data, json=data,
) )
assert response.status_code == 404 assert response.status_code == status.HTTP_404_NOT_FOUND
assert response.json()["detail"] == "Event not found" assert response.json()["detail"] == "Event not found"
def test_read_team(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None: def test_read_team(client: TestClient, superuser_token_headers: dict[str, str], db: Session) -> None:
@@ -61,7 +62,7 @@ def test_read_team(client: TestClient, superuser_token_headers: dict[str, str],
f"{settings.API_V1_STR}/teams/{team.id}", f"{settings.API_V1_STR}/teams/{team.id}",
headers=superuser_token_headers, headers=superuser_token_headers,
) )
assert response.status_code == 200 assert response.status_code == status.HTTP_200_OK
content = response.json() content = response.json()
assert content["id"] == str(team.id) assert content["id"] == str(team.id)
assert content["theme_name"] == team.theme_name assert content["theme_name"] == team.theme_name
@@ -72,7 +73,7 @@ def test_read_team_not_found(client: TestClient, superuser_token_headers: dict[s
f"{settings.API_V1_STR}/teams/{uuid.uuid4()}", f"{settings.API_V1_STR}/teams/{uuid.uuid4()}",
headers=superuser_token_headers, headers=superuser_token_headers,
) )
assert response.status_code == 404 assert response.status_code == status.HTTP_404_NOT_FOUND
assert response.json()["detail"] == "Team not found" assert response.json()["detail"] == "Team not found"
@@ -82,7 +83,7 @@ def test_read_event_not_enough_permissions(client: TestClient, normal_user_token
f"{settings.API_V1_STR}/teams/{team.id}", f"{settings.API_V1_STR}/teams/{team.id}",
headers=normal_user_token_headers, headers=normal_user_token_headers,
) )
assert response.status_code == 403 assert response.status_code == status.HTTP_403_FORBIDDEN
assert response.json()["detail"] == "Not enough permissions" assert response.json()["detail"] == "Not enough permissions"
@@ -94,7 +95,7 @@ def test_read_team_with_event_user(client: TestClient, event_user_token_headers:
headers=event_user_token_headers.headers, headers=event_user_token_headers.headers,
) )
assert response.status_code == 200 assert response.status_code == status.HTTP_200_OK
content = response.json() content = response.json()
assert content["id"] == str(team.id) assert content["id"] == str(team.id)
assert content["theme_name"] == team.theme_name assert content["theme_name"] == team.theme_name
@@ -108,7 +109,7 @@ def test_read_teams(client: TestClient, superuser_token_headers: dict[str, str],
f"{settings.API_V1_STR}/teams/", f"{settings.API_V1_STR}/teams/",
headers=superuser_token_headers, headers=superuser_token_headers,
) )
assert response.status_code == 200 assert response.status_code == status.HTTP_200_OK
content = response.json() content = response.json()
assert "count" in content assert "count" in content
assert content["count"] >= 2 assert content["count"] >= 2
@@ -124,7 +125,7 @@ def test_read_teams_with_normal_user(client: TestClient, normal_user_token_heade
f"{settings.API_V1_STR}/teams/", f"{settings.API_V1_STR}/teams/",
headers=normal_user_token_headers, headers=normal_user_token_headers,
) )
assert response.status_code == 200 assert response.status_code == status.HTTP_200_OK
content = response.json() content = response.json()
assert "count" in content assert "count" in content
assert content["count"] == 0 assert content["count"] == 0
@@ -144,7 +145,7 @@ def test_read_teams_with_event_user_readonly(client: TestClient, db: Session) ->
headers=authentication_token_from_user(db=db, user=user, client=client), headers=authentication_token_from_user(db=db, user=user, client=client),
) )
assert response.status_code == 200 assert response.status_code == status.HTTP_200_OK
content = response.json() content = response.json()
assert "count" in content assert "count" in content
assert content["count"] == 1 assert content["count"] == 1
@@ -164,7 +165,7 @@ def test_read_teams_with_event_user_team_manager(client: TestClient, db: Session
headers=authentication_token_from_user(db=db, user=user, client=client), headers=authentication_token_from_user(db=db, user=user, client=client),
) )
assert response.status_code == 200 assert response.status_code == status.HTTP_200_OK
content = response.json() content = response.json()
assert "count" in content assert "count" in content
assert content["count"] == 1 assert content["count"] == 1
@@ -181,7 +182,7 @@ def test_update_team_name(client: TestClient, superuser_token_headers: dict[str,
headers=superuser_token_headers, headers=superuser_token_headers,
json=data, json=data,
) )
assert response.status_code == 200 assert response.status_code == status.HTTP_200_OK
content = response.json() content = response.json()
assert content["id"] == str(team.id) assert content["id"] == str(team.id)
assert content["theme_name"] == data["theme_name"] assert content["theme_name"] == data["theme_name"]
@@ -195,7 +196,7 @@ def test_update_team_not_found(client: TestClient, superuser_token_headers: dict
headers=superuser_token_headers, headers=superuser_token_headers,
json=data, json=data,
) )
assert response.status_code == 404 assert response.status_code == status.HTTP_404_NOT_FOUND
assert response.json()["detail"] == "Team not found" assert response.json()["detail"] == "Team not found"
@@ -207,7 +208,7 @@ def test_update_team_not_enough_permissions(client: TestClient, normal_user_toke
headers=normal_user_token_headers, headers=normal_user_token_headers,
json=data, json=data,
) )
assert response.status_code == 403 assert response.status_code == status.HTTP_403_FORBIDDEN
assert response.json()["detail"] == "Not enough permissions" assert response.json()["detail"] == "Not enough permissions"
@@ -219,7 +220,7 @@ def test_update_team_name_with_event_permissions(client: TestClient, event_user_
headers=event_user_token_headers.headers, headers=event_user_token_headers.headers,
json=data, json=data,
) )
assert response.status_code == 200 assert response.status_code == status.HTTP_200_OK
content = response.json() content = response.json()
assert content["id"] == str(team.id) assert content["id"] == str(team.id)
assert content["theme_name"] == data["theme_name"] assert content["theme_name"] == data["theme_name"]
@@ -236,7 +237,7 @@ def test_update_team_event(client: TestClient, superuser_token_headers: dict[str
headers=superuser_token_headers, headers=superuser_token_headers,
json=data, json=data,
) )
assert response.status_code == 200 assert response.status_code == status.HTTP_200_OK
content = response.json() content = response.json()
assert content["id"] == str(team.id) assert content["id"] == str(team.id)
assert content["theme_name"] == team.theme_name assert content["theme_name"] == team.theme_name
@@ -252,7 +253,7 @@ def test_update_team_event_not_found(client: TestClient, superuser_token_headers
headers=superuser_token_headers, headers=superuser_token_headers,
json=data, json=data,
) )
assert response.status_code == 404 assert response.status_code == status.HTTP_404_NOT_FOUND
assert response.json()["detail"] == "New event not found" assert response.json()["detail"] == "New event not found"
@@ -269,7 +270,7 @@ def test_update_team_event_with_event_user(client: TestClient, event_user_token_
json=data, json=data,
) )
assert response.status_code == 200 assert response.status_code == status.HTTP_200_OK
content = response.json() content = response.json()
assert content["id"] == str(team.id) assert content["id"] == str(team.id)
assert content["theme_name"] == team.theme_name assert content["theme_name"] == team.theme_name
@@ -288,7 +289,7 @@ def test_update_team_event_with_event_user_not_enough_permissions(client: TestCl
json=data, json=data,
) )
assert response.status_code == 403 assert response.status_code == status.HTTP_403_FORBIDDEN
assert response.json()["detail"] == "Not enough permissions" assert response.json()["detail"] == "Not enough permissions"
@@ -298,7 +299,7 @@ def test_delete_team(client: TestClient, superuser_token_headers: dict[str, str]
f"{settings.API_V1_STR}/teams/{team.id}", f"{settings.API_V1_STR}/teams/{team.id}",
headers=superuser_token_headers, headers=superuser_token_headers,
) )
assert response.status_code == 200 assert response.status_code == status.HTTP_200_OK
assert response.json()["message"] == "Team deleted successfully" assert response.json()["message"] == "Team deleted successfully"
@@ -307,7 +308,7 @@ def test_delete_team_not_found(client: TestClient, superuser_token_headers: dict
f"{settings.API_V1_STR}/teams/{uuid.uuid4()}", f"{settings.API_V1_STR}/teams/{uuid.uuid4()}",
headers=superuser_token_headers, headers=superuser_token_headers,
) )
assert response.status_code == 404 assert response.status_code == status.HTTP_404_NOT_FOUND
assert response.json()["detail"] == "Team not found" assert response.json()["detail"] == "Team not found"
@@ -317,7 +318,7 @@ def test_delete_not_enough_permissions(client: TestClient, normal_user_token_hea
f"{settings.API_V1_STR}/teams/{team.id}", f"{settings.API_V1_STR}/teams/{team.id}",
headers=normal_user_token_headers, headers=normal_user_token_headers,
) )
assert response.status_code == 403 assert response.status_code == status.HTTP_403_FORBIDDEN
assert response.json()["detail"] == "Not enough permissions" assert response.json()["detail"] == "Not enough permissions"
@@ -327,5 +328,5 @@ def test_delete_team_with_event_user(client: TestClient, event_user_token_header
f"{settings.API_V1_STR}/teams/{team.id}", f"{settings.API_V1_STR}/teams/{team.id}",
headers=event_user_token_headers.headers, headers=event_user_token_headers.headers,
) )
assert response.status_code == 200 assert response.status_code == status.HTTP_200_OK
assert response.json()["message"] == "Team deleted successfully" assert response.json()["message"] == "Team deleted successfully"

View File

@@ -1,6 +1,7 @@
import uuid import uuid
from unittest.mock import patch from unittest.mock import patch
from fastapi import status
from fastapi.testclient import TestClient from fastapi.testclient import TestClient
from sqlmodel import Session, select from sqlmodel import Session, select
@@ -48,7 +49,7 @@ def test_create_user_new_email(
headers=superuser_token_headers, headers=superuser_token_headers,
json=data, json=data,
) )
assert 200 <= r.status_code < 300 assert status.HTTP_200_OK <= r.status_code < 300
created_user = r.json() created_user = r.json()
user = User.get_by_email(session=db, email=username) user = User.get_by_email(session=db, email=username)
assert user assert user
@@ -67,7 +68,7 @@ def test_get_existing_user(
f"{settings.API_V1_STR}/users/{user_id}", f"{settings.API_V1_STR}/users/{user_id}",
headers=superuser_token_headers, headers=superuser_token_headers,
) )
assert 200 <= r.status_code < 300 assert status.HTTP_200_OK <= r.status_code < 300
api_user = r.json() api_user = r.json()
existing_user = User.get_by_email(session=db, email=username) existing_user = User.get_by_email(session=db, email=username)
assert existing_user assert existing_user
@@ -94,7 +95,7 @@ def test_get_existing_user_current_user(client: TestClient, db: Session) -> None
f"{settings.API_V1_STR}/users/{user_id}", f"{settings.API_V1_STR}/users/{user_id}",
headers=headers, headers=headers,
) )
assert 200 <= r.status_code < 300 assert status.HTTP_200_OK <= r.status_code < 300
api_user = r.json() api_user = r.json()
existing_user = User.get_by_email(session=db, email=username) existing_user = User.get_by_email(session=db, email=username)
assert existing_user assert existing_user
@@ -108,7 +109,7 @@ def test_get_existing_user_permissions_error(
f"{settings.API_V1_STR}/users/{uuid.uuid4()}", f"{settings.API_V1_STR}/users/{uuid.uuid4()}",
headers=normal_user_token_headers, headers=normal_user_token_headers,
) )
assert r.status_code == 403 assert r.status_code == status.HTTP_403_FORBIDDEN
assert r.json() == {"detail": "The user doesn't have enough privileges"} assert r.json() == {"detail": "The user doesn't have enough privileges"}
@@ -127,7 +128,7 @@ def test_create_user_existing_username(
json=data, json=data,
) )
created_user = r.json() created_user = r.json()
assert r.status_code == 400 assert r.status_code == status.HTTP_400_BAD_REQUEST
assert "_id" not in created_user assert "_id" not in created_user
@@ -142,7 +143,7 @@ def test_create_user_by_normal_user(
headers=normal_user_token_headers, headers=normal_user_token_headers,
json=data, json=data,
) )
assert r.status_code == 403 assert r.status_code == status.HTTP_403_FORBIDDEN
def test_retrieve_users( def test_retrieve_users(
@@ -179,7 +180,7 @@ def test_update_user_me(
headers=normal_user_token_headers, headers=normal_user_token_headers,
json=data, json=data,
) )
assert r.status_code == 200 assert r.status_code == status.HTTP_200_OK
updated_user = r.json() updated_user = r.json()
assert updated_user["email"] == email assert updated_user["email"] == email
assert updated_user["full_name"] == full_name assert updated_user["full_name"] == full_name
@@ -204,7 +205,7 @@ def test_update_password_me(
headers=superuser_token_headers, headers=superuser_token_headers,
json=data, json=data,
) )
assert r.status_code == 200 assert r.status_code == status.HTTP_200_OK
updated_user = r.json() updated_user = r.json()
assert updated_user["message"] == "Password updated successfully" assert updated_user["message"] == "Password updated successfully"
@@ -226,7 +227,7 @@ def test_update_password_me(
) )
db.refresh(user_db) db.refresh(user_db)
assert r.status_code == 200 assert r.status_code == status.HTTP_200_OK
assert verify_password(settings.FIRST_SUPERUSER_PASSWORD, user_db.hashed_password) assert verify_password(settings.FIRST_SUPERUSER_PASSWORD, user_db.hashed_password)
@@ -239,7 +240,7 @@ def test_generate_api_key_me(
headers=superuser_token_headers, headers=superuser_token_headers,
json=data, json=data,
) )
assert r.status_code == 200 assert r.status_code == status.HTTP_200_OK
api_key = r.json() api_key = r.json()
assert "api_key" in api_key assert "api_key" in api_key
assert api_key["name"] == data["name"] assert api_key["name"] == data["name"]
@@ -265,7 +266,7 @@ def test_update_password_me_incorrect_password(
headers=superuser_token_headers, headers=superuser_token_headers,
json=data, json=data,
) )
assert r.status_code == 400 assert r.status_code == status.HTTP_400_BAD_REQUEST
updated_user = r.json() updated_user = r.json()
assert updated_user["detail"] == "Incorrect password" assert updated_user["detail"] == "Incorrect password"
@@ -284,7 +285,7 @@ def test_update_user_me_email_exists(
headers=normal_user_token_headers, headers=normal_user_token_headers,
json=data, json=data,
) )
assert r.status_code == 409 assert r.status_code == status.HTTP_409_CONFLICT
assert r.json()["detail"] == "User with this email already exists" assert r.json()["detail"] == "User with this email already exists"
@@ -300,7 +301,7 @@ def test_update_password_me_same_password_error(
headers=superuser_token_headers, headers=superuser_token_headers,
json=data, json=data,
) )
assert r.status_code == 400 assert r.status_code == status.HTTP_400_BAD_REQUEST
updated_user = r.json() updated_user = r.json()
assert ( assert (
updated_user["detail"] == "New password cannot be the same as the current one" updated_user["detail"] == "New password cannot be the same as the current one"
@@ -316,7 +317,7 @@ def test_register_user(client: TestClient, db: Session) -> None:
f"{settings.API_V1_STR}/users/signup", f"{settings.API_V1_STR}/users/signup",
json=data, json=data,
) )
assert r.status_code == 200 assert r.status_code == status.HTTP_200_OK
created_user = r.json() created_user = r.json()
assert created_user["email"] == username assert created_user["email"] == username
assert created_user["full_name"] == full_name assert created_user["full_name"] == full_name
@@ -341,7 +342,7 @@ def test_register_user_already_exists_error(client: TestClient) -> None:
f"{settings.API_V1_STR}/users/signup", f"{settings.API_V1_STR}/users/signup",
json=data, json=data,
) )
assert r.status_code == 400 assert r.status_code == status.HTTP_400_BAD_REQUEST
assert r.json()["detail"] == "The user with this email already exists in the system" assert r.json()["detail"] == "The user with this email already exists in the system"
@@ -359,7 +360,7 @@ def test_update_user(
headers=superuser_token_headers, headers=superuser_token_headers,
json=data, json=data,
) )
assert r.status_code == 200 assert r.status_code == status.HTTP_200_OK
updated_user = r.json() updated_user = r.json()
assert updated_user["full_name"] == "Updated_full_name" assert updated_user["full_name"] == "Updated_full_name"
@@ -380,7 +381,7 @@ def test_update_user_not_exists(
headers=superuser_token_headers, headers=superuser_token_headers,
json=data, json=data,
) )
assert r.status_code == 404 assert r.status_code == status.HTTP_404_NOT_FOUND
assert r.json()["detail"] == "The user with this id does not exist in the system" assert r.json()["detail"] == "The user with this id does not exist in the system"
@@ -403,7 +404,7 @@ def test_update_user_email_exists(
headers=superuser_token_headers, headers=superuser_token_headers,
json=data, json=data,
) )
assert r.status_code == 409 assert r.status_code == status.HTTP_409_CONFLICT
assert r.json()["detail"] == "User with this email already exists" assert r.json()["detail"] == "User with this email already exists"
@@ -427,7 +428,7 @@ def test_delete_user_me(client: TestClient, db: Session) -> None:
f"{settings.API_V1_STR}/users/me", f"{settings.API_V1_STR}/users/me",
headers=headers, headers=headers,
) )
assert r.status_code == 200 assert r.status_code == status.HTTP_200_OK
deleted_user = r.json() deleted_user = r.json()
assert deleted_user["message"] == "User deleted successfully" assert deleted_user["message"] == "User deleted successfully"
result = db.exec(select(User).where(User.id == user_id)).first() result = db.exec(select(User).where(User.id == user_id)).first()
@@ -445,7 +446,7 @@ def test_delete_user_me_as_superuser(
f"{settings.API_V1_STR}/users/me", f"{settings.API_V1_STR}/users/me",
headers=superuser_token_headers, headers=superuser_token_headers,
) )
assert r.status_code == 403 assert r.status_code == status.HTTP_403_FORBIDDEN
response = r.json() response = r.json()
assert response["detail"] == "Super users are not allowed to delete themselves" assert response["detail"] == "Super users are not allowed to delete themselves"
@@ -462,7 +463,7 @@ def test_delete_user_super_user(
f"{settings.API_V1_STR}/users/{user_id}", f"{settings.API_V1_STR}/users/{user_id}",
headers=superuser_token_headers, headers=superuser_token_headers,
) )
assert r.status_code == 200 assert r.status_code == status.HTTP_200_OK
deleted_user = r.json() deleted_user = r.json()
assert deleted_user["message"] == "User deleted successfully" assert deleted_user["message"] == "User deleted successfully"
result = db.exec(select(User).where(User.id == user_id)).first() result = db.exec(select(User).where(User.id == user_id)).first()
@@ -476,7 +477,7 @@ def test_delete_user_not_found(
f"{settings.API_V1_STR}/users/{uuid.uuid4()}", f"{settings.API_V1_STR}/users/{uuid.uuid4()}",
headers=superuser_token_headers, headers=superuser_token_headers,
) )
assert r.status_code == 404 assert r.status_code == status.HTTP_404_NOT_FOUND
assert r.json()["detail"] == "User not found" assert r.json()["detail"] == "User not found"
@@ -491,7 +492,7 @@ def test_delete_user_current_super_user_error(
f"{settings.API_V1_STR}/users/{user_id}", f"{settings.API_V1_STR}/users/{user_id}",
headers=superuser_token_headers, headers=superuser_token_headers,
) )
assert r.status_code == 403 assert r.status_code == status.HTTP_403_FORBIDDEN
assert r.json()["detail"] == "Super users are not allowed to delete themselves" assert r.json()["detail"] == "Super users are not allowed to delete themselves"
@@ -507,5 +508,5 @@ def test_delete_user_without_privileges(
f"{settings.API_V1_STR}/users/{user.id}", f"{settings.API_V1_STR}/users/{user.id}",
headers=normal_user_token_headers, headers=normal_user_token_headers,
) )
assert r.status_code == 403 assert r.status_code == status.HTTP_403_FORBIDDEN
assert r.json()["detail"] == "The user doesn't have enough privileges" assert r.json()["detail"] == "The user doesn't have enough privileges"