Back to Blog
Article

Building High-Performance Web APIs with FastAPI

By ForgeAPI Team

Performance is not a feature—it's a requirement. In this guide, we'll explore every technique to squeeze maximum performance out of your FastAPI applications.

Understanding FastAPI's Performance

FastAPI achieves exceptional performance through:

  • Starlette: One of the fastest Python ASGI frameworks
  • Uvicorn: Lightning-fast ASGI server built on uvloop
  • Pydantic V2: Rust-powered validation with 5-50x speedups
  • Native Async: Non-blocking I/O for maximum concurrency

Benchmark Context

Framework Comparison (requests/second):
┌─────────────────┬─────────┐
│ FastAPI + Uvicorn │ ~42,000 │
│ Flask + Gunicorn  │ ~8,000  │
│ Django            │ ~6,000  │
│ Express (Node)    │ ~38,000 │
│ Gin (Go)          │ ~50,000 │
└─────────────────┴─────────┘

Database Optimization

Connection Pooling

Never create new database connections per request:

from sqlalchemy.ext.asyncio import (
    create_async_engine,
    AsyncSession,
    async_sessionmaker
)
 
# Optimized connection pool
engine = create_async_engine(
    "postgresql+asyncpg://user:pass@localhost/db",
    pool_size=20,              # Base pool size
    max_overflow=10,           # Extra connections under load
    pool_timeout=30,           # Wait time for connection
    pool_recycle=1800,         # Recycle connections every 30 min
    pool_pre_ping=True,        # Verify connection health
    echo=False                 # Disable SQL logging in production
)
 
async_session_factory = async_sessionmaker(
    engine,
    class_=AsyncSession,
    expire_on_commit=False     # Avoid lazy loading issues
)
 
async def get_db():
    async with async_session_factory() as session:
        yield session

Query Optimization

from sqlalchemy import select
from sqlalchemy.orm import selectinload, joinedload
 
# BAD: N+1 query problem
@app.get("/users")
async def get_users_slow(db: AsyncSession):
    result = await db.execute(select(User))
    users = result.scalars().all()
    # Each user.posts access triggers a new query!
    return users
 
# GOOD: Eager loading with selectinload
@app.get("/users")
async def get_users_fast(db: AsyncSession):
    result = await db.execute(
        select(User)
        .options(selectinload(User.posts))  # Load posts in single query
        .options(selectinload(User.comments))
    )
    return result.scalars().all()
 
# GOOD: Joinedload for single related object
@app.get("/posts")
async def get_posts_with_author(db: AsyncSession):
    result = await db.execute(
        select(Post)
        .options(joinedload(Post.author))  # JOIN instead of separate query
        .limit(100)
    )
    return result.unique().scalars().all()

Batch Operations

from sqlalchemy import insert
 
# BAD: Individual inserts
async def create_items_slow(items: List[ItemCreate], db: AsyncSession):
    for item in items:
        db.add(Item(**item.model_dump()))
    await db.commit()
 
# GOOD: Bulk insert
async def create_items_fast(items: List[ItemCreate], db: AsyncSession):
    await db.execute(
        insert(Item),
        [item.model_dump() for item in items]
    )
    await db.commit()

Caching Strategies

In-Memory Caching with TTL

from functools import lru_cache
from cachetools import TTLCache
from asyncio import Lock
 
# Thread-safe async cache
class AsyncCache:
    def __init__(self, maxsize: int = 1000, ttl: int = 300):
        self._cache = TTLCache(maxsize=maxsize, ttl=ttl)
        self._lock = Lock()
    
    async def get(self, key: str):
        async with self._lock:
            return self._cache.get(key)
    
    async def set(self, key: str, value):
        async with self._lock:
            self._cache[key] = value
    
    async def get_or_set(self, key: str, factory):
        value = await self.get(key)
        if value is None:
            value = await factory()
            await self.set(key, value)
        return value
 
cache = AsyncCache(maxsize=5000, ttl=60)
 
@app.get("/products/{product_id}")
async def get_product(product_id: int, db: AsyncSession):
    cache_key = f"product:{product_id}"
    
    async def fetch():
        result = await db.execute(
            select(Product).where(Product.id == product_id)
        )
        return result.scalar_one_or_none()
    
    return await cache.get_or_set(cache_key, fetch)

Redis Caching

import redis.asyncio as redis
import json
from typing import Optional, TypeVar, Callable
 
T = TypeVar('T')
 
class RedisCache:
    def __init__(self, url: str = "redis://localhost:6379"):
        self.redis = redis.from_url(url, decode_responses=True)
    
    async def cached(
        self,
        key: str,
        factory: Callable[[], T],
        ttl: int = 300
    ) -> T:
        # Try cache first
        cached = await self.redis.get(key)
        if cached:
            return json.loads(cached)
        
        # Fetch and cache
        value = await factory()
        await self.redis.setex(key, ttl, json.dumps(value))
        return value
    
    async def invalidate(self, pattern: str):
        """Invalidate all keys matching pattern."""
        async for key in self.redis.scan_iter(pattern):
            await self.redis.delete(key)
 
redis_cache = RedisCache()
 
@app.post("/products")
async def create_product(product: ProductCreate, db: AsyncSession):
    new_product = Product(**product.model_dump())
    db.add(new_product)
    await db.commit()
    
    # Invalidate related caches
    await redis_cache.invalidate("products:*")
    await redis_cache.invalidate("categories:*")
    
    return new_product

Async Patterns

Parallel Execution

import asyncio
from typing import List
 
@app.get("/dashboard")
async def get_dashboard(user_id: int, db: AsyncSession):
    """Fetch all dashboard data in parallel."""
    
    # Execute all queries concurrently
    user_task = get_user(user_id, db)
    orders_task = get_recent_orders(user_id, db)
    stats_task = get_user_stats(user_id, db)
    notifications_task = get_notifications(user_id, db)
    
    user, orders, stats, notifications = await asyncio.gather(
        user_task,
        orders_task,
        stats_task,
        notifications_task
    )
    
    return {
        "user": user,
        "orders": orders,
        "stats": stats,
        "notifications": notifications
    }

Async Generators for Streaming

from typing import AsyncGenerator
 
async def stream_large_dataset(db: AsyncSession) -> AsyncGenerator[dict, None]:
    """Stream large datasets without loading all into memory."""
    
    # Stream results from database
    result = await db.stream(select(LargeTable))
    
    async for row in result:
        yield row._mapping
 
@app.get("/export")
async def export_data(db: AsyncSession):
    async def generate():
        yield "["
        first = True
        async for item in stream_large_dataset(db):
            if not first:
                yield ","
            yield json.dumps(item)
            first = False
        yield "]"
    
    return StreamingResponse(
        generate(),
        media_type="application/json",
        headers={"Content-Disposition": "attachment; filename=export.json"}
    )

Response Optimization

Compression

from fastapi.middleware.gzip import GZipMiddleware
 
# Compress responses larger than 500 bytes
app.add_middleware(GZipMiddleware, minimum_size=500)
 
# Or use conditional compression
from starlette.middleware.base import BaseHTTPMiddleware
import gzip
 
class SmartCompressionMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request, call_next):
        response = await call_next(request)
        
        # Only compress JSON responses
        if "application/json" in response.headers.get("content-type", ""):
            accept_encoding = request.headers.get("accept-encoding", "")
            
            if "gzip" in accept_encoding:
                body = b""
                async for chunk in response.body_iterator:
                    body += chunk
                
                compressed = gzip.compress(body)
                
                return Response(
                    content=compressed,
                    status_code=response.status_code,
                    headers={
                        **dict(response.headers),
                        "Content-Encoding": "gzip",
                        "Content-Length": str(len(compressed))
                    },
                    media_type=response.media_type
                )
        
        return response

Fast JSON Serialization

import orjson
from fastapi.responses import Response
 
class ORJSONResponse(Response):
    media_type = "application/json"
    
    def render(self, content) -> bytes:
        return orjson.dumps(
            content,
            option=orjson.OPT_SERIALIZE_NUMPY | orjson.OPT_UTC_Z
        )
 
# Use globally
app = FastAPI(default_response_class=ORJSONResponse)
 
# Or per-endpoint
@app.get("/data", response_class=ORJSONResponse)
async def get_data():
    return {"large": "dataset", "with": "many", "fields": True}

Load Testing

Locust Configuration

# locustfile.py
from locust import HttpUser, task, between
 
class APIUser(HttpUser):
    wait_time = between(0.5, 2)
    
    @task(3)
    def get_products(self):
        self.client.get("/api/products?limit=20")
    
    @task(2)
    def get_product_detail(self):
        self.client.get("/api/products/1")
    
    @task(1)
    def search_products(self):
        self.client.get("/api/products/search?q=laptop")
    
    def on_start(self):
        # Login to get auth token
        response = self.client.post("/api/auth/login", json={
            "email": "test@example.com",
            "password": "testpass123"
        })
        self.token = response.json()["access_token"]
        self.client.headers["Authorization"] = f"Bearer {self.token}"

Performance Metrics to Track

MetricTargetCritical
P50 Latency< 50ms< 100ms
P99 Latency< 200ms< 500ms
Throughput> 1000 rps> 500 rps
Error Rate< 0.1%< 1%
CPU Usage< 70%< 90%

Production Configuration

Uvicorn Optimization

# config.py
import multiprocessing
 
# Calculate optimal workers
workers = multiprocessing.cpu_count() * 2 + 1
 
# Uvicorn config
uvicorn_config = {
    "host": "0.0.0.0",
    "port": 8000,
    "workers": workers,
    "loop": "uvloop",          # Faster event loop
    "http": "httptools",       # Faster HTTP parser
    "access_log": False,       # Disable in prod for speed
    "timeout_keep_alive": 5,   # Close idle connections
}

Gunicorn with Uvicorn Workers

gunicorn app.main:app \
    --workers 4 \
    --worker-class uvicorn.workers.UvicornWorker \
    --bind 0.0.0.0:8000 \
    --max-requests 10000 \
    --max-requests-jitter 1000 \
    --timeout 30 \
    --graceful-timeout 30 \
    --keep-alive 5

Profiling and Debugging

Using py-spy

# Profile a running process
py-spy record -o profile.svg --pid 12345
 
# Profile on startup
py-spy record -o profile.svg -- python -m uvicorn app.main:app

Built-in Profiling Middleware

import cProfile
import pstats
from io import StringIO
 
class ProfilingMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request, call_next):
        if "X-Profile" in request.headers:
            profiler = cProfile.Profile()
            profiler.enable()
            
            response = await call_next(request)
            
            profiler.disable()
            stream = StringIO()
            stats = pstats.Stats(profiler, stream=stream)
            stats.sort_stats("cumulative")
            stats.print_stats(20)
            
            print(stream.getvalue())
            return response
        
        return await call_next(request)

Conclusion

Building high-performance APIs requires attention at every layer—from database queries to response serialization. The patterns in this guide will help you achieve sub-100ms response times and handle thousands of concurrent users.

Performance from day one. ForgeAPI includes all these optimizations pre-configured, letting you focus on building features instead of tuning infrastructure.