TimescaleDB Integration
This document describes how the Tracker application uses TimescaleDB to efficiently store and query time-series location data.
Overview
The Tracker application uses TimescaleDB to efficiently store and query time-series location data. The implementation includes:
- Raw data collection in the
location_reportstable - Continuous aggregates for hourly and daily summaries
- Automatic data retention policies
- GraphQL API for querying aggregated data
Data Flow
The following diagram illustrates how data flows through the system:
- Raw location data is collected every 6 hours into the
location_reportstable via a cron job - TimescaleDB continuous aggregates process this data into:
location_history_hourly(hourly aggregates)location_history_daily(daily aggregates)
- A view called
location_history_viewcombines both aggregates to provide a unified interface - A materialized view called
location_historyprovides fast access to the data - A refresher service listens for changes and refreshes the materialized view
Retention Policies
The following retention policies are configured:
- Raw data (
location_reports): Kept for 2 days - Hourly aggregates (
location_history_hourly): Kept for 24 hours - Daily aggregates (
location_history_daily): Kept for 1 year
This tiered approach ensures efficient storage while maintaining access to historical data.
Geocoding Integration
The system preserves geocoding results across all data layers:
- When geocoding is performed, the
nearest_cityfield is updated in thelocation_reportstable - The continuous aggregates automatically include this field in their aggregations
- The
update_nearest_cityfunction ensures consistency across all layers
GraphQL API
The GraphQL API has been extended to support the TimescaleDB integration:
Queries
locationHistoryAggregated: Retrieves aggregated location data with time bucketingquery {
locationHistoryAggregated(
trackerId: "123"
startTime: "2025-01-01T00:00:00Z"
endTime: "2025-01-31T23:59:59Z"
bucketSize: "1 day" # Options: "1 hour" or "1 day"
) {
id
timestamp
location {
latitude
longitude
}
nearestCity
tracker {
id
name
currentStatus
}
}
}
Mutations
refreshLocationHistoryAggregates: Manually triggers a refresh of the continuous aggregatesmutation {
refreshLocationHistoryAggregates(
trackerId: "123"
startTime: "2025-01-01T00:00:00Z"
endTime: "2025-01-31T23:59:59Z"
)
}
Implementation Details
Database Schema
location_reports: Raw location data with timestamp partitioninglocation_history_hourly: Continuous aggregate with hourly bucketslocation_history_daily: Continuous aggregate with daily bucketslocation_history_view: View combining hourly and daily aggregateslocation_history: Materialized view of the combined data
Continuous Aggregate Policies
- Hourly data is refreshed every hour with a 1-hour lag
- Daily data is refreshed every day with a 1-day lag
Functions
update_nearest_city: Updates the nearest city field across all data layersmigrate_nearest_city_data: Migrates existing nearest city data during setup
Refresher Service
A Python service runs in the background to perform two key functions:
- Listen for PostgreSQL notifications and refresh the materialized view when needed
- Periodically refresh the continuous aggregates (at most once per hour) using Redis-based distributed locking
Notification-Based Refresh
The service listens for PostgreSQL notifications to refresh the materialized view:
# Simplified example of the notification-based refresh
import psycopg
conn = psycopg.connect(database_url, autocommit=True)
cur = conn.cursor()
# Listen for notifications
cur.execute("LISTEN refresh_location_history;")
while True:
notifications = conn.notifies()
if notifications:
for notify in notifications:
print(f"Got notification: {notify.channel}")
# Refresh the materialized view
refresh_conn = psycopg.connect(database_url, autocommit=True)
refresh_cur = refresh_conn.cursor()
refresh_cur.execute("REFRESH MATERIALIZED VIEW CONCURRENTLY location_history;")
refresh_cur.close()
refresh_conn.close()
time.sleep(0.5)
Periodic Continuous Aggregate Refresh
The service also periodically refreshes the continuous aggregates using Redis-based distributed locking:
# Simplified example of the periodic continuous aggregate refresh
import redis
import socket
import datetime
# Redis connection for distributed locking
redis_client = redis.Redis(
host=redis_host,
port=redis_port,
password=redis_password,
decode_responses=True
)
def refresh_continuous_aggregates():
now = time.time()
# Check if an hourly refresh has already happened
last_refresh_key = "location_history:last_aggregate_refresh"
last_refresh_time = redis_client.get(last_refresh_key)
if last_refresh_time and now - float(last_refresh_time) < 3600: # 1 hour in seconds
return # Skip if less than an hour has passed
# Try to acquire a lock with 30-minute expiry
lock_key = "location_history:aggregate_refresh_lock"
lock_value = f"{socket.gethostname()}:{os.getpid()}"
# NX=True means only set if key doesn't exist
# EX=1800 sets expiry to 30 minutes
acquired = redis_client.set(lock_key, lock_value, nx=True, ex=1800)
if acquired:
try:
# Create a separate connection for the refresh operation
refresh_conn = psycopg.connect(database_url, autocommit=True)
refresh_cur = refresh_conn.cursor()
# Set a longer statement timeout for large tables
refresh_cur.execute("SET statement_timeout = '20min';")
# Get the current time in database format
refresh_cur.execute("SELECT NOW();")
current_time = refresh_cur.fetchone()[0]
# Calculate time bounds for refresh
# Refresh data from the last 48 hours to now for hourly aggregate
start_time_hourly = current_time - datetime.timedelta(hours=48)
# Refresh hourly aggregate with time bounds
refresh_cur.execute(
"CALL refresh_continuous_aggregate('location_history_hourly', %s, %s);",
(start_time_hourly, current_time)
)
# Refresh data from the last 7 days to now for daily aggregate
start_time_daily = current_time - datetime.timedelta(days=7)
# Refresh daily aggregate with time bounds
refresh_cur.execute(
"CALL refresh_continuous_aggregate('location_history_daily', %s, %s);",
(start_time_daily, current_time)
)
# Refresh the materialized view
refresh_cur.execute("REFRESH MATERIALIZED VIEW CONCURRENTLY location_history;")
# Update last refresh time in Redis with 2-hour expiry
redis_client.set(last_refresh_key, str(now), ex=7200) # 2 hours
finally:
# Release the lock
redis_client.delete(lock_key)
This approach ensures that:
- The materialized view is refreshed immediately when new data is added
- The continuous aggregates are refreshed at most once per hour
- Only one instance performs the refresh at a time, preventing resource contention
- Time-bounded refreshes optimize performance for large tables
Cron Job
A cron job runs every 6 hours to import new location data:
0 */6 * * * cd /path/to/tracker-graphql-2 && ./app/src/import_location_reports.py >> ./logs/location_reports_import.log 2>&1
Performance Considerations
- Queries for recent data (last 24 hours) use hourly aggregates
- Queries for older data use daily aggregates
- Raw data is automatically compressed and eventually removed
- Indexes are optimized for common query patterns
Installation
1. Run the Alembic Migration
The migration will:
- Back up the current
location_historytable - Create the materialized view
- Set up triggers and functions for automatic refreshing
# Navigate to your project directory
cd /path/to/tracker-graphql-2
# Run the migration
alembic upgrade head
2. Install the Refresher Service
The refresher service listens for PostgreSQL notifications and refreshes the materialized view when needed. It also periodically refreshes the continuous aggregates.
Using Systemd
# Make the script executable
chmod +x app/src/location_history_refresher.py
# Copy the systemd service file to the systemd directory
sudo cp systemd/location-history-refresher.service /etc/systemd/system/
# Reload systemd to recognize the new service
sudo systemctl daemon-reload
# Enable and start the service
sudo systemctl enable location-history-refresher
sudo systemctl start location-history-refresher
Using Docker Compose
The service can also be run using Docker Compose:
# Navigate to the fetcher directory
cd fetcher
# Start the services
docker-compose up -d
This will start both the location-history-refresher and tracker-report-fetcher services.
Troubleshooting
Service Won't Start
Check the logs:
journalctl -u location-history-refresher -f
Common issues:
- Python path incorrect in the service file
- Database connection issues
- Permission problems
View Not Refreshing
Notification Issues
Check that the notifications are being sent:
-- In PostgreSQL
SELECT pg_notify('refresh_location_history', 'test');
Then check the service logs to see if it received the notification:
journalctl -u location-history-refresher -f
Continuous Aggregate Issues
If the continuous aggregates aren't being refreshed:
-
Check if Redis is running and accessible:
redis-cli ping -
Check the Redis keys related to the refresh lock:
redis-cli keys "location_history:*" -
Manually refresh the continuous aggregates:
-- In PostgreSQL
CALL refresh_continuous_aggregate('location_history_hourly', NOW() - INTERVAL '48 hours', NOW());
CALL refresh_continuous_aggregate('location_history_daily', NOW() - INTERVAL '7 days', NOW());
Manual Testing
To manually test the system:
Individual Record Testing
- Insert a new record into
location_reports - Refresh the continuous aggregates
- Check if the materialized view was automatically refreshed
-- Insert test data
INSERT INTO location_reports (hashed_adv_key, timestamp, location, confidence)
VALUES ('test_key', NOW(), ST_SetSRID(ST_MakePoint(-0.1278, 51.5074), 4326), 100);
-- Manually refresh the continuous aggregates
CALL refresh_continuous_aggregate('location_history_hourly', NOW() - INTERVAL '1 hour', NOW() + INTERVAL '1 hour');
-- Check if the data appears in the materialized view
SELECT * FROM location_history ORDER BY timestamp DESC LIMIT 10;
Bulk Testing with Mock Data
For more comprehensive testing with mock data, use the provided script:
-- Run the mock data script for European cities
\i sql/mock_location_reports_europe.sql
-- After a short delay, verify that the data appears in location_history
SELECT
t.name as tracker_name,
lh.timestamp,
ST_X(lh.location::geometry) as longitude,
ST_Y(lh.location::geometry) as latitude,
lh.nearest_city
FROM location_history lh
JOIN trackers t ON t.id = lh.tracker_id
ORDER BY lh.timestamp DESC
LIMIT 5;
This script:
- Finds trackers without location history
- Assigns random European city locations to these trackers
- Inserts the data into
location_reports - Lets the TimescaleDB continuous aggregates and triggers populate
location_history
For more details on mock data generation, see the Mock Location Data guide.