- 이번 포스팅에선 FastAPI 에서 Database 를 사용하는 방법을 정리할 것입니다.
https://github.com/RaillyLinker/Python_FastApi_Template
설명에 사용할 코드는,
위 레포지토리 안의 module_sample_sql_alchemy 폴더에 모듈 단위로 떼어내 정리하였으므로 참고하면 되며,
본 게시글에서는 코드상 중요한 부분을 설명하겠습니다.
- ORM 이란,
Object-Relational Mapping의 줄임말입니다.
ORM은 데이터베이스의 테이블을 프로그래밍 언어(예: Python, Java 등)의 '객체'처럼 다루게 해주는 기술로,
SQL 쿼리(SELECT, INSERT, UPDATE 등)를 직접 작성하는 대신,
클래스(예: User, Product)와
객체(예: user = User(name="Alice"))
를 통해 데이터베이스의 테이블과 레코드를 다루는 방법입니다.
- FastAPI 에서 데이터베이스를 다루는 라이브러리로는 SQLAlchemy 를 사용할 것입니다.
이유는, 현 시점 가장 많이 사용하는 방식이기도 하고, 현대적인 ORM 구조를 가지고 있기 때문입니다.
SQLAlchemy 는 라이므러리이기에 개발자마다 사용 방식이 다를 수 있는데,
저의 경우는 Springboot 의 JPA 를 다룰 때 사용하는 방식을 참고하여 코드 구조를 정리하였으므로 참고하세요.
- 본 프로젝트에서는 라이브러리 비동기 처리를 고려하였습니다.
fast api 에서 동기 방식으로 api 를 처리하면,
@app.get("/sync")
def sync_example():
time.sleep(3) # 3초 동안 블로킹 (멈춤)
return {"message": "동기 처리가 끝났습니다."}
위와 같습니다.
비동기 방식은,
@app.get("/async")
async def async_example():
await asyncio.sleep(3) # 3초 동안 '비동기' 대기 (서버는 자유)
return {"message": "비동기 처리가 끝났습니다."}
이처럼 def 앞에 async 를 붙입니다.
이렇게 fast api 의 api 함수를 비동기로 만들게 되면 장점으로는,
한 작업을 처리하는 동안 다른 요청을 처리할 수 있기에 이러한 병렬 처리로 성능이 향상됩니다.
하지만 api 가 비동기 방식이더라도, 해당 api 가 사용하는 데이터베이스 라이브러리를 동기 방식으로 두게 되면,
동일 db 를 사용하는 두개 이상의 비동기 api 가 해당 db 사용권을 두고 블록 당하게 되므로 비동기 방식의 장점이 떨어지게 될 것입니다.
이러한 이유로 인해 DB 사용시 블록되지 않는 비동기 처리 방식이 필요하며,
본 게시글에서는 SQLAlchemy 를 비동기로 사용하는 방식을 정리할 것입니다.
[CRUD 구현]
- Database 에서 가장 기초가 되는 CRUD 를 SQLAlchemy 를 사용하여 구현하는 법을 정리하겠습니다.
1. 라이브러리 설치
pip install SQLAlchemy
위와 같이 라이브러리를 설치합니다.
추가적으로, 만약 mysql 을 사용한다면, asyncmy 이러한 비동기 처리용 별도 라이브러리를 사용해야 하는데,
이에 대해선 각 데이터베이스별 라이브러리를 검색하여 추가하거나,
IDE, 실행시 에러 등을 파악하여 라이브러리를 준비하면 됩니다.
2. 프로젝트 구조
제 프로젝트 구조는 위와 같습니다.
설정 파일이 모여있는 configurations 안에는 sql_alchemy 라는 서브 폴더가 있고, 이 안에 db 접속 객체(DAO) 가 존재합니다.
SQLAlchemy 관련 코드들은 sql_alchemy_objects 폴더 안에 있으며,
서브폴더로는 데이터베이스 종류명의 폴더가 있습니다.
이는, 데이터베이스 한곳만을 고려한 것이 아니라, 여러 데이터베이스에 접속이 가능하도록 데이터베이스 별로 테이블 정보, 요청 정보를 나눠둔 개념입니다.
데이터베이스 폴더 안에는,
데이터베이스 테이블 객체를 의미하는 entity 객체가 모여있는 entities,
데이터베이스에 요청을 보내는 코드가 모여있는(sql 의 모음이라 보면 됩니다.) repositories,
entity 와는 다른 형식으로(join 등을 사용하여 나온 결과물) 쿼리문 상으로 출력되는 값을 매핑하기 위한 value_objects 가 존재합니다.
설정부터 시작해서 하나씩 알아보겠습니다.
3. db 접속 설정
configurations 의 sql_alchemy 폴더 안에, db1_main_config.py 를 만들었습니다.
이는 앞서 설명했듯, 여러 db 에 대한 접속을 가정한 것 중에, 한 db 에 대한 접속 정보를 설정하는 설정 파일로,
127.0.0.1:3306
이런 위치에 있는 db 를 db1_main 이라는 이름으로 설정한 의미입니다.
만약,
127.0.0.1:3307
이런 위치의 db 역시 추가하려면, db2_sub 라는 이름을 사용할 수 있겠죠.
db 설정 파일의 코드는 아래와 같습니다.
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
from contextlib import asynccontextmanager
from functools import wraps
from zoneinfo import ZoneInfo
# DB 유저 인증 정보
_user_name = "root"
_password = "todo1234!"
# DB 타임존 설정
db_timezone = ZoneInfo("Asia/Seoul")
# 비동기 MySQL URL
_database_url = (
f"mysql+asyncmy://{_user_name}:{_password}@127.0.0.1:3306/first_schema?"
f"charset=utf8mb4"
)
# 엔진
_async_engine = (
create_async_engine(
_database_url,
echo=False, # DB 로그
# 커넥션 풀에서 커넥션을 꺼내기 전에, SELECT 1 같은 간단한 쿼리를 보내 유효성 검사를 함.
# 커넥션이 죽어 있다면 → 버리고 새로 연결.
# 결과적으로, "OperationalError: MySQL server has gone away", "connection closed" 같은 에러를 방지할 수 있음.
pool_pre_ping=True,
pool_size=20, # 커넥션 풀 크기
max_overflow=10, # 커넥션 풀을 넘어서 생성할 수 있는 커넥션 수
pool_timeout=30, # 커넥션 풀에서 대기할 최대 시간
)
)
# 세션 메이커
_async_session_maker = (
async_sessionmaker(
bind=_async_engine,
expire_on_commit=False,
autoflush=False
)
)
# (DB 세션을 반환하는 함수)
@asynccontextmanager
async def get_async_db() -> AsyncSession:
async with _async_session_maker() as session:
yield session
# (Transactional 데코레이터)
# Transaction 을 적용 하려는 함수 위에,
# @sql_alchemy_transactional(view_only=True) 이렇게 붙이고, 해당 함수에는,
# db: AsyncSession 이것을 인자값으로 받도록 처리
def sql_alchemy_transactional(view_only: bool = False):
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
async with get_async_db() as db:
if view_only:
result = await func(*args, db=db, **kwargs)
return result
else:
try:
result = await func(*args, db=db, **kwargs)
await db.commit()
return result
except Exception as e:
await db.rollback()
raise e
return wrapper
return decorator
맨 위의 유저 인증 정보는,
말 그대로 데이터베이스 접속을 위해 사용할 유저 정보를 입력하면 되며,
DB 타임존 설정은, 해당 DB 의 타임존 설정을 입력하면 됩니다.(타임존에 대해선 아래에서 설명)
_database_url 에는 db 접속 위치를 입력하면 되는데,
저의 경우는 mysql 에 대한 비동기 접속을 위해 mysql+asyncmy 이것을 붙였는데, 혹여 다른 데이터베이스를 사용한다면 검색하여 변경하면 됩니다.
그 외의 코드는 sql_alchemy_transactional 이 함수를 제외하고는 신경쓰실 필요가 없습니다.
비동기 세션을 생성하는 방식에 대한 코드인데, 커넥션 풀 등의 설정을 조금씩 변경하는 것 외에는 DB 로직 작성시 사용하지 않는 부분입니다.
sql_alchemy_transactional 의 부분은,
데코레이터입니다.
이 코드 역시 안의 내용은 파악하실 필요는 없고 그냥 함수로서 사용만 하면 되는데,
간단히 설명하자면, DB 를 사용하는 함수 위에
@sql_alchemy_transactional()
async def post_insert_data_sample(
request: Request,
response: Response,
request_body: model.PostInsertDataSampleInputVo,
db: AsyncSession
):
위와 같이 @sql_alchemy_transactional() 이렇게 데코레이터를 붙이면,
해당 함수의 db: AsyncSession 파라미터에 데코레이터가 변수를 주입해주고, 함수 실행 전후의 처리를 해주는 것입니다.
즉, 여기서는 데코레이터가 붙은 함수에서 get_async_db() as db: 이렇게 해서 AsyncSession 객체를 사용할 수 있도록 변수를 입력해주는 역할,
함수가 종료될 때 AsyncSession 이 자동으로 close 되게 해주는 역할,
view_only 가 false 일때, 만약 함수 내에서 Exception 이 발생하면 rollback 을 해주고, 아니라면 commit 을 해주는 역할을 담당합니다.
즉, 이 데코레이터만 붙이면,
async with get_async_db() as db:
if view_only:
# todo
else:
try:
# todo
await db.commit()
return result
except Exception as e:
await db.rollback()
raise e
위와 같이 todo 부분을 감싸는 외곽 코드를 번거롭게 작성할 필요가 없어지는 것입니다.
4. 데이터베이스 테이블 설정
실습을 위해 데이터베이스 테이블을 하나 만들어보겠습니다.
sql_alchemy_objects/db1_main/entities 안에,
template_test_data.py 파일을 만들었습니다.
파일 경로의 정보는,
db1_main 이라는 데이터베이스에서, template 라는 스키마 안에 test_data 라는 테이블을 만든다는 의미입니다.
from sqlalchemy import Column, Integer, String, BigInteger, DateTime
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
class Db1TemplateTestData(Base):
__tablename__ = 'test_data'
__table_args__ = {
'schema': 'template',
'comment': '테스트 정보 테이블(논리적 삭제 적용)'
}
uid = (
Column(
"uid",
BigInteger,
primary_key=True,
autoincrement=True,
comment="행 고유값"
)
)
row_create_date = (
Column(
"row_create_date",
DateTime(),
nullable=False,
comment="행 생성일"
)
)
row_update_date = (
Column(
"row_update_date",
DateTime(),
nullable=False,
comment="행 수정일"
)
)
row_delete_date_str = (
Column(
"row_delete_date_str",
String(50),
nullable=False,
comment="행 삭제일(yyyy_MM_dd_T_HH_mm_ss_SSS_z, 삭제되지 않았다면 /)"
)
)
content = (
Column(
"content",
String(255),
nullable=False,
comment="테스트 본문"
)
)
random_num = (
Column(
"random_num",
Integer,
nullable=False,
comment="테스트 랜덤 번호"
)
)
test_datetime = (
Column(
"test_datetime",
DateTime(),
nullable=False,
comment="테스트용 일시 데이터"
)
)
구조는 위와 같습니다.
__tablename__ 으로 테이블명을 정하고,
__table_args__ 안의 설정값으로 스키마와 주석 등의 정보를 정할 수 있습니다.
각 컬럼은 Column 으로 설정이 가능하며,
Column 의 첫번째 인자값으로 데이터베이스 컬럼명을 설정할 수 있습니다.(변수명이 test 라고 해도, Column 설정으로 "sample" 이라 설정하면 데이터베이스 테이블 내의 컬럼명은 sample)
그외에 pk 설정이나 데이터 타입 설정, 주석 설정 등의 설정을 정할 수 있고, 보시는 바와 같이 객체지향적으로 테이블 정보를 정리하여 파악하기도 쉽습니다.
5. 레포지토리 생성
다음은 데이터베이스 테이블에 대한 레포지토리를 설정하겠습니다.
repositories 폴더 안에,
template_test_data_repository.py 라는 이름의 파일을 만들어줍니다.
앞서 만든 entity 에 종속된 레포지토리라는 뜻으로,
from sqlalchemy import select, text, DateTime, update
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.inspection import inspect
import tzlocal
import module_sample_sql_alchemy.utils.sql_alchemy_util as sql_alchemy_util
import module_sample_sql_alchemy.sql_alchemy_objects.db1_main.value_objects.template_test_data_vo as value_objects
from module_sample_sql_alchemy.sql_alchemy_objects.db1_main.entities.template_test_data import Db1TemplateTestData
from module_sample_sql_alchemy.configurations.sql_alchemy.db1_main_config import db_timezone
from typing import List, Sequence, Optional, Tuple
from datetime import datetime
from sqlalchemy import func
# [SqlAlchemy 레포지토리]
# 데이터 변경 함수 사용시 commit, rollback 처리를 해주세요.
# 데이터 save(동일 pk 가 존재 하면 update, 없다면 insert)
async def save(db: AsyncSession, entity: Db1TemplateTestData) -> Db1TemplateTestData:
# datetime 필드 자동 탐지 및 타임존 변환
mapper = inspect(entity.__class__)
for column in mapper.columns:
# print(f"Inspecting column: {column.name}({column.type})") # 디버깅용: 어떤 컬럼을 확인 중인지 출력
if isinstance(column.type, DateTime): # Datetime 타입 변수만 탐지
attr_name = column.name # 컬럼명 ex : row_create_date
value = getattr(entity, attr_name) # 입력된 값 ex : 2025-04-26 10:14:13.335610
# print(f">>>> attr_name : {attr_name}, value : {value}")
if value is not None:
if value.tzinfo is None: # 타임존 설정 안한 경우
local_tz = tzlocal.get_localzone() # 로컬 타임존 예: Asia/Seoul
value = value.replace(tzinfo=local_tz)
# DB 타임존 설정으로 타임존 변경
value = value.astimezone(db_timezone)
# 객체 내 변수 값 수정
setattr(entity, attr_name, value)
db.add(entity)
await db.flush()
await db.refresh(entity)
# Entity 안의 datetime 에 타임존 정보 입력
sql_alchemy_util.apply_timezone_to_datetime_fields(entity, db_timezone)
return entity
# 모든 데이터 삭제
async def delete_all(db: AsyncSession):
stmt = select(Db1TemplateTestData)
result = await db.execute(stmt)
entity_list = result.scalars().all()
for entity in entity_list:
await db.delete(entity)
# 데이터 pk 로 delete
async def delete_by_id(db: AsyncSession, pk: int):
stmt = select(Db1TemplateTestData).where(Db1TemplateTestData.uid == pk)
result = await db.execute(stmt)
entity = result.scalar_one_or_none()
if entity:
await db.delete(entity)
# 모든 데이터 검색
async def find_all(db: AsyncSession) -> Sequence[Db1TemplateTestData]:
stmt = select(Db1TemplateTestData)
result = await db.execute(stmt)
entity_list = result.scalars().all()
# Entity 안의 datetime 에 타임존 정보 입력
sql_alchemy_util.apply_timezone_to_datetime_fields_in_list(entity_list, db_timezone)
return entity_list
# 데이터 pk 로 검색(0 or 1 result)
async def find_by_id(db: AsyncSession, pk: int) -> Optional[Db1TemplateTestData]:
stmt = select(Db1TemplateTestData).where(Db1TemplateTestData.uid == pk)
result = await db.execute(stmt)
entity = result.scalar_one_or_none()
# Entity 안의 datetime 에 타임존 정보 입력
sql_alchemy_util.apply_timezone_to_datetime_fields(entity, db_timezone)
return entity
# ---- (커스텀 쿼리 함수 추가 공간) ----
기본적으로 위와 같이 만들어주면 됩니다.
보시는바와 같이 본 파일 안에는 해당 테이블에 대한 명령어(join 명령의 경우 부모가 되는 테이블을 기준으로 합니다.)를 모아둔 파일로, 위에서 나열한 함수들은 springboot jpa 에서 자동으로 제공해주는 기본 명령어를 구현한 것입니다.
save 는 upsert 의 역할로, 입력하고자 하는 entity 객체를 입력하면, 데이터를 입력하고, entity 객체 안에 pk 값이 충족되어 있다면 해당 pk 에 해당하는 row 의 정보를 수정하는 역할을 합니다.
위에서 코드가 길어진 이유는, datetime 의 변수의 타임존 일치를 위한 로직입니다.
앞서 db_config 에서 설정한 타임존 설정이 여기서 사용되며,
간단하게 설명하자면,
만약 현재 서버에서 datetime.now 를 해서 현재 시간을 db 에 입력할 때, 현재 서버는 KST 기준이고, 데이터베이스 서버는 UTC 기준이라면 타임존에 따른 시차 문제가 생기겠죠?
이를 방지하기 위하여, 데이터 입력시에는 save 에서와 같이 입력할 entity 안의 모든 datetime 타입의 데이터에 시차 적용을 해주고,
데이터 조회시에도 sql_alchemy_util.apply_timezone_to_datetime_fields 로 시차를 적용해줘야 합니다.
apply_timezone_to_datetime_fields 코드는,
from sqlalchemy.inspection import inspect
from sqlalchemy.types import DateTime
from typing import Sequence
# (Entity 안의 모든 datetime 에 타임존 정보 입력)
def apply_timezone_to_datetime_fields(entity, timezone):
if entity is None:
return
mapper = inspect(entity.__class__)
for column in mapper.columns:
if isinstance(column.type, DateTime):
attr_name = column.name
value = getattr(entity, attr_name)
if value is not None:
value = value.replace(tzinfo=timezone)
setattr(entity, attr_name, value)
# (Entity 리스트 안의 모든 datetime 에 타임존 정보 입력)
def apply_timezone_to_datetime_fields_in_list(entities: Sequence, timezone):
if not entities:
return
for entity in entities:
apply_timezone_to_datetime_fields(entity, timezone)
위와 같습니다.
나머지 delete 함수나, 가장 많이 작성하게 될 find 함수의 경우는 ORM 방식대로,
# (데이터 카운팅)
async def count_by_row_delete_date_str(
db: AsyncSession,
row_delete_date_str: str
) -> int:
stmt = select(func.count()).select_from(Db1TemplateTestData).where(
Db1TemplateTestData.row_delete_date_str == row_delete_date_str
)
result = await db.execute(stmt)
count = result.scalar_one() # scalar_one()은 결과에서 단일 값(카운트)을 반환
return count
이렇게 함수를 조합하여 쿼리를 대신할 수 있고,
# (데이터 카운팅)
async def count_from_template_test_data_by_not_deleted(
db: AsyncSession,
row_delete_date_str: str
) -> int:
# Native SQL 쿼리
query = text("""
SELECT COUNT(*)
FROM template.test_data AS test_data
WHERE test_data.row_delete_date_str = :row_delete_date_str
""")
# 쿼리 실행
result = await db.execute(query, {"row_delete_date_str": row_delete_date_str})
# 카운트 값 추출
count = result.scalar_one() # scalar_one()으로 결과에서 카운트 값을 추출
return count
이렇게 Native Query 를 직접 작성해 사용하는 방법도 있습니다.
6. value_objects
이곳에서는 entity 로는 받아올 수 없는 데이터를 매핑하기 위한 컬럼을 작성합니다.
예를들어 join 이라던지 쿼리상 계산을 통해 만들어진 데이터 등을 받아올 때에 사용할 수 있습니다.
예를들어,
# (입력값 거리 측정 쿼리)
async def find_all_by_not_deleted_with_random_distance(
db: AsyncSession,
num: int
) -> List[value_objects.FindAllFromTemplateTestDataByNotDeletedWithRandomNumDistanceOutputVo]:
query = text("""
SELECT
test_data.uid AS uid,
test_data.row_create_date AS rowCreateDate,
test_data.row_update_date AS rowUpdateDate,
test_data.content AS content,
test_data.random_num AS randomNum,
test_data.test_datetime AS testDatetime,
ABS(test_data.random_num-:num) AS distance
FROM
template.test_data AS test_data
WHERE
test_data.row_delete_date_str = '/'
ORDER BY
distance
""")
result = await db.execute(query, {"num": num})
rows = result.mappings().all() # key-value dict로 가져옴
output = [
value_objects.FindAllFromTemplateTestDataByNotDeletedWithRandomNumDistanceOutputVo(
uid=row["uid"],
row_create_date=row["rowCreateDate"].astimezone(db_timezone),
row_update_date=row["rowUpdateDate"].astimezone(db_timezone),
content=row["content"],
random_num=row["randomNum"],
test_datetime=row["testDatetime"].astimezone(db_timezone),
distance=row["distance"]
)
for row in rows
]
return output
위와 같이 distance 라는 이름의 기존 entity 에는 없는 값을 매핑받아 가져오려면,
# (입력값 거리 측정 쿼리)
class FindAllFromTemplateTestDataByNotDeletedWithRandomNumDistanceOutputVo(BaseModel):
uid: int = (
Field(
...,
description="행 고유값"
)
)
row_create_date: datetime = (
Field(
...,
description="행 생성일"
)
)
row_update_date: datetime = (
Field(
...,
description="행 수정일"
)
)
content: str = (
Field(
...,
description="테스트 본문"
)
)
random_num: int = (
Field(
...,
description="테스트 랜덤 번호"
)
)
test_datetime: datetime = (
Field(
...,
description="테스트용 일시 데이터"
)
)
distance: int = (
Field(
...,
description="입력값과 랜덤 번호 간 거리"
)
)
위와 같이 매핑용 클래스를 별도로 value_object 안에 넣어두고 매핑에 사용해야 합니다.
7. DB 사용 예시
이제 SQLAlchemy 를 통한 DB 사용 준비물은 갖춰진 상태입니다.
실질적으로 비동기 API 에서 데이터베이스를 사용하는 예시를 보겠습니다.
컨트롤러는,
from fastapi import APIRouter, Response, Request, Body, Query, Path
import module_sample_sql_alchemy.models.sql_alchemy_test_model as model
import module_sample_sql_alchemy.services.sql_alchemy_test_service as service
# [그룹 컨트롤러]
# Router 설정
router = APIRouter(
prefix="/sql-alchemy-test", # 전체 경로 앞에 붙는 prefix
tags=["SqlAlchemy 테스트 컨트롤러"] # Swagger 문서 그룹 이름
)
# ----------------------------------------------------------------------------------------------------------------------
# <API 선언 공간>
@router.post(
"/row",
response_model=model.PostInsertDataSampleOutputVo,
summary="DB Row 입력 테스트 API",
description="테스트 테이블에 Row 를 입력합니다.",
responses={
200: {"description": "OK"}
}
)
async def post_insert_data_sample(
request: Request,
response: Response,
request_body: model.PostInsertDataSampleInputVo =
Body(
...,
description="Body 파라미터"
)
):
return await service.post_insert_data_sample(request, response, request_body)
위와 같습니다.
API 모델은,
from pydantic import BaseModel, Field
from typing import List
# [그룹 모델]
# (DB Row 입력 테스트 API)
class PostInsertDataSampleInputVo(BaseModel):
class Config:
validate_by_name = True
content: str = (
Field(
...,
alias="content",
description="글 본문",
examples=["테스트 텍스트입니다."]
)
)
date_string: str = (
Field(
...,
alias="dateString",
description="원하는 날짜(yyyy_MM_dd_'T'_HH_mm_ss_SSS_z)",
examples=["2024_05_02_T_15_14_49_552_KST"]
)
)
class PostInsertDataSampleOutputVo(BaseModel):
class Config:
validate_by_name = True
uid: int = (
Field(
...,
alias="uid",
description="글 고유번호",
examples=[1]
)
)
create_date: str = (
Field(
...,
alias="createDate",
description="글 작성일(yyyy_MM_dd_'T'_HH_mm_ss_SSS_z)",
examples=["2024_05_02_T_15_14_49_552_KST"]
)
)
update_date: str = (
Field(
...,
alias="updateDate",
description="글 수정일(yyyy_MM_dd_'T'_HH_mm_ss_SSS_z)",
examples=["2024_05_02_T_15_14_49_552_KST"]
)
)
delete_date: str = (
Field(
...,
alias="deleteDate",
description="글 삭제일(yyyy_MM_dd_'T'_HH_mm_ss_SSS_z, Null 이면 /)",
examples=["/"]
)
)
content: str = (
Field(
...,
alias="content",
description="글 본문",
examples=["테스트 텍스트입니다."]
)
)
random_num: int = (
Field(
...,
alias="randomNum",
description="자동 생성 숫자",
examples=[1]
)
)
test_datetime: str = (
Field(
...,
alias="testDatetime",
description="테스트용 일시 데이터(yyyy_MM_dd_'T'_HH_mm_ss_SSS_z)",
examples=["2024_05_02_T_15_14_49_552_KST"]
)
)
이러합니다.
보시다시피 content 와 dateString 값을 받아서 데이터베이스에 입력하는 간단한 샘플입니다.
비동기 api 로 이를 받아서 처리하는 service 코드는,
import random
from datetime import datetime
from fastapi import Response, Request
from fastapi.responses import JSONResponse
from sqlalchemy.ext.asyncio import AsyncSession
import module_sample_sql_alchemy.models.sql_alchemy_test_model as model
import module_sample_sql_alchemy.sql_alchemy_objects.db1_main.repositories.template_test_data_repository \
as template_test_data_repository
import module_sample_sql_alchemy.utils.custom_util as custom_util
import tzlocal
from module_sample_sql_alchemy.configurations.sql_alchemy.db1_main_config import sql_alchemy_transactional
from module_sample_sql_alchemy.sql_alchemy_objects.db1_main.entities.template_test_data import Db1TemplateTestData
from typing import List
# [그룹 서비스]
# (DB Row 입력 테스트 API)
@sql_alchemy_transactional()
async def post_insert_data_sample(
request: Request,
response: Response,
request_body: model.PostInsertDataSampleInputVo,
db: AsyncSession
):
# yyyy_MM_dd_'T'_HH_mm_ss_SSS_z 형식 string -> datetime
datetime_obj = custom_util.parse_custom_datetime(request_body.date_string, "yyyy_MM_dd_'T'_HH_mm_ss_SSS_z")
# 데이터 저장
now_datetime = datetime.now()
new_entity = await template_test_data_repository.save(
db,
Db1TemplateTestData(
row_create_date=now_datetime,
row_update_date=now_datetime,
row_delete_date_str="/",
content=request_body.content,
random_num=random.randint(0, 99999999),
test_datetime=datetime_obj
)
)
return JSONResponse(
status_code=200,
content=model.PostInsertDataSampleOutputVo(
uid=new_entity.uid,
create_date=
new_entity.row_create_date.strftime('%Y_%m_%d_T_%H_%M_%S') +
f"_{new_entity.row_create_date.microsecond // 1000:03d}"
f"_{new_entity.row_create_date.tzname()}",
update_date=
new_entity.row_update_date.strftime('%Y_%m_%d_T_%H_%M_%S') +
f"_{new_entity.row_update_date.microsecond // 1000:03d}"
f"_{new_entity.row_update_date.tzname()}",
delete_date=new_entity.row_delete_date_str,
content=new_entity.content,
random_num=new_entity.random_num,
test_datetime=
new_entity.test_datetime.strftime('%Y_%m_%d_T_%H_%M_%S') +
f"_{new_entity.test_datetime.microsecond // 1000:03d}"
f"_{new_entity.test_datetime.tzname()}",
).model_dump()
)
위와 같이 간단하게 구성되어 있습니다.
@sql_alchemy_transactional()
를 붙인 async 함수에는, 데코레이터가 반환하는 db 인자값을 가지고, 레포지토리가 제공하는 save 함수 안에 db 라는 이름의 AsyncSession 객체와, 입력하려는 Entity 객체를 넣어준 것입니다.
위와 같이 처리되면, save 함수는 pk 인 uid 가 채워진 new_entity 를 반환하게 되죠.
만약 new_entity 와 같이 pk 가 채워진 객체를 content 와 같은 내용을 수정 후 다시 save 에 넣는다면, 이번엔 insert 가 아닌 update 가 될 것입니다.
entity, repository 만 준비된다면 이렇듯 사용법이 간단한데,
sql_alchemy_transactional 의 장점은 이뿐만이 아닙니다.
이를 사용하는 함수 내에서 에러가 일어나면 자동으로 rollback 을 수행하여 데이터 불일치를 막을 수도 있습니다.
예를들어 앞서 save 가 2번 실행되서 데이터가 2개 입력된 상황이라 가정할 때,
바로 뒷줄의 코드에서 에러가 일어났다고 가정합시다.
전체 코드가 실행되지도 않았는데 에러가 났으니 정상적인 상태가 아닌 데이터가 2개 입력된 것이죠.
만약 에러가 발생하면 기존에 등록한 2개의 데이터 입력도 취소하고 rollback 을 시켜야 하며,
위 데코레이터를 사용하면 자동으로 이러한 처리가 됩니다.
만약 테스트 하고 싶다면,
위 코드에서 save 코드 아래쪽에,
raise Exception()
을 입력하여 실행시켰을 때 값이 저장되지 않았다는 것을 확인하면 됩니다.
8. read_only
sql_alchemy_transactional 의 read_only 설정을 보겠습니다.
이를 적용하지 않으면 기본값은 False 입니다.
즉, 수정, 삭제, 입력시에는 read_only 를 False 로 해야 합니다.
위 config 코드를 보신다면 파악했겠지만, read_only 가 False 라면 try catch 문으로, commit 과 rollback 을 처리합니다.
True 라면 commit 도 rollback 도 없죠.
고로 데이터 변경이 없는 함수에서는 read_only 설정을 쓰는 것이 쓸데없는 로직을 수행하지 않으므로, 보다 성능에 좋습니다.
이외의 예시는 위 깃허브 프로젝트 안에 수록되어 있으므로 참고하세요.
9. 페이징
실습의 마지막으로 SQLAlchemy 의 페이징 방식에 대해서 설명하고 마치겠습니다.
웹 개발 경험이 있으시다면 DB 의 페이징 처리 방식은 알고 계실 것입니다.
고로 알고 계신 방식대로 위에서 설명한 방법을 사용하여 처리하기만 하면 됩니다.
굳이 이러한 설명을 붙인 이유는,
Springboot JPA 와 같은 라이브러리는, 페이징만을 위한 특별한 방법론이 존재하지만,
현 시점 SQLAlchemy 에는 제공되지 않기 때문이며, 저 역시 JPA 의 Pagination 과 같은 방식이 있지 않을까 찾아보았기 때문입니다.
SQLAlchemy Repository 에서의 페이징 처리는 아래와 같습니다.
# (네이티브 페이지네이션 샘플)
async def find_page_all_from_template_test_data_by_not_deleted_with_random_num_distance(
db: AsyncSession,
page: int,
page_elements_count: int,
num: int
) -> Tuple[List[value_objects.FindPageAllFromTemplateTestDataByNotDeletedWithRandomNumDistanceOutputVo], int]:
offset = (page - 1) * page_elements_count
# 본문 조회 (distance 기준 정렬)
result = await db.execute(
text("""
SELECT
test_data.uid AS uid,
test_data.row_create_date AS row_create_date,
test_data.row_update_date AS row_update_date,
test_data.content AS content,
test_data.random_num AS random_num,
test_data.test_datetime AS test_datetime,
ABS(test_data.random_num - :num) AS distance
FROM
template.test_data AS test_data
WHERE
test_data.row_delete_date_str = '/'
ORDER BY
distance
LIMIT :limit OFFSET :offset
"""),
{"num": num, "limit": page_elements_count, "offset": offset}
)
rows = result.mappings().all()
entities = [
value_objects.FindPageAllFromTemplateTestDataByNotDeletedWithRandomNumDistanceOutputVo(
uid=row["uid"],
row_create_date=row["row_create_date"],
row_update_date=row["row_update_date"],
content=row["content"],
random_num=row["random_num"],
test_datetime=row["test_datetime"],
distance=row["distance"]
)
for row in rows
]
# 총 개수 조회
count_result = await db.execute(
text("""
SELECT
COUNT(*)
FROM
template.test_data AS test_data
WHERE
test_data.row_delete_date_str = '/'
""")
)
total_elements = count_result.scalar_one()
return entities, total_elements
page 와 page_elements_count 라는, 페이징 처리에 필요한 정보를 받아들여서 처리하는 코드로,
일반적인 DB 사용 방식과 동일하기에 위와 같은 구조로 작성하는 것이 효율적일 것이라는 것만 소개하겠습니다.
- 이상입니다.
이외에 unique, foreign key 등의 제약 처리나, blob, binary 와 같은 컬럼 타입별 매핑 방식 등의 내용이 남아있으며, 계속해서 개선된 코드를 올리는 중입니다.
해당 부분은 위에 공유드린 깃허브 레포지토리를 참고하세요.
'Programming > BackEnd' 카테고리의 다른 글
Springboot JPA Entity 변수 매핑 정리 (Java, Kotlin) (0) | 2025.05.08 |
---|---|
FastAPI 비동기 처리 주의사항 (0) | 2025.04.28 |
서버 부하 테스트 - Locust 사용 (FastAPI, Springboot 비디오 스트리밍 성능 비교), FastAPI Media Streaming 코드 수록 (0) | 2025.04.23 |
Python FastAPI 개발 방법 정리 (0) | 2025.04.20 |
분산 소켓 서버 설명 및 구현(Springboot, SockJS, STOMP, Kafka, Redis, Javascript) (0) | 2025.03.28 |