Skip to main content

Socket.IO Notification Architecture

This document describes the Socket.IO notification architecture used for real-time updates in the tracking system.

Overview

The tracking system uses Socket.IO to provide real-time updates to clients. This includes:

  • Tracker location updates
  • Status changes
  • Geocoding results
  • Admin-initiated update notifications

The architecture is designed to be resilient, scalable, and to handle the complexities of asynchronous communication between different components of the system.

Architecture Components

1. Socket.IO Server

The Socket.IO server is integrated with the FastAPI application and handles client connections, authentication, and event emission. Key features include:

  • CORS configuration to allow connections from authorized origins
  • JWT-based authentication with fallback to anonymous access
  • Connection management with session tracking
  • Event emission to specific clients or broadcasting to all

2. Redis Integration

Redis serves multiple purposes in the Socket.IO architecture:

  • Pub/Sub Channel: Used for communication between the fetcher service and the Socket.IO server
  • Task Queue: Stores notifications for processing by the main event loop
  • Session Storage: Maintains client session information
  • Cache: Stores geocoding results and other frequently accessed data

3. Two-Stage Notification System

To solve the "no current event loop in thread" error that occurs when trying to emit Socket.IO events from a background thread, we implemented a two-stage notification system:

  1. Stage 1: Redis Subscriber Thread

    • Runs in a separate thread
    • Listens for notifications on the Redis pub/sub channel
    • When a notification is received, it stores it in Redis with a unique key
    • Adds the notification key to a processing queue
  2. Stage 2: Notification Processor Task

    • Runs in the main asyncio event loop
    • Retrieves notification keys from the processing queue
    • Fetches the notification data from Redis
    • Emits the appropriate Socket.IO event to clients

This approach ensures that Socket.IO events are always emitted from the main event loop, avoiding the "no current event loop in thread" error.

Authentication Flow

  1. When a client connects to the Socket.IO server, it includes cookies with its JWT token
  2. The server extracts the token from cookies and verifies it
  3. If the token is valid, the server stores the token data in the client's session
  4. If the token is invalid or missing, the server creates an anonymous session
  5. The client's session includes information about its permissions and client list

Event Types

The system supports several types of Socket.IO events:

Event TypeDescriptionDirection
tracker_location_updatedSent when a tracker's location is updatedServer → Client
tracker_status_changeSent when a tracker's status changesServer → Client
geocoding_queuedSent when a geocoding request is queuedServer → Client
geocoding_resultSent when geocoding is completeServer → Client
geocoding_errorSent when geocoding failsServer → Client
request_geocodingUsed to request geocoding for a locationClient → Server

Implementation Details

Socket.IO Server Initialization

sio = socketio.AsyncServer(
async_mode="asgi",
cors_allowed_origins="*", # Allow all origins for debugging
logger=True,
engineio_logger=True,
ping_timeout=60,
ping_interval=25,
)

socket_app = socketio.ASGIApp(
sio, socketio_path="socket.io", static_files={}
)

Redis Subscriber Implementation

def start_redis_subscriber():
"""Start a Redis subscriber for notifications from the fetcher in a separate thread."""

def redis_listener():
# Create Redis connection
redis_client = redis.Redis(...)

# Subscribe to channel
pubsub = redis_client.pubsub()
pubsub.subscribe("socket:notifications")

# Process messages
for message in pubsub.listen():
if message["type"] != "message":
continue

# Parse notification
notification = json.loads(message["data"])

# Store in Redis for processing
notification_key = f"socket:notification:{datetime.now().timestamp()}"
redis_client.set(notification_key, json.dumps(notification_data), ex=3600)

# Add to processing queue
redis_client.lpush("socket:notification:queue", notification_key)

# Start in background thread
_thread_executor.submit(redis_listener)

Notification Processor Task

async def process_notification_queue():
"""Background task to process notifications from Redis queue"""

# Create Redis connection
redis_client = redis.Redis(...)

while True:
# Get notification key from queue
notification_data = redis_client.brpop("socket:notification:queue", timeout=1)

if notification_data:
# Get notification data
notification_json = redis_client.get(notification_key)
notification_data = json.loads(notification_json)

# Extract details
message_type = notification_data.get("message_type")
notification = notification_data.get("notification")
socket_id = notification_data.get("socket_id")

# Emit event
await emit_socket_event(message_type, notification, socket_id, tracker_id)

# Delete from Redis
redis_client.delete(notification_key)

Client Implementation

On the client side, the Socket.IO connection is established when the application initializes:

// Initialize Socket.IO client
socket = io(serverUrl, {
transports: ["websocket", "polling"],
upgrade: true,
withCredentials: true,
reconnectionAttempts: 5,
reconnectionDelay: 1000,
timeout: 20000,
forceNew: true,
});

// Listen for events
socket.on("tracker_location_updated", (data) => {
// Update UI with new location data
});

socket.on("tracker_status_change", (data) => {
// Update UI with new status
});

Admin Tracker Update Flow

When an admin requests an immediate update of a tracker's location:

  1. The client calls the updateTrackerLocation GraphQL mutation
  2. The server verifies the user has admin privileges
  3. The server publishes a message to the Redis channel tracker:update:requests
  4. The fetcher service processes the request and retrieves the latest data
  5. When complete, the fetcher publishes a notification to the Redis channel socket:notifications
  6. The Redis subscriber thread stores the notification in Redis
  7. The notification processor task emits a tracker_location_updated event to the client
  8. The client updates the UI with the new location data

Error Handling

The Socket.IO architecture includes several error handling mechanisms:

  • Connection errors are logged but don't prevent the application from functioning
  • Failed Socket.IO connections fall back to HTTP polling
  • Notifications are stored in Redis with an expiration time to prevent queue buildup
  • Background tasks are properly cancelled when clients disconnect
  • Errors in the notification processing are caught and logged without affecting other notifications

Performance Considerations

  • Socket.IO connections are kept alive with configurable ping intervals
  • Redis is used as an intermediate store to prevent memory leaks
  • Background tasks use a thread pool to limit resource usage
  • Notifications include only the necessary data to minimize network traffic
  • The system can function without Socket.IO if real-time updates are not critical

Security Considerations

  • All Socket.IO connections require authentication via JWT tokens
  • Admin-only functions verify the user has the appropriate role
  • CORS is configured to allow only authorized origins
  • Sensitive data is filtered before being sent to clients
  • Rate limiting is applied to prevent abuse

Conclusion

The Socket.IO notification architecture provides a robust and scalable solution for real-time updates in the tracking system. By using a two-stage notification system with Redis as an intermediate store, we avoid common pitfalls with asynchronous event emission and ensure reliable delivery of notifications to clients.