Files
n8n-mcp/supabase-telemetry-aggregation.sql
czlonkowski 1d34ad81d5 feat: implement session persistence for v2.19.0 (Phase 1 + Phase 2)
Phase 1 - Lazy Session Restoration (REQ-1, REQ-2, REQ-8):
- Add onSessionNotFound hook for restoring sessions from external storage
- Implement idempotent session creation to prevent race conditions
- Add session ID validation for security (prevent injection attacks)
- Comprehensive error handling (400/408/500 status codes)
- 13 integration tests covering all scenarios

Phase 2 - Session Management API (REQ-5):
- getActiveSessions(): Get all active session IDs
- getSessionState(sessionId): Get session state for persistence
- getAllSessionStates(): Bulk session state retrieval
- restoreSession(sessionId, context): Manual session restoration
- deleteSession(sessionId): Manual session termination
- 21 unit tests covering all API methods

Benefits:
- Sessions survive container restarts
- Horizontal scaling support (no session stickiness needed)
- Zero-downtime deployments
- 100% backwards compatible

Implementation Details:
- Backend methods in http-server-single-session.ts
- Public API methods in mcp-engine.ts
- SessionState type exported from index.ts
- Synchronous session creation and deletion for reliable testing
- Version updated from 2.18.10 to 2.19.0

Tests: 34 passing (13 integration + 21 unit)
Coverage: Full API coverage with edge cases
Security: Session ID validation prevents SQL/NoSQL injection and path traversal

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-12 17:25:38 +02:00

753 lines
28 KiB
PL/PgSQL

-- ============================================================================
-- N8N-MCP Telemetry Aggregation & Automated Pruning System
-- ============================================================================
-- Purpose: Create aggregation tables and automated cleanup to maintain
-- database under 500MB free tier limit while preserving insights
--
-- Strategy: Aggregate → Delete → Retain only recent raw events
-- Expected savings: ~120 MB (from 265 MB → ~145 MB steady state)
-- ============================================================================
-- ============================================================================
-- PART 1: AGGREGATION TABLES
-- ============================================================================
-- Daily tool usage summary (replaces 96 MB of tool_sequence raw data)
CREATE TABLE IF NOT EXISTS telemetry_tool_usage_daily (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
aggregation_date DATE NOT NULL,
user_id TEXT NOT NULL,
tool_name TEXT NOT NULL,
usage_count INTEGER NOT NULL DEFAULT 0,
success_count INTEGER NOT NULL DEFAULT 0,
error_count INTEGER NOT NULL DEFAULT 0,
avg_execution_time_ms NUMERIC,
total_execution_time_ms BIGINT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE(aggregation_date, user_id, tool_name)
);
CREATE INDEX idx_tool_usage_daily_date ON telemetry_tool_usage_daily(aggregation_date DESC);
CREATE INDEX idx_tool_usage_daily_tool ON telemetry_tool_usage_daily(tool_name);
CREATE INDEX idx_tool_usage_daily_user ON telemetry_tool_usage_daily(user_id);
COMMENT ON TABLE telemetry_tool_usage_daily IS 'Daily aggregation of tool usage replacing raw tool_used and tool_sequence events. Saves ~95% storage.';
-- Tool sequence patterns (replaces individual sequences with pattern analysis)
CREATE TABLE IF NOT EXISTS telemetry_tool_patterns (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
aggregation_date DATE NOT NULL,
tool_sequence TEXT[] NOT NULL, -- Array of tool names in order
sequence_hash TEXT NOT NULL, -- Hash of the sequence for grouping
occurrence_count INTEGER NOT NULL DEFAULT 1,
avg_sequence_duration_ms NUMERIC,
success_rate NUMERIC, -- 0.0 to 1.0
common_errors JSONB, -- {"error_type": count}
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE(aggregation_date, sequence_hash)
);
CREATE INDEX idx_tool_patterns_date ON telemetry_tool_patterns(aggregation_date DESC);
CREATE INDEX idx_tool_patterns_hash ON telemetry_tool_patterns(sequence_hash);
COMMENT ON TABLE telemetry_tool_patterns IS 'Common tool usage patterns aggregated daily. Identifies workflows and AI behavior patterns.';
-- Workflow insights (aggregates workflow_created events)
CREATE TABLE IF NOT EXISTS telemetry_workflow_insights (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
aggregation_date DATE NOT NULL,
complexity TEXT, -- simple/medium/complex
node_count_range TEXT, -- 1-5, 6-10, 11-20, 21+
has_trigger BOOLEAN,
has_webhook BOOLEAN,
common_node_types TEXT[], -- Top node types used
workflow_count INTEGER NOT NULL DEFAULT 0,
avg_node_count NUMERIC,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE(aggregation_date, complexity, node_count_range, has_trigger, has_webhook)
);
CREATE INDEX idx_workflow_insights_date ON telemetry_workflow_insights(aggregation_date DESC);
CREATE INDEX idx_workflow_insights_complexity ON telemetry_workflow_insights(complexity);
COMMENT ON TABLE telemetry_workflow_insights IS 'Daily workflow creation patterns. Shows adoption trends without storing duplicate workflows.';
-- Error patterns (keeps error intelligence, deletes raw error events)
CREATE TABLE IF NOT EXISTS telemetry_error_patterns (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
aggregation_date DATE NOT NULL,
error_type TEXT NOT NULL,
error_context TEXT, -- e.g., 'validation', 'workflow_execution', 'node_operation'
occurrence_count INTEGER NOT NULL DEFAULT 1,
affected_users INTEGER NOT NULL DEFAULT 0,
first_seen TIMESTAMPTZ,
last_seen TIMESTAMPTZ,
sample_error_message TEXT, -- Keep one representative message
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE(aggregation_date, error_type, error_context)
);
CREATE INDEX idx_error_patterns_date ON telemetry_error_patterns(aggregation_date DESC);
CREATE INDEX idx_error_patterns_type ON telemetry_error_patterns(error_type);
COMMENT ON TABLE telemetry_error_patterns IS 'Error patterns over time. Preserves debugging insights while pruning raw error events.';
-- Validation insights (aggregates validation_details)
CREATE TABLE IF NOT EXISTS telemetry_validation_insights (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
aggregation_date DATE NOT NULL,
validation_type TEXT, -- 'node', 'workflow', 'expression'
profile TEXT, -- 'minimal', 'runtime', 'ai-friendly', 'strict'
success_count INTEGER NOT NULL DEFAULT 0,
failure_count INTEGER NOT NULL DEFAULT 0,
common_failure_reasons JSONB, -- {"reason": count}
avg_validation_time_ms NUMERIC,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE(aggregation_date, validation_type, profile)
);
CREATE INDEX idx_validation_insights_date ON telemetry_validation_insights(aggregation_date DESC);
CREATE INDEX idx_validation_insights_type ON telemetry_validation_insights(validation_type);
COMMENT ON TABLE telemetry_validation_insights IS 'Validation success/failure patterns. Shows where users struggle without storing every validation event.';
-- ============================================================================
-- PART 2: AGGREGATION FUNCTIONS
-- ============================================================================
-- Function to aggregate tool usage data
CREATE OR REPLACE FUNCTION aggregate_tool_usage(cutoff_date TIMESTAMPTZ)
RETURNS INTEGER AS $$
DECLARE
rows_aggregated INTEGER;
BEGIN
-- Aggregate tool_used events
INSERT INTO telemetry_tool_usage_daily (
aggregation_date,
user_id,
tool_name,
usage_count,
success_count,
error_count,
avg_execution_time_ms,
total_execution_time_ms
)
SELECT
DATE(created_at) as aggregation_date,
user_id,
properties->>'toolName' as tool_name,
COUNT(*) as usage_count,
COUNT(*) FILTER (WHERE (properties->>'success')::boolean = true) as success_count,
COUNT(*) FILTER (WHERE (properties->>'success')::boolean = false OR properties->>'error' IS NOT NULL) as error_count,
AVG((properties->>'executionTime')::numeric) as avg_execution_time_ms,
SUM((properties->>'executionTime')::numeric) as total_execution_time_ms
FROM telemetry_events
WHERE event = 'tool_used'
AND created_at < cutoff_date
AND properties->>'toolName' IS NOT NULL
GROUP BY DATE(created_at), user_id, properties->>'toolName'
ON CONFLICT (aggregation_date, user_id, tool_name)
DO UPDATE SET
usage_count = telemetry_tool_usage_daily.usage_count + EXCLUDED.usage_count,
success_count = telemetry_tool_usage_daily.success_count + EXCLUDED.success_count,
error_count = telemetry_tool_usage_daily.error_count + EXCLUDED.error_count,
total_execution_time_ms = telemetry_tool_usage_daily.total_execution_time_ms + EXCLUDED.total_execution_time_ms,
avg_execution_time_ms = (telemetry_tool_usage_daily.total_execution_time_ms + EXCLUDED.total_execution_time_ms) /
(telemetry_tool_usage_daily.usage_count + EXCLUDED.usage_count),
updated_at = NOW();
GET DIAGNOSTICS rows_aggregated = ROW_COUNT;
RAISE NOTICE 'Aggregated % rows from tool_used events', rows_aggregated;
RETURN rows_aggregated;
END;
$$ LANGUAGE plpgsql;
COMMENT ON FUNCTION aggregate_tool_usage IS 'Aggregates tool_used events into daily summaries before deletion';
-- Function to aggregate tool sequence patterns
CREATE OR REPLACE FUNCTION aggregate_tool_patterns(cutoff_date TIMESTAMPTZ)
RETURNS INTEGER AS $$
DECLARE
rows_aggregated INTEGER;
BEGIN
INSERT INTO telemetry_tool_patterns (
aggregation_date,
tool_sequence,
sequence_hash,
occurrence_count,
avg_sequence_duration_ms,
success_rate
)
SELECT
DATE(created_at) as aggregation_date,
(properties->>'toolSequence')::text[] as tool_sequence,
md5(array_to_string((properties->>'toolSequence')::text[], ',')) as sequence_hash,
COUNT(*) as occurrence_count,
AVG((properties->>'duration')::numeric) as avg_sequence_duration_ms,
AVG(CASE WHEN (properties->>'success')::boolean THEN 1.0 ELSE 0.0 END) as success_rate
FROM telemetry_events
WHERE event = 'tool_sequence'
AND created_at < cutoff_date
AND properties->>'toolSequence' IS NOT NULL
GROUP BY DATE(created_at), (properties->>'toolSequence')::text[]
ON CONFLICT (aggregation_date, sequence_hash)
DO UPDATE SET
occurrence_count = telemetry_tool_patterns.occurrence_count + EXCLUDED.occurrence_count,
avg_sequence_duration_ms = (
(telemetry_tool_patterns.avg_sequence_duration_ms * telemetry_tool_patterns.occurrence_count +
EXCLUDED.avg_sequence_duration_ms * EXCLUDED.occurrence_count) /
(telemetry_tool_patterns.occurrence_count + EXCLUDED.occurrence_count)
),
success_rate = (
(telemetry_tool_patterns.success_rate * telemetry_tool_patterns.occurrence_count +
EXCLUDED.success_rate * EXCLUDED.occurrence_count) /
(telemetry_tool_patterns.occurrence_count + EXCLUDED.occurrence_count)
),
updated_at = NOW();
GET DIAGNOSTICS rows_aggregated = ROW_COUNT;
RAISE NOTICE 'Aggregated % rows from tool_sequence events', rows_aggregated;
RETURN rows_aggregated;
END;
$$ LANGUAGE plpgsql;
COMMENT ON FUNCTION aggregate_tool_patterns IS 'Aggregates tool_sequence events into pattern analysis before deletion';
-- Function to aggregate workflow insights
CREATE OR REPLACE FUNCTION aggregate_workflow_insights(cutoff_date TIMESTAMPTZ)
RETURNS INTEGER AS $$
DECLARE
rows_aggregated INTEGER;
BEGIN
INSERT INTO telemetry_workflow_insights (
aggregation_date,
complexity,
node_count_range,
has_trigger,
has_webhook,
common_node_types,
workflow_count,
avg_node_count
)
SELECT
DATE(created_at) as aggregation_date,
properties->>'complexity' as complexity,
CASE
WHEN (properties->>'nodeCount')::int BETWEEN 1 AND 5 THEN '1-5'
WHEN (properties->>'nodeCount')::int BETWEEN 6 AND 10 THEN '6-10'
WHEN (properties->>'nodeCount')::int BETWEEN 11 AND 20 THEN '11-20'
ELSE '21+'
END as node_count_range,
(properties->>'hasTrigger')::boolean as has_trigger,
(properties->>'hasWebhook')::boolean as has_webhook,
ARRAY[]::text[] as common_node_types, -- Will be populated separately if needed
COUNT(*) as workflow_count,
AVG((properties->>'nodeCount')::numeric) as avg_node_count
FROM telemetry_events
WHERE event = 'workflow_created'
AND created_at < cutoff_date
GROUP BY
DATE(created_at),
properties->>'complexity',
node_count_range,
(properties->>'hasTrigger')::boolean,
(properties->>'hasWebhook')::boolean
ON CONFLICT (aggregation_date, complexity, node_count_range, has_trigger, has_webhook)
DO UPDATE SET
workflow_count = telemetry_workflow_insights.workflow_count + EXCLUDED.workflow_count,
avg_node_count = (
(telemetry_workflow_insights.avg_node_count * telemetry_workflow_insights.workflow_count +
EXCLUDED.avg_node_count * EXCLUDED.workflow_count) /
(telemetry_workflow_insights.workflow_count + EXCLUDED.workflow_count)
),
updated_at = NOW();
GET DIAGNOSTICS rows_aggregated = ROW_COUNT;
RAISE NOTICE 'Aggregated % rows from workflow_created events', rows_aggregated;
RETURN rows_aggregated;
END;
$$ LANGUAGE plpgsql;
COMMENT ON FUNCTION aggregate_workflow_insights IS 'Aggregates workflow_created events into pattern insights before deletion';
-- Function to aggregate error patterns
CREATE OR REPLACE FUNCTION aggregate_error_patterns(cutoff_date TIMESTAMPTZ)
RETURNS INTEGER AS $$
DECLARE
rows_aggregated INTEGER;
BEGIN
INSERT INTO telemetry_error_patterns (
aggregation_date,
error_type,
error_context,
occurrence_count,
affected_users,
first_seen,
last_seen,
sample_error_message
)
SELECT
DATE(created_at) as aggregation_date,
properties->>'errorType' as error_type,
properties->>'context' as error_context,
COUNT(*) as occurrence_count,
COUNT(DISTINCT user_id) as affected_users,
MIN(created_at) as first_seen,
MAX(created_at) as last_seen,
(ARRAY_AGG(properties->>'message' ORDER BY created_at DESC))[1] as sample_error_message
FROM telemetry_events
WHERE event = 'error_occurred'
AND created_at < cutoff_date
GROUP BY DATE(created_at), properties->>'errorType', properties->>'context'
ON CONFLICT (aggregation_date, error_type, error_context)
DO UPDATE SET
occurrence_count = telemetry_error_patterns.occurrence_count + EXCLUDED.occurrence_count,
affected_users = GREATEST(telemetry_error_patterns.affected_users, EXCLUDED.affected_users),
first_seen = LEAST(telemetry_error_patterns.first_seen, EXCLUDED.first_seen),
last_seen = GREATEST(telemetry_error_patterns.last_seen, EXCLUDED.last_seen),
updated_at = NOW();
GET DIAGNOSTICS rows_aggregated = ROW_COUNT;
RAISE NOTICE 'Aggregated % rows from error_occurred events', rows_aggregated;
RETURN rows_aggregated;
END;
$$ LANGUAGE plpgsql;
COMMENT ON FUNCTION aggregate_error_patterns IS 'Aggregates error_occurred events into pattern analysis before deletion';
-- Function to aggregate validation insights
CREATE OR REPLACE FUNCTION aggregate_validation_insights(cutoff_date TIMESTAMPTZ)
RETURNS INTEGER AS $$
DECLARE
rows_aggregated INTEGER;
BEGIN
INSERT INTO telemetry_validation_insights (
aggregation_date,
validation_type,
profile,
success_count,
failure_count,
common_failure_reasons,
avg_validation_time_ms
)
SELECT
DATE(created_at) as aggregation_date,
properties->>'validationType' as validation_type,
properties->>'profile' as profile,
COUNT(*) FILTER (WHERE (properties->>'success')::boolean = true) as success_count,
COUNT(*) FILTER (WHERE (properties->>'success')::boolean = false) as failure_count,
jsonb_object_agg(
COALESCE(properties->>'failureReason', 'unknown'),
COUNT(*)
) FILTER (WHERE (properties->>'success')::boolean = false) as common_failure_reasons,
AVG((properties->>'validationTime')::numeric) as avg_validation_time_ms
FROM telemetry_events
WHERE event = 'validation_details'
AND created_at < cutoff_date
GROUP BY DATE(created_at), properties->>'validationType', properties->>'profile'
ON CONFLICT (aggregation_date, validation_type, profile)
DO UPDATE SET
success_count = telemetry_validation_insights.success_count + EXCLUDED.success_count,
failure_count = telemetry_validation_insights.failure_count + EXCLUDED.failure_count,
updated_at = NOW();
GET DIAGNOSTICS rows_aggregated = ROW_COUNT;
RAISE NOTICE 'Aggregated % rows from validation_details events', rows_aggregated;
RETURN rows_aggregated;
END;
$$ LANGUAGE plpgsql;
COMMENT ON FUNCTION aggregate_validation_insights IS 'Aggregates validation_details events into insights before deletion';
-- ============================================================================
-- PART 3: MASTER AGGREGATION & CLEANUP FUNCTION
-- ============================================================================
CREATE OR REPLACE FUNCTION run_telemetry_aggregation_and_cleanup(
retention_days INTEGER DEFAULT 3
)
RETURNS TABLE(
event_type TEXT,
rows_aggregated INTEGER,
rows_deleted INTEGER,
space_freed_mb NUMERIC
) AS $$
DECLARE
cutoff_date TIMESTAMPTZ;
total_before BIGINT;
total_after BIGINT;
agg_count INTEGER;
del_count INTEGER;
BEGIN
cutoff_date := NOW() - (retention_days || ' days')::INTERVAL;
RAISE NOTICE 'Starting aggregation and cleanup for data older than %', cutoff_date;
-- Get table size before cleanup
SELECT pg_total_relation_size('telemetry_events') INTO total_before;
-- ========================================================================
-- STEP 1: AGGREGATE DATA BEFORE DELETION
-- ========================================================================
-- Tool usage aggregation
SELECT aggregate_tool_usage(cutoff_date) INTO agg_count;
SELECT COUNT(*) INTO del_count FROM telemetry_events
WHERE event = 'tool_used' AND created_at < cutoff_date;
event_type := 'tool_used';
rows_aggregated := agg_count;
rows_deleted := del_count;
RETURN NEXT;
-- Tool patterns aggregation
SELECT aggregate_tool_patterns(cutoff_date) INTO agg_count;
SELECT COUNT(*) INTO del_count FROM telemetry_events
WHERE event = 'tool_sequence' AND created_at < cutoff_date;
event_type := 'tool_sequence';
rows_aggregated := agg_count;
rows_deleted := del_count;
RETURN NEXT;
-- Workflow insights aggregation
SELECT aggregate_workflow_insights(cutoff_date) INTO agg_count;
SELECT COUNT(*) INTO del_count FROM telemetry_events
WHERE event = 'workflow_created' AND created_at < cutoff_date;
event_type := 'workflow_created';
rows_aggregated := agg_count;
rows_deleted := del_count;
RETURN NEXT;
-- Error patterns aggregation
SELECT aggregate_error_patterns(cutoff_date) INTO agg_count;
SELECT COUNT(*) INTO del_count FROM telemetry_events
WHERE event = 'error_occurred' AND created_at < cutoff_date;
event_type := 'error_occurred';
rows_aggregated := agg_count;
rows_deleted := del_count;
RETURN NEXT;
-- Validation insights aggregation
SELECT aggregate_validation_insights(cutoff_date) INTO agg_count;
SELECT COUNT(*) INTO del_count FROM telemetry_events
WHERE event = 'validation_details' AND created_at < cutoff_date;
event_type := 'validation_details';
rows_aggregated := agg_count;
rows_deleted := del_count;
RETURN NEXT;
-- ========================================================================
-- STEP 2: DELETE OLD RAW EVENTS (now that they're aggregated)
-- ========================================================================
DELETE FROM telemetry_events
WHERE created_at < cutoff_date
AND event IN (
'tool_used',
'tool_sequence',
'workflow_created',
'validation_details',
'session_start',
'search_query',
'diagnostic_completed',
'health_check_completed'
);
-- Keep error_occurred for 30 days (extended retention for debugging)
DELETE FROM telemetry_events
WHERE created_at < (NOW() - INTERVAL '30 days')
AND event = 'error_occurred';
-- ========================================================================
-- STEP 3: CLEAN UP OLD WORKFLOWS (keep only unique patterns)
-- ========================================================================
-- Delete duplicate workflows older than retention period
WITH workflow_duplicates AS (
SELECT id
FROM (
SELECT id,
ROW_NUMBER() OVER (
PARTITION BY workflow_hash
ORDER BY created_at DESC
) as rn
FROM telemetry_workflows
WHERE created_at < cutoff_date
) sub
WHERE rn > 1
)
DELETE FROM telemetry_workflows
WHERE id IN (SELECT id FROM workflow_duplicates);
GET DIAGNOSTICS del_count = ROW_COUNT;
event_type := 'duplicate_workflows';
rows_aggregated := 0;
rows_deleted := del_count;
RETURN NEXT;
-- ========================================================================
-- STEP 4: VACUUM TO RECLAIM SPACE
-- ========================================================================
-- Note: VACUUM cannot be run inside a function, must be run separately
-- The cron job will handle this
-- Get table size after cleanup
SELECT pg_total_relation_size('telemetry_events') INTO total_after;
-- Summary row
event_type := 'TOTAL_SPACE_FREED';
rows_aggregated := 0;
rows_deleted := 0;
space_freed_mb := ROUND((total_before - total_after)::NUMERIC / 1024 / 1024, 2);
RETURN NEXT;
RAISE NOTICE 'Cleanup complete. Space freed: % MB', space_freed_mb;
END;
$$ LANGUAGE plpgsql;
COMMENT ON FUNCTION run_telemetry_aggregation_and_cleanup IS 'Master function to aggregate data and delete old events. Run daily via cron.';
-- ============================================================================
-- PART 4: SUPABASE CRON JOB SETUP
-- ============================================================================
-- Enable pg_cron extension (if not already enabled)
CREATE EXTENSION IF NOT EXISTS pg_cron;
-- Schedule daily cleanup at 2 AM UTC (low traffic time)
-- This will aggregate data older than 3 days and then delete it
SELECT cron.schedule(
'telemetry-daily-cleanup',
'0 2 * * *', -- Every day at 2 AM UTC
$$
SELECT run_telemetry_aggregation_and_cleanup(3);
VACUUM ANALYZE telemetry_events;
VACUUM ANALYZE telemetry_workflows;
$$
);
COMMENT ON EXTENSION pg_cron IS 'Cron job scheduler for automated telemetry cleanup';
-- ============================================================================
-- PART 5: MONITORING & ALERTING
-- ============================================================================
-- Function to check database size and alert if approaching limit
CREATE OR REPLACE FUNCTION check_database_size()
RETURNS TABLE(
total_size_mb NUMERIC,
events_size_mb NUMERIC,
workflows_size_mb NUMERIC,
aggregates_size_mb NUMERIC,
percent_of_limit NUMERIC,
days_until_full NUMERIC,
status TEXT
) AS $$
DECLARE
db_size BIGINT;
events_size BIGINT;
workflows_size BIGINT;
agg_size BIGINT;
limit_mb CONSTANT NUMERIC := 500; -- Free tier limit
growth_rate_mb_per_day NUMERIC;
BEGIN
-- Get current sizes
SELECT pg_database_size(current_database()) INTO db_size;
SELECT pg_total_relation_size('telemetry_events') INTO events_size;
SELECT pg_total_relation_size('telemetry_workflows') INTO workflows_size;
SELECT COALESCE(
pg_total_relation_size('telemetry_tool_usage_daily') +
pg_total_relation_size('telemetry_tool_patterns') +
pg_total_relation_size('telemetry_workflow_insights') +
pg_total_relation_size('telemetry_error_patterns') +
pg_total_relation_size('telemetry_validation_insights'),
0
) INTO agg_size;
total_size_mb := ROUND(db_size::NUMERIC / 1024 / 1024, 2);
events_size_mb := ROUND(events_size::NUMERIC / 1024 / 1024, 2);
workflows_size_mb := ROUND(workflows_size::NUMERIC / 1024 / 1024, 2);
aggregates_size_mb := ROUND(agg_size::NUMERIC / 1024 / 1024, 2);
percent_of_limit := ROUND((total_size_mb / limit_mb) * 100, 1);
-- Estimate growth rate (simple 7-day average)
SELECT ROUND(
(SELECT COUNT(*) FROM telemetry_events WHERE created_at > NOW() - INTERVAL '7 days')::NUMERIC
* (pg_column_size(telemetry_events.*))::NUMERIC
/ 7 / 1024 / 1024, 2
) INTO growth_rate_mb_per_day
FROM telemetry_events LIMIT 1;
IF growth_rate_mb_per_day > 0 THEN
days_until_full := ROUND((limit_mb - total_size_mb) / growth_rate_mb_per_day, 0);
ELSE
days_until_full := NULL;
END IF;
-- Determine status
IF percent_of_limit >= 90 THEN
status := 'CRITICAL - Immediate action required';
ELSIF percent_of_limit >= 75 THEN
status := 'WARNING - Monitor closely';
ELSIF percent_of_limit >= 50 THEN
status := 'CAUTION - Plan optimization';
ELSE
status := 'HEALTHY';
END IF;
RETURN NEXT;
END;
$$ LANGUAGE plpgsql;
COMMENT ON FUNCTION check_database_size IS 'Monitor database size and growth. Run daily or on-demand.';
-- ============================================================================
-- PART 6: EMERGENCY CLEANUP (ONE-TIME USE)
-- ============================================================================
-- Emergency function to immediately free up space (use if critical)
CREATE OR REPLACE FUNCTION emergency_cleanup()
RETURNS TABLE(
action TEXT,
rows_deleted INTEGER,
space_freed_mb NUMERIC
) AS $$
DECLARE
size_before BIGINT;
size_after BIGINT;
del_count INTEGER;
BEGIN
SELECT pg_total_relation_size('telemetry_events') INTO size_before;
-- Aggregate everything older than 7 days
PERFORM run_telemetry_aggregation_and_cleanup(7);
-- Delete all non-critical events older than 7 days
DELETE FROM telemetry_events
WHERE created_at < NOW() - INTERVAL '7 days'
AND event NOT IN ('error_occurred', 'workflow_validation_failed');
GET DIAGNOSTICS del_count = ROW_COUNT;
action := 'Deleted non-critical events > 7 days';
rows_deleted := del_count;
RETURN NEXT;
-- Delete error events older than 14 days
DELETE FROM telemetry_events
WHERE created_at < NOW() - INTERVAL '14 days'
AND event = 'error_occurred';
GET DIAGNOSTICS del_count = ROW_COUNT;
action := 'Deleted error events > 14 days';
rows_deleted := del_count;
RETURN NEXT;
-- Delete duplicate workflows
WITH workflow_duplicates AS (
SELECT id
FROM (
SELECT id,
ROW_NUMBER() OVER (
PARTITION BY workflow_hash
ORDER BY created_at DESC
) as rn
FROM telemetry_workflows
) sub
WHERE rn > 1
)
DELETE FROM telemetry_workflows
WHERE id IN (SELECT id FROM workflow_duplicates);
GET DIAGNOSTICS del_count = ROW_COUNT;
action := 'Deleted duplicate workflows';
rows_deleted := del_count;
RETURN NEXT;
-- VACUUM will be run separately
SELECT pg_total_relation_size('telemetry_events') INTO size_after;
action := 'TOTAL (run VACUUM separately)';
rows_deleted := 0;
space_freed_mb := ROUND((size_before - size_after)::NUMERIC / 1024 / 1024, 2);
RETURN NEXT;
RAISE NOTICE 'Emergency cleanup complete. Run VACUUM FULL for maximum space recovery.';
END;
$$ LANGUAGE plpgsql;
COMMENT ON FUNCTION emergency_cleanup IS 'Emergency cleanup when database is near capacity. Run once, then VACUUM.';
-- ============================================================================
-- USAGE INSTRUCTIONS
-- ============================================================================
/*
SETUP (Run once):
1. Execute this entire script in Supabase SQL Editor
2. Verify cron job is scheduled:
SELECT * FROM cron.job;
3. Run initial monitoring:
SELECT * FROM check_database_size();
DAILY OPERATIONS (Automatic):
- Cron job runs daily at 2 AM UTC
- Aggregates data older than 3 days
- Deletes raw events after aggregation
- Vacuums tables to reclaim space
MONITORING:
-- Check current database health
SELECT * FROM check_database_size();
-- View aggregated insights
SELECT * FROM telemetry_tool_usage_daily ORDER BY aggregation_date DESC LIMIT 100;
SELECT * FROM telemetry_tool_patterns ORDER BY occurrence_count DESC LIMIT 20;
SELECT * FROM telemetry_error_patterns ORDER BY occurrence_count DESC LIMIT 20;
MANUAL CLEANUP (if needed):
-- Run cleanup manually (3-day retention)
SELECT * FROM run_telemetry_aggregation_and_cleanup(3);
VACUUM ANALYZE telemetry_events;
-- Emergency cleanup (7-day retention)
SELECT * FROM emergency_cleanup();
VACUUM FULL telemetry_events;
VACUUM FULL telemetry_workflows;
TUNING:
-- Adjust retention period (e.g., 5 days instead of 3)
SELECT cron.schedule(
'telemetry-daily-cleanup',
'0 2 * * *',
$$ SELECT run_telemetry_aggregation_and_cleanup(5); VACUUM ANALYZE telemetry_events; $$
);
EXPECTED RESULTS:
- Initial run: ~120 MB space freed (265 MB → ~145 MB)
- Steady state: ~90-120 MB total database size
- Growth rate: ~2-3 MB/day (down from 7.7 MB/day)
- Headroom: 70-80% of free tier limit available
*/