Files
score/backend/app/api/deps.py
2025-05-26 00:35:30 +02:00

78 lines
2.1 KiB
Python

from collections.abc import Generator
from typing import Annotated
import jwt
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jwt.exceptions import InvalidTokenError
from pydantic import ValidationError
from sqlmodel import Session
from app.core import security
from app.core.config import settings
from app.core.db import engine
from app.models.user import (
PermissionModule,
PermissionPart,
PermissionRight,
TokenPayload,
User,
)
reusable_oauth2 = OAuth2PasswordBearer(
tokenUrl=f"{settings.API_V1_STR}/login/access-token"
)
def get_db() -> Generator[Session, None, None]:
with Session(engine) as session:
yield session
SessionDep = Annotated[Session, Depends(get_db)]
TokenDep = Annotated[str, Depends(reusable_oauth2)]
def get_current_user(session: SessionDep, token: TokenDep) -> User:
try:
payload = jwt.decode(
token, settings.SECRET_KEY, algorithms=[security.ALGORITHM]
)
token_data = TokenPayload(**payload)
except (InvalidTokenError, ValidationError):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Could not validate credentials",
)
user = session.get(User, token_data.sub)
if not user:
raise HTTPException(status_code=404, detail="User not found")
if not user.is_active:
raise HTTPException(status_code=400, detail="Inactive user")
return user
CurrentUser = Annotated[User, Depends(get_current_user)]
def get_user_permissions(
module: PermissionModule,
part: PermissionPart,
current_user: CurrentUser,
rights: PermissionRight = None,
) -> User:
if not current_user.has_permission(module, part, rights):
raise HTTPException(
status_code=403, detail="The user doesn't have enough privileges"
)
return current_user
def get_current_system_admin(current_user: CurrentUser) -> User:
return get_user_permissions(
module=PermissionModule.SYSTEM,
part=PermissionPart.ADMIN,
rights=PermissionRight.ADMIN,
current_user=current_user,
)