Building a Real-Time Traffic API with FastAPI, Redis Pub/Sub, and Load Testing Using Locust
In this article, I’ll walk you through building a real-time traffic API using FastAPI, Redis Pub/Sub, and WebSockets. We’ll explore how to push live vehicle location data to connected clients, leverage Redis for scalable event broadcasting, and use Locust to stress-test our system under load. Whether you're a backend engineer or just getting into real-time architectures, this post is for you.

Project Architecture
The system is composed of:
- FastAPI to expose REST and WebSocket endpoints
- Redis Pub/Sub to decouple event ingestion from broadcasting
- WebSockets to stream real-time updates to connected clients
- Locust to simulate high load and verify stability under pressure

Building the API with FastAPI
We define a POST endpoint /events that receives vehicle data and publishes it through Redis in api/routes.py:
from fastapi import APIRouter, Request
from slowapi import Limiter
from slowapi.util import get_remote_address
from app.models.schemas import VehicleEvent
from app.services.vehicle_service import handle_vehicle_event
import time
router = APIRouter()
limiter = Limiter(key_func=get_remote_address)
@router.post("/events")
@limiter.limit("100/second")
async def post_vehicle_event(event: VehicleEvent, request: Request):
start_time = time.perf_counter()
await handle_vehicle_event(event)
duration = time.perf_counter() - start_time
return {"status": "received", "duration_ms": round(duration * 1000, 2)}
To prevent abuse, we included a simple rate limit using slowapi of 100 requests per second.
WebSocket clients connect to /ws/vehicles
to receive those updates in real time in api/websocket.py
@app.websocket("/ws/vehicles")
async def websocket_endpoint(websocket: WebSocket):
await manager.connect(websocket)
try:
while True:
await websocket.receive_text()
except:
await manager.disconnect(websocket)
Redis Pub/Sub Integration
Redis acts as a message broker. When a POST request is received, the event is pushed to a Redis channel. A background listener consumes events and broadcasts them to WebSocket clients in core/event_bus.py
from redis.asyncio import Redis
from app.ws_manager import manager
REDIS_CHANNEL = "vehicle_events"
redis = Redis(host="redis", port=6379, decode_responses=True)
async def publish_event(event):
await redis.publish(REDIS_CHANNEL, event.json())
async def start_event_listener():
pubsub = redis.pubsub()
await pubsub.subscribe(REDIS_CHANNEL)
async for message in pubsub.listen():
if message["type"] == "message":
await manager.broadcast(message["data"])
Create schema to validate data
The schema validates the data structure sent as an event in app/models/schemas.py:
from pydantic import BaseModel
class VehicleEvent(BaseModel):
id: str
lat: float
lon: float
speed: float
Create a service to publish events in Redis
Create the file app/services/vehicle-service.py:
from app.core.event_bus import publish_event
async def handle_vehicle_event(event):
await publish_event(event)
Create main file of FastAPI
Create file app/main.py
from fastapi import FastAPI
from app.api.routes import router as api_router, limiter
from app.api.websocket import websocket_endpoint
from slowapi import Limiter
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from app.core.event_bus import start_event_listener
import asyncio
app = FastAPI()
app.state.limiter = limiter
app.add_exception_handler(
RateLimitExceeded,
lambda r, e: JSONResponse(status_code=429, content={"error": "Too Many Requests"}),
)
origins = ["*"]
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.include_router(api_router)
app.add_api_websocket_route("/ws/vehicles", websocket_endpoint)
@app.on_event("startup")
async def startup_event():
asyncio.create_task(start_event_listener())
Create connection manager for web socket
Create file app/ws_manager.py
class ConnectionManager:
def __init__(self):
self.active_connections: list = []
async def connect(self, websocket):
await websocket.accept()
self.active_connections.append(websocket)
async def disconnect(self, websocket):
self.active_connections.remove(websocket)
async def broadcast(self, message: str):
for connection in self.active_connections:
await connection.send_text(message)
manager = ConnectionManager()
Create Dockerfile and Docker Compose files
Dockerfile:
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]
docker-compose.yaml:
version: '3.9'
services:
app:
build: .
ports:
- "8000:8000"
volumes:
- .:/app
command: uvicorn app.main:app --host 0.0.0.0 --port 8000 --reload
depends_on:
- redis
redis:
image: redis:7
ports:
- "6379:6379"
Create Makefile
It allows us to have some shortcuts to run our basic commands from bash:
## Makefile
up:
docker-compose up --build
down:
docker-compose down
install:
pip install -r requirements.txt
locust:
locust -f locustfile.py
lint:
docker-compose run --rm app black .
Load Testing with Locust
Locust allows us to simulate traffic and stress test our /events endpoint.
Here's the core of our locustfile.py:
from locust import HttpUser, task, between
import random
class VehicleEventUser(HttpUser):
wait_time = between(0.01, 0.2)
host = "http://localhost:8000"
@task
def post_event(self):
payload = {
"id": f"bus-{random.randint(1, 100)}",
"lat": round(random.uniform(-90, 90), 6),
"lon": round(random.uniform(-180, 180), 6),
"speed": round(random.uniform(0, 120), 2),
}
self.client.post("/events", json=payload)
To run the test:
make locust
Then navigate to http://localhost:8089 and enter:
- Host: http://localhost:8000
- Users: 100
- Spawn rate: 10
Locust reports include latency, failure rate, throughput, and percentiles. When the rate limit is reached, you’ll start seeing 429 responses.

We built a scalable real-time traffic API using FastAPI, Redis, and WebSockets. With Redis Pub/Sub, we decoupled ingestion and delivery. With Locust, we validated the system’s performance. This setup is a solid foundation for systems that need to scale horizontally while staying responsive under load.
You have the full code of the project here.