diff --git a/backend/app/api/deps.py b/backend/app/api/deps.py index b16f247..6177c71 100644 --- a/backend/app/api/deps.py +++ b/backend/app/api/deps.py @@ -11,7 +11,13 @@ from sqlmodel import Session from app.core import security from app.core.config import settings from app.core.db import engine -from app.models.user import TokenPayload, User +from app.models.user import ( + PermissionModule, + PermissionPart, + PermissionRight, + TokenPayload, + User, +) reusable_oauth2 = OAuth2PasswordBearer( tokenUrl=f"{settings.API_V1_STR}/login/access-token" @@ -49,9 +55,23 @@ def get_current_user(session: SessionDep, token: TokenDep) -> User: CurrentUser = Annotated[User, Depends(get_current_user)] -def get_current_active_superuser(current_user: CurrentUser) -> User: - if not current_user.is_superuser: +def get_user_permissions( + module: PermissionModule, + part: PermissionPart, + current_user: CurrentUser, + rights: PermissionRight = None, +) -> User: + if not current_user.has_permission(module, part, rights): raise HTTPException( status_code=403, detail="The user doesn't have enough privileges" ) return current_user + + +def get_current_system_admin(current_user: CurrentUser) -> User: + return get_user_permissions( + module=PermissionModule.SYSTEM, + part=PermissionPart.ADMIN, + rights=PermissionRight.ADMIN, + current_user=current_user, + ) diff --git a/backend/app/api/routes/login.py b/backend/app/api/routes/login.py index cbef4f2..0a7a887 100644 --- a/backend/app/api/routes/login.py +++ b/backend/app/api/routes/login.py @@ -5,7 +5,7 @@ from fastapi import APIRouter, Depends, HTTPException from fastapi.responses import HTMLResponse from fastapi.security import OAuth2PasswordRequestForm -from app.api.deps import CurrentUser, SessionDep, get_current_active_superuser +from app.api.deps import CurrentUser, SessionDep, get_current_system_admin from app.core import security from app.core.config import settings from app.core.security import get_password_hash @@ -100,7 +100,7 @@ def reset_password(session: SessionDep, body: NewPassword) -> Message: @router.post( "/password-recovery-html-content/{email}", - dependencies=[Depends(get_current_active_superuser)], + dependencies=[Depends(get_current_system_admin)], response_class=HTMLResponse, ) def recover_password_html_content(email: str, session: SessionDep) -> Any: diff --git a/backend/app/api/routes/users.py b/backend/app/api/routes/users.py index de72cf8..1a29a5b 100644 --- a/backend/app/api/routes/users.py +++ b/backend/app/api/routes/users.py @@ -7,7 +7,7 @@ from sqlmodel import col, delete, func, select from app.api.deps import ( CurrentUser, SessionDep, - get_current_active_superuser, + get_current_system_admin, ) from app.core.config import settings from app.core.security import get_password_hash, verify_password @@ -21,6 +21,9 @@ from app.models.user import ( UsersPublic, UserUpdate, UserUpdateMe, + PermissionModule, + PermissionPart, + PermissionRight, ) from app.utils import generate_new_account_email, send_email @@ -29,7 +32,7 @@ router = APIRouter(prefix="/users", tags=["users"]) @router.get( "/", - dependencies=[Depends(get_current_active_superuser)], + dependencies=[Depends(get_current_system_admin)], response_model=UsersPublic, ) def read_users(session: SessionDep, skip: int = 0, limit: int = 100) -> Any: @@ -47,7 +50,7 @@ def read_users(session: SessionDep, skip: int = 0, limit: int = 100) -> Any: @router.post( - "/", dependencies=[Depends(get_current_active_superuser)], response_model=UserPublic + "/", dependencies=[Depends(get_current_system_admin)], response_model=UserPublic ) def create_user(*, session: SessionDep, user_in: UserCreate) -> Any: """ @@ -128,7 +131,11 @@ def delete_user_me(session: SessionDep, current_user: CurrentUser) -> Any: """ Delete own user. """ - if current_user.is_superuser: + if current_user.has_permission( + module=PermissionModule.SYSTEM, + part=PermissionPart.ADMIN, + rights=PermissionRight.DELETE, + ): raise HTTPException( status_code=403, detail="Super users are not allowed to delete themselves" ) @@ -163,7 +170,7 @@ def read_user_by_id( user = session.get(User, user_id) if user == current_user: return user - if not current_user.is_superuser: + if not current_user.has_permission(module=PermissionModule.USER, part=PermissionPart.ADMIN, rights=PermissionRight.READ): raise HTTPException( status_code=403, detail="The user doesn't have enough privileges", @@ -173,7 +180,7 @@ def read_user_by_id( @router.patch( "/{user_id}", - dependencies=[Depends(get_current_active_superuser)], + dependencies=[Depends(get_current_system_admin)], response_model=UserPublic, ) def update_user( @@ -203,7 +210,7 @@ def update_user( return db_user -@router.delete("/{user_id}", dependencies=[Depends(get_current_active_superuser)]) +@router.delete("/{user_id}", dependencies=[Depends(get_current_system_admin)]) def delete_user( session: SessionDep, current_user: CurrentUser, user_id: uuid.UUID ) -> Message: diff --git a/backend/app/api/routes/utils.py b/backend/app/api/routes/utils.py index 404e9b9..5a511ff 100644 --- a/backend/app/api/routes/utils.py +++ b/backend/app/api/routes/utils.py @@ -1,7 +1,7 @@ from fastapi import APIRouter, Depends from pydantic.networks import EmailStr -from app.api.deps import get_current_active_superuser +from app.api.deps import get_current_system_admin from app.models.base import Message from app.utils import generate_test_email, send_email @@ -10,7 +10,7 @@ router = APIRouter(prefix="/utils", tags=["utils"]) @router.post( "/test-email/", - dependencies=[Depends(get_current_active_superuser)], + dependencies=[Depends(get_current_system_admin)], status_code=201, ) def test_email(email_to: EmailStr) -> Message: diff --git a/backend/app/core/db.py b/backend/app/core/db.py index 640cd3f..afd7807 100644 --- a/backend/app/core/db.py +++ b/backend/app/core/db.py @@ -1,8 +1,18 @@ +from sqlalchemy.sql import roles, roles from sqlmodel import Session, create_engine, select from app.core.config import settings -from app.models.user import User, UserCreate +from app.models.user import ( + User, + UserCreate, + Role, + Permission, + PermissionModule, + PermissionPart, + PermissionRight, + RolePermissionLink, +) engine = create_engine(str(settings.SQLALCHEMY_DATABASE_URI)) @@ -21,6 +31,64 @@ def init_db(session: Session) -> None: # This works because the models are already imported and registered from app.models BaseSQLModel.metadata.create_all(engine) + # region SuperUser --------------------------------------------------------- + + # Create system admin role + system_admin_role = session.exec(select(Role).where(Role.name == "Admin")).first() + if not system_admin_role: + system_admin_role_in = Role( + name="Admin", + is_active=True, + description="Super admins", + ) + system_admin_role = Role.create( + session=session, create_obj=system_admin_role_in + ) + + user_role = session.exec(select(Role).where(Role.name == "User")).first() + if not user_role: + user_role_in = Role( + name="User", + is_active=True, + description="Role with only healthcheck read rights", + ) + user_role = Role.create(session=session, create_obj=user_role_in) + + # init all possible permissions + existing_permissions = session.exec(select(Permission)).all() + + # Create missing permissions and link to system admin role + for module in PermissionModule: + for part in PermissionPart: + permission = next( + filter( + lambda p: p.module == module and p.part == part, + existing_permissions, + ), + None, + ) + if not permission: + permission_in = Permission( + module=module, + part=part, + is_active=True, + description=f"{module.name} - {part.name}", + ) + permission = Permission.create( + session=session, create_obj=permission_in + ) + + system_admin_role.add_permission( + permission, session=session, right=PermissionRight.ADMIN + ) + + if module == PermissionModule.SYSTEM and part == PermissionPart.HEALTHCHECK: + user_role.add_permission( + permission, session=session, right=PermissionRight.READ + ) + + session.commit() + user = session.exec( select(User).where(User.email == settings.FIRST_SUPERUSER) ).first() @@ -32,3 +100,6 @@ def init_db(session: Session) -> None: is_active=True, ) user = User.create(session=session, create_obj=user_in) + user.add_role(db_obj=system_admin_role, session=session) + + # endregion diff --git a/backend/app/models/base.py b/backend/app/models/base.py index 2830a02..ec524de 100644 --- a/backend/app/models/base.py +++ b/backend/app/models/base.py @@ -1,7 +1,16 @@ +from enum import IntFlag, Enum # Python 3.11 >= StrEnum +from enum import auto as auto_enum + from sqlmodel import SQLModel from uuid import UUID as RowId - +__all__ = [ + 'RowId', + 'DocumentedStrEnum', + 'DocumentedIntFlag', + 'auto_enum', + 'BaseSQLModel', +] # region SQLModel base class ################################################### @@ -12,6 +21,21 @@ class BaseSQLModel(SQLModel): # endregion + +# region enum # Fields ######################################################### + + +class DocumentedStrEnum(str, Enum): + pass + + +class DocumentedIntFlag(IntFlag): + pass + + +# endregion + + # region Generic message ####################################################### diff --git a/backend/app/models/mixin.py b/backend/app/models/mixin.py index 4dc2b3b..524a5f9 100644 --- a/backend/app/models/mixin.py +++ b/backend/app/models/mixin.py @@ -6,12 +6,16 @@ from sqlmodel import Field from .base import RowId as RowIdType +class Name(BaseModel): + name: str | None = Field(default=None, nullable=False, unique=True, max_length=255) + + class FullName(BaseModel): full_name: str | None = Field(default=None, nullable=True, max_length=255) class IsActive(BaseModel): - is_active: bool | None = Field(default=False, nullable=False) + is_active: bool | None = Field(default=True, nullable=False) class IsVerified(BaseModel): @@ -56,3 +60,7 @@ class RowId(BaseModel): class RowIdPublic(RowId): id: RowIdType + + +class Description(BaseModel): + description: str | None = Field(default=None, nullable=True, max_length=512) diff --git a/backend/app/models/user.py b/backend/app/models/user.py index dd3adc7..d29dd26 100644 --- a/backend/app/models/user.py +++ b/backend/app/models/user.py @@ -8,6 +8,10 @@ from app.core.config import settings from app.core.security import get_password_hash, verify_password from .base import ( + RowId, + DocumentedStrEnum, + DocumentedIntFlag, + auto_enum, BaseSQLModel, ) from . import mixin @@ -16,6 +20,47 @@ from . import mixin # region User ################################################################## +class PermissionModule(DocumentedStrEnum): + SYSTEM = auto_enum() + USER = auto_enum() + + +class PermissionPart(DocumentedStrEnum): + ADMIN = auto_enum() + HEALTHCHECK = auto_enum() + + +class PermissionRight(DocumentedIntFlag): + CREATE = auto_enum() + READ = auto_enum() + UPDATE = auto_enum() + DELETE = auto_enum() + + ADMIN = CREATE | READ | UPDATE | DELETE + + +# ############################################################################# +# link to User (many-to-many) +class UserRoleLink(BaseSQLModel, table=True): + user_id: RowId | None = Field( + default=None, + foreign_key="user.id", + primary_key=True, + nullable=False, + ondelete="CASCADE", + ) + role_id: RowId | None = Field( + default=None, + foreign_key="role.id", + primary_key=True, + nullable=False, + ondelete="CASCADE", + ) + + +# ############################################################################# + + # Shared properties class UserBase( mixin.UserName, @@ -34,7 +79,7 @@ class UserCreate(mixin.Password, UserBase): pass -class UserRegister(mixin.Password, BaseSQLModel): +class UserRegister(mixin.Password, mixin.FullName, BaseSQLModel): email: EmailStr = Field(max_length=255) @@ -60,6 +105,7 @@ class User(mixin.RowId, UserBase, table=True): # --- back_populates links ------------------------------------------------- # --- many-to-many links --------------------------------------------------- + roles: list["Role"] = Relationship(back_populates="users", link_model=UserRoleLink) # --- CRUD actions --------------------------------------------------------- @classmethod @@ -107,6 +153,64 @@ class User(mixin.RowId, UserBase, table=True): return None return db_obj + def add_role(self, *, name: str = None, id: RowId = None, db_obj: "Role" = None, session: Session) -> "User": + if db_obj: + pass + elif name: + db_obj = session.exec(select(Role).where(Role.name == name)).first() + elif id: + db_obj = session.exec(select(Role).where(Role.id == id)).first() + + to_add = next((add for add in self.roles if add == db_obj), None) + + if not to_add: + self.roles.append(db_obj) + session.commit() + + return self + + def remove_role(self, *, name: str = None, id: RowId = None, db_obj: "Role" = None, session: Session) -> "User": + if db_obj: + pass + elif name: + db_obj = session.exec(select(Role).where(Role.name == name)).first() + elif id: + db_obj = session.exec(select(Role).where(Role.id == id)).first() + + to_remove = next((remove for remove in self.roles if remove == db_obj), None) + if to_remove: + statement = select(UserRoleLink).where( + UserRoleLink.user_id == self.id, + UserRoleLink.role_id == db_obj.id + ) + link_to_remove = session.exec(statement).first() + + if link_to_remove: + session.delete(link_to_remove) + session.commit() + + return self + + def has_permission( + self, + module: PermissionModule, + part: PermissionPart, + rights: PermissionRight | None = None, + ) -> bool: + return any( + any( + ( + link.permission.module == module + and link.permission.part == part + and link.permission.is_active + and (not rights or (link.rights & rights) == rights) + ) + for link in role.permission_links + if role.is_active + ) + for role in self.roles + ) + # Properties to return via API, id is always required class UserPublic(mixin.RowIdPublic, UserBase): @@ -141,3 +245,116 @@ class NewPassword(BaseSQLModel): # endregion + + +# region Permissions ########################################################### + + +# link to Roles (many-to-many) +class RolePermissionLink(BaseSQLModel, table=True): + role_id: RowId | None = Field( + default=None, + foreign_key="role.id", + primary_key=True, + nullable=False, + ondelete="CASCADE", + ) + permission_id: RowId | None = Field( + default=None, + foreign_key="permission.id", + primary_key=True, + nullable=False, + ondelete="CASCADE", + ) + + rights: "PermissionRight | None" = Field(default=0, nullable=False) + + role: "Role" = Relationship(back_populates="permission_links") + permission: "Permission" = Relationship(back_populates="role_links") + + +# ############################################################################# + +# TODO: if we want to mange roles add all crud classes + + +class Role( + mixin.RowId, mixin.Name, mixin.IsActive, mixin.Description, BaseSQLModel, table=True +): + # --- database only items -------------------------------------------------- + + # --- many-to-many links --------------------------------------------------- + permission_links: list["RolePermissionLink"] = Relationship(back_populates="role") + users: list["User"] = Relationship(back_populates="roles", link_model=UserRoleLink) + + # --- CRUD actions --------------------------------------------------------- + @classmethod + def create(cls, *, session: Session, create_obj: "Role") -> "Role": + data_obj = create_obj.model_dump(exclude_unset=True) + db_obj = cls.model_validate(data_obj) + session.add(db_obj) + session.commit() + session.refresh(db_obj) + return db_obj + + def add_permission( + self, add: "Permission", *, session: Session, right: PermissionRight = None + ) -> "Role": + link = next( + (link for link in self.permission_links if link.permission == add), + None, + ) + if link: + link.rights = right + else: + self.permission_links.append( + RolePermissionLink( + role=self, + permission=add, + rights=right, + ) + ) + session.add(self.permission_links[-1]) + + session.commit() + + return self + + def remove_permission(self, remove: "Permission", *, session: Session) -> "Role": + link = next( + (link for link in self.permission_links if link.permission == remove), + None, + ) + if link: + session.delete(link) + session.commit() + + return self + + +# ############################################################################# + + +# All Permission will be generated during db init +class Permission( + mixin.RowId, mixin.IsActive, mixin.Description, BaseSQLModel, table=True +): + # --- database only items -------------------------------------------------- + module: PermissionModule = Field(nullable=False) + part: PermissionPart = Field(nullable=False) + + # --- many-to-many links --------------------------------------------------- + role_links: list["RolePermissionLink"] = Relationship(back_populates="permission") + + # --- CRUD actions --------------------------------------------------------- + @classmethod + def create(cls, *, session: Session, create_obj: "Permission") -> "Permission": + data_obj = create_obj.model_dump(exclude_unset=True) + db_obj = cls.model_validate(data_obj) + session.add(db_obj) + session.commit() + session.refresh(db_obj) + return db_obj + + +# endregion diff --git a/backend/app/tests/api/routes/test_private.py b/backend/app/tests/api/routes/test_private.py index 1e1f985..b5095c3 100644 --- a/backend/app/tests/api/routes/test_private.py +++ b/backend/app/tests/api/routes/test_private.py @@ -2,7 +2,7 @@ from fastapi.testclient import TestClient from sqlmodel import Session, select from app.core.config import settings -from app.models import User +from app.models.user import User def test_create_user(client: TestClient, db: Session) -> None: @@ -19,6 +19,8 @@ def test_create_user(client: TestClient, db: Session) -> None: data = r.json() + # TODO: Give user role + user = db.exec(select(User).where(User.id == data["id"])).first() assert user diff --git a/backend/app/tests/api/routes/test_users.py b/backend/app/tests/api/routes/test_users.py index ff3771d..cc06564 100644 --- a/backend/app/tests/api/routes/test_users.py +++ b/backend/app/tests/api/routes/test_users.py @@ -17,7 +17,7 @@ def test_get_users_superuser_me( current_user = r.json() assert current_user assert current_user["is_active"] is True - assert current_user["is_superuser"] + # assert current_user["is_superuser"] # TODO: Rewrite to split all roles assert current_user["email"] == settings.FIRST_SUPERUSER @@ -28,7 +28,7 @@ def test_get_users_normal_user_me( current_user = r.json() assert current_user assert current_user["is_active"] is True - assert current_user["is_superuser"] is False + # assert current_user["is_superuser"] is False # TODO: Rewrite to split all roles assert current_user["email"] == settings.EMAIL_TEST_USER diff --git a/backend/app/tests/conftest.py b/backend/app/tests/conftest.py index 90ab39a..0e38892 100644 --- a/backend/app/tests/conftest.py +++ b/backend/app/tests/conftest.py @@ -7,7 +7,7 @@ from sqlmodel import Session, delete from app.core.config import settings from app.core.db import engine, init_db from app.main import app -from app.models import Item, User +from app.models.user import User from app.tests.utils.user import authentication_token_from_email from app.tests.utils.utils import get_superuser_token_headers @@ -17,8 +17,6 @@ def db() -> Generator[Session, None, None]: with Session(engine) as session: init_db(session) yield session - statement = delete(Item) - session.execute(statement) statement = delete(User) session.execute(statement) session.commit() diff --git a/backend/app/tests/crud/test_user.py b/backend/app/tests/crud/test_user.py index 8f11d70..b02129d 100644 --- a/backend/app/tests/crud/test_user.py +++ b/backend/app/tests/crud/test_user.py @@ -2,7 +2,13 @@ from fastapi.encoders import jsonable_encoder from sqlmodel import Session from app.core.security import verify_password -from app.models.user import User, UserCreate, UserUpdate +from app.models.user import ( + User, + UserCreate, + UserUpdate, + PermissionModule, + PermissionPart, +) from app.tests.utils.utils import random_email, random_lower_string @@ -43,17 +49,21 @@ def test_check_if_user_is_active(db: Session) -> None: def test_check_if_user_is_active_inactive(db: Session) -> None: email = random_email() password = random_lower_string() - user_in = UserCreate(email=email, password=password, disabled=True) + user_in = UserCreate(email=email, password=password, is_active=False) user = User.create(session=db, create_obj=user_in) - assert user.is_active + assert user.is_active is False def test_check_if_user_is_superuser(db: Session) -> None: email = random_email() password = random_lower_string() - user_in = UserCreate(email=email, password=password, is_superuser=True) + user_in = UserCreate(email=email, password=password) user = User.create(session=db, create_obj=user_in) - assert user.is_superuser is True + user.add_role(name="Admin", session=db) + assert ( + user.has_permission(module=PermissionModule.SYSTEM, part=PermissionPart.ADMIN) + is True + ) def test_check_if_user_is_superuser_normal_user(db: Session) -> None: @@ -61,14 +71,19 @@ def test_check_if_user_is_superuser_normal_user(db: Session) -> None: password = random_lower_string() user_in = UserCreate(email=username, password=password) user = User.create(session=db, create_obj=user_in) - assert user.is_superuser is False + user.add_role(name="User", session=db) + assert ( + user.has_permission(module=PermissionModule.SYSTEM, part=PermissionPart.ADMIN) + is False + ) def test_get_user(db: Session) -> None: password = random_lower_string() username = random_email() - user_in = UserCreate(email=username, password=password, is_superuser=True) + user_in = UserCreate(email=username, password=password) user = User.create(session=db, create_obj=user_in) + user.add_role(name="Admin", session=db) user_2 = db.get(User, user.id) assert user_2 assert user.email == user_2.email @@ -78,10 +93,10 @@ def test_get_user(db: Session) -> None: def test_update_user(db: Session) -> None: password = random_lower_string() email = random_email() - user_in = UserCreate(email=email, password=password, is_superuser=True) + user_in = UserCreate(email=email, password=password) user = User.create(session=db, create_obj=user_in) new_password = random_lower_string() - user_in_update = UserUpdate(password=new_password, is_superuser=True) + user_in_update = UserUpdate(password=new_password) if user.id is not None: User.update(session=db, db_obj=user, in_obj=user_in_update) user_2 = db.get(User, user.id) diff --git a/backend/app/tests/utils/user.py b/backend/app/tests/utils/user.py index ee5b0ed..3fcb75a 100644 --- a/backend/app/tests/utils/user.py +++ b/backend/app/tests/utils/user.py @@ -1,8 +1,8 @@ from fastapi.testclient import TestClient -from sqlmodel import Session +from sqlmodel import Session, select from app.core.config import settings -from app.models.user import User, UserCreate, UserUpdate +from app.models.user import User, UserCreate, UserUpdate, Role from app.tests.utils.utils import random_email, random_lower_string