Skip to main content

TimescaleDB Integration

This document describes how the Tracker application uses TimescaleDB to efficiently store and query time-series location data.

Overview

The Tracker application uses TimescaleDB to efficiently store and query time-series location data. The implementation includes:

  1. Raw data collection in the location_reports table
  2. Continuous aggregates for hourly and daily summaries
  3. Automatic data retention policies
  4. GraphQL API for querying aggregated data

Data Flow

The following diagram illustrates how data flows through the system:

  1. Raw location data is collected every 6 hours into the location_reports table via a cron job
  2. TimescaleDB continuous aggregates process this data into:
    • location_history_hourly (hourly aggregates)
    • location_history_daily (daily aggregates)
  3. A view called location_history_view combines both aggregates to provide a unified interface
  4. A materialized view called location_history provides fast access to the data
  5. A refresher service listens for changes and refreshes the materialized view

Retention Policies

The following retention policies are configured:

  • Raw data (location_reports): Kept for 2 days
  • Hourly aggregates (location_history_hourly): Kept for 24 hours
  • Daily aggregates (location_history_daily): Kept for 1 year

This tiered approach ensures efficient storage while maintaining access to historical data.

Geocoding Integration

The system preserves geocoding results across all data layers:

  1. When geocoding is performed, the nearest_city field is updated in the location_reports table
  2. The continuous aggregates automatically include this field in their aggregations
  3. The update_nearest_city function ensures consistency across all layers

GraphQL API

The GraphQL API has been extended to support the TimescaleDB integration:

Queries

  • locationHistoryAggregated: Retrieves aggregated location data with time bucketing
    query {
    locationHistoryAggregated(
    trackerId: "123"
    startTime: "2025-01-01T00:00:00Z"
    endTime: "2025-01-31T23:59:59Z"
    bucketSize: "1 day" # Options: "1 hour" or "1 day"
    ) {
    id
    timestamp
    location {
    latitude
    longitude
    }
    nearestCity
    tracker {
    id
    name
    currentStatus
    }
    }
    }

Mutations

  • refreshLocationHistoryAggregates: Manually triggers a refresh of the continuous aggregates
    mutation {
    refreshLocationHistoryAggregates(
    trackerId: "123"
    startTime: "2025-01-01T00:00:00Z"
    endTime: "2025-01-31T23:59:59Z"
    )
    }

Implementation Details

Database Schema

  • location_reports: Raw location data with timestamp partitioning
  • location_history_hourly: Continuous aggregate with hourly buckets
  • location_history_daily: Continuous aggregate with daily buckets
  • location_history_view: View combining hourly and daily aggregates
  • location_history: Materialized view of the combined data

Continuous Aggregate Policies

  • Hourly data is refreshed every hour with a 1-hour lag
  • Daily data is refreshed every day with a 1-day lag

Functions

  • update_nearest_city: Updates the nearest city field across all data layers
  • migrate_nearest_city_data: Migrates existing nearest city data during setup

Refresher Service

A Python service runs in the background to perform two key functions:

  1. Listen for PostgreSQL notifications and refresh the materialized view when needed
  2. Periodically refresh the continuous aggregates (at most once per hour) using Redis-based distributed locking

Notification-Based Refresh

The service listens for PostgreSQL notifications to refresh the materialized view:

# Simplified example of the notification-based refresh
import psycopg

conn = psycopg.connect(database_url, autocommit=True)
cur = conn.cursor()

# Listen for notifications
cur.execute("LISTEN refresh_location_history;")

while True:
notifications = conn.notifies()
if notifications:
for notify in notifications:
print(f"Got notification: {notify.channel}")

# Refresh the materialized view
refresh_conn = psycopg.connect(database_url, autocommit=True)
refresh_cur = refresh_conn.cursor()
refresh_cur.execute("REFRESH MATERIALIZED VIEW CONCURRENTLY location_history;")
refresh_cur.close()
refresh_conn.close()

time.sleep(0.5)

Periodic Continuous Aggregate Refresh

The service also periodically refreshes the continuous aggregates using Redis-based distributed locking:

# Simplified example of the periodic continuous aggregate refresh
import redis
import socket
import datetime

# Redis connection for distributed locking
redis_client = redis.Redis(
host=redis_host,
port=redis_port,
password=redis_password,
decode_responses=True
)

def refresh_continuous_aggregates():
now = time.time()

# Check if an hourly refresh has already happened
last_refresh_key = "location_history:last_aggregate_refresh"
last_refresh_time = redis_client.get(last_refresh_key)

if last_refresh_time and now - float(last_refresh_time) < 3600: # 1 hour in seconds
return # Skip if less than an hour has passed

# Try to acquire a lock with 30-minute expiry
lock_key = "location_history:aggregate_refresh_lock"
lock_value = f"{socket.gethostname()}:{os.getpid()}"

# NX=True means only set if key doesn't exist
# EX=1800 sets expiry to 30 minutes
acquired = redis_client.set(lock_key, lock_value, nx=True, ex=1800)

if acquired:
try:
# Create a separate connection for the refresh operation
refresh_conn = psycopg.connect(database_url, autocommit=True)
refresh_cur = refresh_conn.cursor()

# Set a longer statement timeout for large tables
refresh_cur.execute("SET statement_timeout = '20min';")

# Get the current time in database format
refresh_cur.execute("SELECT NOW();")
current_time = refresh_cur.fetchone()[0]

# Calculate time bounds for refresh
# Refresh data from the last 48 hours to now for hourly aggregate
start_time_hourly = current_time - datetime.timedelta(hours=48)

# Refresh hourly aggregate with time bounds
refresh_cur.execute(
"CALL refresh_continuous_aggregate('location_history_hourly', %s, %s);",
(start_time_hourly, current_time)
)

# Refresh data from the last 7 days to now for daily aggregate
start_time_daily = current_time - datetime.timedelta(days=7)

# Refresh daily aggregate with time bounds
refresh_cur.execute(
"CALL refresh_continuous_aggregate('location_history_daily', %s, %s);",
(start_time_daily, current_time)
)

# Refresh the materialized view
refresh_cur.execute("REFRESH MATERIALIZED VIEW CONCURRENTLY location_history;")

# Update last refresh time in Redis with 2-hour expiry
redis_client.set(last_refresh_key, str(now), ex=7200) # 2 hours
finally:
# Release the lock
redis_client.delete(lock_key)

This approach ensures that:

  1. The materialized view is refreshed immediately when new data is added
  2. The continuous aggregates are refreshed at most once per hour
  3. Only one instance performs the refresh at a time, preventing resource contention
  4. Time-bounded refreshes optimize performance for large tables

Cron Job

A cron job runs every 6 hours to import new location data:

0 */6 * * * cd /path/to/tracker-graphql-2 && ./app/src/import_location_reports.py >> ./logs/location_reports_import.log 2>&1

Performance Considerations

  • Queries for recent data (last 24 hours) use hourly aggregates
  • Queries for older data use daily aggregates
  • Raw data is automatically compressed and eventually removed
  • Indexes are optimized for common query patterns

Installation

1. Run the Alembic Migration

The migration will:

  • Back up the current location_history table
  • Create the materialized view
  • Set up triggers and functions for automatic refreshing
# Navigate to your project directory
cd /path/to/tracker-graphql-2

# Run the migration
alembic upgrade head

2. Install the Refresher Service

The refresher service listens for PostgreSQL notifications and refreshes the materialized view when needed. It also periodically refreshes the continuous aggregates.

Using Systemd

# Make the script executable
chmod +x app/src/location_history_refresher.py

# Copy the systemd service file to the systemd directory
sudo cp systemd/location-history-refresher.service /etc/systemd/system/

# Reload systemd to recognize the new service
sudo systemctl daemon-reload

# Enable and start the service
sudo systemctl enable location-history-refresher
sudo systemctl start location-history-refresher

Using Docker Compose

The service can also be run using Docker Compose:

# Navigate to the fetcher directory
cd fetcher

# Start the services
docker-compose up -d

This will start both the location-history-refresher and tracker-report-fetcher services.

Troubleshooting

Service Won't Start

Check the logs:

journalctl -u location-history-refresher -f

Common issues:

  • Python path incorrect in the service file
  • Database connection issues
  • Permission problems

View Not Refreshing

Notification Issues

Check that the notifications are being sent:

-- In PostgreSQL
SELECT pg_notify('refresh_location_history', 'test');

Then check the service logs to see if it received the notification:

journalctl -u location-history-refresher -f

Continuous Aggregate Issues

If the continuous aggregates aren't being refreshed:

  1. Check if Redis is running and accessible:

    redis-cli ping
  2. Check the Redis keys related to the refresh lock:

    redis-cli keys "location_history:*"
  3. Manually refresh the continuous aggregates:

    -- In PostgreSQL
    CALL refresh_continuous_aggregate('location_history_hourly', NOW() - INTERVAL '48 hours', NOW());
    CALL refresh_continuous_aggregate('location_history_daily', NOW() - INTERVAL '7 days', NOW());

Manual Testing

To manually test the system:

Individual Record Testing

  1. Insert a new record into location_reports
  2. Refresh the continuous aggregates
  3. Check if the materialized view was automatically refreshed
-- Insert test data
INSERT INTO location_reports (hashed_adv_key, timestamp, location, confidence)
VALUES ('test_key', NOW(), ST_SetSRID(ST_MakePoint(-0.1278, 51.5074), 4326), 100);

-- Manually refresh the continuous aggregates
CALL refresh_continuous_aggregate('location_history_hourly', NOW() - INTERVAL '1 hour', NOW() + INTERVAL '1 hour');

-- Check if the data appears in the materialized view
SELECT * FROM location_history ORDER BY timestamp DESC LIMIT 10;

Bulk Testing with Mock Data

For more comprehensive testing with mock data, use the provided script:

-- Run the mock data script for European cities
\i sql/mock_location_reports_europe.sql

-- After a short delay, verify that the data appears in location_history
SELECT
t.name as tracker_name,
lh.timestamp,
ST_X(lh.location::geometry) as longitude,
ST_Y(lh.location::geometry) as latitude,
lh.nearest_city
FROM location_history lh
JOIN trackers t ON t.id = lh.tracker_id
ORDER BY lh.timestamp DESC
LIMIT 5;

This script:

  1. Finds trackers without location history
  2. Assigns random European city locations to these trackers
  3. Inserts the data into location_reports
  4. Lets the TimescaleDB continuous aggregates and triggers populate location_history

For more details on mock data generation, see the Mock Location Data guide.