Skip to main content

Socket.IO Architecture

The Tracker GraphQL API uses Socket.IO for real-time communication, enabling live updates and event-driven interactions.

Overview

Connection Management

Socket.IO Server Setup

from socketio import AsyncServer
from .auth import validate_token

# Basic URL-based configuration
sio = AsyncServer(
async_mode='asgi',
cors_allowed_origins='*',
client_manager=RedisManager(REDIS_URL)
)

# Advanced configuration with TLS and username support
sio = AsyncServer(
async_mode='asgi',
cors_allowed_origins='*',
client_manager=RedisManager(
host=REDIS_HOST,
port=REDIS_PORT,
username=REDIS_USERNAME,
password=REDIS_PASSWORD,
ssl=REDIS_SSL,
ssl_cert_reqs=ssl.CERT_REQUIRED if REDIS_SSL else None,
db=REDIS_DB
)
)

@sio.on('connect')
async def handle_connect(sid, environ):
"""Handle new socket connections."""
session = {
'sid': sid,
'connected_at': time.time(),
'client_list': []
}
await sio.save_session(sid, session)

Authentication

@sio.on('authenticate')
async def handle_auth(sid, token):
"""Authenticate socket connection with JWT."""
try:
payload = validate_token(token)
session = await sio.get_session(sid)
session['authenticated'] = True
session['client_list'] = payload['client_list']
await sio.save_session(sid, session)
return {'status': 'authenticated'}
except Exception as e:
await sio.disconnect(sid)
return {'status': 'error', 'message': str(e)}

Event Handling

Subscription Management

@sio.on('subscribe')
async def handle_subscribe(sid, data):
"""Handle subscription to specific updates."""
session = await sio.get_session(sid)
if not session.get('authenticated'):
return {'status': 'error', 'message': 'Not authenticated'}

subscription = {
'type': data['type'],
'filters': data.get('filters', {}),
'client_list': session['client_list']
}

# Use key with hash tag for Redis Cluster compatibility
await redis.sadd(f'{{socket}}:subscriptions:{sid}', json.dumps(subscription))
return {'status': 'subscribed'}

Event Broadcasting

async def broadcast_update(event_type: str, data: dict, client_list: List[str]):
"""Broadcast updates to relevant clients."""
rooms = await get_subscribed_rooms(event_type, client_list)

for room in rooms:
if await should_receive_update(room, data):
await sio.emit(event_type, data, room=room)

Redis Integration

Redis Connection Types

The application supports different Redis deployment types:

Standalone Redis

# Standard Redis connection
redis_client = redis.Redis(
host=REDIS_HOST,
port=REDIS_PORT,
username=REDIS_USERNAME,
password=REDIS_PASSWORD,
ssl=REDIS_SSL,
db=REDIS_DB
)

Redis Cluster (AWS MemoryDB)

# Redis Cluster connection (for AWS MemoryDB)
redis_client = redis.RedisCluster(
host=REDIS_HOST, # e.g., clustercfg.tracker.xxxxx.memorydb.region.amazonaws.com
port=REDIS_PORT,
username=REDIS_USERNAME,
password=REDIS_PASSWORD,
ssl=True # TLS is required for AWS MemoryDB
)

For more details on Redis Cluster support, including how we solved the CROSSSLOT error and ensured backward compatibility with standalone Redis, see Redis Cluster Support.

Session Storage

class RedisSessionStore:
"""Store socket session data in Redis."""

async def save_session(self, sid: str, session: dict):
"""Save session data with expiration."""
# Use hash tags for Redis Cluster compatibility
key = f'{{socket}}:session:{sid}'
await redis.setex(key, SESSION_TTL, json.dumps(session))

async def get_session(self, sid: str) -> dict:
"""Retrieve session data."""
# Use hash tags for Redis Cluster compatibility
key = f'{{socket}}:session:{sid}'
data = await redis.get(key)
return json.loads(data) if data else {}

Event Publishing

async def publish_event(event_type: str, data: dict):
"""Publish events to Redis for Socket.IO distribution."""
message = {
'type': event_type,
'data': data,
'timestamp': time.time()
}
# Use channel name with hash tag for Redis Cluster compatibility
await redis.publish('{socket}:events', json.dumps(message))

Background Processing

Event Processing

async def process_location_update(location: dict):
"""Process and broadcast location updates."""
# Update database
await db.save_location(location)

# Invalidate cache (with hash tag for Redis Cluster compatibility)
await cache.invalidate(f"{{location}}:{location['tracker_id']}")

# Broadcast to relevant clients
await broadcast_update('location_update', location, location['client_list'])

Queue Management

class QueueManager:
"""Manage background processing queues."""

async def add_task(self, task_type: str, data: dict):
"""Add task to processing queue."""
task = {
'id': str(uuid.uuid4()),
'type': task_type,
'data': data,
'created_at': time.time()
}
# Use key with hash tag for Redis Cluster compatibility
await redis.lpush('{socket}:task_queue', json.dumps(task))

async def process_tasks(self):
"""Process tasks from queue."""
while True:
# Use key with hash tag for Redis Cluster compatibility
task = await redis.brpop('{socket}:task_queue')
if task:
await process_task(json.loads(task[1]))

Error Handling

Connection Errors

@sio.on_error()
async def error_handler(sid, error):
"""Handle Socket.IO errors."""
logger.error(f"Socket error for {sid}: {error}")
await sio.emit('error', {'message': str(error)}, room=sid)

Reconnection Logic

@sio.on('disconnect')
async def handle_disconnect(sid):
"""Clean up on client disconnect."""
await cleanup_session(sid)
await remove_subscriptions(sid)

Monitoring

Health Checks

async def check_socket_health():
"""Monitor Socket.IO server health."""
metrics = {
'connected_clients': len(await sio.manager.get_participants()),
'rooms': len(await sio.manager.get_rooms()),
'events_per_second': await get_event_rate()
}
await update_metrics(metrics)

Performance Metrics

SOCKET_METRICS = {
'connections': Counter('socket_connections_total', 'Total connections'),
'messages': Counter('socket_messages_total', 'Total messages'),
'errors': Counter('socket_errors_total', 'Total errors'),
'latency': Histogram('socket_latency_seconds', 'Message latency')
}

Best Practices

  1. Connection Management

    • Implement heartbeat mechanism
    • Handle reconnection gracefully
    • Clean up stale connections
  2. Event Handling

    • Validate event data
    • Rate limit events
    • Handle backpressure
  3. Security

    • Authenticate all connections
    • Validate client permissions
    • Sanitize event data
  4. Performance

    • Use room-based broadcasting
    • Batch updates when possible
    • Implement message queuing
  5. Redis Cluster Compatibility

    • Use hash tags in key names (e.g., {prefix}:key)
    • Ensure related keys use the same hash tag
    • Use RedisCluster client for AWS MemoryDB
    • Enable TLS for secure connections