Basic vs. Advanced WebSocket Implementations
Below are two contrasting approaches for handling WebSocket connections in Python: a basic in-memory approach and a more advanced Redis-based approach for scalability.
1. Basic Implementation (Single-Instance, In-Memory)
Key Points
- Stores WebSocket connections in a Python set or dict.
- Fine for small-scale or prototype scenarios.
- Not resilient: if the server restarts, all connections are lost.
- Difficult to scale: you can’t easily share in-memory state across multiple server instances.
Code Example
| from sanic import Sanic, Request, Websocket
from sanic.exceptions import WebSocketClosed
app = Sanic("WebSocketBasic")
connected_clients = set()
@app.websocket("/feed")
async def feed(request: Request, ws: Websocket):
connected_clients.add(ws)
print("WebSocket connection established")
try:
async for message in ws:
print(f"Received message: {message}")
# Broadcast to all *other* clients
for client in list(connected_clients):
if client != ws:
try:
await client.send(f"Broadcast: {message}")
except Exception as e:
print(f"Error sending to client: {e}")
except WebSocketClosed:
print("WebSocket connection closed")
finally:
connected_clients.remove(ws)
print("Connection cleanup completed")
@app.route("/")
async def index(request):
return {"message": "Basic single-instance WebSocket server is running."}
if __name__ == "__main__":
# This setup is not designed for multiple processes or servers.
app.run(host="0.0.0.0", port=8000, workers=1)
|
2. Advanced Implementation (Multi-Instance, Redis Pub/Sub)
Key Points
- Each server instance stores only its local WebSocket connections.
- A Redis Pub/Sub channel is used to broadcast messages across instances.
- This allows horizontal scaling: multiple workers or servers can handle clients concurrently.
- If one server goes down, it only affects the clients connected to that instance.
Code Example
| import uuid
import asyncio
import aioredis
from sanic import Sanic, Request, Websocket
from sanic.exceptions import WebSocketClosed
app = Sanic("DistributedWebSocketApp")
REDIS_URL = "redis://localhost:6379"
REDIS_CHANNEL = "ws_broadcast"
# Store local active WebSocket connections: { client_id: Websocket }
local_ws_connections = {}
@app.listener("before_server_start")
async def setup_redis(app, loop):
"""
Create Redis pool and start a background task to listen for messages.
"""
app.ctx.redis = await aioredis.create_redis_pool(REDIS_URL)
app.ctx.pubsub = app.ctx.redis.pubsub()
await app.ctx.pubsub.subscribe(REDIS_CHANNEL)
async def redis_listener():
"""Listens on REDIS_CHANNEL for messages to broadcast locally."""
while True:
try:
message = await app.ctx.pubsub.get_message(
ignore_subscribe_messages=True,
timeout=1.0
)
if message:
# message['data'] is bytes; convert to string
data = message["data"].decode()
# Broadcast to all local connections
for ws in list(local_ws_connections.values()):
try:
await ws.send(f"Redis broadcast -> {data}")
except Exception:
pass
except asyncio.CancelledError:
break
except Exception as e:
app.logger.error(f"Redis listener error: {e}")
# Schedule Redis subscriber listening in the background
app.add_task(redis_listener())
@app.listener("after_server_stop")
async def close_redis(app, loop):
"""Cleanly close Redis connections."""
await app.ctx.pubsub.unsubscribe(REDIS_CHANNEL)
app.ctx.pubsub.close()
app.ctx.redis.close()
await app.ctx.redis.wait_closed()
@app.websocket("/ws")
async def handle_websocket(request: Request, ws: Websocket):
client_id = str(uuid.uuid4())
local_ws_connections[client_id] = ws
app.logger.info(f"Client connected: {client_id}")
try:
async for msg in ws:
app.logger.info(f"Received from {client_id}: {msg}")
# Publish incoming message to Redis, so all instances see it
await app.ctx.redis.publish(
REDIS_CHANNEL,
f"{client_id} says: {msg}"
)
except WebSocketClosed:
app.logger.info(f"WebSocket closed: {client_id}")
except Exception as e:
app.logger.error(f"Error in websocket: {e}")
finally:
# Remove from local connections
local_ws_connections.pop(client_id, None)
app.logger.info(f"Client disconnected: {client_id}")
@app.route("/")
async def index(request):
return {"message": "Advanced WebSocket server instance with Redis Pub/Sub."}
if __name__ == "__main__":
# Launch multiple instances on different ports as needed,
# all connecting to the same Redis instance.
# python websocket_server.py --port=8001
# python websocket_server.py --port=8002
app.run(host="0.0.0.0", port=8001, workers=1)
|
Wrap-Up
- Basic:
- Easiest to implement, but limited to a single process.
-
Fine for quick demos, small usage, or internal tools.
-
Advanced (Redis Pub/Sub):
- Allows you to scale out by running multiple server instances.
- A common, production-friendly approach for real-time apps that need more than a single machine.
- Fault-tolerant: a crash on one instance doesn’t kill the entire system.
Collection of System Design Examples
Below are various system design examples with code snippets. Each section is self-contained and demonstrates a distinct pattern or approach.
Example 1: Rate Limiter with Redis
Scenario: Limit API calls per user within a time frame.
| import time
import redis
r = redis.StrictRedis(host='localhost', port=6379, db=0)
RATE_LIMIT = 10 # max requests
TIME_WINDOW = 60 # seconds
def is_rate_limited(user_id):
key = f"rate_limit:{user_id}"
current = r.get(key)
if current is None:
pipe = r.pipeline()
pipe.set(key, 1, ex=TIME_WINDOW)
pipe.execute()
return False
elif int(current) < RATE_LIMIT:
r.incr(key)
return False
else:
return True
# Usage
user_id = "user123"
if is_rate_limited(user_id):
print("Rate limit exceeded. Try later.")
else:
print("Request allowed.")
|
Example 2: Asynchronous Task Processing with Celery
Scenario: Offload tasks to background workers.
celery_app.py:
| from celery import Celery
app = Celery('tasks', broker='pyamqp://guest@localhost//')
@app.task
def long_running_task(x, y):
import time
time.sleep(5)
return x + y
|
producer.py:
| from celery_app import long_running_task
result = long_running_task.delay(10, 20)
print("Task sent. Waiting for result...")
print("Result:", result.get(timeout=10))
|
Example 3: Circuit Breaker Pattern
Scenario: Prevent repeated calls to a failing service.
| import time
import requests
class CircuitBreaker:
def __init__(self, failure_threshold=3, recovery_timeout=30):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.failure_count = 0
self.last_failure_time = None
self.open = False
def call(self, func, *args, **kwargs):
if self.open:
if time.time() - self.last_failure_time > self.recovery_timeout:
self.open = False
else:
raise Exception("Circuit is open. Call blocked.")
try:
result = func(*args, **kwargs)
self.failure_count = 0
return result
except Exception as e:
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.open = True
raise e
breaker = CircuitBreaker()
def unreliable_service():
response = requests.get("http://example.com/api")
response.raise_for_status()
return response.json()
try:
result = breaker.call(unreliable_service)
print("Service call succeeded:", result)
except Exception as ex:
print("Service call failed or circuit open:", ex)
|
Example 4: Distributed Caching with Redis
Scenario: Cache expensive database queries.
| import redis
import time
import hashlib
r = redis.StrictRedis(host='localhost', port=6379, db=0)
def get_data_from_db(query):
time.sleep(2)
return f"Results for {query}"
def cached_query(query, ttl=60):
key = f"cache:{hashlib.sha256(query.encode()).hexdigest()}"
result = r.get(key)
if result:
return result.decode()
result = get_data_from_db(query)
r.setex(key, ttl, result)
return result
query = "SELECT * FROM users WHERE id = 1"
print(cached_query(query))
|
Example 5: OAuth2 with Flask and Authlib
Scenario: OAuth2 Authorization Code Flow setup.
| from flask import Flask, request, jsonify
from authlib.integrations.flask_oauth2 import AuthorizationServer
from authlib.oauth2.rfc6749 import grants
app = Flask(__name__)
app.config['SECRET_KEY'] = 'secret'
authorization = AuthorizationServer(app)
clients = {}
tokens = {}
class AuthorizationCodeGrant(grants.AuthorizationCodeGrant):
def save_authorization_code(self, code, request):
clients[code] = request.client_id
def query_authorization_code(self, code, client):
if clients.get(code) == client.client_id:
return code
def delete_authorization_code(self, authorization_code):
clients.pop(authorization_code, None)
def authenticate_user(self, authorization_code):
return {'user_id': '123'}
authorization.register_grant(AuthorizationCodeGrant)
@app.route('/oauth/authorize', methods=['GET', 'POST'])
def authorize():
if request.method == 'GET':
return '<form method="post"><button type="submit">Authorize</button></form>'
grant_user = {'user_id': '123'}
return authorization.create_authorization_response(grant_user=grant_user)
@app.route('/oauth/token', methods=['POST'])
def issue_token():
return authorization.create_token_response()
if __name__ == "__main__":
app.run(debug=True)
|
Example 6: Webhook Handler
Scenario: Receive and process incoming webhook events.
| from flask import Flask, request, jsonify
app = Flask(__name__)
@app.route('/webhook', methods=['POST'])
def webhook():
data = request.json
print("Received webhook:", data)
return jsonify({"status": "received"}), 200
if __name__ == "__main__":
app.run(port=5000)
|
Example 7: File Upload Service with Flask
Scenario: Accept file uploads and save them.
| from flask import Flask, request, jsonify
import os
app = Flask(__name__)
UPLOAD_FOLDER = './uploads'
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
@app.route('/upload', methods=['POST'])
def upload_file():
if 'file' not in request.files:
return jsonify({"error": "No file provided"}), 400
file = request.files['file']
file.save(os.path.join(UPLOAD_FOLDER, file.filename))
return jsonify({"status": "success", "filename": file.filename})
if __name__ == "__main__":
app.run(port=5000)
|
Example 8: Chat Server with Flask-SocketIO
Scenario: Simple real-time chat using WebSockets.
| from flask import Flask
from flask_socketio import SocketIO, emit
app = Flask(__name__)
app.config['SECRET_KEY'] = 'secret!'
socketio = SocketIO(app)
@app.route('/')
def index():
return "WebSocket Chat Server Running"
@socketio.on('message')
def handle_message(msg):
print('Received message:', msg)
emit('message', msg, broadcast=True)
if __name__ == '__main__':
socketio.run(app, port=5000)
|
Example 9: RESTful API with FastAPI
Scenario: Create a simple RESTful endpoint.
| from fastapi import FastAPI
app = FastAPI()
@app.get("/items/{item_id}")
async def read_item(item_id: int, q: str = None):
return {"item_id": item_id, "q": q}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
|
Example 10: gRPC Communication in Python
Scenario: Set up a basic gRPC server and client.
example.proto
:
| syntax = "proto3";
service Greeter {
rpc SayHello (HelloRequest) returns (HelloReply) {}
}
message HelloRequest {
string name = 1;
}
message HelloReply {
string message = 1;
}
|
Generate Python code:
| python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. example.proto
|
Server (server.py):
| from concurrent import futures
import grpc
import time
import example_pb2
import example_pb2_grpc
class Greeter(example_pb2_grpc.GreeterServicer):
def SayHello(self, request, context):
return example_pb2.HelloReply(message='Hello, ' + request.name)
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
example_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server)
server.add_insecure_port('[::]:50051')
server.start()
try:
while True:
time.sleep(86400)
except KeyboardInterrupt:
server.stop(0)
|
Client (client.py):
| import grpc
import example_pb2
import example_pb2_grpc
channel = grpc.insecure_channel('localhost:50051')
stub = example_pb2_grpc.GreeterStub(channel)
response = stub.SayHello(example_pb2.HelloRequest(name='World'))
print("Greeter client received: " + response.message)
|
This consolidated Markdown file presents various system design examples with code. Each snippet is encapsulated and ready for use. Further topics can be added similarly as needed.
Collection of System Design Examples (Continued)
Below are 10 more diverse system design examples with code snippets, compiled together.
Example 11: Event-Driven Architecture with Kafka
Scenario: Produce and consume messages using Apache Kafka.
Prerequisites: Install kafka-python
(pip install kafka-python
) and have a Kafka cluster running.
Producer Example
| from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='localhost:9092')
for i in range(5):
producer.send('my-topic', b'Sample message %d' % i)
producer.flush()
|
Consumer Example
| from kafka import KafkaConsumer
consumer = KafkaConsumer('my-topic',
bootstrap_servers='localhost:9092',
auto_offset_reset='earliest')
for message in consumer:
print(f"Received: {message.value.decode()}")
|
Example 12: Simple Event Sourcing Mechanism
Scenario: Record changes as events and rebuild state by replaying them.
| import json
# Event Store (in-memory for demo)
event_store = []
def record_event(event_type, data):
event = {"type": event_type, "data": data}
event_store.append(event)
def replay_events():
state = {}
for event in event_store:
if event["type"] == "user_created":
state[event["data"]["id"]] = event["data"]
elif event["type"] == "user_updated":
state[event["data"]["id"]].update(event["data"])
return state
# Usage
record_event("user_created", {"id": 1, "name": "Alice"})
record_event("user_updated", {"id": 1, "email": "alice@example.com"})
print(replay_events())
|
Example 13: Simple Notification System with WebSockets
Scenario: Notify connected clients in real-time using Flask-SocketIO.
| from flask import Flask
from flask_socketio import SocketIO, emit
app = Flask(__name__)
app.config['SECRET_KEY'] = 'secret!'
socketio = SocketIO(app)
@app.route('/')
def index():
return "Notification Server Running"
@app.route('/notify', methods=['POST'])
def notify():
message = "Notification message" # Typically extracted from request data
socketio.emit('notification', {'data': message})
return "Notification sent!"
if __name__ == '__main__':
socketio.run(app, port=5000)
|
Example 14: Simple URL Shortener
Scenario: Create short URLs mapping to original URLs.
| from flask import Flask, request, redirect, jsonify
import hashlib
app = Flask(__name__)
url_mapping = {}
def shorten_url(original_url):
short_hash = hashlib.sha256(original_url.encode()).hexdigest()[:6]
url_mapping[short_hash] = original_url
return short_hash
@app.route('/shorten', methods=['POST'])
def create_short_url():
data = request.json
original_url = data.get('url')
short_url = shorten_url(original_url)
return jsonify({"short_url": request.host_url + short_url})
@app.route('/<short_url>')
def redirect_url(short_url):
original_url = url_mapping.get(short_url)
if original_url:
return redirect(original_url)
return "URL not found", 404
if __name__ == "__main__":
app.run(port=5000)
|
Example 15: Data Warehouse ETL Pipeline (Simplified)
Scenario: Extract, Transform, and Load data from a source to a target.
| import csv
import sqlite3
# Extract: read CSV file
def extract_data(csv_file):
with open(csv_file, newline='') as f:
reader = csv.DictReader(f)
return list(reader)
# Transform: simple transformation (e.g., converting strings to integers)
def transform_data(data):
for row in data:
row['age'] = int(row['age'])
return data
# Load: insert data into SQLite database
def load_data(data, db_file):
conn = sqlite3.connect(db_file)
cursor = conn.cursor()
cursor.execute('CREATE TABLE IF NOT EXISTS users (id INTEGER, name TEXT, age INTEGER)')
for row in data:
cursor.execute('INSERT INTO users VALUES (?, ?, ?)', (row['id'], row['name'], row['age']))
conn.commit()
conn.close()
# Running the ETL process
data = extract_data('users.csv')
data = transform_data(data)
load_data(data, 'users.db')
|
Example 16: Distributed Lock with Redis
Scenario: Use Redis to implement a simple distributed lock.
| import redis
import time
r = redis.StrictRedis(host='localhost', port=6379, db=0)
def acquire_lock(lock_name, timeout=10):
lock = r.lock(lock_name, timeout=timeout)
acquired = lock.acquire(blocking=True)
return lock if acquired else None
def release_lock(lock):
lock.release()
# Usage
lock = acquire_lock('my_lock')
if lock:
try:
print("Lock acquired, processing critical section.")
# Critical section code
time.sleep(2)
finally:
release_lock(lock)
print("Lock released.")
else:
print("Failed to acquire lock.")
|
Example 17: Simple File Storage Service (S3-like)
Scenario: Upload and download files using Flask.
| from flask import Flask, request, send_from_directory, jsonify
import os
app = Flask(__name__)
UPLOAD_FOLDER = './files'
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
@app.route('/upload', methods=['POST'])
def upload():
file = request.files['file']
file.save(os.path.join(UPLOAD_FOLDER, file.filename))
return jsonify({"message": "File uploaded", "filename": file.filename})
@app.route('/download/<filename>', methods=['GET'])
def download(filename):
return send_from_directory(UPLOAD_FOLDER, filename)
if __name__ == "__main__":
app.run(port=5000)
|
Example 18: Implementing gRPC Streaming (Server-Side)
Scenario: Stream responses from a gRPC server.
streaming.proto:
| syntax = "proto3";
service Streamer {
rpc StreamData(Empty) returns (stream DataChunk) {}
}
message Empty {}
message DataChunk {
string content = 1;
}
|
Generate Python code:
| python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. streaming.proto
|
Server (stream_server.py):
| import time
from concurrent import futures
import grpc
import streaming_pb2
import streaming_pb2_grpc
class StreamerServicer(streaming_pb2_grpc.StreamerServicer):
def StreamData(self, request, context):
for i in range(5):
yield streaming_pb2.DataChunk(content=f"Chunk {i}")
time.sleep(1)
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
streaming_pb2_grpc.add_StreamerServicer_to_server(StreamerServicer(), server)
server.add_insecure_port('[::]:50052')
server.start()
server.wait_for_termination()
|
Client (stream_client.py):
| import grpc
import streaming_pb2
import streaming_pb2_grpc
channel = grpc.insecure_channel('localhost:50052')
stub = streaming_pb2_grpc.StreamerStub(channel)
for chunk in stub.StreamData(streaming_pb2.Empty()):
print("Received:", chunk.content)
|
Example 19: Implementing Feature Flags
Scenario: Toggle features on/off dynamically.
| # Simple in-memory feature flag system
feature_flags = {
"new_dashboard": False,
"beta_feature": True
}
def is_feature_enabled(feature_name):
return feature_flags.get(feature_name, False)
# Usage
if is_feature_enabled("new_dashboard"):
print("Render new dashboard")
else:
print("Render old dashboard")
|
Example 20: Implementing a Distributed Lock Service
Scenario: A more robust distributed lock using Redlock algorithm with Redis.
| import redis
import uuid
import time
class Redlock:
def __init__(self, redis_client, lock_key, ttl=10000):
self.redis = redis_client
self.lock_key = lock_key
self.ttl = ttl
self.lock_value = str(uuid.uuid4())
def acquire(self):
result = self.redis.set(self.lock_key, self.lock_value, nx=True, px=self.ttl)
return result
def release(self):
# Lua script for safe delete
script = """
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end
"""
self.redis.eval(script, 1, self.lock_key, self.lock_value)
r = redis.StrictRedis(host='localhost', port=6379, db=0)
lock = Redlock(r, "resource_lock")
if lock.acquire():
try:
print("Lock acquired, processing resource.")
# Critical resource processing
time.sleep(2)
finally:
lock.release()
print("Lock released.")
else:
print("Could not acquire lock.")
|
| ### Example 21: Cache Eviction Strategy Implementation
**Scenario**: Implement an LRU (Least Recently Used) cache in Python.
```python
from collections import OrderedDict
class LRUCache:
def __init__(self, capacity: int):
self.cache = OrderedDict()
self.capacity = capacity
def get(self, key):
if key not in self.cache:
return -1
# Move key to end to mark as recently used
self.cache.move_to_end(key)
return self.cache[key]
def put(self, key, value):
if key in self.cache:
# Update existing key and mark as recently used
self.cache.move_to_end(key)
self.cache[key] = value
if len(self.cache) > self.capacity:
# Pop the first (least recently used) item
self.cache.popitem(last=False)
# Usage
cache = LRUCache(2)
cache.put(1, 'A')
cache.put(2, 'B')
print(cache.get(1)) # 'A'
cache.put(3, 'C') # Evicts key 2
print(cache.get(2)) # -1 (not found)
|
Example 22: Microservice Communication via gRPC with Load Balancing
Scenario: Implement client-side load balancing among multiple service instances using gRPC.
Setup: Assume multiple Greeter services running on different ports.
| import grpc
import random
import example_pb2
import example_pb2_grpc
# List of available server addresses
server_addresses = ['localhost:50051', 'localhost:50052', 'localhost:50053']
def get_stub():
# Randomly pick a server for each request (simple load balancing)
channel = grpc.insecure_channel(random.choice(server_addresses))
return example_pb2_grpc.GreeterStub(channel)
stub = get_stub()
response = stub.SayHello(example_pb2.HelloRequest(name='LoadBalancedClient'))
print("Response:", response.message)
|
Example 23: Actor Model with Pykka
Scenario: Use the actor model to manage concurrent state.
Prerequisites: Install Pykka
(pip install pykka
).
| import pykka
class CounterActor(pykka.ThreadingActor):
def __init__(self):
super().__init__()
self.count = 0
def on_receive(self, message):
if message.get('cmd') == 'increment':
self.count += 1
return self.count
elif message.get('cmd') == 'get':
return self.count
# Start an actor
counter = CounterActor.start()
# Interact with the actor
future = counter.ask({'cmd': 'increment'})
print("Count after increment:", future)
current = counter.ask({'cmd': 'get'})
print("Current count:", current)
counter.stop()
|
Example 24: Service Discovery with Consul
Scenario: Register and discover services using Consul's HTTP API.
Prerequisites: Consul agent running locally.
| import requests
import time
CONSUL_ADDRESS = 'http://localhost:8500'
def register_service(name, service_id, address, port):
url = f"{CONSUL_ADDRESS}/v1/agent/service/register"
payload = {
"Name": name,
"ID": service_id,
"Address": address,
"Port": port,
"Check": {
"HTTP": f"http://{address}:{port}/health",
"Interval": "10s"
}
}
response = requests.put(url, json=payload)
print("Service registration:", response.status_code)
def discover_service(name):
url = f"{CONSUL_ADDRESS}/v1/catalog/service/{name}"
response = requests.get(url)
services = response.json()
return services
# Register a sample service
register_service("my-service", "my-service-1", "127.0.0.1", 5000)
time.sleep(2) # Wait for registration
# Discover the registered service
services = discover_service("my-service")
print("Discovered services:", services)
|
Example 25: Distributed Task Queue with Redis and RQ
Scenario: Use Redis Queue (RQ) for simple background task processing.
Prerequisites: Install rq
(pip install rq
) and run a Redis server.
tasks.py:
| import time
def background_task(x, y):
time.sleep(5) # Simulate long computation
return x + y
|
enqueue_task.py:
| from redis import Redis
from rq import Queue
from tasks import background_task
# Connect to Redis server
redis_conn = Redis()
q = Queue(connection=redis_conn)
# Enqueue a task
job = q.enqueue(background_task, 10, 20)
print(f"Enqueued job: {job.id}")
# Wait for the job to finish
result = job.result or job.wait(timeout=10)
print("Task result:", result)
|
To process tasks, run an RQ worker in another terminal:
```