Building a Real-Time Traffic API with FastAPI, Redis Pub/Sub, and Load Testing Using Locust

In this article, I’ll walk you through building a real-time traffic API using FastAPI, Redis Pub/Sub, and WebSockets. We’ll explore how to push live vehicle location data to connected clients, leverage Redis for scalable event broadcasting, and use Locust to stress-test our system under load. Whether you're a backend engineer or just getting into real-time architectures, this post is for you.

Project Architecture

The system is composed of:

  • FastAPI to expose REST and WebSocket endpoints
  • Redis Pub/Sub to decouple event ingestion from broadcasting
  • WebSockets to stream real-time updates to connected clients
  • Locust to simulate high load and verify stability under pressure

Building the API with FastAPI

We define a POST endpoint /events that receives vehicle data and publishes it through Redis in api/routes.py:


from fastapi import APIRouter, Request
from slowapi import Limiter
from slowapi.util import get_remote_address

from app.models.schemas import VehicleEvent
from app.services.vehicle_service import handle_vehicle_event
import time

router = APIRouter()
limiter = Limiter(key_func=get_remote_address)


@router.post("/events")
@limiter.limit("100/second")
async def post_vehicle_event(event: VehicleEvent, request: Request):
    start_time = time.perf_counter()
    await handle_vehicle_event(event)
    duration = time.perf_counter() - start_time
    return {"status": "received", "duration_ms": round(duration * 1000, 2)}

To prevent abuse, we included a simple rate limit using slowapi of 100 requests per second.

WebSocket clients connect to /ws/vehicles to receive those updates in real time in api/websocket.py

@app.websocket("/ws/vehicles")
async def websocket_endpoint(websocket: WebSocket):
    await manager.connect(websocket)
    try:
        while True:
            await websocket.receive_text()
    except:
        await manager.disconnect(websocket)

Redis Pub/Sub Integration

Redis acts as a message broker. When a POST request is received, the event is pushed to a Redis channel. A background listener consumes events and broadcasts them to WebSocket clients in core/event_bus.py

from redis.asyncio import Redis
from app.ws_manager import manager

REDIS_CHANNEL = "vehicle_events"
redis = Redis(host="redis", port=6379, decode_responses=True)


async def publish_event(event):
    await redis.publish(REDIS_CHANNEL, event.json())


async def start_event_listener():
    pubsub = redis.pubsub()
    await pubsub.subscribe(REDIS_CHANNEL)
    async for message in pubsub.listen():
        if message["type"] == "message":
            await manager.broadcast(message["data"])

Create schema to validate data

The schema validates the data structure sent as an event in app/models/schemas.py:

from pydantic import BaseModel


class VehicleEvent(BaseModel):
    id: str
    lat: float
    lon: float
    speed: float

Create a service to publish events in Redis

Create the file app/services/vehicle-service.py:

from app.core.event_bus import publish_event


async def handle_vehicle_event(event):
    await publish_event(event)

Create main file of FastAPI

Create file app/main.py

from fastapi import FastAPI
from app.api.routes import router as api_router, limiter
from app.api.websocket import websocket_endpoint
from slowapi import Limiter
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from app.core.event_bus import start_event_listener
import asyncio


app = FastAPI()
app.state.limiter = limiter
app.add_exception_handler(
    RateLimitExceeded,
    lambda r, e: JSONResponse(status_code=429, content={"error": "Too Many Requests"}),
)

origins = ["*"]

app.add_middleware(
    CORSMiddleware,
    allow_origins=origins,
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

app.include_router(api_router)
app.add_api_websocket_route("/ws/vehicles", websocket_endpoint)


@app.on_event("startup")
async def startup_event():
    asyncio.create_task(start_event_listener())

Create connection manager for web socket

Create file app/ws_manager.py

class ConnectionManager:
    def __init__(self):
        self.active_connections: list = []

    async def connect(self, websocket):
        await websocket.accept()
        self.active_connections.append(websocket)

    async def disconnect(self, websocket):
        self.active_connections.remove(websocket)

    async def broadcast(self, message: str):
        for connection in self.active_connections:
            await connection.send_text(message)


manager = ConnectionManager()

Create Dockerfile and Docker Compose files

Dockerfile:

FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]

docker-compose.yaml:

version: '3.9'
services:
  app:
    build: .
    ports:
      - "8000:8000"
    volumes:
      - .:/app
    command: uvicorn app.main:app --host 0.0.0.0 --port 8000 --reload
    depends_on:
      - redis
  redis:
    image: redis:7
    ports:
      - "6379:6379"

Create Makefile

It allows us to have some shortcuts to run our basic commands from bash:

## Makefile
up:
docker-compose up --build

down:
docker-compose down

install:
pip install -r requirements.txt

locust:
locust -f locustfile.py

lint:
docker-compose run --rm app black .

Load Testing with Locust

Locust allows us to simulate traffic and stress test our /events endpoint.
Here's the core of our locustfile.py:

from locust import HttpUser, task, between
import random


class VehicleEventUser(HttpUser):
    wait_time = between(0.01, 0.2)
    host = "http://localhost:8000"

    @task
    def post_event(self):
        payload = {
            "id": f"bus-{random.randint(1, 100)}",
            "lat": round(random.uniform(-90, 90), 6),
            "lon": round(random.uniform(-180, 180), 6),
            "speed": round(random.uniform(0, 120), 2),
        }
        self.client.post("/events", json=payload)

To run the test:

make locust

Then navigate to http://localhost:8089 and enter:

  • Host: http://localhost:8000
  • Users: 100
  • Spawn rate: 10

Locust reports include latency, failure rate, throughput, and percentiles. When the rate limit is reached, you’ll start seeing 429 responses.

We built a scalable real-time traffic API using FastAPI, Redis, and WebSockets. With Redis Pub/Sub, we decoupled ingestion and delivery. With Locust, we validated the system’s performance. This setup is a solid foundation for systems that need to scale horizontally while staying responsive under load.

You have the full code of the project here.

Subir