Inplement ApiKeys

This commit is contained in:
Sebastiaan
2025-06-05 19:36:35 +02:00
parent f8b15e3407
commit 2b865aa249
8 changed files with 312 additions and 14 deletions

View File

@@ -11,6 +11,7 @@ from app.core.config import settings
from app.core.security import get_password_hash from app.core.security import get_password_hash
from app.models.base import Message from app.models.base import Message
from app.models.user import User, NewPassword, Token, UserPublic from app.models.user import User, NewPassword, Token, UserPublic
from app.models.apikey import ApiKey
from app.utils import ( from app.utils import (
generate_password_reset_token, generate_password_reset_token,
generate_reset_password_email, generate_reset_password_email,
@@ -43,6 +44,27 @@ def login_access_token(
) )
@router.get("/login/api-key/{api_key}")
def login_apikey(
session: SessionDep,
api_key: str,
) -> Token:
"""
Plain apikey compatible login, get an access token for future requests
"""
user = ApiKey.authenticate(session=session, api_key=api_key)
if not user:
raise HTTPException(status_code=400, detail="Incorrect apikey")
elif not user.is_active:
raise HTTPException(status_code=400, detail="Inactive user")
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
return Token(
access_token=security.create_access_token(
user.id, expires_delta=access_token_expires
)
)
@router.post("/login/test-token", response_model=UserPublic) @router.post("/login/test-token", response_model=UserPublic)
def test_token(current_user: CurrentUser) -> Any: def test_token(current_user: CurrentUser) -> Any:
""" """

View File

@@ -11,7 +11,7 @@ from app.api.deps import (
) )
from app.core.config import settings from app.core.config import settings
from app.core.security import get_password_hash, verify_password from app.core.security import get_password_hash, verify_password
from app.models.base import Message from app.models.base import Message, RowId
from app.models.user import ( from app.models.user import (
UpdatePassword, UpdatePassword,
User, User,
@@ -25,6 +25,13 @@ from app.models.user import (
PermissionPart, PermissionPart,
PermissionRight, PermissionRight,
) )
from app.models.apikey import (
ApiKey,
ApiKeyCreate,
ApiKeyPublic,
ApiKeysPublic,
ApiKeyGenerate,
)
from app.utils import generate_new_account_email, send_email from app.utils import generate_new_account_email, send_email
router = APIRouter(prefix="/users", tags=["users"]) router = APIRouter(prefix="/users", tags=["users"])
@@ -118,6 +125,68 @@ def update_password_me(
return Message(message="Password updated successfully") return Message(message="Password updated successfully")
@router.get("/me/api-key", response_model=ApiKeysPublic)
def read_apikey_me(
session: SessionDep, current_user: CurrentUser, skip: int = 0, limit: int = 100
) -> Any:
"""
Retrieve api keys from user
"""
count_statement = (
select(func.count())
.select_from(ApiKey)
.where(ApiKey.user_id == current_user.id)
)
count = session.exec(count_statement).one()
statement = select(User).offset(skip).limit(limit)
api_keys = session.exec(statement).all()
return ApiKeysPublic(data=api_keys, count=count)
@router.post("/me/api-key", response_model=ApiKeyPublic)
def create_apikey_met(
*, session: SessionDep, body: ApiKeyGenerate, current_user: CurrentUser
) -> Any:
"""
Generate a new api-key.
"""
data_obj = body.model_dump(exclude_unset=True)
extra_data = {
"user_id": current_user.id,
}
create_obj = ApiKeyCreate.model_validate(data_obj, update=extra_data)
api_key = ApiKey.create(session=session, create_obj=create_obj)
current_user.api_keys.append(api_key)
session.add(current_user)
session.commit()
return api_key
@router.delete("/me/api-key/{api_key}", response_model=ApiKeyPublic)
def delete_apikey_me(
*,
session: SessionDep,
current_user: CurrentUser,
api_key: RowId,
) -> Message:
"""
Delete a api-key.
"""
for api_key in current_user.api_keys:
if api_key.id == api_key:
session.delete(api_key)
session.commit()
return Message(message="Api key deleted successfully")
raise HTTPException(status_code=404, detail="API key not found")
@router.get("/me", response_model=UserPublic) @router.get("/me", response_model=UserPublic)
def read_user_me(current_user: CurrentUser) -> Any: def read_user_me(current_user: CurrentUser) -> Any:
""" """

View File

@@ -1,4 +1,3 @@
from sqlalchemy.sql import roles, roles
from sqlmodel import Session, create_engine, select from sqlmodel import Session, create_engine, select
from app.core.config import settings from app.core.config import settings
@@ -11,9 +10,10 @@ from app.models.user import (
PermissionModule, PermissionModule,
PermissionPart, PermissionPart,
PermissionRight, PermissionRight,
RolePermissionLink,
) )
from app.models.apikey import ApiKey
engine = create_engine(str(settings.SQLALCHEMY_DATABASE_URI)) engine = create_engine(str(settings.SQLALCHEMY_DATABASE_URI))

View File

@@ -0,0 +1,107 @@
import random
from typing import TYPE_CHECKING
from sqlmodel import Session, Field, Relationship, select
from .base import (
RowId,
BaseSQLModel,
)
from . import mixin
from .user import User
# region # API Keys for access ###################################################
# Shared properties
class ApiKeyBase(mixin.IsActive, mixin.Name, BaseSQLModel):
user_id: RowId | None = Field(
foreign_key="user.id", nullable=False, ondelete="CASCADE"
)
# Properties to receive via API on creation
class ApiKeyCreate(ApiKeyBase):
pass
class ApiKeyGenerate(mixin.IsActive, mixin.Name, BaseSQLModel):
pass
# Properties to receive via API on creation
class ApiKeyUpdate(ApiKeyBase):
pass
# Database model, database table inferred from class name
class ApiKey(mixin.RowId, ApiKeyBase, table=True):
# --- database only items --------------------------------------------------
api_key: str = Field(unique=True, nullable=False, max_length=64)
# --- back_populates links -------------------------------------------------
user: User | None = Relationship(back_populates="api_keys")
# --- CRUD actions ---------------------------------------------------------
@staticmethod
def generate(size=30, chars="ABCDEFGHJKLMNPQRSTUVWXYZ23456789"):
return "".join(random.choice(chars) for _ in range(size))
@classmethod
def create(cls, *, session: Session, create_obj: ApiKeyCreate) -> "ApiKey":
# TODO: User id
data_obj = create_obj.model_dump(exclude_unset=True)
# Generate new api key
extra_data = {
"api_key": ApiKey.generate(),
}
while cls.authenticate(session=session, api_key=extra_data["api_key"]):
extra_data["api_key"] = ApiKey.generate()
db_obj = cls.model_validate(data_obj, update=extra_data)
session.add(db_obj)
session.commit()
session.refresh(db_obj)
return db_obj
@classmethod
def update(
cls, *, session: Session, db_obj: "ApiKey", in_obj: ApiKeyUpdate
) -> "ApiKey":
data_obj = in_obj.model_dump(exclude_unset=True)
db_obj.sqlmodel_update(data_obj)
session.add(db_obj)
session.commit()
session.refresh(db_obj)
return db_obj
@classmethod
def authenticate(cls, *, session: Session, api_key: str) -> "User | None":
statement = select(cls).where(cls.api_key == api_key and cls.is_active)
db_obj = session.exec(statement).first()
if not db_obj:
return None
if not db_obj.user:
return None
return db_obj.user
# Properties to return via API, id is always required
class ApiKeyCreatedPublic(mixin.RowIdPublic, ApiKeyBase):
api_key: str
# Properties to return via API, id is always required
class ApiKeyPublic(mixin.RowIdPublic, ApiKeyBase):
pass
class ApiKeysPublic(BaseSQLModel):
data: list[ApiKeyPublic]
count: int
# endregion

View File

@@ -4,12 +4,14 @@ from enum import auto as auto_enum
from sqlmodel import SQLModel from sqlmodel import SQLModel
from uuid import UUID as RowId from uuid import UUID as RowId
__all__ = [ __all__ = [
'RowId', "RowId",
'DocumentedStrEnum', "DocumentedStrEnum",
'DocumentedIntFlag', "DocumentedIntFlag",
'auto_enum', "auto_enum",
'BaseSQLModel', "BaseSQLModel",
"Message",
] ]
# region SQLModel base class ################################################### # region SQLModel base class ###################################################

View File

@@ -1,10 +1,8 @@
import random
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from pydantic import EmailStr from pydantic import EmailStr
from sqlmodel import Session, Field, Relationship, select from sqlmodel import Session, Field, Relationship, select
from app.core.config import settings
from app.core.security import get_password_hash, verify_password from app.core.security import get_password_hash, verify_password
from .base import ( from .base import (
@@ -16,8 +14,11 @@ from .base import (
) )
from . import mixin from . import mixin
if TYPE_CHECKING:
from .apikey import ApiKey
# region User ##################################################################
# region # User ################################################################
class PermissionModule(DocumentedStrEnum): class PermissionModule(DocumentedStrEnum):
@@ -39,7 +40,7 @@ class PermissionRight(DocumentedIntFlag):
ADMIN = CREATE | READ | UPDATE | DELETE ADMIN = CREATE | READ | UPDATE | DELETE
# ############################################################################# # ##############################################################################
# link to User (many-to-many) # link to User (many-to-many)
class UserRoleLink(BaseSQLModel, table=True): class UserRoleLink(BaseSQLModel, table=True):
user_id: RowId | None = Field( user_id: RowId | None = Field(
@@ -103,6 +104,7 @@ class User(mixin.RowId, UserBase, table=True):
hashed_password: str hashed_password: str
# --- back_populates links ------------------------------------------------- # --- back_populates links -------------------------------------------------
api_keys: list["ApiKey"] = Relationship(back_populates="user", cascade_delete=True)
# --- many-to-many links --------------------------------------------------- # --- many-to-many links ---------------------------------------------------
roles: list["Role"] = Relationship(back_populates="users", link_model=UserRoleLink) roles: list["Role"] = Relationship(back_populates="users", link_model=UserRoleLink)
@@ -215,7 +217,7 @@ class UsersPublic(BaseSQLModel):
# endregion # endregion
# region Password manager ###################################################### # region # Password manager ######################################################
# JSON payload containing access token # JSON payload containing access token
@@ -237,7 +239,7 @@ class NewPassword(BaseSQLModel):
# endregion # endregion
# region Permissions ########################################################### # region # Permissions ###########################################################
# link to Roles (many-to-many) # link to Roles (many-to-many)

View File

@@ -6,6 +6,7 @@ from sqlmodel import Session
from app.core.config import settings from app.core.config import settings
from app.core.security import verify_password from app.core.security import verify_password
from app.models.user import User, UserCreate from app.models.user import User, UserCreate
from app.models.apikey import ApiKey, ApiKeyCreate
from app.tests.utils.user import user_authentication_headers from app.tests.utils.user import user_authentication_headers
from app.tests.utils.utils import random_email, random_lower_string from app.tests.utils.utils import random_email, random_lower_string
from app.utils import generate_password_reset_token from app.utils import generate_password_reset_token
@@ -44,6 +45,75 @@ def test_use_access_token(
assert "email" in result assert "email" in result
def test_use_api_key(client: TestClient, db: Session) -> None:
user_db = User.get_by_email(session=db, email=settings.FIRST_SUPERUSER)
data = {
"user_id": user_db.id,
"is_active": True,
}
create_obj = ApiKeyCreate.model_validate(data)
api_key = ApiKey.create(session=db, create_obj=create_obj)
# TODO: Fix user_db.api_keys.append(api_key)
db.add(user_db)
db.commit()
r = client.get(f"{settings.API_V1_STR}/login/api-key/{api_key.api_key}")
tokens = r.json()
assert r.status_code == 200
assert "access_token" in tokens
assert tokens["access_token"]
def test_use_api_key_inactive(client: TestClient, db: Session) -> None:
user_db = User.get_by_email(session=db, email=settings.FIRST_SUPERUSER)
data = {
"user_id": user_db.id,
"is_active": False,
}
create_obj = ApiKeyCreate.model_validate(data)
api_key = ApiKey.create(session=db, create_obj=create_obj)
# TODO: Fix user_db.api_keys.append(api_key)
db.add(user_db)
db.commit()
r = client.get(f"{settings.API_V1_STR}/login/api-key/{api_key.api_key}")
tokens = r.json()
assert r.status_code == 400
assert "access_token" in tokens
assert tokens["access_token"]
def test_use_api_key_user_inactive(client: TestClient, db: Session) -> None:
user_db = User.get_by_email(session=db, email=settings.FIRST_SUPERUSER)
data = {
"user_id": user_db.id,
"is_active": True,
}
create_obj = ApiKeyCreate.model_validate(data)
api_key = ApiKey.create(session=db, create_obj=create_obj)
# TODO: Fix user_db.api_keys.append(api_key)
db.add(user_db)
db.commit()
# TODO: set user inactive
r = client.get(f"{settings.API_V1_STR}/login/api-key/{api_key.api_key}")
tokens = r.json()
assert r.status_code == 400
assert "access_token" in tokens
assert tokens["access_token"]
# Revert to the old password to keep consistency in test
# TODO: restore user active
def test_recovery_password( def test_recovery_password(
client: TestClient, normal_user_token_headers: dict[str, str] client: TestClient, normal_user_token_headers: dict[str, str]
) -> None: ) -> None:

View File

@@ -165,6 +165,7 @@ def test_retrieve_users(
assert "count" in all_users assert "count" in all_users
for item in all_users["data"]: for item in all_users["data"]:
assert "email" in item assert "email" in item
# TODO: To be sure there are no: assert "api_keys" not in item
def test_update_user_me( def test_update_user_me(
@@ -229,6 +230,31 @@ def test_update_password_me(
assert verify_password(settings.FIRST_SUPERUSER_PASSWORD, user_db.hashed_password) assert verify_password(settings.FIRST_SUPERUSER_PASSWORD, user_db.hashed_password)
def test_generate_api_key_me(
client: TestClient, superuser_token_headers: dict[str, str]
) -> None:
data = {"name": "Test api"}
r = client.post(
f"{settings.API_V1_STR}/users/me/api-key",
headers=superuser_token_headers,
json=data,
)
assert r.status_code == 200
api_key = r.json()
assert "api_key" in api_key
assert api_key["name"] == data["name"]
assert api_key["is_active"]
# TODO: get api-keys
# TODO: disable api-key
# TODO: enable api-key
# TODO: delete api-key
def test_update_password_me_incorrect_password( def test_update_password_me_incorrect_password(
client: TestClient, superuser_token_headers: dict[str, str] client: TestClient, superuser_token_headers: dict[str, str]
) -> None: ) -> None: