Update rights manager for events
This commit is contained in:
@@ -37,6 +37,34 @@ router = APIRouter(prefix="/teams", tags=[ApiTags.TEAMS])
|
||||
|
||||
# region # Teams ###############################################################
|
||||
|
||||
def load_team(
|
||||
session: SessionDep,
|
||||
current_user: CurrentUser,
|
||||
id: RowId | None = None,
|
||||
module: PermissionModule = PermissionModule.TEAM,
|
||||
part: PermissionPart = PermissionPart.ADMIN,
|
||||
user_rights: PermissionRight | None = None,
|
||||
event_rights: PermissionRight | None = PermissionRight.MANAGE_TEAMS,
|
||||
) -> Team | None:
|
||||
team = session.get(Team, id)
|
||||
|
||||
if id and not team:
|
||||
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Team not found")
|
||||
|
||||
valid = False
|
||||
if current_user.has_permissions(module=module, part=part, rights=user_rights):
|
||||
# Also valid for create new
|
||||
valid = True
|
||||
if hasattr(team, "event"):
|
||||
if team.event.user_has_rights(user=current_user, rights=event_rights):
|
||||
valid = True
|
||||
|
||||
if not valid:
|
||||
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions")
|
||||
|
||||
return team
|
||||
|
||||
|
||||
@router.get("/", response_model=TeamsPublic)
|
||||
def read_teams(
|
||||
session: SessionDep, current_user: CurrentUser, skip: int = 0, limit: int = 100
|
||||
@@ -50,39 +78,22 @@ def read_teams(
|
||||
part=PermissionPart.ADMIN,
|
||||
rights=PermissionRight.READ,
|
||||
):
|
||||
count_statement = select(func.count()).select_from(Team)
|
||||
count = session.exec(count_statement).one()
|
||||
statement = select(Team).offset(skip).limit(limit)
|
||||
teams = session.exec(statement).all()
|
||||
|
||||
else:
|
||||
# Only read teams that are connected to an event that the user can read
|
||||
count_statement = (
|
||||
select(func.count())
|
||||
.select_from(Team)
|
||||
.join(Event) # Join with Event to filter teams based on events
|
||||
.join(EventUserLink) # Join with EventUserLink to check user permissions
|
||||
.where(
|
||||
EventUserLink.user_id == current_user.id,
|
||||
# FIXME: (EventUserLink.rights & (PermissionRight.READ | PermissionRight.MANAGE_TEAMS)) > 0
|
||||
)
|
||||
)
|
||||
count = session.exec(count_statement).one()
|
||||
|
||||
statement = (
|
||||
data_query = (
|
||||
select(Team)
|
||||
.join(Event)
|
||||
.join(EventUserLink)
|
||||
)
|
||||
else:
|
||||
data_query = (
|
||||
select(Team)
|
||||
.join(EventUserLink, EventUserLink.event_id == Team.event_id)
|
||||
.where(
|
||||
EventUserLink.user_id == current_user.id,
|
||||
# FIXME: (EventUserLink.rights & (PermissionRight.READ | PermissionRight.MANAGE_TEAMS)) > 0
|
||||
)
|
||||
.offset(skip)
|
||||
.limit(limit)
|
||||
)
|
||||
teams = session.exec(statement).all()
|
||||
|
||||
return TeamsPublic(data=teams, count=count)
|
||||
count = session.exec(select(func.count()).select_from(data_query.subquery())).one()
|
||||
data = session.exec(data_query.offset(skip).limit(limit)).all()
|
||||
return TeamsPublic(data=data, count=count)
|
||||
|
||||
|
||||
@router.get("/{id}", response_model=TeamPublic)
|
||||
@@ -90,20 +101,12 @@ def read_team(session: SessionDep, current_user: CurrentUser, id: RowId) -> Any:
|
||||
"""
|
||||
Get team by ID.
|
||||
"""
|
||||
team = session.get(Team, id)
|
||||
if not team:
|
||||
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Team not found")
|
||||
|
||||
event = session.get(Event, team.event_id)
|
||||
if not event: # pragma: no cover
|
||||
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Event not found")
|
||||
|
||||
if not current_user.has_permissions(
|
||||
module=PermissionModule.TEAM,
|
||||
part=PermissionPart.ADMIN,
|
||||
rights=PermissionRight.READ,
|
||||
) and not (event.user_has_rights(user=current_user, rights=PermissionRight.MANAGE_TEAMS)):
|
||||
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions")
|
||||
team = load_team(
|
||||
session=session,
|
||||
current_user=current_user,
|
||||
id=id,
|
||||
user_rights=PermissionRight.READ,
|
||||
)
|
||||
|
||||
return team
|
||||
|
||||
@@ -138,21 +141,12 @@ def update_team(
|
||||
"""
|
||||
Update a team.
|
||||
"""
|
||||
team = session.get(Team, id)
|
||||
if not team:
|
||||
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Team not found")
|
||||
|
||||
# Check user's permissions for the existing event
|
||||
event = session.get(Event, team.event_id)
|
||||
if not event: # pragma: no cover
|
||||
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Event not found")
|
||||
|
||||
if not current_user.has_permissions(
|
||||
module=PermissionModule.TEAM,
|
||||
part=PermissionPart.ADMIN,
|
||||
rights=PermissionRight.UPDATE,
|
||||
) and not (event.user_has_rights(user=current_user, rights=PermissionRight.MANAGE_TEAMS)):
|
||||
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions")
|
||||
team = load_team(
|
||||
session=session,
|
||||
current_user=current_user,
|
||||
id=id,
|
||||
user_rights=PermissionRight.UPDATE,
|
||||
)
|
||||
|
||||
# Check rights for the new event data
|
||||
if team_in.event_id:
|
||||
@@ -177,20 +171,12 @@ def delete_team(session: SessionDep,current_user: CurrentUser, id: RowId) -> Mes
|
||||
"""
|
||||
Delete a team.
|
||||
"""
|
||||
team = session.get(Team, id)
|
||||
if not team:
|
||||
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Team not found")
|
||||
|
||||
event = session.get(Event, team.event_id)
|
||||
if not event: # pragma: no cover
|
||||
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Event not found")
|
||||
|
||||
if not current_user.has_permissions(
|
||||
module=PermissionModule.TEAM,
|
||||
part=PermissionPart.ADMIN,
|
||||
rights=PermissionRight.DELETE,
|
||||
) and not (event.user_has_rights(user=current_user, rights=PermissionRight.MANAGE_TEAMS)):
|
||||
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions")
|
||||
team = load_team(
|
||||
session=session,
|
||||
current_user=current_user,
|
||||
id=id,
|
||||
user_rights=PermissionRight.DELETE,
|
||||
)
|
||||
|
||||
session.delete(team)
|
||||
session.commit()
|
||||
@@ -206,20 +192,13 @@ def read_team_divisions(session: SessionDep, current_user: CurrentUser, id: RowI
|
||||
"""
|
||||
Get division from team by ID.
|
||||
"""
|
||||
team = session.get(Team, id)
|
||||
if not team:
|
||||
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Team not found")
|
||||
|
||||
event = session.get(Event, team.event_id)
|
||||
if not event: # pragma: no cover
|
||||
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Event not found")
|
||||
|
||||
if not current_user.has_permissions(
|
||||
module=PermissionModule.TEAM,
|
||||
part=PermissionPart.ADMIN,
|
||||
rights=PermissionRight.MANAGE_DIVISIONS,
|
||||
) and not (event.user_has_rights(user=current_user, rights=PermissionRight.MANAGE_DIVISIONS)):
|
||||
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions")
|
||||
team = load_team(
|
||||
session=session,
|
||||
current_user=current_user,
|
||||
id=id,
|
||||
user_rights=PermissionRight.MANAGE_DIVISIONS,
|
||||
event_rights=PermissionRight.MANAGE_DIVISIONS,
|
||||
)
|
||||
|
||||
return team.division_link
|
||||
|
||||
@@ -231,25 +210,17 @@ def create_team_division_link(
|
||||
"""
|
||||
Create new division link in team.
|
||||
"""
|
||||
|
||||
team = session.get(Team, id)
|
||||
if not team:
|
||||
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Team not found")
|
||||
team = load_team(
|
||||
session=session,
|
||||
current_user=current_user,
|
||||
id=id,
|
||||
user_rights=PermissionRight.MANAGE_DIVISIONS,
|
||||
event_rights=PermissionRight.MANAGE_DIVISIONS,
|
||||
)
|
||||
|
||||
if team.division_link:
|
||||
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="Team already linked to division")
|
||||
|
||||
event = session.get(Event, team.event_id)
|
||||
if not event: # pragma: no cover
|
||||
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Event not found")
|
||||
|
||||
if not current_user.has_permissions(
|
||||
module=PermissionModule.TEAM,
|
||||
part=PermissionPart.ADMIN,
|
||||
rights=PermissionRight.MANAGE_DIVISIONS,
|
||||
) and not (event.user_has_rights(user=current_user, rights=PermissionRight.MANAGE_DIVISIONS)):
|
||||
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions")
|
||||
|
||||
division_team_link = DivisionTeamLink.create(create_obj=team_in, session=session, team=team)
|
||||
return division_team_link
|
||||
|
||||
@@ -261,21 +232,13 @@ def update_team_division_link(
|
||||
"""
|
||||
Update division info inside team.
|
||||
"""
|
||||
team = session.get(Team, id)
|
||||
if not team:
|
||||
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Team not found")
|
||||
|
||||
# Check user's permissions for the existing event
|
||||
event = session.get(Event, team.event_id)
|
||||
if not event: # pragma: no cover
|
||||
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Event not found")
|
||||
|
||||
if not current_user.has_permissions(
|
||||
module=PermissionModule.TEAM,
|
||||
part=PermissionPart.ADMIN,
|
||||
rights=PermissionRight.MANAGE_DIVISIONS,
|
||||
) and not (event.user_has_rights(user=current_user, rights=PermissionRight.MANAGE_DIVISIONS)):
|
||||
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions")
|
||||
team = load_team(
|
||||
session=session,
|
||||
current_user=current_user,
|
||||
id=id,
|
||||
user_rights=PermissionRight.MANAGE_DIVISIONS,
|
||||
event_rights=PermissionRight.MANAGE_DIVISIONS,
|
||||
)
|
||||
|
||||
# Update the team
|
||||
division_team_link = DivisionTeamLink.update(db_obj=team.division_link, in_obj=team_in, session=session)
|
||||
@@ -287,20 +250,13 @@ def delete_team_division_link(session: SessionDep, current_user: CurrentUser, id
|
||||
"""
|
||||
Delete a division link from a team.
|
||||
"""
|
||||
team = session.get(Team, id)
|
||||
if not team:
|
||||
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Team not found")
|
||||
|
||||
event = session.get(Event, team.event_id)
|
||||
if not event: # pragma: no cover
|
||||
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Event not found")
|
||||
|
||||
if not current_user.has_permissions(
|
||||
module=PermissionModule.TEAM,
|
||||
part=PermissionPart.ADMIN,
|
||||
rights=PermissionRight.MANAGE_DIVISIONS,
|
||||
) and not (event.user_has_rights(user=current_user, rights=PermissionRight.MANAGE_DIVISIONS)):
|
||||
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions")
|
||||
team = load_team(
|
||||
session=session,
|
||||
current_user=current_user,
|
||||
id=id,
|
||||
user_rights=PermissionRight.MANAGE_DIVISIONS,
|
||||
event_rights=PermissionRight.MANAGE_DIVISIONS,
|
||||
)
|
||||
|
||||
session.delete(team.division_link)
|
||||
session.commit()
|
||||
|
||||
Reference in New Issue
Block a user