Performance is not a feature—it's a requirement. In this guide, we'll explore every technique to squeeze maximum performance out of your FastAPI applications.
Understanding FastAPI's Performance
FastAPI achieves exceptional performance through:
- Starlette: One of the fastest Python ASGI frameworks
- Uvicorn: Lightning-fast ASGI server built on uvloop
- Pydantic V2: Rust-powered validation with 5-50x speedups
- Native Async: Non-blocking I/O for maximum concurrency
Benchmark Context
Framework Comparison (requests/second):
┌─────────────────┬─────────┐
│ FastAPI + Uvicorn │ ~42,000 │
│ Flask + Gunicorn │ ~8,000 │
│ Django │ ~6,000 │
│ Express (Node) │ ~38,000 │
│ Gin (Go) │ ~50,000 │
└─────────────────┴─────────┘Database Optimization
Connection Pooling
Never create new database connections per request:
from sqlalchemy.ext.asyncio import (
create_async_engine,
AsyncSession,
async_sessionmaker
)
# Optimized connection pool
engine = create_async_engine(
"postgresql+asyncpg://user:pass@localhost/db",
pool_size=20, # Base pool size
max_overflow=10, # Extra connections under load
pool_timeout=30, # Wait time for connection
pool_recycle=1800, # Recycle connections every 30 min
pool_pre_ping=True, # Verify connection health
echo=False # Disable SQL logging in production
)
async_session_factory = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False # Avoid lazy loading issues
)
async def get_db():
async with async_session_factory() as session:
yield sessionQuery Optimization
from sqlalchemy import select
from sqlalchemy.orm import selectinload, joinedload
# BAD: N+1 query problem
@app.get("/users")
async def get_users_slow(db: AsyncSession):
result = await db.execute(select(User))
users = result.scalars().all()
# Each user.posts access triggers a new query!
return users
# GOOD: Eager loading with selectinload
@app.get("/users")
async def get_users_fast(db: AsyncSession):
result = await db.execute(
select(User)
.options(selectinload(User.posts)) # Load posts in single query
.options(selectinload(User.comments))
)
return result.scalars().all()
# GOOD: Joinedload for single related object
@app.get("/posts")
async def get_posts_with_author(db: AsyncSession):
result = await db.execute(
select(Post)
.options(joinedload(Post.author)) # JOIN instead of separate query
.limit(100)
)
return result.unique().scalars().all()Batch Operations
from sqlalchemy import insert
# BAD: Individual inserts
async def create_items_slow(items: List[ItemCreate], db: AsyncSession):
for item in items:
db.add(Item(**item.model_dump()))
await db.commit()
# GOOD: Bulk insert
async def create_items_fast(items: List[ItemCreate], db: AsyncSession):
await db.execute(
insert(Item),
[item.model_dump() for item in items]
)
await db.commit()Caching Strategies
In-Memory Caching with TTL
from functools import lru_cache
from cachetools import TTLCache
from asyncio import Lock
# Thread-safe async cache
class AsyncCache:
def __init__(self, maxsize: int = 1000, ttl: int = 300):
self._cache = TTLCache(maxsize=maxsize, ttl=ttl)
self._lock = Lock()
async def get(self, key: str):
async with self._lock:
return self._cache.get(key)
async def set(self, key: str, value):
async with self._lock:
self._cache[key] = value
async def get_or_set(self, key: str, factory):
value = await self.get(key)
if value is None:
value = await factory()
await self.set(key, value)
return value
cache = AsyncCache(maxsize=5000, ttl=60)
@app.get("/products/{product_id}")
async def get_product(product_id: int, db: AsyncSession):
cache_key = f"product:{product_id}"
async def fetch():
result = await db.execute(
select(Product).where(Product.id == product_id)
)
return result.scalar_one_or_none()
return await cache.get_or_set(cache_key, fetch)Redis Caching
import redis.asyncio as redis
import json
from typing import Optional, TypeVar, Callable
T = TypeVar('T')
class RedisCache:
def __init__(self, url: str = "redis://localhost:6379"):
self.redis = redis.from_url(url, decode_responses=True)
async def cached(
self,
key: str,
factory: Callable[[], T],
ttl: int = 300
) -> T:
# Try cache first
cached = await self.redis.get(key)
if cached:
return json.loads(cached)
# Fetch and cache
value = await factory()
await self.redis.setex(key, ttl, json.dumps(value))
return value
async def invalidate(self, pattern: str):
"""Invalidate all keys matching pattern."""
async for key in self.redis.scan_iter(pattern):
await self.redis.delete(key)
redis_cache = RedisCache()
@app.post("/products")
async def create_product(product: ProductCreate, db: AsyncSession):
new_product = Product(**product.model_dump())
db.add(new_product)
await db.commit()
# Invalidate related caches
await redis_cache.invalidate("products:*")
await redis_cache.invalidate("categories:*")
return new_productAsync Patterns
Parallel Execution
import asyncio
from typing import List
@app.get("/dashboard")
async def get_dashboard(user_id: int, db: AsyncSession):
"""Fetch all dashboard data in parallel."""
# Execute all queries concurrently
user_task = get_user(user_id, db)
orders_task = get_recent_orders(user_id, db)
stats_task = get_user_stats(user_id, db)
notifications_task = get_notifications(user_id, db)
user, orders, stats, notifications = await asyncio.gather(
user_task,
orders_task,
stats_task,
notifications_task
)
return {
"user": user,
"orders": orders,
"stats": stats,
"notifications": notifications
}Async Generators for Streaming
from typing import AsyncGenerator
async def stream_large_dataset(db: AsyncSession) -> AsyncGenerator[dict, None]:
"""Stream large datasets without loading all into memory."""
# Stream results from database
result = await db.stream(select(LargeTable))
async for row in result:
yield row._mapping
@app.get("/export")
async def export_data(db: AsyncSession):
async def generate():
yield "["
first = True
async for item in stream_large_dataset(db):
if not first:
yield ","
yield json.dumps(item)
first = False
yield "]"
return StreamingResponse(
generate(),
media_type="application/json",
headers={"Content-Disposition": "attachment; filename=export.json"}
)Response Optimization
Compression
from fastapi.middleware.gzip import GZipMiddleware
# Compress responses larger than 500 bytes
app.add_middleware(GZipMiddleware, minimum_size=500)
# Or use conditional compression
from starlette.middleware.base import BaseHTTPMiddleware
import gzip
class SmartCompressionMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request, call_next):
response = await call_next(request)
# Only compress JSON responses
if "application/json" in response.headers.get("content-type", ""):
accept_encoding = request.headers.get("accept-encoding", "")
if "gzip" in accept_encoding:
body = b""
async for chunk in response.body_iterator:
body += chunk
compressed = gzip.compress(body)
return Response(
content=compressed,
status_code=response.status_code,
headers={
**dict(response.headers),
"Content-Encoding": "gzip",
"Content-Length": str(len(compressed))
},
media_type=response.media_type
)
return responseFast JSON Serialization
import orjson
from fastapi.responses import Response
class ORJSONResponse(Response):
media_type = "application/json"
def render(self, content) -> bytes:
return orjson.dumps(
content,
option=orjson.OPT_SERIALIZE_NUMPY | orjson.OPT_UTC_Z
)
# Use globally
app = FastAPI(default_response_class=ORJSONResponse)
# Or per-endpoint
@app.get("/data", response_class=ORJSONResponse)
async def get_data():
return {"large": "dataset", "with": "many", "fields": True}Load Testing
Locust Configuration
# locustfile.py
from locust import HttpUser, task, between
class APIUser(HttpUser):
wait_time = between(0.5, 2)
@task(3)
def get_products(self):
self.client.get("/api/products?limit=20")
@task(2)
def get_product_detail(self):
self.client.get("/api/products/1")
@task(1)
def search_products(self):
self.client.get("/api/products/search?q=laptop")
def on_start(self):
# Login to get auth token
response = self.client.post("/api/auth/login", json={
"email": "test@example.com",
"password": "testpass123"
})
self.token = response.json()["access_token"]
self.client.headers["Authorization"] = f"Bearer {self.token}"Performance Metrics to Track
| Metric | Target | Critical |
|---|---|---|
| P50 Latency | < 50ms | < 100ms |
| P99 Latency | < 200ms | < 500ms |
| Throughput | > 1000 rps | > 500 rps |
| Error Rate | < 0.1% | < 1% |
| CPU Usage | < 70% | < 90% |
Production Configuration
Uvicorn Optimization
# config.py
import multiprocessing
# Calculate optimal workers
workers = multiprocessing.cpu_count() * 2 + 1
# Uvicorn config
uvicorn_config = {
"host": "0.0.0.0",
"port": 8000,
"workers": workers,
"loop": "uvloop", # Faster event loop
"http": "httptools", # Faster HTTP parser
"access_log": False, # Disable in prod for speed
"timeout_keep_alive": 5, # Close idle connections
}Gunicorn with Uvicorn Workers
gunicorn app.main:app \
--workers 4 \
--worker-class uvicorn.workers.UvicornWorker \
--bind 0.0.0.0:8000 \
--max-requests 10000 \
--max-requests-jitter 1000 \
--timeout 30 \
--graceful-timeout 30 \
--keep-alive 5Profiling and Debugging
Using py-spy
# Profile a running process
py-spy record -o profile.svg --pid 12345
# Profile on startup
py-spy record -o profile.svg -- python -m uvicorn app.main:appBuilt-in Profiling Middleware
import cProfile
import pstats
from io import StringIO
class ProfilingMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request, call_next):
if "X-Profile" in request.headers:
profiler = cProfile.Profile()
profiler.enable()
response = await call_next(request)
profiler.disable()
stream = StringIO()
stats = pstats.Stats(profiler, stream=stream)
stats.sort_stats("cumulative")
stats.print_stats(20)
print(stream.getvalue())
return response
return await call_next(request)Conclusion
Building high-performance APIs requires attention at every layer—from database queries to response serialization. The patterns in this guide will help you achieve sub-100ms response times and handle thousands of concurrent users.
Performance from day one. ForgeAPI includes all these optimizations pre-configured, letting you focus on building features instead of tuning infrastructure.