Socket.IO Architecture
The Tracker GraphQL API uses Socket.IO for real-time communication, enabling live updates and event-driven interactions.
Overview
Connection Management
Socket.IO Server Setup
from socketio import AsyncServer
from .auth import validate_token
# Basic URL-based configuration
sio = AsyncServer(
async_mode='asgi',
cors_allowed_origins='*',
client_manager=RedisManager(REDIS_URL)
)
# Advanced configuration with TLS and username support
sio = AsyncServer(
async_mode='asgi',
cors_allowed_origins='*',
client_manager=RedisManager(
host=REDIS_HOST,
port=REDIS_PORT,
username=REDIS_USERNAME,
password=REDIS_PASSWORD,
ssl=REDIS_SSL,
ssl_cert_reqs=ssl.CERT_REQUIRED if REDIS_SSL else None,
db=REDIS_DB
)
)
@sio.on('connect')
async def handle_connect(sid, environ):
"""Handle new socket connections."""
session = {
'sid': sid,
'connected_at': time.time(),
'client_list': []
}
await sio.save_session(sid, session)
Authentication
@sio.on('authenticate')
async def handle_auth(sid, token):
"""Authenticate socket connection with JWT."""
try:
payload = validate_token(token)
session = await sio.get_session(sid)
session['authenticated'] = True
session['client_list'] = payload['client_list']
await sio.save_session(sid, session)
return {'status': 'authenticated'}
except Exception as e:
await sio.disconnect(sid)
return {'status': 'error', 'message': str(e)}
Event Handling
Subscription Management
@sio.on('subscribe')
async def handle_subscribe(sid, data):
"""Handle subscription to specific updates."""
session = await sio.get_session(sid)
if not session.get('authenticated'):
return {'status': 'error', 'message': 'Not authenticated'}
subscription = {
'type': data['type'],
'filters': data.get('filters', {}),
'client_list': session['client_list']
}
# Use key with hash tag for Redis Cluster compatibility
await redis.sadd(f'{{socket}}:subscriptions:{sid}', json.dumps(subscription))
return {'status': 'subscribed'}
Event Broadcasting
async def broadcast_update(event_type: str, data: dict, client_list: List[str]):
"""Broadcast updates to relevant clients."""
rooms = await get_subscribed_rooms(event_type, client_list)
for room in rooms:
if await should_receive_update(room, data):
await sio.emit(event_type, data, room=room)
Redis Integration
Redis Connection Types
The application supports different Redis deployment types:
Standalone Redis
# Standard Redis connection
redis_client = redis.Redis(
host=REDIS_HOST,
port=REDIS_PORT,
username=REDIS_USERNAME,
password=REDIS_PASSWORD,
ssl=REDIS_SSL,
db=REDIS_DB
)
Redis Cluster (AWS MemoryDB)
# Redis Cluster connection (for AWS MemoryDB)
redis_client = redis.RedisCluster(
host=REDIS_HOST, # e.g., clustercfg.tracker.xxxxx.memorydb.region.amazonaws.com
port=REDIS_PORT,
username=REDIS_USERNAME,
password=REDIS_PASSWORD,
ssl=True # TLS is required for AWS MemoryDB
)
For more details on Redis Cluster support, including how we solved the CROSSSLOT error and ensured backward compatibility with standalone Redis, see Redis Cluster Support.
Session Storage
class RedisSessionStore:
"""Store socket session data in Redis."""
async def save_session(self, sid: str, session: dict):
"""Save session data with expiration."""
# Use hash tags for Redis Cluster compatibility
key = f'{{socket}}:session:{sid}'
await redis.setex(key, SESSION_TTL, json.dumps(session))
async def get_session(self, sid: str) -> dict:
"""Retrieve session data."""
# Use hash tags for Redis Cluster compatibility
key = f'{{socket}}:session:{sid}'
data = await redis.get(key)
return json.loads(data) if data else {}
Event Publishing
async def publish_event(event_type: str, data: dict):
"""Publish events to Redis for Socket.IO distribution."""
message = {
'type': event_type,
'data': data,
'timestamp': time.time()
}
# Use channel name with hash tag for Redis Cluster compatibility
await redis.publish('{socket}:events', json.dumps(message))
Background Processing
Event Processing
async def process_location_update(location: dict):
"""Process and broadcast location updates."""
# Update database
await db.save_location(location)
# Invalidate cache (with hash tag for Redis Cluster compatibility)
await cache.invalidate(f"{{location}}:{location['tracker_id']}")
# Broadcast to relevant clients
await broadcast_update('location_update', location, location['client_list'])
Queue Management
class QueueManager:
"""Manage background processing queues."""
async def add_task(self, task_type: str, data: dict):
"""Add task to processing queue."""
task = {
'id': str(uuid.uuid4()),
'type': task_type,
'data': data,
'created_at': time.time()
}
# Use key with hash tag for Redis Cluster compatibility
await redis.lpush('{socket}:task_queue', json.dumps(task))
async def process_tasks(self):
"""Process tasks from queue."""
while True:
# Use key with hash tag for Redis Cluster compatibility
task = await redis.brpop('{socket}:task_queue')
if task:
await process_task(json.loads(task[1]))
Error Handling
Connection Errors
@sio.on_error()
async def error_handler(sid, error):
"""Handle Socket.IO errors."""
logger.error(f"Socket error for {sid}: {error}")
await sio.emit('error', {'message': str(error)}, room=sid)
Reconnection Logic
@sio.on('disconnect')
async def handle_disconnect(sid):
"""Clean up on client disconnect."""
await cleanup_session(sid)
await remove_subscriptions(sid)
Monitoring
Health Checks
async def check_socket_health():
"""Monitor Socket.IO server health."""
metrics = {
'connected_clients': len(await sio.manager.get_participants()),
'rooms': len(await sio.manager.get_rooms()),
'events_per_second': await get_event_rate()
}
await update_metrics(metrics)
Performance Metrics
SOCKET_METRICS = {
'connections': Counter('socket_connections_total', 'Total connections'),
'messages': Counter('socket_messages_total', 'Total messages'),
'errors': Counter('socket_errors_total', 'Total errors'),
'latency': Histogram('socket_latency_seconds', 'Message latency')
}
Best Practices
-
Connection Management
- Implement heartbeat mechanism
- Handle reconnection gracefully
- Clean up stale connections
-
Event Handling
- Validate event data
- Rate limit events
- Handle backpressure
-
Security
- Authenticate all connections
- Validate client permissions
- Sanitize event data
-
Performance
- Use room-based broadcasting
- Batch updates when possible
- Implement message queuing
-
Redis Cluster Compatibility
- Use hash tags in key names (e.g.,
{prefix}:key) - Ensure related keys use the same hash tag
- Use RedisCluster client for AWS MemoryDB
- Enable TLS for secure connections
- Use hash tags in key names (e.g.,