♻ Move project source files to top level from src, update Sentry dependency (#630)

Co-authored-by: Sebastián Ramírez <tiangolo@gmail.com>
This commit is contained in:
Esteban Maya
2024-03-07 11:35:33 -05:00
committed by GitHub
parent ae83b89113
commit 8558cf00a2
248 changed files with 4 additions and 6 deletions

8
backend/.dockerignore Normal file
View File

@@ -0,0 +1,8 @@
# Python
__pycache__
app.egg-info
*.pyc
.mypy_cache
.coverage
htmlcov
.venv

9
backend/.gitignore vendored Normal file
View File

@@ -0,0 +1,9 @@
__pycache__
app.egg-info
*.pyc
.mypy_cache
.coverage
htmlcov
poetry.lock
.cache
.venv

71
backend/alembic.ini Executable file
View File

@@ -0,0 +1,71 @@
# A generic, single database configuration.
[alembic]
# path to migration scripts
script_location = app/alembic
# template used to generate migration files
# file_template = %%(rev)s_%%(slug)s
# timezone to use when rendering the date
# within the migration file as well as the filename.
# string value is passed to dateutil.tz.gettz()
# leave blank for localtime
# timezone =
# max length of characters to apply to the
# "slug" field
#truncate_slug_length = 40
# set to 'true' to run the environment during
# the 'revision' command, regardless of autogenerate
# revision_environment = false
# set to 'true' to allow .pyc and .pyo files without
# a source .py file to be detected as revisions in the
# versions/ directory
# sourceless = false
# version location specification; this defaults
# to alembic/versions. When using multiple version
# directories, initial revisions must be specified with --version-path
# version_locations = %(here)s/bar %(here)s/bat alembic/versions
# the output encoding used when revision files
# are written from script.py.mako
# output_encoding = utf-8
# Logging configuration
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARN
handlers = console
qualname =
[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S

0
backend/app/__init__.py Normal file
View File

1
backend/app/alembic/README Executable file
View File

@@ -0,0 +1 @@
Generic single-database configuration.

87
backend/app/alembic/env.py Executable file
View File

@@ -0,0 +1,87 @@
import os
from logging.config import fileConfig
from alembic import context
from sqlalchemy import engine_from_config, pool
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
fileConfig(config.config_file_name)
# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
# target_metadata = None
from app.models import SQLModel # noqa
target_metadata = SQLModel.metadata
# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.
def get_url():
user = os.getenv("POSTGRES_USER", "postgres")
password = os.getenv("POSTGRES_PASSWORD", "")
server = os.getenv("POSTGRES_SERVER", "db")
db = os.getenv("POSTGRES_DB", "app")
return f"postgresql+psycopg://{user}:{password}@{server}/{db}"
def run_migrations_offline():
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = get_url()
context.configure(
url=url, target_metadata=target_metadata, literal_binds=True, compare_type=True
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online():
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
configuration = config.get_section(config.config_ini_section)
configuration["sqlalchemy.url"] = get_url()
connectable = engine_from_config(
configuration,
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(
connection=connection, target_metadata=target_metadata, compare_type=True
)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()

View File

@@ -0,0 +1,25 @@
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
from alembic import op
import sqlalchemy as sa
import sqlmodel.sql.sqltypes
${imports if imports else ""}
# revision identifiers, used by Alembic.
revision = ${repr(up_revision)}
down_revision = ${repr(down_revision)}
branch_labels = ${repr(branch_labels)}
depends_on = ${repr(depends_on)}
def upgrade():
${upgrades if upgrades else "pass"}
def downgrade():
${downgrades if downgrades else "pass"}

View File

View File

@@ -0,0 +1,54 @@
"""Initialize models
Revision ID: e2412789c190
Revises:
Create Date: 2023-11-24 22:55:43.195942
"""
import sqlalchemy as sa
import sqlmodel.sql.sqltypes
from alembic import op
# revision identifiers, used by Alembic.
revision = "e2412789c190"
down_revision = None
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"user",
sa.Column("email", sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.Column("is_active", sa.Boolean(), nullable=False),
sa.Column("is_superuser", sa.Boolean(), nullable=False),
sa.Column("full_name", sqlmodel.sql.sqltypes.AutoString(), nullable=True),
sa.Column("id", sa.Integer(), nullable=False),
sa.Column(
"hashed_password", sqlmodel.sql.sqltypes.AutoString(), nullable=False
),
sa.PrimaryKeyConstraint("id"),
)
op.create_index(op.f("ix_user_email"), "user", ["email"], unique=True)
op.create_table(
"item",
sa.Column("description", sqlmodel.sql.sqltypes.AutoString(), nullable=True),
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("title", sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.Column("owner_id", sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(
["owner_id"],
["user.id"],
),
sa.PrimaryKeyConstraint("id"),
)
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table("item")
op.drop_index(op.f("ix_user_email"), table_name="user")
op.drop_table("user")
# ### end Alembic commands ###

View File

56
backend/app/api/deps.py Normal file
View File

@@ -0,0 +1,56 @@
from collections.abc import Generator
from typing import Annotated
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import jwt
from pydantic import ValidationError
from sqlmodel import Session
from app.core import security
from app.core.config import settings
from app.core.db import engine
from app.models import TokenPayload, User
reusable_oauth2 = OAuth2PasswordBearer(
tokenUrl=f"{settings.API_V1_STR}/login/access-token"
)
def get_db() -> Generator:
with Session(engine) as session:
yield session
SessionDep = Annotated[Session, Depends(get_db)]
TokenDep = Annotated[str, Depends(reusable_oauth2)]
def get_current_user(session: SessionDep, token: TokenDep) -> User:
try:
payload = jwt.decode(
token, settings.SECRET_KEY, algorithms=[security.ALGORITHM]
)
token_data = TokenPayload(**payload)
except (jwt.JWTError, ValidationError):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Could not validate credentials",
)
user = session.get(User, token_data.sub)
if not user:
raise HTTPException(status_code=404, detail="User not found")
if not user.is_active:
raise HTTPException(status_code=400, detail="Inactive user")
return user
CurrentUser = Annotated[User, Depends(get_current_user)]
def get_current_active_superuser(current_user: CurrentUser) -> User:
if not current_user.is_superuser:
raise HTTPException(
status_code=400, detail="The user doesn't have enough privileges"
)
return current_user

9
backend/app/api/main.py Normal file
View File

@@ -0,0 +1,9 @@
from fastapi import APIRouter
from app.api.routes import items, login, users, utils
api_router = APIRouter()
api_router.include_router(login.router, tags=["login"])
api_router.include_router(users.router, prefix="/users", tags=["users"])
api_router.include_router(utils.router, prefix="/utils", tags=["utils"])
api_router.include_router(items.router, prefix="/items", tags=["items"])

View File

View File

@@ -0,0 +1,97 @@
from typing import Any
from fastapi import APIRouter, HTTPException
from sqlmodel import func, select
from app.api.deps import CurrentUser, SessionDep
from app.models import Item, ItemCreate, ItemOut, ItemsOut, ItemUpdate, Message
router = APIRouter()
@router.get("/", response_model=ItemsOut)
def read_items(
session: SessionDep, current_user: CurrentUser, skip: int = 0, limit: int = 100
) -> Any:
"""
Retrieve items.
"""
statment = select(func.count()).select_from(Item)
count = session.exec(statment).one()
if current_user.is_superuser:
statement = select(Item).offset(skip).limit(limit)
items = session.exec(statement).all()
else:
statement = (
select(Item)
.where(Item.owner_id == current_user.id)
.offset(skip)
.limit(limit)
)
items = session.exec(statement).all()
return ItemsOut(data=items, count=count)
@router.get("/{id}", response_model=ItemOut)
def read_item(session: SessionDep, current_user: CurrentUser, id: int) -> Any:
"""
Get item by ID.
"""
item = session.get(Item, id)
if not item:
raise HTTPException(status_code=404, detail="Item not found")
if not current_user.is_superuser and (item.owner_id != current_user.id):
raise HTTPException(status_code=400, detail="Not enough permissions")
return item
@router.post("/", response_model=ItemOut)
def create_item(
*, session: SessionDep, current_user: CurrentUser, item_in: ItemCreate
) -> Any:
"""
Create new item.
"""
item = Item.model_validate(item_in, update={"owner_id": current_user.id})
session.add(item)
session.commit()
session.refresh(item)
return item
@router.put("/{id}", response_model=ItemOut)
def update_item(
*, session: SessionDep, current_user: CurrentUser, id: int, item_in: ItemUpdate
) -> Any:
"""
Update an item.
"""
item = session.get(Item, id)
if not item:
raise HTTPException(status_code=404, detail="Item not found")
if not current_user.is_superuser and (item.owner_id != current_user.id):
raise HTTPException(status_code=400, detail="Not enough permissions")
update_dict = item_in.model_dump(exclude_unset=True)
item.sqlmodel_update(update_dict)
session.add(item)
session.commit()
session.refresh(item)
return item
@router.delete("/{id}")
def delete_item(session: SessionDep, current_user: CurrentUser, id: int) -> Message:
"""
Delete an item.
"""
item = session.get(Item, id)
if not item:
raise HTTPException(status_code=404, detail="Item not found")
if not current_user.is_superuser and (item.owner_id != current_user.id):
raise HTTPException(status_code=400, detail="Not enough permissions")
session.delete(item)
session.commit()
return Message(message="Item deleted successfully")

View File

@@ -0,0 +1,91 @@
from datetime import timedelta
from typing import Annotated, Any
from fastapi import APIRouter, Depends, HTTPException
from fastapi.security import OAuth2PasswordRequestForm
from app import crud
from app.api.deps import CurrentUser, SessionDep
from app.core import security
from app.core.config import settings
from app.core.security import get_password_hash
from app.models import Message, NewPassword, Token, UserOut
from app.utils import (
generate_password_reset_token,
send_reset_password_email,
verify_password_reset_token,
)
router = APIRouter()
@router.post("/login/access-token")
def login_access_token(
session: SessionDep, form_data: Annotated[OAuth2PasswordRequestForm, Depends()]
) -> Token:
"""
OAuth2 compatible token login, get an access token for future requests
"""
user = crud.authenticate(
session=session, email=form_data.username, password=form_data.password
)
if not user:
raise HTTPException(status_code=400, detail="Incorrect email or password")
elif not user.is_active:
raise HTTPException(status_code=400, detail="Inactive user")
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
return Token(
access_token=security.create_access_token(
user.id, expires_delta=access_token_expires
)
)
@router.post("/login/test-token", response_model=UserOut)
def test_token(current_user: CurrentUser) -> Any:
"""
Test access token
"""
return current_user
@router.post("/password-recovery/{email}")
def recover_password(email: str, session: SessionDep) -> Message:
"""
Password Recovery
"""
user = crud.get_user_by_email(session=session, email=email)
if not user:
raise HTTPException(
status_code=404,
detail="The user with this username does not exist in the system.",
)
password_reset_token = generate_password_reset_token(email=email)
send_reset_password_email(
email_to=user.email, email=email, token=password_reset_token
)
return Message(message="Password recovery email sent")
@router.post("/reset-password/")
def reset_password(session: SessionDep, body: NewPassword) -> Message:
"""
Reset password
"""
email = verify_password_reset_token(token=body.token)
if not email:
raise HTTPException(status_code=400, detail="Invalid token")
user = crud.get_user_by_email(session=session, email=email)
if not user:
raise HTTPException(
status_code=404,
detail="The user with this username does not exist in the system.",
)
elif not user.is_active:
raise HTTPException(status_code=400, detail="Inactive user")
hashed_password = get_password_hash(password=body.new_password)
user.hashed_password = hashed_password
session.add(user)
session.commit()
return Message(message="Password updated successfully")

View File

@@ -0,0 +1,198 @@
from typing import Any
from fastapi import APIRouter, Depends, HTTPException
from sqlmodel import delete, func, select
from app import crud
from app.api.deps import (
CurrentUser,
SessionDep,
get_current_active_superuser,
)
from app.core.config import settings
from app.core.security import get_password_hash, verify_password
from app.models import (
Item,
Message,
UpdatePassword,
User,
UserCreate,
UserCreateOpen,
UserOut,
UsersOut,
UserUpdate,
UserUpdateMe,
)
from app.utils import send_new_account_email
router = APIRouter()
@router.get(
"/", dependencies=[Depends(get_current_active_superuser)], response_model=UsersOut
)
def read_users(session: SessionDep, skip: int = 0, limit: int = 100) -> Any:
"""
Retrieve users.
"""
statment = select(func.count()).select_from(User)
count = session.exec(statment).one()
statement = select(User).offset(skip).limit(limit)
users = session.exec(statement).all()
return UsersOut(data=users, count=count)
@router.post(
"/", dependencies=[Depends(get_current_active_superuser)], response_model=UserOut
)
def create_user(*, session: SessionDep, user_in: UserCreate) -> Any:
"""
Create new user.
"""
user = crud.get_user_by_email(session=session, email=user_in.email)
if user:
raise HTTPException(
status_code=400,
detail="The user with this username already exists in the system.",
)
user = crud.create_user(session=session, user_create=user_in)
if settings.EMAILS_ENABLED and user_in.email:
send_new_account_email(
email_to=user_in.email, username=user_in.email, password=user_in.password
)
return user
@router.patch("/me", response_model=UserOut)
def update_user_me(
*, session: SessionDep, user_in: UserUpdateMe, current_user: CurrentUser
) -> Any:
"""
Update own user.
"""
user_data = user_in.model_dump(exclude_unset=True)
current_user.sqlmodel_update(user_data)
session.add(current_user)
session.commit()
session.refresh(current_user)
return current_user
@router.patch("/me/password", response_model=Message)
def update_password_me(
*, session: SessionDep, body: UpdatePassword, current_user: CurrentUser
) -> Any:
"""
Update own password.
"""
if not verify_password(body.current_password, current_user.hashed_password):
raise HTTPException(status_code=400, detail="Incorrect password")
if body.current_password == body.new_password:
raise HTTPException(
status_code=400, detail="New password cannot be the same as the current one"
)
hashed_password = get_password_hash(body.new_password)
current_user.hashed_password = hashed_password
session.add(current_user)
session.commit()
return Message(message="Password updated successfully")
@router.get("/me", response_model=UserOut)
def read_user_me(session: SessionDep, current_user: CurrentUser) -> Any:
"""
Get current user.
"""
return current_user
@router.post("/open", response_model=UserOut)
def create_user_open(session: SessionDep, user_in: UserCreateOpen) -> Any:
"""
Create new user without the need to be logged in.
"""
if not settings.USERS_OPEN_REGISTRATION:
raise HTTPException(
status_code=403,
detail="Open user registration is forbidden on this server",
)
user = crud.get_user_by_email(session=session, email=user_in.email)
if user:
raise HTTPException(
status_code=400,
detail="The user with this username already exists in the system",
)
user_create = UserCreate.from_orm(user_in)
user = crud.create_user(session=session, user_create=user_create)
return user
@router.get("/{user_id}", response_model=UserOut)
def read_user_by_id(
user_id: int, session: SessionDep, current_user: CurrentUser
) -> Any:
"""
Get a specific user by id.
"""
user = session.get(User, user_id)
if user == current_user:
return user
if not current_user.is_superuser:
raise HTTPException(
# TODO: Review status code
status_code=400,
detail="The user doesn't have enough privileges",
)
return user
@router.patch(
"/{user_id}",
dependencies=[Depends(get_current_active_superuser)],
response_model=UserOut,
)
def update_user(
*,
session: SessionDep,
user_id: int,
user_in: UserUpdate,
) -> Any:
"""
Update a user.
"""
db_user = crud.update_user(session=session, user_id=user_id, user_in=user_in)
if db_user is None:
raise HTTPException(
status_code=404,
detail="The user with this username does not exist in the system",
)
return db_user
@router.delete("/{user_id}")
def delete_user(
session: SessionDep, current_user: CurrentUser, user_id: int
) -> Message:
"""
Delete a user.
"""
user = session.get(User, user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
if (user == current_user and not current_user.is_superuser) or (user != current_user and current_user.is_superuser):
statement = delete(Item).where(Item.owner_id == user_id)
session.exec(statement)
session.delete(user)
session.commit()
return Message(message="User deleted successfully")
elif user == current_user and current_user.is_superuser:
raise HTTPException(
status_code=400, detail="Super users are not allowed to delete themselves"
)

View File

@@ -0,0 +1,35 @@
from fastapi import APIRouter, Depends
from pydantic.networks import EmailStr
from app.api.deps import get_current_active_superuser
from app.core.celery_app import celery_app
from app.models import Message
from app.utils import send_test_email
router = APIRouter()
@router.post(
"/test-celery/",
dependencies=[Depends(get_current_active_superuser)],
status_code=201,
)
def test_celery(body: Message) -> Message:
"""
Test Celery worker.
"""
celery_app.send_task("app.worker.test_celery", args=[body.message])
return Message(message="Word received")
@router.post(
"/test-email/",
dependencies=[Depends(get_current_active_superuser)],
status_code=201,
)
def test_email(email_to: EmailStr) -> Message:
"""
Test emails.
"""
send_test_email(email_to=email_to)
return Message(message="Test email sent")

View File

@@ -0,0 +1,38 @@
import logging
from sqlmodel import Session, select
from tenacity import after_log, before_log, retry, stop_after_attempt, wait_fixed
from app.core.db import engine
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
max_tries = 60 * 5 # 5 minutes
wait_seconds = 1
@retry(
stop=stop_after_attempt(max_tries),
wait=wait_fixed(wait_seconds),
before=before_log(logger, logging.INFO),
after=after_log(logger, logging.WARN),
)
def init() -> None:
try:
with Session(engine) as session:
# Try to create session to check if DB is awake
session.exec(select(1))
except Exception as e:
logger.error(e)
raise e
def main() -> None:
logger.info("Initializing service")
init()
logger.info("Service finished initializing")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,38 @@
import logging
from sqlmodel import Session, select
from tenacity import after_log, before_log, retry, stop_after_attempt, wait_fixed
from app.core.db import engine
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
max_tries = 60 * 5 # 5 minutes
wait_seconds = 1
@retry(
stop=stop_after_attempt(max_tries),
wait=wait_fixed(wait_seconds),
before=before_log(logger, logging.INFO),
after=after_log(logger, logging.WARN),
)
def init() -> None:
try:
# Try to create session to check if DB is awake
with Session(engine) as session:
session.exec(select(1))
except Exception as e:
logger.error(e)
raise e
def main() -> None:
logger.info("Initializing service")
init()
logger.info("Service finished initializing")
if __name__ == "__main__":
main()

View File

View File

@@ -0,0 +1,5 @@
from celery import Celery
celery_app = Celery("worker", broker="amqp://guest@queue//")
celery_app.conf.task_routes = {"app.worker.test_celery": "main-queue"}

View File

@@ -0,0 +1,98 @@
import secrets
from typing import Any
from pydantic import (
AnyHttpUrl,
HttpUrl,
PostgresDsn,
ValidationInfo,
field_validator,
)
from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
API_V1_STR: str = "/api/v1"
SECRET_KEY: str = secrets.token_urlsafe(32)
# 60 minutes * 24 hours * 8 days = 8 days
ACCESS_TOKEN_EXPIRE_MINUTES: int = 60 * 24 * 8
SERVER_HOST: AnyHttpUrl
# BACKEND_CORS_ORIGINS is a JSON-formatted list of origins
# e.g: '["http://localhost", "http://localhost:4200", "http://localhost:3000", \
# "http://localhost:8080", "http://local.dockertoolbox.tiangolo.com"]'
BACKEND_CORS_ORIGINS: list[AnyHttpUrl] | str = []
@field_validator("BACKEND_CORS_ORIGINS", mode="before")
@classmethod
def assemble_cors_origins(cls, v: str | list[str]) -> list[str] | str:
if isinstance(v, str) and not v.startswith("["):
return [i.strip() for i in v.split(",")]
elif isinstance(v, list | str):
return v
raise ValueError(v)
PROJECT_NAME: str
SENTRY_DSN: HttpUrl | None = None
@field_validator("SENTRY_DSN", mode="before")
@classmethod
def sentry_dsn_can_be_blank(cls, v: str) -> str | None:
if not v:
return None
return v
POSTGRES_SERVER: str
POSTGRES_USER: str
POSTGRES_PASSWORD: str
POSTGRES_DB: str
SQLALCHEMY_DATABASE_URI: PostgresDsn | None = None
@field_validator("SQLALCHEMY_DATABASE_URI", mode="before")
def assemble_db_connection(cls, v: str | None, info: ValidationInfo) -> Any:
if isinstance(v, str):
return v
return PostgresDsn.build(
scheme="postgresql+psycopg",
username=info.data.get("POSTGRES_USER"),
password=info.data.get("POSTGRES_PASSWORD"),
host=info.data.get("POSTGRES_SERVER"),
path=f"{info.data.get('POSTGRES_DB') or ''}",
)
SMTP_TLS: bool = True
SMTP_PORT: int | None = None
SMTP_HOST: str | None = None
SMTP_USER: str | None = None
SMTP_PASSWORD: str | None = None
# TODO: update type to EmailStr when sqlmodel supports it
EMAILS_FROM_EMAIL: str | None = None
EMAILS_FROM_NAME: str | None = None
@field_validator("EMAILS_FROM_NAME")
def get_project_name(cls, v: str | None, info: ValidationInfo) -> str:
if not v:
return info.data["PROJECT_NAME"]
return v
EMAIL_RESET_TOKEN_EXPIRE_HOURS: int = 48
EMAIL_TEMPLATES_DIR: str = "/app/app/email-templates/build"
EMAILS_ENABLED: bool = False
@field_validator("EMAILS_ENABLED", mode="before")
def get_emails_enabled(cls, v: bool, info: ValidationInfo) -> bool:
return bool(
info.data.get("SMTP_HOST")
and info.data.get("SMTP_PORT")
and info.data.get("EMAILS_FROM_EMAIL")
)
# TODO: update type to EmailStr when sqlmodel supports it
EMAIL_TEST_USER: str = "test@example.com"
# TODO: update type to EmailStr when sqlmodel supports it
FIRST_SUPERUSER: str
FIRST_SUPERUSER_PASSWORD: str
USERS_OPEN_REGISTRATION: bool = False
model_config = SettingsConfigDict(case_sensitive=True)
settings = Settings()

34
backend/app/core/db.py Normal file
View File

@@ -0,0 +1,34 @@
from sqlmodel import Session, create_engine, select
from app import crud
from app.core.config import settings
from app.models import User, UserCreate
engine = create_engine(str(settings.SQLALCHEMY_DATABASE_URI))
# make sure all SQLModel models are imported (app.models) before initializing DB
# otherwise, SQLModel might fail to initialize relationships properly
# for more details: https://github.com/tiangolo/full-stack-fastapi-postgresql/issues/28
def init_db(session: Session) -> None:
# Tables should be created with Alembic migrations
# But if you don't want to use migrations, create
# the tables un-commenting the next lines
# from sqlmodel import SQLModel
# from app.core.engine import engine
# This works because the models are already imported and registered from app.models
# SQLModel.metadata.create_all(engine)
user = session.exec(
select(User).where(User.email == settings.FIRST_SUPERUSER)
).first()
if not user:
user_in = UserCreate(
email=settings.FIRST_SUPERUSER,
password=settings.FIRST_SUPERUSER_PASSWORD,
is_superuser=True,
)
user = crud.create_user(session=session, user_create=user_in)

View File

@@ -0,0 +1,32 @@
from datetime import datetime, timedelta
from typing import Any
from jose import jwt
from passlib.context import CryptContext
from app.core.config import settings
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
ALGORITHM = "HS256"
def create_access_token(subject: str | Any, expires_delta: timedelta = None) -> str:
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(
minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES
)
to_encode = {"exp": expire, "sub": str(subject)}
encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
return pwd_context.hash(password)

55
backend/app/crud.py Normal file
View File

@@ -0,0 +1,55 @@
from typing import Any
from sqlmodel import Session, select
from app.core.security import get_password_hash, verify_password
from app.models import Item, ItemCreate, User, UserCreate, UserUpdate
def create_user(*, session: Session, user_create: UserCreate) -> User:
db_obj = User.model_validate(
user_create, update={"hashed_password": get_password_hash(user_create.password)}
)
session.add(db_obj)
session.commit()
session.refresh(db_obj)
return db_obj
def update_user(*, session: Session, user_id: int, user_in: UserUpdate) -> Any:
db_user = session.get(User, user_id)
if not db_user:
return None
user_data = user_in.model_dump(exclude_unset=True)
extra_data = {}
if "password" in user_data:
password = user_data["password"]
hashed_password = get_password_hash(password)
extra_data["hashed_password"] = hashed_password
db_user.sqlmodel_update(user_data, update=extra_data)
session.add(db_user)
session.commit()
session.refresh(db_user)
return db_user
def get_user_by_email(*, session: Session, email: str) -> User | None:
statement = select(User).where(User.email == email)
session_user = session.exec(statement).first()
return session_user
def authenticate(*, session: Session, email: str, password: str) -> User | None:
db_user = get_user_by_email(session=session, email=email)
if not db_user:
return None
if not verify_password(password, db_user.hashed_password):
return None
return db_user
def create_item(*, session: Session, item_in: ItemCreate, owner_id: int) -> Item:
db_item = Item.model_validate(item_in, update={"owner_id": owner_id})
session.add(db_item)
session.commit()
session.refresh(db_item)
return db_item

View File

@@ -0,0 +1,26 @@
<!doctype html><html xmlns="http://www.w3.org/1999/xhtml" xmlns:v="urn:schemas-microsoft-com:vml" xmlns:o="urn:schemas-microsoft-com:office:office"><head><title></title><!--[if !mso]><!-- --><meta http-equiv="X-UA-Compatible" content="IE=edge"><!--<![endif]--><meta http-equiv="Content-Type" content="text/html; charset=UTF-8"><meta name="viewport" content="width=device-width,initial-scale=1"><style type="text/css">#outlook a { padding:0; }
.ReadMsgBody { width:100%; }
.ExternalClass { width:100%; }
.ExternalClass * { line-height:100%; }
body { margin:0;padding:0;-webkit-text-size-adjust:100%;-ms-text-size-adjust:100%; }
table, td { border-collapse:collapse;mso-table-lspace:0pt;mso-table-rspace:0pt; }
img { border:0;height:auto;line-height:100%; outline:none;text-decoration:none;-ms-interpolation-mode:bicubic; }
p { display:block;margin:13px 0; }</style><!--[if !mso]><!--><style type="text/css">@media only screen and (max-width:480px) {
@-ms-viewport { width:320px; }
@viewport { width:320px; }
}</style><!--<![endif]--><!--[if mso]>
<xml>
<o:OfficeDocumentSettings>
<o:AllowPNG/>
<o:PixelsPerInch>96</o:PixelsPerInch>
</o:OfficeDocumentSettings>
</xml>
<![endif]--><!--[if lte mso 11]>
<style type="text/css">
.outlook-group-fix { width:100% !important; }
</style>
<![endif]--><!--[if !mso]><!--><link href="https://fonts.googleapis.com/css?family=Ubuntu:300,400,500,700" rel="stylesheet" type="text/css"><style type="text/css">@import url(https://fonts.googleapis.com/css?family=Ubuntu:300,400,500,700);</style><!--<![endif]--><style type="text/css">@media only screen and (min-width:480px) {
.mj-column-per-100 { width:100% !important; max-width: 100%; }
}</style><style type="text/css"></style></head><body style="background-color:#ffffff;"><div style="background-color:#ffffff;"><!--[if mso | IE]><table align="center" border="0" cellpadding="0" cellspacing="0" class="" style="width:600px;" width="600" ><tr><td style="line-height:0px;font-size:0px;mso-line-height-rule:exactly;"><![endif]--><div style="Margin:0px auto;max-width:600px;"><table align="center" border="0" cellpadding="0" cellspacing="0" role="presentation" style="width:100%;"><tbody><tr><td style="direction:ltr;font-size:0px;padding:20px 0;text-align:center;vertical-align:top;"><!--[if mso | IE]><table role="presentation" border="0" cellpadding="0" cellspacing="0"><tr><td class="" style="vertical-align:top;width:600px;" ><![endif]--><div class="mj-column-per-100 outlook-group-fix" style="font-size:13px;text-align:left;direction:ltr;display:inline-block;vertical-align:top;width:100%;"><table border="0" cellpadding="0" cellspacing="0" role="presentation" style="vertical-align:top;" width="100%"><tr><td style="font-size:0px;padding:10px 25px;word-break:break-word;"><p style="border-top:solid 4px #555555;font-size:1;margin:0px auto;width:100%;"></p><!--[if mso | IE]><table align="center" border="0" cellpadding="0" cellspacing="0" style="border-top:solid 4px #555555;font-size:1;margin:0px auto;width:550px;" role="presentation" width="550px" ><tr><td style="height:0;line-height:0;"> &nbsp;
</td></tr></table><![endif]--></td></tr><tr><td align="left" style="font-size:0px;padding:10px 25px;word-break:break-word;"><div style="font-family:helvetica;font-size:20px;line-height:1;text-align:left;color:#555555;">{{ project_name }} - New Account</div></td></tr><tr><td align="left" style="font-size:0px;padding:10px 25px;word-break:break-word;"><div style="font-family:Ubuntu, Helvetica, Arial, sans-serif;font-size:16px;line-height:1;text-align:left;color:#555555;">You have a new account:</div></td></tr><tr><td align="left" style="font-size:0px;padding:10px 25px;word-break:break-word;"><div style="font-family:Ubuntu, Helvetica, Arial, sans-serif;font-size:16px;line-height:1;text-align:left;color:#555555;">Username: {{ username }}</div></td></tr><tr><td align="left" style="font-size:0px;padding:10px 25px;word-break:break-word;"><div style="font-family:Ubuntu, Helvetica, Arial, sans-serif;font-size:16px;line-height:1;text-align:left;color:#555555;">Password: {{ password }}</div></td></tr><tr><td align="center" vertical-align="middle" style="font-size:0px;padding:50px 0px;word-break:break-word;"><table border="0" cellpadding="0" cellspacing="0" role="presentation" style="border-collapse:separate;line-height:100%;"><tr><td align="center" bgcolor="#414141" role="presentation" style="border:none;border-radius:3px;cursor:auto;padding:10px 25px;background:#414141;" valign="middle"><a href="{{ link }}" style="background:#414141;color:#ffffff;font-family:Ubuntu, Helvetica, Arial, sans-serif;font-size:13px;font-weight:normal;line-height:120%;Margin:0;text-decoration:none;text-transform:none;" target="_blank">Go to Dashboard</a></td></tr></table></td></tr><tr><td style="font-size:0px;padding:10px 25px;word-break:break-word;"><p style="border-top:solid 2px #555555;font-size:1;margin:0px auto;width:100%;"></p><!--[if mso | IE]><table align="center" border="0" cellpadding="0" cellspacing="0" style="border-top:solid 2px #555555;font-size:1;margin:0px auto;width:550px;" role="presentation" width="550px" ><tr><td style="height:0;line-height:0;"> &nbsp;
</td></tr></table><![endif]--></td></tr></table></div><!--[if mso | IE]></td></tr></table><![endif]--></td></tr></tbody></table></div><!--[if mso | IE]></td></tr></table><![endif]--></div></body></html>

View File

@@ -0,0 +1,26 @@
<!doctype html><html xmlns="http://www.w3.org/1999/xhtml" xmlns:v="urn:schemas-microsoft-com:vml" xmlns:o="urn:schemas-microsoft-com:office:office"><head><title></title><!--[if !mso]><!-- --><meta http-equiv="X-UA-Compatible" content="IE=edge"><!--<![endif]--><meta http-equiv="Content-Type" content="text/html; charset=UTF-8"><meta name="viewport" content="width=device-width,initial-scale=1"><style type="text/css">#outlook a { padding:0; }
.ReadMsgBody { width:100%; }
.ExternalClass { width:100%; }
.ExternalClass * { line-height:100%; }
body { margin:0;padding:0;-webkit-text-size-adjust:100%;-ms-text-size-adjust:100%; }
table, td { border-collapse:collapse;mso-table-lspace:0pt;mso-table-rspace:0pt; }
img { border:0;height:auto;line-height:100%; outline:none;text-decoration:none;-ms-interpolation-mode:bicubic; }
p { display:block;margin:13px 0; }</style><!--[if !mso]><!--><style type="text/css">@media only screen and (max-width:480px) {
@-ms-viewport { width:320px; }
@viewport { width:320px; }
}</style><!--<![endif]--><!--[if mso]>
<xml>
<o:OfficeDocumentSettings>
<o:AllowPNG/>
<o:PixelsPerInch>96</o:PixelsPerInch>
</o:OfficeDocumentSettings>
</xml>
<![endif]--><!--[if lte mso 11]>
<style type="text/css">
.outlook-group-fix { width:100% !important; }
</style>
<![endif]--><!--[if !mso]><!--><link href="https://fonts.googleapis.com/css?family=Ubuntu:300,400,500,700" rel="stylesheet" type="text/css"><style type="text/css">@import url(https://fonts.googleapis.com/css?family=Ubuntu:300,400,500,700);</style><!--<![endif]--><style type="text/css">@media only screen and (min-width:480px) {
.mj-column-per-100 { width:100% !important; max-width: 100%; }
}</style><style type="text/css"></style></head><body style="background-color:#ffffff;"><div style="background-color:#ffffff;"><!--[if mso | IE]><table align="center" border="0" cellpadding="0" cellspacing="0" class="" style="width:600px;" width="600" ><tr><td style="line-height:0px;font-size:0px;mso-line-height-rule:exactly;"><![endif]--><div style="Margin:0px auto;max-width:600px;"><table align="center" border="0" cellpadding="0" cellspacing="0" role="presentation" style="width:100%;"><tbody><tr><td style="direction:ltr;font-size:0px;padding:20px 0;text-align:center;vertical-align:top;"><!--[if mso | IE]><table role="presentation" border="0" cellpadding="0" cellspacing="0"><tr><td class="" style="vertical-align:top;width:600px;" ><![endif]--><div class="mj-column-per-100 outlook-group-fix" style="font-size:13px;text-align:left;direction:ltr;display:inline-block;vertical-align:top;width:100%;"><table border="0" cellpadding="0" cellspacing="0" role="presentation" style="vertical-align:top;" width="100%"><tr><td style="font-size:0px;padding:10px 25px;word-break:break-word;"><p style="border-top:solid 4px #555555;font-size:1;margin:0px auto;width:100%;"></p><!--[if mso | IE]><table align="center" border="0" cellpadding="0" cellspacing="0" style="border-top:solid 4px #555555;font-size:1;margin:0px auto;width:550px;" role="presentation" width="550px" ><tr><td style="height:0;line-height:0;"> &nbsp;
</td></tr></table><![endif]--></td></tr><tr><td align="left" style="font-size:0px;padding:10px 25px;word-break:break-word;"><div style="font-family:helvetica;font-size:20px;line-height:1;text-align:left;color:#555555;">{{ project_name }} - Password Recovery</div></td></tr><tr><td align="left" style="font-size:0px;padding:10px 25px;word-break:break-word;"><div style="font-family:Ubuntu, Helvetica, Arial, sans-serif;font-size:16px;line-height:1;text-align:left;color:#555555;">We received a request to recover the password for user {{ username }} with email {{ email }}</div></td></tr><tr><td align="left" style="font-size:0px;padding:10px 25px;word-break:break-word;"><div style="font-family:Ubuntu, Helvetica, Arial, sans-serif;font-size:16px;line-height:1;text-align:left;color:#555555;">Reset your password by clicking the button below:</div></td></tr><tr><td align="center" vertical-align="middle" style="font-size:0px;padding:50px 0px;word-break:break-word;"><table border="0" cellpadding="0" cellspacing="0" role="presentation" style="border-collapse:separate;line-height:100%;"><tr><td align="center" bgcolor="#414141" role="presentation" style="border:none;border-radius:3px;cursor:auto;padding:10px 25px;background:#414141;" valign="middle"><a href="{{ link }}" style="background:#414141;color:#ffffff;font-family:Ubuntu, Helvetica, Arial, sans-serif;font-size:13px;font-weight:normal;line-height:120%;Margin:0;text-decoration:none;text-transform:none;" target="_blank">Reset Password</a></td></tr></table></td></tr><tr><td align="left" style="font-size:0px;padding:10px 25px;word-break:break-word;"><div style="font-family:Ubuntu, Helvetica, Arial, sans-serif;font-size:16px;line-height:1;text-align:left;color:#555555;">Or open the following link:</div></td></tr><tr><td align="left" style="font-size:0px;padding:10px 25px;word-break:break-word;"><div style="font-family:Ubuntu, Helvetica, Arial, sans-serif;font-size:16px;line-height:1;text-align:left;color:#555555;"><a href="{{ link }}">{{ link }}</a></div></td></tr><tr><td style="font-size:0px;padding:10px 25px;word-break:break-word;"><p style="border-top:solid 2px #555555;font-size:1;margin:0px auto;width:100%;"></p><!--[if mso | IE]><table align="center" border="0" cellpadding="0" cellspacing="0" style="border-top:solid 2px #555555;font-size:1;margin:0px auto;width:550px;" role="presentation" width="550px" ><tr><td style="height:0;line-height:0;"> &nbsp;
</td></tr></table><![endif]--></td></tr><tr><td align="left" style="font-size:0px;padding:10px 25px;word-break:break-word;"><div style="font-family:Ubuntu, Helvetica, Arial, sans-serif;font-size:14px;line-height:1;text-align:left;color:#555555;">The reset password link / button will expire in {{ valid_hours }} hours.</div></td></tr><tr><td align="left" style="font-size:0px;padding:10px 25px;word-break:break-word;"><div style="font-family:Ubuntu, Helvetica, Arial, sans-serif;font-size:14px;line-height:1;text-align:left;color:#555555;">If you didn't request a password recovery you can disregard this email.</div></td></tr></table></div><!--[if mso | IE]></td></tr></table><![endif]--></td></tr></tbody></table></div><!--[if mso | IE]></td></tr></table><![endif]--></div></body></html>

View File

@@ -0,0 +1,25 @@
<!doctype html><html xmlns="http://www.w3.org/1999/xhtml" xmlns:v="urn:schemas-microsoft-com:vml" xmlns:o="urn:schemas-microsoft-com:office:office"><head><title></title><!--[if !mso]><!-- --><meta http-equiv="X-UA-Compatible" content="IE=edge"><!--<![endif]--><meta http-equiv="Content-Type" content="text/html; charset=UTF-8"><meta name="viewport" content="width=device-width,initial-scale=1"><style type="text/css">#outlook a { padding:0; }
.ReadMsgBody { width:100%; }
.ExternalClass { width:100%; }
.ExternalClass * { line-height:100%; }
body { margin:0;padding:0;-webkit-text-size-adjust:100%;-ms-text-size-adjust:100%; }
table, td { border-collapse:collapse;mso-table-lspace:0pt;mso-table-rspace:0pt; }
img { border:0;height:auto;line-height:100%; outline:none;text-decoration:none;-ms-interpolation-mode:bicubic; }
p { display:block;margin:13px 0; }</style><!--[if !mso]><!--><style type="text/css">@media only screen and (max-width:480px) {
@-ms-viewport { width:320px; }
@viewport { width:320px; }
}</style><!--<![endif]--><!--[if mso]>
<xml>
<o:OfficeDocumentSettings>
<o:AllowPNG/>
<o:PixelsPerInch>96</o:PixelsPerInch>
</o:OfficeDocumentSettings>
</xml>
<![endif]--><!--[if lte mso 11]>
<style type="text/css">
.outlook-group-fix { width:100% !important; }
</style>
<![endif]--><!--[if !mso]><!--><link href="https://fonts.googleapis.com/css?family=Ubuntu:300,400,500,700" rel="stylesheet" type="text/css"><style type="text/css">@import url(https://fonts.googleapis.com/css?family=Ubuntu:300,400,500,700);</style><!--<![endif]--><style type="text/css">@media only screen and (min-width:480px) {
.mj-column-per-100 { width:100% !important; max-width: 100%; }
}</style><style type="text/css"></style></head><body style="background-color:#ffffff;"><div style="background-color:#ffffff;"><!--[if mso | IE]><table align="center" border="0" cellpadding="0" cellspacing="0" class="" style="width:600px;" width="600" ><tr><td style="line-height:0px;font-size:0px;mso-line-height-rule:exactly;"><![endif]--><div style="Margin:0px auto;max-width:600px;"><table align="center" border="0" cellpadding="0" cellspacing="0" role="presentation" style="width:100%;"><tbody><tr><td style="direction:ltr;font-size:0px;padding:20px 0;text-align:center;vertical-align:top;"><!--[if mso | IE]><table role="presentation" border="0" cellpadding="0" cellspacing="0"><tr><td class="" style="vertical-align:top;width:600px;" ><![endif]--><div class="mj-column-per-100 outlook-group-fix" style="font-size:13px;text-align:left;direction:ltr;display:inline-block;vertical-align:top;width:100%;"><table border="0" cellpadding="0" cellspacing="0" role="presentation" style="vertical-align:top;" width="100%"><tr><td style="font-size:0px;padding:10px 25px;word-break:break-word;"><p style="border-top:solid 4px #555555;font-size:1;margin:0px auto;width:100%;"></p><!--[if mso | IE]><table align="center" border="0" cellpadding="0" cellspacing="0" style="border-top:solid 4px #555555;font-size:1;margin:0px auto;width:550px;" role="presentation" width="550px" ><tr><td style="height:0;line-height:0;"> &nbsp;
</td></tr></table><![endif]--></td></tr><tr><td align="left" style="font-size:0px;padding:10px 25px;word-break:break-word;"><div style="font-family:helvetica;font-size:20px;line-height:1;text-align:left;color:#555555;">{{ project_name }}</div></td></tr><tr><td align="left" style="font-size:0px;padding:10px 25px;word-break:break-word;"><div style="font-family:Ubuntu, Helvetica, Arial, sans-serif;font-size:16px;line-height:1;text-align:left;color:#555555;">Test email for: {{ email }}</div></td></tr></table></div><!--[if mso | IE]></td></tr></table><![endif]--></td></tr></tbody></table></div><!--[if mso | IE]></td></tr></table><![endif]--></div></body></html>

View File

@@ -0,0 +1,15 @@
<mjml>
<mj-body background-color="#fff">
<mj-section>
<mj-column>
<mj-divider border-color="#555"></mj-divider>
<mj-text font-size="20px" color="#555" font-family="helvetica">{{ project_name }} - New Account</mj-text>
<mj-text font-size="16px" color="#555">You have a new account:</mj-text>
<mj-text font-size="16px" color="#555">Username: {{ username }}</mj-text>
<mj-text font-size="16px" color="#555">Password: {{ password }}</mj-text>
<mj-button padding="50px 0px" href="{{ link }}">Go to Dashboard</mj-button>
<mj-divider border-color="#555" border-width="2px" />
</mj-column>
</mj-section>
</mj-body>
</mjml>

View File

@@ -0,0 +1,19 @@
<mjml>
<mj-body background-color="#fff">
<mj-section>
<mj-column>
<mj-divider border-color="#555"></mj-divider>
<mj-text font-size="20px" color="#555" font-family="helvetica">{{ project_name }} - Password Recovery</mj-text>
<mj-text font-size="16px" color="#555">We received a request to recover the password for user {{ username }}
with email {{ email }}</mj-text>
<mj-text font-size="16px" color="#555">Reset your password by clicking the button below:</mj-text>
<mj-button padding="50px 0px" href="{{ link }}">Reset Password</mj-button>
<mj-text font-size="16px" color="#555">Or open the following link:</mj-text>
<mj-text font-size="16px" color="#555"><a href="{{ link }}">{{ link }}</a></mj-text>
<mj-divider border-color="#555" border-width="2px" />
<mj-text font-size="14px" color="#555">The reset password link / button will expire in {{ valid_hours }} hours.</mj-text>
<mj-text font-size="14px" color="#555">If you didn't request a password recovery you can disregard this email.</mj-text>
</mj-column>
</mj-section>
</mj-body>
</mjml>

View File

@@ -0,0 +1,11 @@
<mjml>
<mj-body background-color="#fff">
<mj-section>
<mj-column>
<mj-divider border-color="#555"></mj-divider>
<mj-text font-size="20px" color="#555" font-family="helvetica">{{ project_name }}</mj-text>
<mj-text font-size="16px" color="#555">Test email for: {{ email }}</mj-text>
</mj-column>
</mj-section>
</mj-body>
</mjml>

View File

@@ -0,0 +1,23 @@
import logging
from sqlmodel import Session
from app.core.db import engine, init_db
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def init() -> None:
with Session(engine) as session:
init_db(session)
def main() -> None:
logger.info("Creating initial data")
init()
logger.info("Initial data created")
if __name__ == "__main__":
main()

31
backend/app/main.py Normal file
View File

@@ -0,0 +1,31 @@
from fastapi import FastAPI
from fastapi.routing import APIRoute
from starlette.middleware.cors import CORSMiddleware
from app.api.main import api_router
from app.core.config import settings
def custom_generate_unique_id(route: APIRoute):
return f"{route.tags[0]}-{route.name}"
app = FastAPI(
title=settings.PROJECT_NAME,
openapi_url=f"{settings.API_V1_STR}/openapi.json",
generate_unique_id_function=custom_generate_unique_id,
)
# Set all CORS enabled origins
if settings.BACKEND_CORS_ORIGINS:
app.add_middleware(
CORSMiddleware,
allow_origins=[
str(origin).strip("/") for origin in settings.BACKEND_CORS_ORIGINS
],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.include_router(api_router, prefix=settings.API_V1_STR)

113
backend/app/models.py Normal file
View File

@@ -0,0 +1,113 @@
from sqlmodel import Field, Relationship, SQLModel
# Shared properties
# TODO replace email str with EmailStr when sqlmodel supports it
class UserBase(SQLModel):
email: str = Field(unique=True, index=True)
is_active: bool = True
is_superuser: bool = False
full_name: str | None = None
# Properties to receive via API on creation
class UserCreate(UserBase):
password: str
# TODO replace email str with EmailStr when sqlmodel supports it
class UserCreateOpen(SQLModel):
email: str
password: str
full_name: str | None = None
# Properties to receive via API on update, all are optional
# TODO replace email str with EmailStr when sqlmodel supports it
class UserUpdate(UserBase):
email: str | None = None
password: str | None = None
# TODO replace email str with EmailStr when sqlmodel supports it
class UserUpdateMe(SQLModel):
full_name: str | None = None
email: str | None = None
class UpdatePassword(SQLModel):
current_password: str
new_password: str
# Database model, database table inferred from class name
class User(UserBase, table=True):
id: int | None = Field(default=None, primary_key=True)
hashed_password: str
items: list["Item"] = Relationship(back_populates="owner")
# Properties to return via API, id is always required
class UserOut(UserBase):
id: int
class UsersOut(SQLModel):
data: list[UserOut]
count: int
# Shared properties
class ItemBase(SQLModel):
title: str
description: str | None = None
# Properties to receive on item creation
class ItemCreate(ItemBase):
title: str
# Properties to receive on item update
class ItemUpdate(ItemBase):
title: str | None = None
# Database model, database table inferred from class name
class Item(ItemBase, table=True):
id: int | None = Field(default=None, primary_key=True)
title: str
owner_id: int | None = Field(default=None, foreign_key="user.id", nullable=False)
owner: User | None = Relationship(back_populates="items")
# Properties to return via API, id is always required
class ItemOut(ItemBase):
id: int
owner_id: int
class ItemsOut(SQLModel):
data: list[ItemOut]
count: int
# Generic message
class Message(SQLModel):
message: str
# JSON payload containing access token
class Token(SQLModel):
access_token: str
token_type: str = "bearer"
# Contents of JWT token
class TokenPayload(SQLModel):
sub: int | None = None
class NewPassword(SQLModel):
token: str
new_password: str

View File

@@ -0,0 +1,4 @@
from .item import Item, ItemCreate, ItemInDB, ItemUpdate
from .msg import Msg
from .token import Token, TokenPayload
from .user import User, UserCreate, UserInDB, UserUpdate

View File

@@ -0,0 +1,35 @@
from pydantic import BaseModel, ConfigDict
# Shared properties
class ItemBase(BaseModel):
title: str | None = None
description: str | None = None
# Properties to receive on item creation
class ItemCreate(ItemBase):
title: str
# Properties to receive on item update
class ItemUpdate(ItemBase):
pass
# Properties shared by models stored in DB
class ItemInDBBase(ItemBase):
id: int
title: str
owner_id: int
model_config = ConfigDict(from_attributes=True)
# Properties to return to client
class Item(ItemInDBBase):
pass
# Properties properties stored in DB
class ItemInDB(ItemInDBBase):
pass

View File

@@ -0,0 +1,5 @@
from pydantic import BaseModel
class Msg(BaseModel):
msg: str

View File

@@ -0,0 +1,10 @@
from pydantic import BaseModel
class Token(BaseModel):
access_token: str
token_type: str
class TokenPayload(BaseModel):
sub: int | None = None

View File

@@ -0,0 +1,35 @@
from pydantic import BaseModel, ConfigDict, EmailStr
# Shared properties
class UserBase(BaseModel):
email: EmailStr | None = None
is_active: bool | None = True
is_superuser: bool = False
full_name: str | None = None
# Properties to receive via API on creation
class UserCreate(UserBase):
email: EmailStr
password: str
# Properties to receive via API on update
class UserUpdate(UserBase):
password: str | None = None
class UserInDBBase(UserBase):
id: int | None = None
model_config = ConfigDict(from_attributes=True)
# Additional properties to return via API
class User(UserInDBBase):
pass
# Additional properties stored in DB
class UserInDB(UserInDBBase):
hashed_password: str

View File

View File

View File

View File

@@ -0,0 +1,16 @@
from fastapi.testclient import TestClient
from app.core.config import settings
def test_celery_worker_test(
client: TestClient, superuser_token_headers: dict[str, str]
) -> None:
data = {"message": "test"}
r = client.post(
f"{settings.API_V1_STR}/utils/test-celery/",
json=data,
headers=superuser_token_headers,
)
response = r.json()
assert response["message"] == "Word received"

View File

@@ -0,0 +1,38 @@
from fastapi.testclient import TestClient
from sqlmodel import Session
from app.core.config import settings
from app.tests.utils.item import create_random_item
def test_create_item(
client: TestClient, superuser_token_headers: dict, db: Session
) -> None:
data = {"title": "Foo", "description": "Fighters"}
response = client.post(
f"{settings.API_V1_STR}/items/",
headers=superuser_token_headers,
json=data,
)
assert response.status_code == 200
content = response.json()
assert content["title"] == data["title"]
assert content["description"] == data["description"]
assert "id" in content
assert "owner_id" in content
def test_read_item(
client: TestClient, superuser_token_headers: dict, db: Session
) -> None:
item = create_random_item(db)
response = client.get(
f"{settings.API_V1_STR}/items/{item.id}",
headers=superuser_token_headers,
)
assert response.status_code == 200
content = response.json()
assert content["title"] == item.title
assert content["description"] == item.description
assert content["id"] == item.id
assert content["owner_id"] == item.owner_id

View File

@@ -0,0 +1,27 @@
from fastapi.testclient import TestClient
from app.core.config import settings
def test_get_access_token(client: TestClient) -> None:
login_data = {
"username": settings.FIRST_SUPERUSER,
"password": settings.FIRST_SUPERUSER_PASSWORD,
}
r = client.post(f"{settings.API_V1_STR}/login/access-token", data=login_data)
tokens = r.json()
assert r.status_code == 200
assert "access_token" in tokens
assert tokens["access_token"]
def test_use_access_token(
client: TestClient, superuser_token_headers: dict[str, str]
) -> None:
r = client.post(
f"{settings.API_V1_STR}/login/test-token",
headers=superuser_token_headers,
)
result = r.json()
assert r.status_code == 200
assert "email" in result

View File

@@ -0,0 +1,121 @@
from fastapi.testclient import TestClient
from sqlmodel import Session
from app import crud
from app.core.config import settings
from app.models import UserCreate
from app.tests.utils.utils import random_email, random_lower_string
def test_get_users_superuser_me(
client: TestClient, superuser_token_headers: dict[str, str]
) -> None:
r = client.get(f"{settings.API_V1_STR}/users/me", headers=superuser_token_headers)
current_user = r.json()
assert current_user
assert current_user["is_active"] is True
assert current_user["is_superuser"]
assert current_user["email"] == settings.FIRST_SUPERUSER
def test_get_users_normal_user_me(
client: TestClient, normal_user_token_headers: dict[str, str]
) -> None:
r = client.get(f"{settings.API_V1_STR}/users/me", headers=normal_user_token_headers)
current_user = r.json()
assert current_user
assert current_user["is_active"] is True
assert current_user["is_superuser"] is False
assert current_user["email"] == settings.EMAIL_TEST_USER
def test_create_user_new_email(
client: TestClient, superuser_token_headers: dict, db: Session
) -> None:
username = random_email()
password = random_lower_string()
data = {"email": username, "password": password}
r = client.post(
f"{settings.API_V1_STR}/users/",
headers=superuser_token_headers,
json=data,
)
assert 200 <= r.status_code < 300
created_user = r.json()
user = crud.get_user_by_email(session=db, email=username)
assert user
assert user.email == created_user["email"]
def test_get_existing_user(
client: TestClient, superuser_token_headers: dict, db: Session
) -> None:
username = random_email()
password = random_lower_string()
user_in = UserCreate(email=username, password=password)
user = crud.create_user(session=db, user_create=user_in)
user_id = user.id
r = client.get(
f"{settings.API_V1_STR}/users/{user_id}",
headers=superuser_token_headers,
)
assert 200 <= r.status_code < 300
api_user = r.json()
existing_user = crud.get_user_by_email(session=db, email=username)
assert existing_user
assert existing_user.email == api_user["email"]
def test_create_user_existing_username(
client: TestClient, superuser_token_headers: dict, db: Session
) -> None:
username = random_email()
# username = email
password = random_lower_string()
user_in = UserCreate(email=username, password=password)
crud.create_user(session=db, user_create=user_in)
data = {"email": username, "password": password}
r = client.post(
f"{settings.API_V1_STR}/users/",
headers=superuser_token_headers,
json=data,
)
created_user = r.json()
assert r.status_code == 400
assert "_id" not in created_user
def test_create_user_by_normal_user(
client: TestClient, normal_user_token_headers: dict[str, str]
) -> None:
username = random_email()
password = random_lower_string()
data = {"email": username, "password": password}
r = client.post(
f"{settings.API_V1_STR}/users/",
headers=normal_user_token_headers,
json=data,
)
assert r.status_code == 400
def test_retrieve_users(
client: TestClient, superuser_token_headers: dict, db: Session
) -> None:
username = random_email()
password = random_lower_string()
user_in = UserCreate(email=username, password=password)
crud.create_user(session=db, user_create=user_in)
username2 = random_email()
password2 = random_lower_string()
user_in2 = UserCreate(email=username2, password=password2)
crud.create_user(session=db, user_create=user_in2)
r = client.get(f"{settings.API_V1_STR}/users/", headers=superuser_token_headers)
all_users = r.json()
assert len(all_users["data"]) > 1
assert "count" in all_users
for item in all_users["data"]:
assert "email" in item

View File

@@ -0,0 +1,42 @@
from collections.abc import Generator
import pytest
from fastapi.testclient import TestClient
from sqlmodel import Session, delete
from app.core.config import settings
from app.core.db import engine, init_db
from app.main import app
from app.models import Item, User
from app.tests.utils.user import authentication_token_from_email
from app.tests.utils.utils import get_superuser_token_headers
@pytest.fixture(scope="session", autouse=True)
def db() -> Generator:
with Session(engine) as session:
init_db(session)
yield session
statement = delete(Item)
session.execute(statement)
statement = delete(User)
session.execute(statement)
session.commit()
@pytest.fixture(scope="module")
def client() -> Generator:
with TestClient(app) as c:
yield c
@pytest.fixture(scope="module")
def superuser_token_headers(client: TestClient) -> dict[str, str]:
return get_superuser_token_headers(client)
@pytest.fixture(scope="module")
def normal_user_token_headers(client: TestClient, db: Session) -> dict[str, str]:
return authentication_token_from_email(
client=client, email=settings.EMAIL_TEST_USER, db=db
)

View File

View File

@@ -0,0 +1,91 @@
from fastapi.encoders import jsonable_encoder
from sqlmodel import Session
from app import crud
from app.core.security import verify_password
from app.models import User, UserCreate, UserUpdate
from app.tests.utils.utils import random_email, random_lower_string
def test_create_user(db: Session) -> None:
email = random_email()
password = random_lower_string()
user_in = UserCreate(email=email, password=password)
user = crud.create_user(session=db, user_create=user_in)
assert user.email == email
assert hasattr(user, "hashed_password")
def test_authenticate_user(db: Session) -> None:
email = random_email()
password = random_lower_string()
user_in = UserCreate(email=email, password=password)
user = crud.create_user(session=db, user_create=user_in)
authenticated_user = crud.authenticate(session=db, email=email, password=password)
assert authenticated_user
assert user.email == authenticated_user.email
def test_not_authenticate_user(db: Session) -> None:
email = random_email()
password = random_lower_string()
user = crud.authenticate(session=db, email=email, password=password)
assert user is None
def test_check_if_user_is_active(db: Session) -> None:
email = random_email()
password = random_lower_string()
user_in = UserCreate(email=email, password=password)
user = crud.create_user(session=db, user_create=user_in)
assert user.is_active is True
def test_check_if_user_is_active_inactive(db: Session) -> None:
email = random_email()
password = random_lower_string()
user_in = UserCreate(email=email, password=password, disabled=True)
user = crud.create_user(session=db, user_create=user_in)
assert user.is_active
def test_check_if_user_is_superuser(db: Session) -> None:
email = random_email()
password = random_lower_string()
user_in = UserCreate(email=email, password=password, is_superuser=True)
user = crud.create_user(session=db, user_create=user_in)
assert user.is_superuser is True
def test_check_if_user_is_superuser_normal_user(db: Session) -> None:
username = random_email()
password = random_lower_string()
user_in = UserCreate(email=username, password=password)
user = crud.create_user(session=db, user_create=user_in)
assert user.is_superuser is False
def test_get_user(db: Session) -> None:
password = random_lower_string()
username = random_email()
user_in = UserCreate(email=username, password=password, is_superuser=True)
user = crud.create_user(session=db, user_create=user_in)
user_2 = db.get(User, user.id)
assert user_2
assert user.email == user_2.email
assert jsonable_encoder(user) == jsonable_encoder(user_2)
def test_update_user(db: Session) -> None:
password = random_lower_string()
email = random_email()
user_in = UserCreate(email=email, password=password, is_superuser=True)
user = crud.create_user(session=db, user_create=user_in)
new_password = random_lower_string()
user_in_update = UserUpdate(password=new_password, is_superuser=True)
if user.id is not None:
crud.update_user(session=db, user_id=user.id, user_in=user_in_update)
user_2 = db.get(User, user.id)
assert user_2
assert user.email == user_2.email
assert verify_password(new_password, user_2.hashed_password)

View File

View File

@@ -0,0 +1,16 @@
from sqlmodel import Session
from app import crud
from app.models import Item, ItemCreate
from app.tests.utils.user import create_random_user
from app.tests.utils.utils import random_lower_string
def create_random_item(db: Session) -> Item:
user = create_random_user(db)
owner_id = user.id
assert owner_id is not None
title = random_lower_string()
description = random_lower_string()
item_in = ItemCreate(title=title, description=description)
return crud.create_item(session=db, item_in=item_in, owner_id=owner_id)

View File

@@ -0,0 +1,47 @@
from fastapi.testclient import TestClient
from sqlmodel import Session
from app import crud
from app.core.config import settings
from app.models import User, UserCreate, UserUpdate
from app.tests.utils.utils import random_email, random_lower_string
def user_authentication_headers(
*, client: TestClient, email: str, password: str
) -> dict[str, str]:
data = {"username": email, "password": password}
r = client.post(f"{settings.API_V1_STR}/login/access-token", data=data)
response = r.json()
auth_token = response["access_token"]
headers = {"Authorization": f"Bearer {auth_token}"}
return headers
def create_random_user(db: Session) -> User:
email = random_email()
password = random_lower_string()
user_in = UserCreate(email=email, password=password)
user = crud.create_user(session=db, user_create=user_in)
return user
def authentication_token_from_email(
*, client: TestClient, email: str, db: Session
) -> dict[str, str]:
"""
Return a valid token for the user with given email.
If the user doesn't exist it is created first.
"""
password = random_lower_string()
user = crud.get_user_by_email(session=db, email=email)
if not user:
user_in_create = UserCreate(email=email, password=password)
user = crud.create_user(session=db, user_create=user_in_create)
else:
user_in_update = UserUpdate(password=password)
user = crud.update_user(session=db, user_id=user.id, user_in=user_in_update)
return user_authentication_headers(client=client, email=email, password=password)

View File

@@ -0,0 +1,26 @@
import random
import string
from fastapi.testclient import TestClient
from app.core.config import settings
def random_lower_string() -> str:
return "".join(random.choices(string.ascii_lowercase, k=32))
def random_email() -> str:
return f"{random_lower_string()}@{random_lower_string()}.com"
def get_superuser_token_headers(client: TestClient) -> dict[str, str]:
login_data = {
"username": settings.FIRST_SUPERUSER,
"password": settings.FIRST_SUPERUSER_PASSWORD,
}
r = client.post(f"{settings.API_V1_STR}/login/access-token", data=login_data)
tokens = r.json()
a_token = tokens["access_token"]
headers = {"Authorization": f"Bearer {a_token}"}
return headers

View File

@@ -0,0 +1,38 @@
import logging
from sqlmodel import Session, select
from tenacity import after_log, before_log, retry, stop_after_attempt, wait_fixed
from app.core.db import engine
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
max_tries = 60 * 5 # 5 minutes
wait_seconds = 1
@retry(
stop=stop_after_attempt(max_tries),
wait=wait_fixed(wait_seconds),
before=before_log(logger, logging.INFO),
after=after_log(logger, logging.WARN),
)
def init() -> None:
try:
# Try to create session to check if DB is awake
with Session(engine) as session:
session.exec(select(1))
except Exception as e:
logger.error(e)
raise e
def main() -> None:
logger.info("Initializing service")
init()
logger.info("Service finished initializing")
if __name__ == "__main__":
main()

109
backend/app/utils.py Normal file
View File

@@ -0,0 +1,109 @@
import logging
from datetime import datetime, timedelta
from pathlib import Path
from typing import Any
import emails
from emails.template import JinjaTemplate
from jose import jwt
from app.core.config import settings
def send_email(
email_to: str,
subject_template: str = "",
html_template: str = "",
environment: dict[str, Any] | None = None,
) -> None:
current_environment = environment or {}
assert settings.EMAILS_ENABLED, "no provided configuration for email variables"
message = emails.Message(
subject=JinjaTemplate(subject_template),
html=JinjaTemplate(html_template),
mail_from=(settings.EMAILS_FROM_NAME, settings.EMAILS_FROM_EMAIL),
)
smtp_options = {"host": settings.SMTP_HOST, "port": settings.SMTP_PORT}
if settings.SMTP_TLS:
smtp_options["tls"] = True
if settings.SMTP_USER:
smtp_options["user"] = settings.SMTP_USER
if settings.SMTP_PASSWORD:
smtp_options["password"] = settings.SMTP_PASSWORD
response = message.send(to=email_to, render=current_environment, smtp=smtp_options)
logging.info(f"send email result: {response}")
def send_test_email(email_to: str) -> None:
project_name = settings.PROJECT_NAME
subject = f"{project_name} - Test email"
with open(Path(settings.EMAIL_TEMPLATES_DIR) / "test_email.html") as f:
template_str = f.read()
send_email(
email_to=email_to,
subject_template=subject,
html_template=template_str,
environment={"project_name": settings.PROJECT_NAME, "email": email_to},
)
def send_reset_password_email(email_to: str, email: str, token: str) -> None:
project_name = settings.PROJECT_NAME
subject = f"{project_name} - Password recovery for user {email}"
with open(Path(settings.EMAIL_TEMPLATES_DIR) / "reset_password.html") as f:
template_str = f.read()
server_host = settings.SERVER_HOST
link = f"{server_host}/reset-password?token={token}"
send_email(
email_to=email_to,
subject_template=subject,
html_template=template_str,
environment={
"project_name": settings.PROJECT_NAME,
"username": email,
"email": email_to,
"valid_hours": settings.EMAIL_RESET_TOKEN_EXPIRE_HOURS,
"link": link,
},
)
def send_new_account_email(email_to: str, username: str, password: str) -> None:
project_name = settings.PROJECT_NAME
subject = f"{project_name} - New account for user {username}"
with open(Path(settings.EMAIL_TEMPLATES_DIR) / "new_account.html") as f:
template_str = f.read()
link = settings.SERVER_HOST
send_email(
email_to=email_to,
subject_template=subject,
html_template=template_str,
environment={
"project_name": settings.PROJECT_NAME,
"username": username,
"password": password,
"email": email_to,
"link": link,
},
)
def generate_password_reset_token(email: str) -> str:
delta = timedelta(hours=settings.EMAIL_RESET_TOKEN_EXPIRE_HOURS)
now = datetime.utcnow()
expires = now + delta
exp = expires.timestamp()
encoded_jwt = jwt.encode(
{"exp": exp, "nbf": now, "sub": email},
settings.SECRET_KEY,
algorithm="HS256",
)
return encoded_jwt
def verify_password_reset_token(token: str) -> str | None:
try:
decoded_token = jwt.decode(token, settings.SECRET_KEY, algorithms=["HS256"])
return decoded_token["sub"]
except jwt.JWTError:
return None

11
backend/app/worker.py Normal file
View File

@@ -0,0 +1,11 @@
import sentry_sdk
from app.core.celery_app import celery_app
from app.core.config import settings
sentry_sdk.init(dsn=settings.SENTRY_DSN)
@celery_app.task(acks_late=True)
def test_celery(word: str) -> str:
return f"test task return {word}"

View File

@@ -0,0 +1,26 @@
FROM tiangolo/uvicorn-gunicorn-fastapi:python3.10
WORKDIR /app/
# Install Poetry
RUN curl -sSL https://install.python-poetry.org | POETRY_HOME=/opt/poetry python && \
cd /usr/local/bin && \
ln -s /opt/poetry/bin/poetry && \
poetry config virtualenvs.create false
# Copy poetry.lock* in case it doesn't exist in the repo
COPY ./pyproject.toml ./poetry.lock* /app/
# Allow installing dev dependencies to run tests
ARG INSTALL_DEV=false
RUN bash -c "if [ $INSTALL_DEV == 'true' ] ; then poetry install --no-root ; else poetry install --no-root --only main ; fi"
ENV PYTHONPATH=/app
COPY ./alembic.ini /app/
COPY ./prestart.sh /app/
COPY ./tests-start.sh /app/
COPY ./app /app/app

View File

@@ -0,0 +1,30 @@
FROM python:3.10
WORKDIR /app/
# Install Poetry
RUN curl -sSL https://install.python-poetry.org | POETRY_HOME=/opt/poetry python && \
cd /usr/local/bin && \
ln -s /opt/poetry/bin/poetry && \
poetry config virtualenvs.create false
# Copy poetry.lock* in case it doesn't exist in the repo
COPY ./pyproject.toml ./poetry.lock* /app/
# Allow installing dev dependencies to run tests
ARG INSTALL_DEV=false
RUN bash -c "if [ $INSTALL_DEV == 'true' ] ; then poetry install --no-root ; else poetry install --no-root --only main ; fi"
ENV C_FORCE_ROOT=1
ENV PYTHONPATH=/app
COPY ./alembic.ini /app/
COPY ./worker-start.sh /worker-start.sh
COPY ./app /app/app
RUN chmod +x /worker-start.sh
CMD ["bash", "/worker-start.sh"]

10
backend/prestart.sh Normal file
View File

@@ -0,0 +1,10 @@
#! /usr/bin/env bash
# Let the DB start
python /app/app/backend_pre_start.py
# Run migrations
alembic upgrade head
# Create initial data in DB
python /app/app/initial_data.py

72
backend/pyproject.toml Normal file
View File

@@ -0,0 +1,72 @@
[tool.poetry]
name = "app"
version = "0.1.0"
description = ""
authors = ["Admin <admin@example.com>"]
[tool.poetry.dependencies]
python = "^3.10"
uvicorn = "^0.24.0.post1"
fastapi = "^0.104.1"
python-multipart = "^0.0.6"
email-validator = "^2.1.0.post1"
celery = "^5.3.5"
passlib = {extras = ["bcrypt"], version = "^1.7.4"}
tenacity = "^8.2.3"
pydantic = ">2.0"
emails = "^0.6"
gunicorn = "^21.2.0"
jinja2 = "^3.1.2"
alembic = "^1.12.1"
python-jose = {extras = ["cryptography"], version = "^3.3.0"}
httpx = "^0.25.1"
psycopg = {extras = ["binary"], version = "^3.1.13"}
sqlmodel = "^0.0.16"
# Pin bcrypt until passlib supports the latest
bcrypt = "4.0.1"
pydantic-settings = "^2.2.1"
sentry-sdk = {extras = ["fastapi"], version = "^1.40.6"}
[tool.poetry.group.dev.dependencies]
pytest = "^7.4.3"
pytest-cov = "^4.1.0"
mypy = "^1.8.0"
ruff = "^0.2.2"
pre-commit = "^3.6.2"
[tool.isort]
multi_line_output = 3
include_trailing_comma = true
force_grid_wrap = 0
line_length = 88
[build-system]
requires = ["poetry>=0.12"]
build-backend = "poetry.masonry.api"
[tool.mypy]
strict = true
[tool.ruff]
target-version = "py310"
[tool.ruff.lint]
select = [
"E", # pycodestyle errors
"W", # pycodestyle warnings
"F", # pyflakes
"I", # isort
"B", # flake8-bugbear
"C4", # flake8-comprehensions
"UP", # pyupgrade
]
ignore = [
"E501", # line too long, handled by black
"B008", # do not perform function calls in argument defaults
"W191", # indentation contains tabs
"B904", # Allow raising exceptions without from e, for HTTPException
]
[tool.ruff.lint.pyupgrade]
# Preserve types, even if a file imports `from __future__ import annotations`.
keep-runtime-typing = true

View File

@@ -0,0 +1,6 @@
#!/bin/sh -e
set -x
# Sort imports one per line, so autoflake can remove unused imports
isort --recursive --force-single-line-imports --apply app
sh ./scripts/format.sh

6
backend/scripts/format.sh Executable file
View File

@@ -0,0 +1,6 @@
#!/bin/sh -e
set -x
autoflake --remove-all-unused-imports --recursive --remove-unused-variables --in-place app --exclude=__init__.py
black app
isort --recursive --apply app

8
backend/scripts/lint.sh Normal file
View File

@@ -0,0 +1,8 @@
#!/usr/bin/env bash
set -x
mypy app
black app --check
isort --recursive --check-only app
flake8

View File

@@ -0,0 +1,6 @@
#!/usr/bin/env bash
set -e
set -x
bash scripts/test.sh --cov-report=html "${@}"

6
backend/scripts/test.sh Executable file
View File

@@ -0,0 +1,6 @@
#!/usr/bin/env bash
set -e
set -x
pytest --cov=app --cov-report=term-missing app/tests "${@}"

6
backend/tests-start.sh Normal file
View File

@@ -0,0 +1,6 @@
#! /usr/bin/env bash
set -e
python /app/app/tests_pre_start.py
bash ./scripts/test.sh "$@"

6
backend/worker-start.sh Normal file
View File

@@ -0,0 +1,6 @@
#! /usr/bin/env bash
set -e
python /app/app/celeryworker_pre_start.py
celery -A app.worker worker -l info -Q main-queue -c 1