為什麼用 FastAPI?
最近把幾個 Flask 專案遷移到 FastAPI,原因很簡單:
- 原生 async 支援:I/O 密集型任務效能大提升
- 自動 API 文件:Swagger UI 開箱即用
- 型別提示:配合 Pydantic,驗證和序列化都省了
- 效能好:接近 Node.js 和 Go 的水準
非同步基礎
async/await 的正確用法
from fastapi import FastAPI
app = FastAPI()
@app.get("/items/{item_id}")
async def get_item(item_id: int):
# 非同步資料庫查詢
item = await db.get_item(item_id)
return item
重點:只有在做 I/O 操作時才用 async def。如果函式內沒有 await,用普通 def 就好。
# 好:有 await,用 async def
@app.get("/async-endpoint")
async def async_endpoint():
data = await fetch_from_api()
return data
# 好:沒有 await,用普通 def
@app.get("/sync-endpoint")
def sync_endpoint():
return {"message": "Hello"}
# 不好:async def 但沒有 await(浪費資源)
@app.get("/bad-endpoint")
async def bad_endpoint():
return {"message": "Hello"} # 這裡應該用普通 def
Lifespan 管理
正確的啟動/關閉處理
注意:@app.on_event("startup") 和 @app.on_event("shutdown") 已經棄用,應該用 lifespan context manager。
from contextlib import asynccontextmanager
from fastapi import FastAPI
from asyncpg import create_pool
# 全域連線池
pool = None
@asynccontextmanager
async def lifespan(app: FastAPI):
# 啟動時執行
global pool
pool = await create_pool(
"postgresql://user:pass@localhost/db",
min_size=5,
max_size=20
)
print("資料庫連線池已建立")
yield # 應用程式運行中
# 關閉時執行
await pool.close()
print("資料庫連線池已關閉")
app = FastAPI(lifespan=lifespan)
為什麼 lifespan 更好?
- 更清楚的生命週期:一個地方管理啟動和關閉
- 資源保證釋放:用 context manager,確保 cleanup 會執行
- 支援依賴注入:可以把資源傳給依賴
依賴注入
基本用法
from fastapi import Depends
async def get_db():
async with pool.acquire() as conn:
yield conn
@app.get("/users/")
async def get_users(db = Depends(get_db)):
rows = await db.fetch("SELECT * FROM users")
return rows
進階:帶參數的依賴
def get_pagination(skip: int = 0, limit: int = 10):
return {"skip": skip, "limit": limit}
@app.get("/items/")
async def get_items(
db = Depends(get_db),
pagination = Depends(get_pagination)
):
query = f"SELECT * FROM items OFFSET {pagination['skip']} LIMIT {pagination['limit']}"
return await db.fetch(query)
類別作為依賴
class ItemService:
def __init__(self, db = Depends(get_db)):
self.db = db
async def get_all(self):
return await self.db.fetch("SELECT * FROM items")
async def get_by_id(self, item_id: int):
return await self.db.fetchrow(
"SELECT * FROM items WHERE id = $1",
item_id
)
@app.get("/items/")
async def get_items(service: ItemService = Depends()):
return await service.get_all()
連線池管理
資料庫連線池
from asyncpg import create_pool
@asynccontextmanager
async def lifespan(app: FastAPI):
app.state.pool = await create_pool(
DATABASE_URL,
min_size=5,
max_size=20,
command_timeout=60
)
yield
await app.state.pool.close()
async def get_db():
async with app.state.pool.acquire() as conn:
yield conn
HTTP 客戶端連線池
import httpx
@asynccontextmanager
async def lifespan(app: FastAPI):
app.state.http_client = httpx.AsyncClient(
timeout=30.0,
limits=httpx.Limits(max_connections=100)
)
yield
await app.state.http_client.aclose()
async def get_http_client():
return app.state.http_client
@app.get("/external-api")
async def call_external_api(client: httpx.AsyncClient = Depends(get_http_client)):
response = await client.get("https://api.example.com/data")
return response.json()
背景任務
不阻塞回應
from fastapi import BackgroundTasks
async def send_notification(user_id: int, message: str):
# 模擬發送通知(耗時操作)
await asyncio.sleep(5)
print(f"通知已發送給用戶 {user_id}: {message}")
@app.post("/orders/")
async def create_order(
order: OrderCreate,
background_tasks: BackgroundTasks
):
# 建立訂單(快速)
order_id = await db.create_order(order)
# 發送通知(放到背景)
background_tasks.add_task(
send_notification,
order.user_id,
f"您的訂單 {order_id} 已建立"
)
# 立即回傳
return {"order_id": order_id, "status": "created"}
什麼時候用背景任務?
- 發送郵件/通知
- 寫入日誌到外部系統
- 更新快取
- 非關鍵的資料處理
注意:如果任務很重要,考慮用 Celery 或 Redis Queue,避免任務遺失。
錯誤處理
全域例外處理
from fastapi import Request
from fastapi.responses import JSONResponse
@app.exception_handler(ValueError)
async def value_error_handler(request: Request, exc: ValueError):
return JSONResponse(
status_code=400,
content={"detail": str(exc)}
)
@app.exception_handler(Exception)
async def general_exception_handler(request: Request, exc: Exception):
# 記錄錯誤
logger.error(f"Unhandled exception: {exc}", exc_info=True)
return JSONResponse(
status_code=500,
content={"detail": "Internal server error"}
)
效能小技巧
1. 使用 orjson 加速 JSON 序列化
from fastapi.responses import ORJSONResponse
app = FastAPI(default_response_class=ORJSONResponse)
2. 適當使用 Response Cache
from fastapi_cache import FastAPICache
from fastapi_cache.decorator import cache
@app.get("/expensive-query")
@cache(expire=60) # 快取 60 秒
async def expensive_query():
return await db.fetch_expensive_data()
3. 批量操作
# 不好:N 次查詢
for user_id in user_ids:
user = await db.get_user(user_id)
# 好:1 次查詢
users = await db.get_users_by_ids(user_ids)
總結
FastAPI 的非同步能力很強,但要用對:
- lifespan 管理資源,別用棄用的
on_event - 連線池一定要用,別每次都建新連線
- 依賴注入讓程式碼更乾淨、更好測試
- 背景任務用於非關鍵的耗時操作
掌握這些模式,就能寫出高效能、可維護的 API。