Socket.IO Notification Architecture
This document describes the Socket.IO notification architecture used for real-time updates in the tracking system.
Overview
The tracking system uses Socket.IO to provide real-time updates to clients. This includes:
- Tracker location updates
- Status changes
- Geocoding results
- Admin-initiated update notifications
The architecture is designed to be resilient, scalable, and to handle the complexities of asynchronous communication between different components of the system.
Architecture Components
1. Socket.IO Server
The Socket.IO server is integrated with the FastAPI application and handles client connections, authentication, and event emission. Key features include:
- CORS configuration to allow connections from authorized origins
- JWT-based authentication with fallback to anonymous access
- Connection management with session tracking
- Event emission to specific clients or broadcasting to all
2. Redis Integration
Redis serves multiple purposes in the Socket.IO architecture:
- Pub/Sub Channel: Used for communication between the fetcher service and the Socket.IO server
- Task Queue: Stores notifications for processing by the main event loop
- Session Storage: Maintains client session information
- Cache: Stores geocoding results and other frequently accessed data
3. Two-Stage Notification System
To solve the "no current event loop in thread" error that occurs when trying to emit Socket.IO events from a background thread, we implemented a two-stage notification system:
-
Stage 1: Redis Subscriber Thread
- Runs in a separate thread
- Listens for notifications on the Redis pub/sub channel
- When a notification is received, it stores it in Redis with a unique key
- Adds the notification key to a processing queue
-
Stage 2: Notification Processor Task
- Runs in the main asyncio event loop
- Retrieves notification keys from the processing queue
- Fetches the notification data from Redis
- Emits the appropriate Socket.IO event to clients
This approach ensures that Socket.IO events are always emitted from the main event loop, avoiding the "no current event loop in thread" error.
Authentication Flow
- When a client connects to the Socket.IO server, it includes cookies with its JWT token
- The server extracts the token from cookies and verifies it
- If the token is valid, the server stores the token data in the client's session
- If the token is invalid or missing, the server creates an anonymous session
- The client's session includes information about its permissions and client list
Event Types
The system supports several types of Socket.IO events:
| Event Type | Description | Direction |
|---|---|---|
tracker_location_updated | Sent when a tracker's location is updated | Server → Client |
tracker_status_change | Sent when a tracker's status changes | Server → Client |
geocoding_queued | Sent when a geocoding request is queued | Server → Client |
geocoding_result | Sent when geocoding is complete | Server → Client |
geocoding_error | Sent when geocoding fails | Server → Client |
request_geocoding | Used to request geocoding for a location | Client → Server |
Implementation Details
Socket.IO Server Initialization
sio = socketio.AsyncServer(
async_mode="asgi",
cors_allowed_origins="*", # Allow all origins for debugging
logger=True,
engineio_logger=True,
ping_timeout=60,
ping_interval=25,
)
socket_app = socketio.ASGIApp(
sio, socketio_path="socket.io", static_files={}
)
Redis Subscriber Implementation
def start_redis_subscriber():
"""Start a Redis subscriber for notifications from the fetcher in a separate thread."""
def redis_listener():
# Create Redis connection
redis_client = redis.Redis(...)
# Subscribe to channel
pubsub = redis_client.pubsub()
pubsub.subscribe("socket:notifications")
# Process messages
for message in pubsub.listen():
if message["type"] != "message":
continue
# Parse notification
notification = json.loads(message["data"])
# Store in Redis for processing
notification_key = f"socket:notification:{datetime.now().timestamp()}"
redis_client.set(notification_key, json.dumps(notification_data), ex=3600)
# Add to processing queue
redis_client.lpush("socket:notification:queue", notification_key)
# Start in background thread
_thread_executor.submit(redis_listener)
Notification Processor Task
async def process_notification_queue():
"""Background task to process notifications from Redis queue"""
# Create Redis connection
redis_client = redis.Redis(...)
while True:
# Get notification key from queue
notification_data = redis_client.brpop("socket:notification:queue", timeout=1)
if notification_data:
# Get notification data
notification_json = redis_client.get(notification_key)
notification_data = json.loads(notification_json)
# Extract details
message_type = notification_data.get("message_type")
notification = notification_data.get("notification")
socket_id = notification_data.get("socket_id")
# Emit event
await emit_socket_event(message_type, notification, socket_id, tracker_id)
# Delete from Redis
redis_client.delete(notification_key)
Client Implementation
On the client side, the Socket.IO connection is established when the application initializes:
// Initialize Socket.IO client
socket = io(serverUrl, {
transports: ["websocket", "polling"],
upgrade: true,
withCredentials: true,
reconnectionAttempts: 5,
reconnectionDelay: 1000,
timeout: 20000,
forceNew: true,
});
// Listen for events
socket.on("tracker_location_updated", (data) => {
// Update UI with new location data
});
socket.on("tracker_status_change", (data) => {
// Update UI with new status
});
Admin Tracker Update Flow
When an admin requests an immediate update of a tracker's location:
- The client calls the
updateTrackerLocationGraphQL mutation - The server verifies the user has admin privileges
- The server publishes a message to the Redis channel
tracker:update:requests - The fetcher service processes the request and retrieves the latest data
- When complete, the fetcher publishes a notification to the Redis channel
socket:notifications - The Redis subscriber thread stores the notification in Redis
- The notification processor task emits a
tracker_location_updatedevent to the client - The client updates the UI with the new location data
Error Handling
The Socket.IO architecture includes several error handling mechanisms:
- Connection errors are logged but don't prevent the application from functioning
- Failed Socket.IO connections fall back to HTTP polling
- Notifications are stored in Redis with an expiration time to prevent queue buildup
- Background tasks are properly cancelled when clients disconnect
- Errors in the notification processing are caught and logged without affecting other notifications
Performance Considerations
- Socket.IO connections are kept alive with configurable ping intervals
- Redis is used as an intermediate store to prevent memory leaks
- Background tasks use a thread pool to limit resource usage
- Notifications include only the necessary data to minimize network traffic
- The system can function without Socket.IO if real-time updates are not critical
Security Considerations
- All Socket.IO connections require authentication via JWT tokens
- Admin-only functions verify the user has the appropriate role
- CORS is configured to allow only authorized origins
- Sensitive data is filtered before being sent to clients
- Rate limiting is applied to prevent abuse
Conclusion
The Socket.IO notification architecture provides a robust and scalable solution for real-time updates in the tracking system. By using a two-stage notification system with Redis as an intermediate store, we avoid common pitfalls with asynchronous event emission and ensure reliable delivery of notifications to clients.