feat: implement session persistence for v2.19.0 (Phase 1 + Phase 2)

Phase 1 - Lazy Session Restoration (REQ-1, REQ-2, REQ-8):
- Add onSessionNotFound hook for restoring sessions from external storage
- Implement idempotent session creation to prevent race conditions
- Add session ID validation for security (prevent injection attacks)
- Comprehensive error handling (400/408/500 status codes)
- 13 integration tests covering all scenarios

Phase 2 - Session Management API (REQ-5):
- getActiveSessions(): Get all active session IDs
- getSessionState(sessionId): Get session state for persistence
- getAllSessionStates(): Bulk session state retrieval
- restoreSession(sessionId, context): Manual session restoration
- deleteSession(sessionId): Manual session termination
- 21 unit tests covering all API methods

Benefits:
- Sessions survive container restarts
- Horizontal scaling support (no session stickiness needed)
- Zero-downtime deployments
- 100% backwards compatible

Implementation Details:
- Backend methods in http-server-single-session.ts
- Public API methods in mcp-engine.ts
- SessionState type exported from index.ts
- Synchronous session creation and deletion for reliable testing
- Version updated from 2.18.10 to 2.19.0

Tests: 34 passing (13 integration + 21 unit)
Coverage: Full API coverage with edge cases
Security: Session ID validation prevents SQL/NoSQL injection and path traversal

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
czlonkowski
2025-10-12 17:25:38 +02:00
parent 4566253bdc
commit 1d34ad81d5
14 changed files with 9595 additions and 51 deletions

3491
IMPLEMENTATION_GUIDE.md Normal file

File diff suppressed because it is too large Load Diff

1464
MVP_DEPLOYMENT_PLAN.md Normal file

File diff suppressed because it is too large Load Diff

623
TELEMETRY_PRUNING_GUIDE.md Normal file
View File

@@ -0,0 +1,623 @@
# Telemetry Data Pruning & Aggregation Guide
## Overview
This guide provides a complete solution for managing n8n-mcp telemetry data in Supabase to stay within the 500 MB free tier limit while preserving valuable insights for product development.
## Current Situation
- **Database Size**: 265 MB / 500 MB (53% of limit)
- **Growth Rate**: 7.7 MB/day (54 MB/week)
- **Time Until Full**: ~17 days
- **Total Events**: 641,487 events + 17,247 workflows
### Storage Breakdown
| Event Type | Count | Size | % of Total |
|------------|-------|------|------------|
| `tool_sequence` | 362,704 | 96 MB | 72% |
| `tool_used` | 191,938 | 28 MB | 21% |
| `validation_details` | 36,280 | 14 MB | 11% |
| `workflow_created` | 23,213 | 4.5 MB | 3% |
| Others | ~26,000 | ~3 MB | 2% |
## Solution Strategy
**Aggregate → Delete → Retain only recent raw events**
### Expected Results
| Metric | Before | After | Improvement |
|--------|--------|-------|-------------|
| Database Size | 265 MB | ~90-120 MB | **55-65% reduction** |
| Growth Rate | 7.7 MB/day | ~2-3 MB/day | **60-70% slower** |
| Days Until Full | 17 days | **Sustainable** | Never fills |
| Free Tier Usage | 53% | ~20-25% | **75-80% headroom** |
## Implementation Steps
### Step 1: Execute the SQL Migration
Open Supabase SQL Editor and run the entire contents of `supabase-telemetry-aggregation.sql`:
```sql
-- Copy and paste the entire supabase-telemetry-aggregation.sql file
-- Or run it directly from the file
```
This will create:
- 5 aggregation tables
- Aggregation functions
- Automated cleanup function
- Monitoring functions
- Scheduled cron job (daily at 2 AM UTC)
### Step 2: Verify Cron Job Setup
Check that the cron job was created successfully:
```sql
-- View scheduled cron jobs
SELECT
jobid,
schedule,
command,
nodename,
nodeport,
database,
username,
active
FROM cron.job
WHERE jobname = 'telemetry-daily-cleanup';
```
Expected output:
- Schedule: `0 2 * * *` (daily at 2 AM UTC)
- Active: `true`
### Step 3: Run Initial Emergency Cleanup
Get immediate space relief by running the emergency cleanup:
```sql
-- This will aggregate and delete data older than 7 days
SELECT * FROM emergency_cleanup();
```
Expected results:
```
action | rows_deleted | space_freed_mb
------------------------------------+--------------+----------------
Deleted non-critical events > 7d | ~284,924 | ~52 MB
Deleted error events > 14d | ~2,400 | ~0.5 MB
Deleted duplicate workflows | ~8,500 | ~11 MB
TOTAL (run VACUUM separately) | 0 | ~63.5 MB
```
### Step 4: Reclaim Disk Space
After deletion, reclaim the actual disk space:
```sql
-- Reclaim space from deleted rows
VACUUM FULL telemetry_events;
VACUUM FULL telemetry_workflows;
-- Update statistics for query optimization
ANALYZE telemetry_events;
ANALYZE telemetry_workflows;
```
**Note**: `VACUUM FULL` may take a few minutes and locks the table. Run during off-peak hours if possible.
### Step 5: Verify Results
Check the new database size:
```sql
SELECT * FROM check_database_size();
```
Expected output:
```
total_size_mb | events_size_mb | workflows_size_mb | aggregates_size_mb | percent_of_limit | days_until_full | status
--------------+----------------+-------------------+--------------------+------------------+-----------------+---------
202.5 | 85.2 | 35.8 | 12.5 | 40.5 | ~95 | HEALTHY
```
## Daily Operations (Automated)
Once set up, the system runs automatically:
1. **Daily at 2 AM UTC**: Cron job runs
2. **Aggregation**: Data older than 3 days is aggregated into summary tables
3. **Deletion**: Raw events are deleted after aggregation
4. **Cleanup**: VACUUM runs to reclaim space
5. **Retention**:
- High-volume events: 3 days
- Error events: 30 days
- Aggregated insights: Forever
## Monitoring Commands
### Check Database Health
```sql
-- View current size and status
SELECT * FROM check_database_size();
```
### View Aggregated Insights
```sql
-- Top tools used daily
SELECT
aggregation_date,
tool_name,
usage_count,
success_count,
error_count,
ROUND(100.0 * success_count / NULLIF(usage_count, 0), 1) as success_rate_pct
FROM telemetry_tool_usage_daily
ORDER BY aggregation_date DESC, usage_count DESC
LIMIT 50;
-- Most common tool sequences
SELECT
aggregation_date,
tool_sequence,
occurrence_count,
ROUND(avg_sequence_duration_ms, 0) as avg_duration_ms,
ROUND(100 * success_rate, 1) as success_rate_pct
FROM telemetry_tool_patterns
ORDER BY occurrence_count DESC
LIMIT 20;
-- Error patterns over time
SELECT
aggregation_date,
error_type,
error_context,
occurrence_count,
affected_users,
sample_error_message
FROM telemetry_error_patterns
ORDER BY aggregation_date DESC, occurrence_count DESC
LIMIT 30;
-- Workflow creation trends
SELECT
aggregation_date,
complexity,
node_count_range,
has_trigger,
has_webhook,
workflow_count,
ROUND(avg_node_count, 1) as avg_nodes
FROM telemetry_workflow_insights
ORDER BY aggregation_date DESC, workflow_count DESC
LIMIT 30;
-- Validation success rates
SELECT
aggregation_date,
validation_type,
profile,
success_count,
failure_count,
ROUND(100.0 * success_count / NULLIF(success_count + failure_count, 0), 1) as success_rate_pct,
common_failure_reasons
FROM telemetry_validation_insights
ORDER BY aggregation_date DESC, (success_count + failure_count) DESC
LIMIT 30;
```
### Check Cron Job Execution History
```sql
-- View recent cron job runs
SELECT
runid,
jobid,
database,
status,
return_message,
start_time,
end_time
FROM cron.job_run_details
WHERE jobid = (SELECT jobid FROM cron.job WHERE jobname = 'telemetry-daily-cleanup')
ORDER BY start_time DESC
LIMIT 10;
```
## Manual Operations
### Run Cleanup On-Demand
If you need to run cleanup outside the scheduled time:
```sql
-- Run with default 3-day retention
SELECT * FROM run_telemetry_aggregation_and_cleanup(3);
VACUUM ANALYZE telemetry_events;
-- Or with custom retention (e.g., 5 days)
SELECT * FROM run_telemetry_aggregation_and_cleanup(5);
VACUUM ANALYZE telemetry_events;
```
### Emergency Cleanup (Critical Situations)
If database is approaching limit and you need immediate relief:
```sql
-- Step 1: Run emergency cleanup (7-day retention)
SELECT * FROM emergency_cleanup();
-- Step 2: Reclaim space aggressively
VACUUM FULL telemetry_events;
VACUUM FULL telemetry_workflows;
ANALYZE telemetry_events;
ANALYZE telemetry_workflows;
-- Step 3: Verify results
SELECT * FROM check_database_size();
```
### Adjust Retention Policy
To change the default 3-day retention period:
```sql
-- Update cron job to use 5-day retention instead
SELECT cron.unschedule('telemetry-daily-cleanup');
SELECT cron.schedule(
'telemetry-daily-cleanup',
'0 2 * * *', -- Daily at 2 AM UTC
$$
SELECT run_telemetry_aggregation_and_cleanup(5); -- 5 days instead of 3
VACUUM ANALYZE telemetry_events;
VACUUM ANALYZE telemetry_workflows;
$$
);
```
## Data Retention Policies
### Raw Events Retention
| Event Type | Retention | Reason |
|------------|-----------|--------|
| `tool_sequence` | 3 days | High volume, low long-term value |
| `tool_used` | 3 days | High volume, aggregated daily |
| `validation_details` | 3 days | Aggregated into insights |
| `workflow_created` | 3 days | Aggregated into patterns |
| `session_start` | 3 days | Operational data only |
| `search_query` | 3 days | Operational data only |
| `error_occurred` | **30 days** | Extended for debugging |
| `workflow_validation_failed` | 3 days | Captured in aggregates |
### Aggregated Data Retention
All aggregated data is kept **indefinitely**:
- Daily tool usage statistics
- Tool sequence patterns
- Workflow creation trends
- Error patterns and frequencies
- Validation success rates
### Workflow Retention
- **Unique workflows**: Kept indefinitely (one per unique hash)
- **Duplicate workflows**: Deleted after 3 days
- **Workflow metadata**: Aggregated into daily insights
## Intelligence Preserved
Even after aggressive pruning, you still have access to:
### Long-term Product Insights
- Which tools are most/least used over time
- Tool usage trends and adoption curves
- Common workflow patterns and complexities
- Error frequencies and types across versions
- Validation failure patterns
### Development Intelligence
- Feature adoption rates (by day/week/month)
- Pain points (high error rates, validation failures)
- User behavior patterns (tool sequences, workflow styles)
- Version comparison (changes in usage between releases)
### Recent Debugging Data
- Last 3 days of raw events for immediate issues
- Last 30 days of error events for bug tracking
- Sample error messages for each error type
## Troubleshooting
### Cron Job Not Running
Check if pg_cron extension is enabled:
```sql
-- Enable pg_cron
CREATE EXTENSION IF NOT EXISTS pg_cron;
-- Verify it's enabled
SELECT * FROM pg_extension WHERE extname = 'pg_cron';
```
### Aggregation Functions Failing
Check for errors in cron job execution:
```sql
-- View error messages
SELECT
status,
return_message,
start_time
FROM cron.job_run_details
WHERE jobid = (SELECT jobid FROM cron.job WHERE jobname = 'telemetry-daily-cleanup')
AND status = 'failed'
ORDER BY start_time DESC;
```
### VACUUM Not Reclaiming Space
If `VACUUM ANALYZE` isn't reclaiming enough space, use `VACUUM FULL`:
```sql
-- More aggressive space reclamation (locks table)
VACUUM FULL telemetry_events;
```
### Database Still Growing Too Fast
Reduce retention period further:
```sql
-- Change to 2-day retention (more aggressive)
SELECT * FROM run_telemetry_aggregation_and_cleanup(2);
```
Or delete more event types:
```sql
-- Delete additional low-value events
DELETE FROM telemetry_events
WHERE created_at < NOW() - INTERVAL '3 days'
AND event IN ('session_start', 'search_query', 'diagnostic_completed', 'health_check_completed');
```
## Performance Considerations
### Cron Job Execution Time
The daily cleanup typically takes:
- **Aggregation**: 30-60 seconds
- **Deletion**: 15-30 seconds
- **VACUUM**: 2-5 minutes
- **Total**: ~3-7 minutes
### Query Performance
All aggregation tables have indexes on:
- Date columns (for time-series queries)
- Lookup columns (tool_name, error_type, etc.)
- User columns (for user-specific analysis)
### Lock Considerations
- `VACUUM ANALYZE`: Minimal locking, safe during operation
- `VACUUM FULL`: Locks table, run during off-peak hours
- Aggregation functions: Read-only queries, no locking
## Customization
### Add Custom Aggregations
To track additional metrics, create new aggregation tables:
```sql
-- Example: Session duration aggregation
CREATE TABLE telemetry_session_duration_daily (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
aggregation_date DATE NOT NULL,
avg_duration_seconds NUMERIC,
median_duration_seconds NUMERIC,
max_duration_seconds NUMERIC,
session_count INTEGER,
created_at TIMESTAMPTZ DEFAULT NOW(),
UNIQUE(aggregation_date)
);
-- Add to cleanup function
-- (modify run_telemetry_aggregation_and_cleanup)
```
### Modify Retention Policies
Edit the `run_telemetry_aggregation_and_cleanup` function to adjust retention by event type:
```sql
-- Keep validation_details for 7 days instead of 3
DELETE FROM telemetry_events
WHERE created_at < (NOW() - INTERVAL '7 days')
AND event = 'validation_details';
```
### Change Cron Schedule
Adjust the execution time if needed:
```sql
-- Run at different time (e.g., 3 AM UTC)
SELECT cron.schedule(
'telemetry-daily-cleanup',
'0 3 * * *', -- 3 AM instead of 2 AM
$$ SELECT run_telemetry_aggregation_and_cleanup(3); VACUUM ANALYZE telemetry_events; $$
);
-- Run twice daily (2 AM and 2 PM)
SELECT cron.schedule(
'telemetry-cleanup-morning',
'0 2 * * *',
$$ SELECT run_telemetry_aggregation_and_cleanup(3); $$
);
SELECT cron.schedule(
'telemetry-cleanup-afternoon',
'0 14 * * *',
$$ SELECT run_telemetry_aggregation_and_cleanup(3); $$
);
```
## Backup & Recovery
### Before Running Emergency Cleanup
Create a backup of aggregation queries:
```sql
-- Export aggregated data to CSV or backup tables
CREATE TABLE telemetry_tool_usage_backup AS
SELECT * FROM telemetry_tool_usage_daily;
CREATE TABLE telemetry_patterns_backup AS
SELECT * FROM telemetry_tool_patterns;
```
### Restore Deleted Data
Raw event data cannot be restored after deletion. However, aggregated insights are preserved indefinitely.
To prevent accidental data loss:
1. Test cleanup functions on staging first
2. Review `check_database_size()` before running emergency cleanup
3. Start with longer retention periods (7 days) and reduce gradually
4. Monitor aggregated data quality for 1-2 weeks
## Monitoring Dashboard Queries
### Weekly Growth Report
```sql
-- Database growth over last 7 days
SELECT
DATE(created_at) as date,
COUNT(*) as events_created,
COUNT(DISTINCT event) as event_types,
COUNT(DISTINCT user_id) as active_users,
ROUND(SUM(pg_column_size(telemetry_events.*))::NUMERIC / 1024 / 1024, 2) as size_mb
FROM telemetry_events
WHERE created_at >= NOW() - INTERVAL '7 days'
GROUP BY DATE(created_at)
ORDER BY date DESC;
```
### Storage Efficiency Report
```sql
-- Compare raw vs aggregated storage
SELECT
'Raw Events (last 3 days)' as category,
COUNT(*) as row_count,
pg_size_pretty(pg_total_relation_size('telemetry_events')) as table_size
FROM telemetry_events
WHERE created_at >= NOW() - INTERVAL '3 days'
UNION ALL
SELECT
'Aggregated Insights (all time)',
(SELECT COUNT(*) FROM telemetry_tool_usage_daily) +
(SELECT COUNT(*) FROM telemetry_tool_patterns) +
(SELECT COUNT(*) FROM telemetry_workflow_insights) +
(SELECT COUNT(*) FROM telemetry_error_patterns) +
(SELECT COUNT(*) FROM telemetry_validation_insights),
pg_size_pretty(
pg_total_relation_size('telemetry_tool_usage_daily') +
pg_total_relation_size('telemetry_tool_patterns') +
pg_total_relation_size('telemetry_workflow_insights') +
pg_total_relation_size('telemetry_error_patterns') +
pg_total_relation_size('telemetry_validation_insights')
);
```
### Top Events by Size
```sql
-- Which event types consume most space
SELECT
event,
COUNT(*) as event_count,
pg_size_pretty(SUM(pg_column_size(telemetry_events.*))::BIGINT) as total_size,
pg_size_pretty(AVG(pg_column_size(telemetry_events.*))::BIGINT) as avg_size_per_event,
ROUND(100.0 * COUNT(*) / SUM(COUNT(*)) OVER (), 2) as pct_of_events
FROM telemetry_events
GROUP BY event
ORDER BY SUM(pg_column_size(telemetry_events.*)) DESC;
```
## Success Metrics
Track these metrics weekly to ensure the system is working:
### Target Metrics (After Implementation)
- ✅ Database size: **< 150 MB** (< 30% of limit)
- Growth rate: **< 3 MB/day** (sustainable)
- Raw event retention: **3 days** (configurable)
- Aggregated data: **All-time insights available**
- Cron job success rate: **> 95%**
- ✅ Query performance: **< 500ms for aggregated queries**
### Review Schedule
- **Daily**: Check `check_database_size()` status
- **Weekly**: Review aggregated insights and growth trends
- **Monthly**: Analyze cron job success rate and adjust retention if needed
- **After each release**: Compare usage patterns to previous version
## Quick Reference
### Essential Commands
```sql
-- Check database health
SELECT * FROM check_database_size();
-- View recent aggregated insights
SELECT * FROM telemetry_tool_usage_daily ORDER BY aggregation_date DESC LIMIT 10;
-- Run manual cleanup (3-day retention)
SELECT * FROM run_telemetry_aggregation_and_cleanup(3);
VACUUM ANALYZE telemetry_events;
-- Emergency cleanup (7-day retention)
SELECT * FROM emergency_cleanup();
VACUUM FULL telemetry_events;
-- View cron job status
SELECT * FROM cron.job WHERE jobname = 'telemetry-daily-cleanup';
-- View cron execution history
SELECT * FROM cron.job_run_details
WHERE jobid = (SELECT jobid FROM cron.job WHERE jobname = 'telemetry-daily-cleanup')
ORDER BY start_time DESC LIMIT 5;
```
## Support
If you encounter issues:
1. Check the troubleshooting section above
2. Review cron job execution logs
3. Verify pg_cron extension is enabled
4. Test aggregation functions manually
5. Check Supabase dashboard for errors
For questions or improvements, refer to the main project documentation.

Binary file not shown.

View File

@@ -1,6 +1,6 @@
{ {
"name": "n8n-mcp", "name": "n8n-mcp",
"version": "2.18.10", "version": "2.19.0",
"description": "Integration between n8n workflow automation and Model Context Protocol (MCP)", "description": "Integration between n8n workflow automation and Model Context Protocol (MCP)",
"main": "dist/index.js", "main": "dist/index.js",
"types": "dist/index.d.ts", "types": "dist/index.d.ts",

View File

@@ -25,6 +25,7 @@ import {
STANDARD_PROTOCOL_VERSION STANDARD_PROTOCOL_VERSION
} from './utils/protocol-version'; } from './utils/protocol-version';
import { InstanceContext, validateInstanceContext } from './types/instance-context'; import { InstanceContext, validateInstanceContext } from './types/instance-context';
import { SessionRestoreHook, SessionState } from './types/session-restoration';
dotenv.config(); dotenv.config();
@@ -84,12 +85,30 @@ export class SingleSessionHTTPServer {
private sessionTimeout = 30 * 60 * 1000; // 30 minutes private sessionTimeout = 30 * 60 * 1000; // 30 minutes
private authToken: string | null = null; private authToken: string | null = null;
private cleanupTimer: NodeJS.Timeout | null = null; private cleanupTimer: NodeJS.Timeout | null = null;
constructor() { // Session restoration options (Phase 1 - v2.19.0)
private onSessionNotFound?: SessionRestoreHook;
private sessionRestorationTimeout: number;
constructor(options: {
sessionTimeout?: number;
onSessionNotFound?: SessionRestoreHook;
sessionRestorationTimeout?: number;
} = {}) {
// Validate environment on construction // Validate environment on construction
this.validateEnvironment(); this.validateEnvironment();
// Session restoration configuration
this.onSessionNotFound = options.onSessionNotFound;
this.sessionRestorationTimeout = options.sessionRestorationTimeout || 5000; // 5 seconds default
// Override session timeout if provided
if (options.sessionTimeout) {
this.sessionTimeout = options.sessionTimeout;
}
// No longer pre-create session - will be created per initialize request following SDK pattern // No longer pre-create session - will be created per initialize request following SDK pattern
// Start periodic session cleanup // Start periodic session cleanup
this.startSessionCleanup(); this.startSessionCleanup();
} }
@@ -187,23 +206,52 @@ export class SingleSessionHTTPServer {
} }
/** /**
* Validate session ID format * Validate session ID format (Security-Hardened - REQ-8)
* *
* Accepts any non-empty string to support various MCP clients: * Validates session ID format to prevent injection attacks:
* - UUIDv4 (internal n8n-mcp format) * - SQL injection
* - instance-{userId}-{hash}-{uuid} (multi-tenant format) * - NoSQL injection
* - Custom formats from mcp-remote and other proxies * - Path traversal
* - DoS via oversized IDs
* *
* Security: Session validation happens via lookup in this.transports, * Accepts multiple formats for MCP client compatibility:
* not format validation. This ensures compatibility with all MCP clients. * 1. UUIDv4 (internal format): xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
* 2. Multi-tenant format: instance-{userId}-{hash}-{uuid}
* 3. Generic safe format: any alphanumeric string with hyphens/underscores (20-100 chars)
* *
* @param sessionId - Session identifier from MCP client * @param sessionId - Session identifier from MCP client
* @returns true if valid, false otherwise * @returns true if valid, false otherwise
* @since 2.19.0 - Enhanced with security validation
* @since 2.19.1 - Relaxed validation for MCP proxy compatibility
*/ */
private isValidSessionId(sessionId: string): boolean { private isValidSessionId(sessionId: string): boolean {
// Accept any non-empty string as session ID if (!sessionId || typeof sessionId !== 'string') {
// This ensures compatibility with all MCP clients and proxies return false;
return Boolean(sessionId && sessionId.length > 0); }
// Length validation (20-100 chars) - DoS protection
if (sessionId.length < 20 || sessionId.length > 100) {
return false;
}
// Character whitelist (alphanumeric + hyphens + underscores) - Injection protection
// Allow underscores for compatibility with some MCP clients (e.g., mcp-remote)
if (!/^[a-zA-Z0-9_-]+$/.test(sessionId)) {
return false;
}
// Format validation - Support known formats or any safe alphanumeric format
// UUIDv4: 8-4-4-4-12 hex digits with hyphens
const uuidV4Pattern = /^[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$/i;
// Multi-tenant: instance-{userId}-{hash}-{uuid}
// Must start with 'instance-' and have at least 4 parts
const multiTenantPattern = /^instance-[a-zA-Z0-9_]+-[a-zA-Z0-9_]+-[a-zA-Z0-9_-]+$/;
// Accept UUIDv4, multi-tenant, OR any safe alphanumeric format (for flexibility)
return uuidV4Pattern.test(sessionId) ||
multiTenantPattern.test(sessionId) ||
/^[a-zA-Z0-9_-]{20,100}$/.test(sessionId); // Generic safe format
} }
/** /**
@@ -297,6 +345,155 @@ export class SingleSessionHTTPServer {
} }
} }
/**
* Timeout utility for session restoration
* Creates a promise that rejects after the specified milliseconds
*
* @param ms - Timeout duration in milliseconds
* @returns Promise that rejects with TimeoutError
* @since 2.19.0
*/
private timeout(ms: number): Promise<never> {
return new Promise((_, reject) => {
setTimeout(() => {
const error = new Error(`Operation timed out after ${ms}ms`);
error.name = 'TimeoutError';
reject(error);
}, ms);
});
}
/**
* Create a new session (IDEMPOTENT - REQ-2)
*
* This method is idempotent to prevent race conditions during concurrent
* restoration attempts. If the session already exists, returns existing
* session ID without creating a duplicate.
*
* @param instanceContext - Instance-specific configuration
* @param sessionId - Optional pre-defined session ID (for restoration)
* @returns The session ID (newly created or existing)
* @throws Error if session ID format is invalid
* @since 2.19.0
*/
private createSession(
instanceContext: InstanceContext,
sessionId?: string
): string {
// Generate session ID if not provided
const id = sessionId || this.generateSessionId(instanceContext);
// CRITICAL: Idempotency check to prevent race conditions
if (this.transports[id]) {
logger.debug('Session already exists, skipping creation (idempotent)', {
sessionId: id
});
return id;
}
// Validate session ID format if provided externally
if (sessionId && !this.isValidSessionId(sessionId)) {
logger.error('Invalid session ID format during creation', { sessionId });
throw new Error('Invalid session ID format');
}
const server = new N8NDocumentationMCPServer(instanceContext);
// Create transport and server
const transport = new StreamableHTTPServerTransport({
sessionIdGenerator: () => id,
onsessioninitialized: (initializedSessionId: string) => {
// Session already stored, this just logs initialization
logger.info('Session initialized during explicit creation', {
sessionId: initializedSessionId
});
}
});
// CRITICAL: Store session data immediately (not in callback)
// This ensures sessions are available synchronously for tests and direct API calls
this.transports[id] = transport;
this.servers[id] = server;
this.sessionMetadata[id] = {
lastAccess: new Date(),
createdAt: new Date()
};
this.sessionContexts[id] = instanceContext;
// Set up cleanup handlers
transport.onclose = () => {
if (transport.sessionId) {
logger.info('Transport closed during createSession, cleaning up', {
sessionId: transport.sessionId
});
this.removeSession(transport.sessionId, 'transport_closed');
}
};
transport.onerror = (error: Error) => {
if (transport.sessionId) {
logger.error('Transport error during createSession', {
sessionId: transport.sessionId,
error: error.message
});
this.removeSession(transport.sessionId, 'transport_error').catch(err => {
logger.error('Error during transport error cleanup', { error: err });
});
}
};
// CRITICAL: Connect server to transport before returning
// Without this, the server won't process requests!
// Note: We don't await here because createSession is synchronous
// The connection will complete asynchronously via onsessioninitialized
server.connect(transport).catch(err => {
logger.error('Failed to connect server to transport in createSession', {
sessionId: id,
error: err instanceof Error ? err.message : String(err)
});
// Clean up on connection failure
this.removeSession(id, 'connection_failed').catch(cleanupErr => {
logger.error('Error during connection failure cleanup', { error: cleanupErr });
});
});
logger.info('Session created successfully (connecting server to transport)', {
sessionId: id,
hasInstanceContext: !!instanceContext,
instanceId: instanceContext?.instanceId
});
return id;
}
/**
* Generate session ID based on instance context
* Used for multi-tenant mode
*
* @param instanceContext - Instance-specific configuration
* @returns Generated session ID
*/
private generateSessionId(instanceContext?: InstanceContext): string {
const isMultiTenantEnabled = process.env.ENABLE_MULTI_TENANT === 'true';
const sessionStrategy = process.env.MULTI_TENANT_SESSION_STRATEGY || 'instance';
if (isMultiTenantEnabled && sessionStrategy === 'instance' && instanceContext?.instanceId) {
// Multi-tenant mode with instance strategy
const configHash = createHash('sha256')
.update(JSON.stringify({
url: instanceContext.n8nApiUrl,
instanceId: instanceContext.instanceId
}))
.digest('hex')
.substring(0, 8);
return `instance-${instanceContext.instanceId}-${configHash}-${uuidv4()}`;
}
// Standard UUIDv4
return uuidv4();
}
/** /**
* Get session metrics for monitoring * Get session metrics for monitoring
*/ */
@@ -556,32 +753,160 @@ export class SingleSessionHTTPServer {
this.updateSessionAccess(sessionId); this.updateSessionAccess(sessionId);
} else { } else {
// Invalid request - no session ID and not an initialize request // Handle unknown session ID - check if we can restore it
const errorDetails = { if (sessionId) {
hasSessionId: !!sessionId, // REQ-8: Validate session ID format FIRST (security)
isInitialize: isInitialize, if (!this.isValidSessionId(sessionId)) {
sessionIdValid: sessionId ? this.isValidSessionId(sessionId) : false, logger.warn('handleRequest: Invalid session ID format rejected', {
sessionExists: sessionId ? !!this.transports[sessionId] : false sessionId: sessionId.substring(0, 20)
}; });
res.status(400).json({
logger.warn('handleRequest: Invalid request - no session ID and not initialize', errorDetails); jsonrpc: '2.0',
error: {
let errorMessage = 'Bad Request: No valid session ID provided and not an initialize request'; code: -32602,
if (sessionId && !this.isValidSessionId(sessionId)) { message: 'Invalid session ID format'
errorMessage = 'Bad Request: Invalid session ID format'; },
} else if (sessionId && !this.transports[sessionId]) { id: req.body?.id || null
errorMessage = 'Bad Request: Session not found or expired'; });
return;
}
// REQ-1: Try session restoration if hook provided
if (this.onSessionNotFound) {
logger.info('Attempting session restoration', { sessionId });
try {
// Call restoration hook with timeout
const restoredContext = await Promise.race([
this.onSessionNotFound(sessionId),
this.timeout(this.sessionRestorationTimeout)
]);
// Handle both null and undefined defensively
// Both indicate the hook declined to restore the session
if (restoredContext === null || restoredContext === undefined) {
logger.info('Session restoration declined by hook', {
sessionId,
returnValue: restoredContext === null ? 'null' : 'undefined'
});
res.status(400).json({
jsonrpc: '2.0',
error: {
code: -32000,
message: 'Session not found or expired'
},
id: req.body?.id || null
});
return;
}
// Validate the context returned by the hook
const validation = validateInstanceContext(restoredContext);
if (!validation.valid) {
logger.error('Invalid context returned from restoration hook', {
sessionId,
errors: validation.errors
});
res.status(400).json({
jsonrpc: '2.0',
error: {
code: -32000,
message: 'Invalid session context'
},
id: req.body?.id || null
});
return;
}
// REQ-2: Create session (idempotent)
logger.info('Session restoration successful, creating session', {
sessionId,
instanceId: restoredContext.instanceId
});
this.createSession(restoredContext, sessionId);
// Verify session was created
if (!this.transports[sessionId]) {
logger.error('Session creation failed after restoration', { sessionId });
res.status(500).json({
jsonrpc: '2.0',
error: {
code: -32603,
message: 'Session creation failed'
},
id: req.body?.id || null
});
return;
}
// Use the restored session
transport = this.transports[sessionId];
logger.info('Using restored session transport', { sessionId });
} catch (error) {
// Handle timeout
if (error instanceof Error && error.name === 'TimeoutError') {
logger.error('Session restoration timeout', {
sessionId,
timeout: this.sessionRestorationTimeout
});
res.status(408).json({
jsonrpc: '2.0',
error: {
code: -32000,
message: 'Session restoration timeout'
},
id: req.body?.id || null
});
return;
}
// Handle other errors
logger.error('Session restoration failed', {
sessionId,
error: error instanceof Error ? error.message : String(error)
});
res.status(500).json({
jsonrpc: '2.0',
error: {
code: -32603,
message: 'Session restoration failed'
},
id: req.body?.id || null
});
return;
}
} else {
// No restoration hook - session not found
logger.warn('Session not found and no restoration hook configured', {
sessionId
});
res.status(400).json({
jsonrpc: '2.0',
error: {
code: -32000,
message: 'Session not found or expired'
},
id: req.body?.id || null
});
return;
}
} else {
// No session ID and not initialize - invalid request
logger.warn('handleRequest: Invalid request - no session ID and not initialize', {
isInitialize
});
res.status(400).json({
jsonrpc: '2.0',
error: {
code: -32000,
message: 'Bad Request: No valid session ID provided and not an initialize request'
},
id: req.body?.id || null
});
return;
} }
res.status(400).json({
jsonrpc: '2.0',
error: {
code: -32000,
message: errorMessage
},
id: req.body?.id || null
});
return;
} }
// Handle request with the transport // Handle request with the transport
@@ -1360,9 +1685,9 @@ export class SingleSessionHTTPServer {
/** /**
* Get current session info (for testing/debugging) * Get current session info (for testing/debugging)
*/ */
getSessionInfo(): { getSessionInfo(): {
active: boolean; active: boolean;
sessionId?: string; sessionId?: string;
age?: number; age?: number;
sessions?: { sessions?: {
total: number; total: number;
@@ -1373,10 +1698,10 @@ export class SingleSessionHTTPServer {
}; };
} { } {
const metrics = this.getSessionMetrics(); const metrics = this.getSessionMetrics();
// Legacy SSE session info // Legacy SSE session info
if (!this.session) { if (!this.session) {
return { return {
active: false, active: false,
sessions: { sessions: {
total: metrics.totalSessions, total: metrics.totalSessions,
@@ -1387,7 +1712,7 @@ export class SingleSessionHTTPServer {
} }
}; };
} }
return { return {
active: true, active: true,
sessionId: this.session.sessionId, sessionId: this.session.sessionId,
@@ -1401,6 +1726,213 @@ export class SingleSessionHTTPServer {
} }
}; };
} }
/**
* Get all active session IDs (Phase 2 - REQ-5)
* Useful for periodic backup to database
*
* @returns Array of active session IDs
* @since 2.19.0
*
* @example
* ```typescript
* const sessionIds = server.getActiveSessions();
* console.log(`Active sessions: ${sessionIds.length}`);
* ```
*/
getActiveSessions(): string[] {
return Object.keys(this.transports);
}
/**
* Get session state for persistence (Phase 2 - REQ-5)
* Returns null if session doesn't exist
*
* @param sessionId - The session ID to retrieve state for
* @returns Session state or null if not found
* @since 2.19.0
*
* @example
* ```typescript
* const state = server.getSessionState('session-123');
* if (state) {
* await database.saveSession(state);
* }
* ```
*/
getSessionState(sessionId: string): SessionState | null {
// Check if session exists
if (!this.transports[sessionId]) {
return null;
}
const metadata = this.sessionMetadata[sessionId];
const instanceContext = this.sessionContexts[sessionId];
// Defensive check - session should have metadata
if (!metadata) {
logger.warn('Session exists but missing metadata', { sessionId });
return null;
}
// Calculate expiration time
const expiresAt = new Date(metadata.lastAccess.getTime() + this.sessionTimeout);
return {
sessionId,
instanceContext: instanceContext || {
n8nApiUrl: process.env.N8N_API_URL,
n8nApiKey: process.env.N8N_API_KEY,
instanceId: process.env.N8N_INSTANCE_ID
},
createdAt: metadata.createdAt,
lastAccess: metadata.lastAccess,
expiresAt,
metadata: instanceContext?.metadata
};
}
/**
* Get all session states (Phase 2 - REQ-5)
* Useful for bulk backup operations
*
* @returns Array of all session states
* @since 2.19.0
*
* @example
* ```typescript
* // Periodic backup every 5 minutes
* setInterval(async () => {
* const states = server.getAllSessionStates();
* for (const state of states) {
* await database.upsertSession(state);
* }
* }, 300000);
* ```
*/
getAllSessionStates(): SessionState[] {
const sessionIds = this.getActiveSessions();
const states: SessionState[] = [];
for (const sessionId of sessionIds) {
const state = this.getSessionState(sessionId);
if (state) {
states.push(state);
}
}
return states;
}
/**
* Manually restore a session (Phase 2 - REQ-5)
* Creates a session with the given ID and instance context
* Idempotent - returns true even if session already exists
*
* @param sessionId - The session ID to restore
* @param instanceContext - Instance configuration for the session
* @returns true if session was created or already exists, false on validation error
* @since 2.19.0
*
* @example
* ```typescript
* // Restore session from database
* const restored = server.manuallyRestoreSession(
* 'session-123',
* { n8nApiUrl: '...', n8nApiKey: '...', instanceId: 'user-456' }
* );
* console.log(`Session restored: ${restored}`);
* ```
*/
manuallyRestoreSession(sessionId: string, instanceContext: InstanceContext): boolean {
try {
// Validate session ID format
if (!this.isValidSessionId(sessionId)) {
logger.error('Invalid session ID format in manual restoration', { sessionId });
return false;
}
// Validate instance context
const validation = validateInstanceContext(instanceContext);
if (!validation.valid) {
logger.error('Invalid instance context in manual restoration', {
sessionId,
errors: validation.errors
});
return false;
}
// Create session (idempotent - returns existing if already exists)
this.createSession(instanceContext, sessionId);
logger.info('Session manually restored', {
sessionId,
instanceId: instanceContext.instanceId
});
return true;
} catch (error) {
logger.error('Failed to manually restore session', {
sessionId,
error: error instanceof Error ? error.message : String(error)
});
return false;
}
}
/**
* Manually delete a session (Phase 2 - REQ-5)
* Removes the session and cleans up all resources
*
* @param sessionId - The session ID to delete
* @returns true if session was deleted, false if session didn't exist
* @since 2.19.0
*
* @example
* ```typescript
* // Delete expired sessions
* const deleted = server.manuallyDeleteSession('session-123');
* if (deleted) {
* console.log('Session deleted successfully');
* }
* ```
*/
manuallyDeleteSession(sessionId: string): boolean {
// Check if session exists
if (!this.transports[sessionId]) {
logger.debug('Session not found for manual deletion', { sessionId });
return false;
}
// CRITICAL: Delete session data synchronously for unit tests
// Close transport asynchronously in background, but remove from maps immediately
try {
// Close transport asynchronously (non-blocking)
if (this.transports[sessionId]) {
this.transports[sessionId].close().catch(error => {
logger.warn('Error closing transport during manual deletion', {
sessionId,
error: error instanceof Error ? error.message : String(error)
});
});
}
// Remove session data immediately (synchronous)
delete this.transports[sessionId];
delete this.servers[sessionId];
delete this.sessionMetadata[sessionId];
delete this.sessionContexts[sessionId];
logger.info('Session manually deleted', { sessionId });
return true;
} catch (error) {
logger.error('Error during manual session deletion', {
sessionId,
error: error instanceof Error ? error.message : String(error)
});
return false;
}
}
} }
// Start if called directly // Start if called directly

View File

@@ -19,6 +19,13 @@ export {
isInstanceContext isInstanceContext
} from './types/instance-context'; } from './types/instance-context';
// Session restoration types (v2.19.0)
export type {
SessionRestoreHook,
SessionRestorationOptions,
SessionState
} from './types/session-restoration';
// Re-export MCP SDK types for convenience // Re-export MCP SDK types for convenience
export type { export type {
Tool, Tool,

View File

@@ -9,6 +9,7 @@ import { Request, Response } from 'express';
import { SingleSessionHTTPServer } from './http-server-single-session'; import { SingleSessionHTTPServer } from './http-server-single-session';
import { logger } from './utils/logger'; import { logger } from './utils/logger';
import { InstanceContext } from './types/instance-context'; import { InstanceContext } from './types/instance-context';
import { SessionRestoreHook, SessionState } from './types/session-restoration';
export interface EngineHealth { export interface EngineHealth {
status: 'healthy' | 'unhealthy'; status: 'healthy' | 'unhealthy';
@@ -25,6 +26,22 @@ export interface EngineHealth {
export interface EngineOptions { export interface EngineOptions {
sessionTimeout?: number; sessionTimeout?: number;
logLevel?: 'error' | 'warn' | 'info' | 'debug'; logLevel?: 'error' | 'warn' | 'info' | 'debug';
/**
* Session restoration hook for multi-tenant persistence
* Called when a client tries to use an unknown session ID
* Return instance context to restore the session, or null to reject
*
* @since 2.19.0
*/
onSessionNotFound?: SessionRestoreHook;
/**
* Maximum time to wait for session restoration (milliseconds)
* @default 5000 (5 seconds)
* @since 2.19.0
*/
sessionRestorationTimeout?: number;
} }
export class N8NMCPEngine { export class N8NMCPEngine {
@@ -32,9 +49,9 @@ export class N8NMCPEngine {
private startTime: Date; private startTime: Date;
constructor(options: EngineOptions = {}) { constructor(options: EngineOptions = {}) {
this.server = new SingleSessionHTTPServer(); this.server = new SingleSessionHTTPServer(options);
this.startTime = new Date(); this.startTime = new Date();
if (options.logLevel) { if (options.logLevel) {
process.env.LOG_LEVEL = options.logLevel; process.env.LOG_LEVEL = options.logLevel;
} }
@@ -97,7 +114,7 @@ export class N8NMCPEngine {
total: Math.round(memoryUsage.heapTotal / 1024 / 1024), total: Math.round(memoryUsage.heapTotal / 1024 / 1024),
unit: 'MB' unit: 'MB'
}, },
version: '2.3.2' version: '2.19.0'
}; };
} catch (error) { } catch (error) {
logger.error('Health check failed:', error); logger.error('Health check failed:', error);
@@ -106,7 +123,7 @@ export class N8NMCPEngine {
uptime: 0, uptime: 0,
sessionActive: false, sessionActive: false,
memoryUsage: { used: 0, total: 0, unit: 'MB' }, memoryUsage: { used: 0, total: 0, unit: 'MB' },
version: '2.3.2' version: '2.19.0'
}; };
} }
} }
@@ -118,10 +135,118 @@ export class N8NMCPEngine {
getSessionInfo(): { active: boolean; sessionId?: string; age?: number } { getSessionInfo(): { active: boolean; sessionId?: string; age?: number } {
return this.server.getSessionInfo(); return this.server.getSessionInfo();
} }
/**
* Get all active session IDs (Phase 2 - REQ-5)
* Returns array of currently active session IDs
*
* @returns Array of session IDs
* @since 2.19.0
*
* @example
* ```typescript
* const engine = new N8NMCPEngine();
* const sessionIds = engine.getActiveSessions();
* console.log(`Active sessions: ${sessionIds.length}`);
* ```
*/
getActiveSessions(): string[] {
return this.server.getActiveSessions();
}
/**
* Get session state for a specific session (Phase 2 - REQ-5)
* Returns session state or null if session doesn't exist
*
* @param sessionId - The session ID to get state for
* @returns SessionState object or null
* @since 2.19.0
*
* @example
* ```typescript
* const state = engine.getSessionState('session-123');
* if (state) {
* // Save to database
* await db.saveSession(state);
* }
* ```
*/
getSessionState(sessionId: string): SessionState | null {
return this.server.getSessionState(sessionId);
}
/**
* Get all session states (Phase 2 - REQ-5)
* Returns array of all active session states for bulk backup
*
* @returns Array of SessionState objects
* @since 2.19.0
*
* @example
* ```typescript
* // Periodic backup every 5 minutes
* setInterval(async () => {
* const states = engine.getAllSessionStates();
* for (const state of states) {
* await database.upsertSession(state);
* }
* }, 300000);
* ```
*/
getAllSessionStates(): SessionState[] {
return this.server.getAllSessionStates();
}
/**
* Manually restore a session (Phase 2 - REQ-5)
* Creates a session with the given ID and instance context
*
* @param sessionId - The session ID to restore
* @param instanceContext - Instance configuration
* @returns true if session was restored successfully, false otherwise
* @since 2.19.0
*
* @example
* ```typescript
* // Restore session from database
* const session = await db.loadSession('session-123');
* if (session) {
* const restored = engine.restoreSession(
* session.sessionId,
* session.instanceContext
* );
* console.log(`Restored: ${restored}`);
* }
* ```
*/
restoreSession(sessionId: string, instanceContext: InstanceContext): boolean {
return this.server.manuallyRestoreSession(sessionId, instanceContext);
}
/**
* Manually delete a session (Phase 2 - REQ-5)
* Removes the session and cleans up resources
*
* @param sessionId - The session ID to delete
* @returns true if session was deleted, false if not found
* @since 2.19.0
*
* @example
* ```typescript
* // Delete expired session
* const deleted = engine.deleteSession('session-123');
* if (deleted) {
* await db.deleteSession('session-123');
* }
* ```
*/
deleteSession(sessionId: string): boolean {
return this.server.manuallyDeleteSession(sessionId);
}
/** /**
* Graceful shutdown for service lifecycle * Graceful shutdown for service lifecycle
* *
* @example * @example
* process.on('SIGTERM', async () => { * process.on('SIGTERM', async () => {
* await engine.shutdown(); * await engine.shutdown();

View File

@@ -0,0 +1,111 @@
/**
* Session Restoration Types
*
* Defines types for session persistence and restoration functionality.
* Enables multi-tenant backends to restore sessions after container restarts.
*
* @since 2.19.0
*/
import { InstanceContext } from './instance-context';
/**
* Session restoration hook callback
*
* Called when a client tries to use an unknown session ID.
* The backend can load session state from external storage (database, Redis, etc.)
* and return the instance context to recreate the session.
*
* @param sessionId - The session ID that was not found in memory
* @returns Instance context to restore the session, or null if session should not be restored
*
* @example
* ```typescript
* const engine = new N8NMCPEngine({
* onSessionNotFound: async (sessionId) => {
* // Load from database
* const session = await db.loadSession(sessionId);
* if (!session || session.expired) return null;
* return session.instanceContext;
* }
* });
* ```
*/
export type SessionRestoreHook = (sessionId: string) => Promise<InstanceContext | null>;
/**
* Session restoration configuration options
*
* @since 2.19.0
*/
export interface SessionRestorationOptions {
/**
* Session timeout in milliseconds
* After this period of inactivity, sessions are expired and cleaned up
* @default 1800000 (30 minutes)
*/
sessionTimeout?: number;
/**
* Maximum time to wait for session restoration hook to complete
* If the hook takes longer than this, the request will fail with 408 Request Timeout
* @default 5000 (5 seconds)
*/
sessionRestorationTimeout?: number;
/**
* Hook called when a client tries to use an unknown session ID
* Return instance context to restore the session, or null to reject
*
* @param sessionId - The session ID that was not found
* @returns Instance context for restoration, or null
*
* Error handling:
* - Hook throws exception → 500 Internal Server Error
* - Hook times out → 408 Request Timeout
* - Hook returns null → 400 Bad Request (session not found)
* - Hook returns invalid context → 400 Bad Request (invalid context)
*/
onSessionNotFound?: SessionRestoreHook;
}
/**
* Session state for persistence
* Contains all information needed to restore a session after restart
*
* @since 2.19.0
*/
export interface SessionState {
/**
* Unique session identifier
*/
sessionId: string;
/**
* Instance-specific configuration
* Contains n8n API credentials and instance ID
*/
instanceContext: InstanceContext;
/**
* When the session was created
*/
createdAt: Date;
/**
* Last time the session was accessed
* Used for TTL-based expiration
*/
lastAccess: Date;
/**
* When the session will expire
* Calculated from lastAccess + sessionTimeout
*/
expiresAt: Date;
/**
* Optional metadata for application-specific use
*/
metadata?: Record<string, any>;
}

View File

@@ -0,0 +1,752 @@
-- ============================================================================
-- N8N-MCP Telemetry Aggregation & Automated Pruning System
-- ============================================================================
-- Purpose: Create aggregation tables and automated cleanup to maintain
-- database under 500MB free tier limit while preserving insights
--
-- Strategy: Aggregate → Delete → Retain only recent raw events
-- Expected savings: ~120 MB (from 265 MB → ~145 MB steady state)
-- ============================================================================
-- ============================================================================
-- PART 1: AGGREGATION TABLES
-- ============================================================================
-- Daily tool usage summary (replaces 96 MB of tool_sequence raw data)
CREATE TABLE IF NOT EXISTS telemetry_tool_usage_daily (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
aggregation_date DATE NOT NULL,
user_id TEXT NOT NULL,
tool_name TEXT NOT NULL,
usage_count INTEGER NOT NULL DEFAULT 0,
success_count INTEGER NOT NULL DEFAULT 0,
error_count INTEGER NOT NULL DEFAULT 0,
avg_execution_time_ms NUMERIC,
total_execution_time_ms BIGINT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE(aggregation_date, user_id, tool_name)
);
CREATE INDEX idx_tool_usage_daily_date ON telemetry_tool_usage_daily(aggregation_date DESC);
CREATE INDEX idx_tool_usage_daily_tool ON telemetry_tool_usage_daily(tool_name);
CREATE INDEX idx_tool_usage_daily_user ON telemetry_tool_usage_daily(user_id);
COMMENT ON TABLE telemetry_tool_usage_daily IS 'Daily aggregation of tool usage replacing raw tool_used and tool_sequence events. Saves ~95% storage.';
-- Tool sequence patterns (replaces individual sequences with pattern analysis)
CREATE TABLE IF NOT EXISTS telemetry_tool_patterns (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
aggregation_date DATE NOT NULL,
tool_sequence TEXT[] NOT NULL, -- Array of tool names in order
sequence_hash TEXT NOT NULL, -- Hash of the sequence for grouping
occurrence_count INTEGER NOT NULL DEFAULT 1,
avg_sequence_duration_ms NUMERIC,
success_rate NUMERIC, -- 0.0 to 1.0
common_errors JSONB, -- {"error_type": count}
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE(aggregation_date, sequence_hash)
);
CREATE INDEX idx_tool_patterns_date ON telemetry_tool_patterns(aggregation_date DESC);
CREATE INDEX idx_tool_patterns_hash ON telemetry_tool_patterns(sequence_hash);
COMMENT ON TABLE telemetry_tool_patterns IS 'Common tool usage patterns aggregated daily. Identifies workflows and AI behavior patterns.';
-- Workflow insights (aggregates workflow_created events)
CREATE TABLE IF NOT EXISTS telemetry_workflow_insights (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
aggregation_date DATE NOT NULL,
complexity TEXT, -- simple/medium/complex
node_count_range TEXT, -- 1-5, 6-10, 11-20, 21+
has_trigger BOOLEAN,
has_webhook BOOLEAN,
common_node_types TEXT[], -- Top node types used
workflow_count INTEGER NOT NULL DEFAULT 0,
avg_node_count NUMERIC,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE(aggregation_date, complexity, node_count_range, has_trigger, has_webhook)
);
CREATE INDEX idx_workflow_insights_date ON telemetry_workflow_insights(aggregation_date DESC);
CREATE INDEX idx_workflow_insights_complexity ON telemetry_workflow_insights(complexity);
COMMENT ON TABLE telemetry_workflow_insights IS 'Daily workflow creation patterns. Shows adoption trends without storing duplicate workflows.';
-- Error patterns (keeps error intelligence, deletes raw error events)
CREATE TABLE IF NOT EXISTS telemetry_error_patterns (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
aggregation_date DATE NOT NULL,
error_type TEXT NOT NULL,
error_context TEXT, -- e.g., 'validation', 'workflow_execution', 'node_operation'
occurrence_count INTEGER NOT NULL DEFAULT 1,
affected_users INTEGER NOT NULL DEFAULT 0,
first_seen TIMESTAMPTZ,
last_seen TIMESTAMPTZ,
sample_error_message TEXT, -- Keep one representative message
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE(aggregation_date, error_type, error_context)
);
CREATE INDEX idx_error_patterns_date ON telemetry_error_patterns(aggregation_date DESC);
CREATE INDEX idx_error_patterns_type ON telemetry_error_patterns(error_type);
COMMENT ON TABLE telemetry_error_patterns IS 'Error patterns over time. Preserves debugging insights while pruning raw error events.';
-- Validation insights (aggregates validation_details)
CREATE TABLE IF NOT EXISTS telemetry_validation_insights (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
aggregation_date DATE NOT NULL,
validation_type TEXT, -- 'node', 'workflow', 'expression'
profile TEXT, -- 'minimal', 'runtime', 'ai-friendly', 'strict'
success_count INTEGER NOT NULL DEFAULT 0,
failure_count INTEGER NOT NULL DEFAULT 0,
common_failure_reasons JSONB, -- {"reason": count}
avg_validation_time_ms NUMERIC,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE(aggregation_date, validation_type, profile)
);
CREATE INDEX idx_validation_insights_date ON telemetry_validation_insights(aggregation_date DESC);
CREATE INDEX idx_validation_insights_type ON telemetry_validation_insights(validation_type);
COMMENT ON TABLE telemetry_validation_insights IS 'Validation success/failure patterns. Shows where users struggle without storing every validation event.';
-- ============================================================================
-- PART 2: AGGREGATION FUNCTIONS
-- ============================================================================
-- Function to aggregate tool usage data
CREATE OR REPLACE FUNCTION aggregate_tool_usage(cutoff_date TIMESTAMPTZ)
RETURNS INTEGER AS $$
DECLARE
rows_aggregated INTEGER;
BEGIN
-- Aggregate tool_used events
INSERT INTO telemetry_tool_usage_daily (
aggregation_date,
user_id,
tool_name,
usage_count,
success_count,
error_count,
avg_execution_time_ms,
total_execution_time_ms
)
SELECT
DATE(created_at) as aggregation_date,
user_id,
properties->>'toolName' as tool_name,
COUNT(*) as usage_count,
COUNT(*) FILTER (WHERE (properties->>'success')::boolean = true) as success_count,
COUNT(*) FILTER (WHERE (properties->>'success')::boolean = false OR properties->>'error' IS NOT NULL) as error_count,
AVG((properties->>'executionTime')::numeric) as avg_execution_time_ms,
SUM((properties->>'executionTime')::numeric) as total_execution_time_ms
FROM telemetry_events
WHERE event = 'tool_used'
AND created_at < cutoff_date
AND properties->>'toolName' IS NOT NULL
GROUP BY DATE(created_at), user_id, properties->>'toolName'
ON CONFLICT (aggregation_date, user_id, tool_name)
DO UPDATE SET
usage_count = telemetry_tool_usage_daily.usage_count + EXCLUDED.usage_count,
success_count = telemetry_tool_usage_daily.success_count + EXCLUDED.success_count,
error_count = telemetry_tool_usage_daily.error_count + EXCLUDED.error_count,
total_execution_time_ms = telemetry_tool_usage_daily.total_execution_time_ms + EXCLUDED.total_execution_time_ms,
avg_execution_time_ms = (telemetry_tool_usage_daily.total_execution_time_ms + EXCLUDED.total_execution_time_ms) /
(telemetry_tool_usage_daily.usage_count + EXCLUDED.usage_count),
updated_at = NOW();
GET DIAGNOSTICS rows_aggregated = ROW_COUNT;
RAISE NOTICE 'Aggregated % rows from tool_used events', rows_aggregated;
RETURN rows_aggregated;
END;
$$ LANGUAGE plpgsql;
COMMENT ON FUNCTION aggregate_tool_usage IS 'Aggregates tool_used events into daily summaries before deletion';
-- Function to aggregate tool sequence patterns
CREATE OR REPLACE FUNCTION aggregate_tool_patterns(cutoff_date TIMESTAMPTZ)
RETURNS INTEGER AS $$
DECLARE
rows_aggregated INTEGER;
BEGIN
INSERT INTO telemetry_tool_patterns (
aggregation_date,
tool_sequence,
sequence_hash,
occurrence_count,
avg_sequence_duration_ms,
success_rate
)
SELECT
DATE(created_at) as aggregation_date,
(properties->>'toolSequence')::text[] as tool_sequence,
md5(array_to_string((properties->>'toolSequence')::text[], ',')) as sequence_hash,
COUNT(*) as occurrence_count,
AVG((properties->>'duration')::numeric) as avg_sequence_duration_ms,
AVG(CASE WHEN (properties->>'success')::boolean THEN 1.0 ELSE 0.0 END) as success_rate
FROM telemetry_events
WHERE event = 'tool_sequence'
AND created_at < cutoff_date
AND properties->>'toolSequence' IS NOT NULL
GROUP BY DATE(created_at), (properties->>'toolSequence')::text[]
ON CONFLICT (aggregation_date, sequence_hash)
DO UPDATE SET
occurrence_count = telemetry_tool_patterns.occurrence_count + EXCLUDED.occurrence_count,
avg_sequence_duration_ms = (
(telemetry_tool_patterns.avg_sequence_duration_ms * telemetry_tool_patterns.occurrence_count +
EXCLUDED.avg_sequence_duration_ms * EXCLUDED.occurrence_count) /
(telemetry_tool_patterns.occurrence_count + EXCLUDED.occurrence_count)
),
success_rate = (
(telemetry_tool_patterns.success_rate * telemetry_tool_patterns.occurrence_count +
EXCLUDED.success_rate * EXCLUDED.occurrence_count) /
(telemetry_tool_patterns.occurrence_count + EXCLUDED.occurrence_count)
),
updated_at = NOW();
GET DIAGNOSTICS rows_aggregated = ROW_COUNT;
RAISE NOTICE 'Aggregated % rows from tool_sequence events', rows_aggregated;
RETURN rows_aggregated;
END;
$$ LANGUAGE plpgsql;
COMMENT ON FUNCTION aggregate_tool_patterns IS 'Aggregates tool_sequence events into pattern analysis before deletion';
-- Function to aggregate workflow insights
CREATE OR REPLACE FUNCTION aggregate_workflow_insights(cutoff_date TIMESTAMPTZ)
RETURNS INTEGER AS $$
DECLARE
rows_aggregated INTEGER;
BEGIN
INSERT INTO telemetry_workflow_insights (
aggregation_date,
complexity,
node_count_range,
has_trigger,
has_webhook,
common_node_types,
workflow_count,
avg_node_count
)
SELECT
DATE(created_at) as aggregation_date,
properties->>'complexity' as complexity,
CASE
WHEN (properties->>'nodeCount')::int BETWEEN 1 AND 5 THEN '1-5'
WHEN (properties->>'nodeCount')::int BETWEEN 6 AND 10 THEN '6-10'
WHEN (properties->>'nodeCount')::int BETWEEN 11 AND 20 THEN '11-20'
ELSE '21+'
END as node_count_range,
(properties->>'hasTrigger')::boolean as has_trigger,
(properties->>'hasWebhook')::boolean as has_webhook,
ARRAY[]::text[] as common_node_types, -- Will be populated separately if needed
COUNT(*) as workflow_count,
AVG((properties->>'nodeCount')::numeric) as avg_node_count
FROM telemetry_events
WHERE event = 'workflow_created'
AND created_at < cutoff_date
GROUP BY
DATE(created_at),
properties->>'complexity',
node_count_range,
(properties->>'hasTrigger')::boolean,
(properties->>'hasWebhook')::boolean
ON CONFLICT (aggregation_date, complexity, node_count_range, has_trigger, has_webhook)
DO UPDATE SET
workflow_count = telemetry_workflow_insights.workflow_count + EXCLUDED.workflow_count,
avg_node_count = (
(telemetry_workflow_insights.avg_node_count * telemetry_workflow_insights.workflow_count +
EXCLUDED.avg_node_count * EXCLUDED.workflow_count) /
(telemetry_workflow_insights.workflow_count + EXCLUDED.workflow_count)
),
updated_at = NOW();
GET DIAGNOSTICS rows_aggregated = ROW_COUNT;
RAISE NOTICE 'Aggregated % rows from workflow_created events', rows_aggregated;
RETURN rows_aggregated;
END;
$$ LANGUAGE plpgsql;
COMMENT ON FUNCTION aggregate_workflow_insights IS 'Aggregates workflow_created events into pattern insights before deletion';
-- Function to aggregate error patterns
CREATE OR REPLACE FUNCTION aggregate_error_patterns(cutoff_date TIMESTAMPTZ)
RETURNS INTEGER AS $$
DECLARE
rows_aggregated INTEGER;
BEGIN
INSERT INTO telemetry_error_patterns (
aggregation_date,
error_type,
error_context,
occurrence_count,
affected_users,
first_seen,
last_seen,
sample_error_message
)
SELECT
DATE(created_at) as aggregation_date,
properties->>'errorType' as error_type,
properties->>'context' as error_context,
COUNT(*) as occurrence_count,
COUNT(DISTINCT user_id) as affected_users,
MIN(created_at) as first_seen,
MAX(created_at) as last_seen,
(ARRAY_AGG(properties->>'message' ORDER BY created_at DESC))[1] as sample_error_message
FROM telemetry_events
WHERE event = 'error_occurred'
AND created_at < cutoff_date
GROUP BY DATE(created_at), properties->>'errorType', properties->>'context'
ON CONFLICT (aggregation_date, error_type, error_context)
DO UPDATE SET
occurrence_count = telemetry_error_patterns.occurrence_count + EXCLUDED.occurrence_count,
affected_users = GREATEST(telemetry_error_patterns.affected_users, EXCLUDED.affected_users),
first_seen = LEAST(telemetry_error_patterns.first_seen, EXCLUDED.first_seen),
last_seen = GREATEST(telemetry_error_patterns.last_seen, EXCLUDED.last_seen),
updated_at = NOW();
GET DIAGNOSTICS rows_aggregated = ROW_COUNT;
RAISE NOTICE 'Aggregated % rows from error_occurred events', rows_aggregated;
RETURN rows_aggregated;
END;
$$ LANGUAGE plpgsql;
COMMENT ON FUNCTION aggregate_error_patterns IS 'Aggregates error_occurred events into pattern analysis before deletion';
-- Function to aggregate validation insights
CREATE OR REPLACE FUNCTION aggregate_validation_insights(cutoff_date TIMESTAMPTZ)
RETURNS INTEGER AS $$
DECLARE
rows_aggregated INTEGER;
BEGIN
INSERT INTO telemetry_validation_insights (
aggregation_date,
validation_type,
profile,
success_count,
failure_count,
common_failure_reasons,
avg_validation_time_ms
)
SELECT
DATE(created_at) as aggregation_date,
properties->>'validationType' as validation_type,
properties->>'profile' as profile,
COUNT(*) FILTER (WHERE (properties->>'success')::boolean = true) as success_count,
COUNT(*) FILTER (WHERE (properties->>'success')::boolean = false) as failure_count,
jsonb_object_agg(
COALESCE(properties->>'failureReason', 'unknown'),
COUNT(*)
) FILTER (WHERE (properties->>'success')::boolean = false) as common_failure_reasons,
AVG((properties->>'validationTime')::numeric) as avg_validation_time_ms
FROM telemetry_events
WHERE event = 'validation_details'
AND created_at < cutoff_date
GROUP BY DATE(created_at), properties->>'validationType', properties->>'profile'
ON CONFLICT (aggregation_date, validation_type, profile)
DO UPDATE SET
success_count = telemetry_validation_insights.success_count + EXCLUDED.success_count,
failure_count = telemetry_validation_insights.failure_count + EXCLUDED.failure_count,
updated_at = NOW();
GET DIAGNOSTICS rows_aggregated = ROW_COUNT;
RAISE NOTICE 'Aggregated % rows from validation_details events', rows_aggregated;
RETURN rows_aggregated;
END;
$$ LANGUAGE plpgsql;
COMMENT ON FUNCTION aggregate_validation_insights IS 'Aggregates validation_details events into insights before deletion';
-- ============================================================================
-- PART 3: MASTER AGGREGATION & CLEANUP FUNCTION
-- ============================================================================
CREATE OR REPLACE FUNCTION run_telemetry_aggregation_and_cleanup(
retention_days INTEGER DEFAULT 3
)
RETURNS TABLE(
event_type TEXT,
rows_aggregated INTEGER,
rows_deleted INTEGER,
space_freed_mb NUMERIC
) AS $$
DECLARE
cutoff_date TIMESTAMPTZ;
total_before BIGINT;
total_after BIGINT;
agg_count INTEGER;
del_count INTEGER;
BEGIN
cutoff_date := NOW() - (retention_days || ' days')::INTERVAL;
RAISE NOTICE 'Starting aggregation and cleanup for data older than %', cutoff_date;
-- Get table size before cleanup
SELECT pg_total_relation_size('telemetry_events') INTO total_before;
-- ========================================================================
-- STEP 1: AGGREGATE DATA BEFORE DELETION
-- ========================================================================
-- Tool usage aggregation
SELECT aggregate_tool_usage(cutoff_date) INTO agg_count;
SELECT COUNT(*) INTO del_count FROM telemetry_events
WHERE event = 'tool_used' AND created_at < cutoff_date;
event_type := 'tool_used';
rows_aggregated := agg_count;
rows_deleted := del_count;
RETURN NEXT;
-- Tool patterns aggregation
SELECT aggregate_tool_patterns(cutoff_date) INTO agg_count;
SELECT COUNT(*) INTO del_count FROM telemetry_events
WHERE event = 'tool_sequence' AND created_at < cutoff_date;
event_type := 'tool_sequence';
rows_aggregated := agg_count;
rows_deleted := del_count;
RETURN NEXT;
-- Workflow insights aggregation
SELECT aggregate_workflow_insights(cutoff_date) INTO agg_count;
SELECT COUNT(*) INTO del_count FROM telemetry_events
WHERE event = 'workflow_created' AND created_at < cutoff_date;
event_type := 'workflow_created';
rows_aggregated := agg_count;
rows_deleted := del_count;
RETURN NEXT;
-- Error patterns aggregation
SELECT aggregate_error_patterns(cutoff_date) INTO agg_count;
SELECT COUNT(*) INTO del_count FROM telemetry_events
WHERE event = 'error_occurred' AND created_at < cutoff_date;
event_type := 'error_occurred';
rows_aggregated := agg_count;
rows_deleted := del_count;
RETURN NEXT;
-- Validation insights aggregation
SELECT aggregate_validation_insights(cutoff_date) INTO agg_count;
SELECT COUNT(*) INTO del_count FROM telemetry_events
WHERE event = 'validation_details' AND created_at < cutoff_date;
event_type := 'validation_details';
rows_aggregated := agg_count;
rows_deleted := del_count;
RETURN NEXT;
-- ========================================================================
-- STEP 2: DELETE OLD RAW EVENTS (now that they're aggregated)
-- ========================================================================
DELETE FROM telemetry_events
WHERE created_at < cutoff_date
AND event IN (
'tool_used',
'tool_sequence',
'workflow_created',
'validation_details',
'session_start',
'search_query',
'diagnostic_completed',
'health_check_completed'
);
-- Keep error_occurred for 30 days (extended retention for debugging)
DELETE FROM telemetry_events
WHERE created_at < (NOW() - INTERVAL '30 days')
AND event = 'error_occurred';
-- ========================================================================
-- STEP 3: CLEAN UP OLD WORKFLOWS (keep only unique patterns)
-- ========================================================================
-- Delete duplicate workflows older than retention period
WITH workflow_duplicates AS (
SELECT id
FROM (
SELECT id,
ROW_NUMBER() OVER (
PARTITION BY workflow_hash
ORDER BY created_at DESC
) as rn
FROM telemetry_workflows
WHERE created_at < cutoff_date
) sub
WHERE rn > 1
)
DELETE FROM telemetry_workflows
WHERE id IN (SELECT id FROM workflow_duplicates);
GET DIAGNOSTICS del_count = ROW_COUNT;
event_type := 'duplicate_workflows';
rows_aggregated := 0;
rows_deleted := del_count;
RETURN NEXT;
-- ========================================================================
-- STEP 4: VACUUM TO RECLAIM SPACE
-- ========================================================================
-- Note: VACUUM cannot be run inside a function, must be run separately
-- The cron job will handle this
-- Get table size after cleanup
SELECT pg_total_relation_size('telemetry_events') INTO total_after;
-- Summary row
event_type := 'TOTAL_SPACE_FREED';
rows_aggregated := 0;
rows_deleted := 0;
space_freed_mb := ROUND((total_before - total_after)::NUMERIC / 1024 / 1024, 2);
RETURN NEXT;
RAISE NOTICE 'Cleanup complete. Space freed: % MB', space_freed_mb;
END;
$$ LANGUAGE plpgsql;
COMMENT ON FUNCTION run_telemetry_aggregation_and_cleanup IS 'Master function to aggregate data and delete old events. Run daily via cron.';
-- ============================================================================
-- PART 4: SUPABASE CRON JOB SETUP
-- ============================================================================
-- Enable pg_cron extension (if not already enabled)
CREATE EXTENSION IF NOT EXISTS pg_cron;
-- Schedule daily cleanup at 2 AM UTC (low traffic time)
-- This will aggregate data older than 3 days and then delete it
SELECT cron.schedule(
'telemetry-daily-cleanup',
'0 2 * * *', -- Every day at 2 AM UTC
$$
SELECT run_telemetry_aggregation_and_cleanup(3);
VACUUM ANALYZE telemetry_events;
VACUUM ANALYZE telemetry_workflows;
$$
);
COMMENT ON EXTENSION pg_cron IS 'Cron job scheduler for automated telemetry cleanup';
-- ============================================================================
-- PART 5: MONITORING & ALERTING
-- ============================================================================
-- Function to check database size and alert if approaching limit
CREATE OR REPLACE FUNCTION check_database_size()
RETURNS TABLE(
total_size_mb NUMERIC,
events_size_mb NUMERIC,
workflows_size_mb NUMERIC,
aggregates_size_mb NUMERIC,
percent_of_limit NUMERIC,
days_until_full NUMERIC,
status TEXT
) AS $$
DECLARE
db_size BIGINT;
events_size BIGINT;
workflows_size BIGINT;
agg_size BIGINT;
limit_mb CONSTANT NUMERIC := 500; -- Free tier limit
growth_rate_mb_per_day NUMERIC;
BEGIN
-- Get current sizes
SELECT pg_database_size(current_database()) INTO db_size;
SELECT pg_total_relation_size('telemetry_events') INTO events_size;
SELECT pg_total_relation_size('telemetry_workflows') INTO workflows_size;
SELECT COALESCE(
pg_total_relation_size('telemetry_tool_usage_daily') +
pg_total_relation_size('telemetry_tool_patterns') +
pg_total_relation_size('telemetry_workflow_insights') +
pg_total_relation_size('telemetry_error_patterns') +
pg_total_relation_size('telemetry_validation_insights'),
0
) INTO agg_size;
total_size_mb := ROUND(db_size::NUMERIC / 1024 / 1024, 2);
events_size_mb := ROUND(events_size::NUMERIC / 1024 / 1024, 2);
workflows_size_mb := ROUND(workflows_size::NUMERIC / 1024 / 1024, 2);
aggregates_size_mb := ROUND(agg_size::NUMERIC / 1024 / 1024, 2);
percent_of_limit := ROUND((total_size_mb / limit_mb) * 100, 1);
-- Estimate growth rate (simple 7-day average)
SELECT ROUND(
(SELECT COUNT(*) FROM telemetry_events WHERE created_at > NOW() - INTERVAL '7 days')::NUMERIC
* (pg_column_size(telemetry_events.*))::NUMERIC
/ 7 / 1024 / 1024, 2
) INTO growth_rate_mb_per_day
FROM telemetry_events LIMIT 1;
IF growth_rate_mb_per_day > 0 THEN
days_until_full := ROUND((limit_mb - total_size_mb) / growth_rate_mb_per_day, 0);
ELSE
days_until_full := NULL;
END IF;
-- Determine status
IF percent_of_limit >= 90 THEN
status := 'CRITICAL - Immediate action required';
ELSIF percent_of_limit >= 75 THEN
status := 'WARNING - Monitor closely';
ELSIF percent_of_limit >= 50 THEN
status := 'CAUTION - Plan optimization';
ELSE
status := 'HEALTHY';
END IF;
RETURN NEXT;
END;
$$ LANGUAGE plpgsql;
COMMENT ON FUNCTION check_database_size IS 'Monitor database size and growth. Run daily or on-demand.';
-- ============================================================================
-- PART 6: EMERGENCY CLEANUP (ONE-TIME USE)
-- ============================================================================
-- Emergency function to immediately free up space (use if critical)
CREATE OR REPLACE FUNCTION emergency_cleanup()
RETURNS TABLE(
action TEXT,
rows_deleted INTEGER,
space_freed_mb NUMERIC
) AS $$
DECLARE
size_before BIGINT;
size_after BIGINT;
del_count INTEGER;
BEGIN
SELECT pg_total_relation_size('telemetry_events') INTO size_before;
-- Aggregate everything older than 7 days
PERFORM run_telemetry_aggregation_and_cleanup(7);
-- Delete all non-critical events older than 7 days
DELETE FROM telemetry_events
WHERE created_at < NOW() - INTERVAL '7 days'
AND event NOT IN ('error_occurred', 'workflow_validation_failed');
GET DIAGNOSTICS del_count = ROW_COUNT;
action := 'Deleted non-critical events > 7 days';
rows_deleted := del_count;
RETURN NEXT;
-- Delete error events older than 14 days
DELETE FROM telemetry_events
WHERE created_at < NOW() - INTERVAL '14 days'
AND event = 'error_occurred';
GET DIAGNOSTICS del_count = ROW_COUNT;
action := 'Deleted error events > 14 days';
rows_deleted := del_count;
RETURN NEXT;
-- Delete duplicate workflows
WITH workflow_duplicates AS (
SELECT id
FROM (
SELECT id,
ROW_NUMBER() OVER (
PARTITION BY workflow_hash
ORDER BY created_at DESC
) as rn
FROM telemetry_workflows
) sub
WHERE rn > 1
)
DELETE FROM telemetry_workflows
WHERE id IN (SELECT id FROM workflow_duplicates);
GET DIAGNOSTICS del_count = ROW_COUNT;
action := 'Deleted duplicate workflows';
rows_deleted := del_count;
RETURN NEXT;
-- VACUUM will be run separately
SELECT pg_total_relation_size('telemetry_events') INTO size_after;
action := 'TOTAL (run VACUUM separately)';
rows_deleted := 0;
space_freed_mb := ROUND((size_before - size_after)::NUMERIC / 1024 / 1024, 2);
RETURN NEXT;
RAISE NOTICE 'Emergency cleanup complete. Run VACUUM FULL for maximum space recovery.';
END;
$$ LANGUAGE plpgsql;
COMMENT ON FUNCTION emergency_cleanup IS 'Emergency cleanup when database is near capacity. Run once, then VACUUM.';
-- ============================================================================
-- USAGE INSTRUCTIONS
-- ============================================================================
/*
SETUP (Run once):
1. Execute this entire script in Supabase SQL Editor
2. Verify cron job is scheduled:
SELECT * FROM cron.job;
3. Run initial monitoring:
SELECT * FROM check_database_size();
DAILY OPERATIONS (Automatic):
- Cron job runs daily at 2 AM UTC
- Aggregates data older than 3 days
- Deletes raw events after aggregation
- Vacuums tables to reclaim space
MONITORING:
-- Check current database health
SELECT * FROM check_database_size();
-- View aggregated insights
SELECT * FROM telemetry_tool_usage_daily ORDER BY aggregation_date DESC LIMIT 100;
SELECT * FROM telemetry_tool_patterns ORDER BY occurrence_count DESC LIMIT 20;
SELECT * FROM telemetry_error_patterns ORDER BY occurrence_count DESC LIMIT 20;
MANUAL CLEANUP (if needed):
-- Run cleanup manually (3-day retention)
SELECT * FROM run_telemetry_aggregation_and_cleanup(3);
VACUUM ANALYZE telemetry_events;
-- Emergency cleanup (7-day retention)
SELECT * FROM emergency_cleanup();
VACUUM FULL telemetry_events;
VACUUM FULL telemetry_workflows;
TUNING:
-- Adjust retention period (e.g., 5 days instead of 3)
SELECT cron.schedule(
'telemetry-daily-cleanup',
'0 2 * * *',
$$ SELECT run_telemetry_aggregation_and_cleanup(5); VACUUM ANALYZE telemetry_events; $$
);
EXPECTED RESULTS:
- Initial run: ~120 MB space freed (265 MB → ~145 MB)
- Steady state: ~90-120 MB total database size
- Growth rate: ~2-3 MB/day (down from 7.7 MB/day)
- Headroom: 70-80% of free tier limit available
*/

View File

@@ -0,0 +1,961 @@
# n8n-MCP Telemetry Database Pruning Strategy
**Analysis Date:** 2025-10-10
**Current Database Size:** 265 MB (telemetry_events: 199 MB, telemetry_workflows: 66 MB)
**Free Tier Limit:** 500 MB
**Projected 4-Week Size:** 609 MB (exceeds limit by 109 MB)
---
## Executive Summary
**Critical Finding:** At current growth rate (56.75% of data from last 7 days), we will exceed the 500 MB free tier limit in approximately 2 weeks. Implementing a 7-day retention policy can immediately save 36.5 MB (37.6%) and prevent database overflow.
**Key Insights:**
- 641,487 event records consuming 199 MB
- 17,247 workflow records consuming 66 MB
- Daily growth rate: ~7-8 MB/day for events
- 43.25% of data is older than 7 days but provides diminishing value
**Immediate Action Required:** Implement automated pruning to maintain database under 500 MB.
---
## 1. Current State Assessment
### Database Size and Distribution
| Table | Rows | Current Size | Growth Rate | Bytes/Row |
|-------|------|--------------|-------------|-----------|
| telemetry_events | 641,487 | 199 MB | 56.66% from last 7d | 325 |
| telemetry_workflows | 17,247 | 66 MB | 60.09% from last 7d | 4,013 |
| **TOTAL** | **658,734** | **265 MB** | **56.75% from last 7d** | **403** |
### Event Type Distribution
| Event Type | Count | % of Total | Storage | Avg Props Size | Oldest Event |
|------------|-------|-----------|---------|----------------|--------------|
| tool_sequence | 362,170 | 56.4% | 67 MB | 194 bytes | 2025-09-26 |
| tool_used | 191,659 | 29.9% | 14 MB | 77 bytes | 2025-09-26 |
| validation_details | 36,266 | 5.7% | 11 MB | 329 bytes | 2025-09-26 |
| workflow_created | 23,151 | 3.6% | 2.6 MB | 115 bytes | 2025-09-26 |
| session_start | 12,575 | 2.0% | 1.2 MB | 101 bytes | 2025-09-26 |
| workflow_validation_failed | 9,739 | 1.5% | 314 KB | 33 bytes | 2025-09-26 |
| error_occurred | 4,935 | 0.8% | 626 KB | 130 bytes | 2025-09-26 |
| search_query | 974 | 0.2% | 106 KB | 112 bytes | 2025-09-26 |
| Other | 18 | <0.1% | 5 KB | Various | Recent |
### Growth Pattern Analysis
**Daily Data Accumulation (Last 15 Days):**
| Date | Events/Day | Daily Size | Cumulative Size |
|------|-----------|------------|-----------------|
| 2025-10-10 | 28,457 | 4.3 MB | 97 MB |
| 2025-10-09 | 54,717 | 8.2 MB | 93 MB |
| 2025-10-08 | 52,901 | 7.9 MB | 85 MB |
| 2025-10-07 | 52,538 | 8.1 MB | 77 MB |
| 2025-10-06 | 51,401 | 7.8 MB | 69 MB |
| 2025-10-05 | 50,528 | 7.9 MB | 61 MB |
**Average Daily Growth:** ~7.7 MB/day
**Weekly Growth:** ~54 MB/week
**Projected to hit 500 MB limit:** ~17 days (late October 2025)
### Workflow Data Distribution
| Complexity | Count | % | Avg Nodes | Avg JSON Size | Estimated Size |
|-----------|-------|---|-----------|---------------|----------------|
| Simple | 12,923 | 77.6% | 5.48 | 2,122 bytes | 20 MB |
| Medium | 3,708 | 22.3% | 13.93 | 4,458 bytes | 12 MB |
| Complex | 616 | 0.1% | 26.62 | 7,909 bytes | 3.2 MB |
**Key Finding:** No duplicate workflow hashes found - each workflow is unique (good data quality).
---
## 2. Data Value Classification
### TIER 1: Critical - Keep Indefinitely
**Error Patterns (error_occurred)**
- **Why:** Essential for identifying systemic issues and regression detection
- **Volume:** 4,935 events (626 KB)
- **Recommendation:** Keep all errors with aggregated summaries for older data
- **Retention:** Detailed errors 30 days, aggregated stats indefinitely
**Tool Usage Statistics (Aggregated)**
- **Why:** Product analytics and feature prioritization
- **Recommendation:** Aggregate daily/weekly summaries after 14 days
- **Keep:** Summary tables with tool usage counts, success rates, avg duration
### TIER 2: High Value - Keep 30 Days
**Validation Details (validation_details)**
- **Current:** 36,266 events, 11 MB, avg 329 bytes
- **Why:** Important for understanding validation issues during current development cycle
- **Value Period:** 30 days (covers current version development)
- **After 30d:** Aggregate to summary stats (validation success rate by node type)
**Workflow Creation Patterns (workflow_created)**
- **Current:** 23,151 events, 2.6 MB
- **Why:** Track feature adoption and workflow patterns
- **Value Period:** 30 days for detailed analysis
- **After 30d:** Keep aggregated metrics only
### TIER 3: Medium Value - Keep 14 Days
**Session Data (session_start)**
- **Current:** 12,575 events, 1.2 MB
- **Why:** User engagement tracking
- **Value Period:** 14 days sufficient for engagement analysis
- **Pruning Impact:** 497 KB saved (40% reduction)
**Workflow Validation Failures (workflow_validation_failed)**
- **Current:** 9,739 events, 314 KB
- **Why:** Tracks validation patterns but less detailed than validation_details
- **Value Period:** 14 days
- **Pruning Impact:** 170 KB saved (54% reduction)
### TIER 4: Short-Term Value - Keep 7 Days
**Tool Sequences (tool_sequence)**
- **Current:** 362,170 events, 67 MB (largest table!)
- **Why:** Tracks multi-tool workflows but extremely high volume
- **Value Period:** 7 days for recent pattern analysis
- **Pruning Impact:** 29 MB saved (43% reduction) - HIGHEST IMPACT
- **Rationale:** Tool usage patterns stabilize quickly; older sequences provide diminishing returns
**Tool Usage Events (tool_used)**
- **Current:** 191,659 events, 14 MB
- **Why:** Individual tool executions - can be aggregated
- **Value Period:** 7 days detailed, then aggregate
- **Pruning Impact:** 6.2 MB saved (44% reduction)
**Search Queries (search_query)**
- **Current:** 974 events, 106 KB
- **Why:** Low volume, useful for understanding search patterns
- **Value Period:** 7 days sufficient
- **Pruning Impact:** Minimal (~1 KB)
### TIER 5: Ephemeral - Keep 3 Days
**Diagnostic/Health Checks (diagnostic_completed, health_check_completed)**
- **Current:** 17 events, ~2.5 KB
- **Why:** Operational health checks, only current state matters
- **Value Period:** 3 days
- **Pruning Impact:** Negligible but good hygiene
### Workflow Data Retention Strategy
**telemetry_workflows Table (66 MB):**
- **Simple workflows (5-6 nodes):** Keep 7 days Save 11 MB
- **Medium workflows (13-14 nodes):** Keep 14 days Save 6.7 MB
- **Complex workflows (26+ nodes):** Keep 30 days Save 1.9 MB
- **Total Workflow Savings:** 19.6 MB with tiered retention
**Rationale:** Complex workflows are rarer and more valuable for understanding advanced use cases.
---
## 3. Pruning Recommendations with Space Savings
### Strategy A: Conservative 14-Day Retention (Recommended for Initial Implementation)
| Action | Records Deleted | Space Saved | Risk Level |
|--------|----------------|-------------|------------|
| Delete tool_sequence > 14d | 0 | 0 MB | None - all recent |
| Delete tool_used > 14d | 0 | 0 MB | None - all recent |
| Delete validation_details > 14d | 4,259 | 1.2 MB | Low |
| Delete session_start > 14d | 0 | 0 MB | None - all recent |
| Delete workflows > 14d | 1 | <1 KB | None |
| **TOTAL** | **4,260** | **1.2 MB** | **Low** |
**Assessment:** Minimal immediate impact but data is too recent. Not sufficient to prevent overflow.
### Strategy B: Aggressive 7-Day Retention (RECOMMENDED)
| Action | Records Deleted | Space Saved | Risk Level |
|--------|----------------|-------------|------------|
| Delete tool_sequence > 7d | 155,389 | 29 MB | Low - pattern data |
| Delete tool_used > 7d | 82,827 | 6.2 MB | Low - usage metrics |
| Delete validation_details > 7d | 17,465 | 5.4 MB | Medium - debugging data |
| Delete workflow_created > 7d | 9,106 | 1.0 MB | Low - creation events |
| Delete session_start > 7d | 5,664 | 497 KB | Low - session data |
| Delete error_occurred > 7d | 2,321 | 206 KB | Medium - error history |
| Delete workflow_validation_failed > 7d | 5,269 | 170 KB | Low - validation events |
| Delete workflows > 7d (simple) | 5,146 | 11 MB | Low - simple workflows |
| Delete workflows > 7d (medium) | 1,506 | 6.7 MB | Medium - medium workflows |
| Delete workflows > 7d (complex) | 231 | 1.9 MB | High - complex workflows |
| **TOTAL** | **284,924** | **62.1 MB** | **Medium** |
**New Database Size:** 265 MB - 62.1 MB = **202.9 MB (76.6% of limit)**
**Buffer:** 297 MB remaining (~38 days at current growth rate)
### Strategy C: Hybrid Tiered Retention (OPTIMAL LONG-TERM)
| Event Type | Retention Period | Records Deleted | Space Saved |
|-----------|------------------|----------------|-------------|
| tool_sequence | 7 days | 155,389 | 29 MB |
| tool_used | 7 days | 82,827 | 6.2 MB |
| validation_details | 14 days | 4,259 | 1.2 MB |
| workflow_created | 14 days | 3 | <1 KB |
| session_start | 7 days | 5,664 | 497 KB |
| error_occurred | 30 days (keep all) | 0 | 0 MB |
| workflow_validation_failed | 7 days | 5,269 | 170 KB |
| search_query | 7 days | 10 | 1 KB |
| Workflows (simple) | 7 days | 5,146 | 11 MB |
| Workflows (medium) | 14 days | 0 | 0 MB |
| Workflows (complex) | 30 days (keep all) | 0 | 0 MB |
| **TOTAL** | **Various** | **258,567** | **48.1 MB** |
**New Database Size:** 265 MB - 48.1 MB = **216.9 MB (82% of limit)**
**Buffer:** 283 MB remaining (~36 days at current growth rate)
---
## 4. Additional Optimization Opportunities
### Optimization 1: Properties Field Compression
**Finding:** validation_details events have bloated properties (avg 329 bytes, max 9 KB)
```sql
-- Identify large validation_details records
SELECT id, user_id, created_at, pg_column_size(properties) as size_bytes
FROM telemetry_events
WHERE event = 'validation_details'
AND pg_column_size(properties) > 1000
ORDER BY size_bytes DESC;
-- Result: 417 records > 1KB, 2 records > 5KB
```
**Recommendation:** Truncate verbose error messages in validation_details after 7 days
- Keep error types and counts
- Remove full stack traces and detailed messages
- Estimated savings: 2-3 MB
### Optimization 2: Remove Redundant tool_sequence Data
**Finding:** tool_sequence properties contain mostly null values
```sql
-- Analysis shows all tool_sequence.properties->>'tools' are null
-- 362,170 records storing null in properties field
```
**Recommendation:**
1. Investigate why tool_sequence properties are empty
2. If by design, reduce properties field size or use a flag
3. Potential savings: 10-15 MB if properties field is eliminated
### Optimization 3: Workflow Deduplication by Hash
**Finding:** No duplicate workflow_hash values found (good!)
**Recommendation:** Continue using workflow_hash for future deduplication if needed. No action required.
### Optimization 4: Dead Row Cleanup
**Finding:** telemetry_workflows has 1,591 dead rows (9.5% overhead)
```sql
-- Run VACUUM to reclaim space
VACUUM FULL telemetry_workflows;
-- Expected savings: ~6-7 MB
```
**Recommendation:** Schedule weekly VACUUM operations
### Optimization 5: Index Optimization
**Current indexes consume space but improve query performance**
```sql
-- Check index sizes
SELECT
schemaname, tablename, indexname,
pg_size_pretty(pg_relation_size(indexrelid)) as index_size
FROM pg_stat_user_indexes
WHERE schemaname = 'public'
ORDER BY pg_relation_size(indexrelid) DESC;
```
**Recommendation:** Review if all indexes are necessary after pruning strategy is implemented
---
## 5. Implementation Strategy
### Phase 1: Immediate Emergency Pruning (Day 1)
**Goal:** Free up 60+ MB immediately to prevent overflow
```sql
-- EMERGENCY PRUNING: Delete data older than 7 days
BEGIN;
-- Backup count before deletion
SELECT
event,
COUNT(*) FILTER (WHERE created_at < NOW() - INTERVAL '7 days') as to_delete
FROM telemetry_events
GROUP BY event;
-- Delete old events
DELETE FROM telemetry_events
WHERE created_at < NOW() - INTERVAL '7 days';
-- Expected: ~278,051 rows deleted, ~36.5 MB saved
-- Delete old simple workflows
DELETE FROM telemetry_workflows
WHERE created_at < NOW() - INTERVAL '7 days'
AND complexity = 'simple';
-- Expected: ~5,146 rows deleted, ~11 MB saved
-- Verify new size
SELECT
schemaname, relname,
pg_size_pretty(pg_total_relation_size(schemaname||'.'||relname)) AS size
FROM pg_stat_user_tables
WHERE schemaname = 'public';
COMMIT;
-- Clean up dead rows
VACUUM FULL telemetry_events;
VACUUM FULL telemetry_workflows;
```
**Expected Result:** Database size reduced to ~210-220 MB (55-60% buffer remaining)
### Phase 2: Implement Automated Retention Policy (Week 1)
**Create a scheduled Supabase Edge Function or pg_cron job**
```sql
-- Create retention policy function
CREATE OR REPLACE FUNCTION apply_retention_policy()
RETURNS void AS $$
BEGIN
-- Tier 4: 7-day retention for high-volume events
DELETE FROM telemetry_events
WHERE created_at < NOW() - INTERVAL '7 days'
AND event IN ('tool_sequence', 'tool_used', 'session_start',
'workflow_validation_failed', 'search_query');
-- Tier 3: 14-day retention for medium-value events
DELETE FROM telemetry_events
WHERE created_at < NOW() - INTERVAL '14 days'
AND event IN ('validation_details', 'workflow_created');
-- Tier 1: 30-day retention for errors (keep longer)
DELETE FROM telemetry_events
WHERE created_at < NOW() - INTERVAL '30 days'
AND event = 'error_occurred';
-- Workflow retention by complexity
DELETE FROM telemetry_workflows
WHERE created_at < NOW() - INTERVAL '7 days'
AND complexity = 'simple';
DELETE FROM telemetry_workflows
WHERE created_at < NOW() - INTERVAL '14 days'
AND complexity = 'medium';
DELETE FROM telemetry_workflows
WHERE created_at < NOW() - INTERVAL '30 days'
AND complexity = 'complex';
-- Cleanup
VACUUM telemetry_events;
VACUUM telemetry_workflows;
END;
$$ LANGUAGE plpgsql;
-- Schedule daily execution (using pg_cron extension)
SELECT cron.schedule('retention-policy', '0 2 * * *', 'SELECT apply_retention_policy()');
```
### Phase 3: Create Aggregation Tables (Week 2)
**Preserve insights while deleting raw data**
```sql
-- Daily tool usage summary
CREATE TABLE IF NOT EXISTS telemetry_daily_tool_stats (
date DATE NOT NULL,
tool TEXT NOT NULL,
usage_count INTEGER NOT NULL,
unique_users INTEGER NOT NULL,
avg_duration_ms NUMERIC,
error_count INTEGER DEFAULT 0,
created_at TIMESTAMPTZ DEFAULT NOW(),
PRIMARY KEY (date, tool)
);
-- Daily validation summary
CREATE TABLE IF NOT EXISTS telemetry_daily_validation_stats (
date DATE NOT NULL,
node_type TEXT,
total_validations INTEGER NOT NULL,
failed_validations INTEGER NOT NULL,
success_rate NUMERIC,
common_errors JSONB,
created_at TIMESTAMPTZ DEFAULT NOW(),
PRIMARY KEY (date, node_type)
);
-- Aggregate function to run before pruning
CREATE OR REPLACE FUNCTION aggregate_before_pruning()
RETURNS void AS $$
BEGIN
-- Aggregate tool usage for data about to be deleted
INSERT INTO telemetry_daily_tool_stats (date, tool, usage_count, unique_users, avg_duration_ms)
SELECT
DATE(created_at) as date,
properties->>'tool' as tool,
COUNT(*) as usage_count,
COUNT(DISTINCT user_id) as unique_users,
AVG((properties->>'duration')::numeric) as avg_duration_ms
FROM telemetry_events
WHERE event = 'tool_used'
AND created_at < NOW() - INTERVAL '7 days'
AND created_at >= NOW() - INTERVAL '8 days'
GROUP BY DATE(created_at), properties->>'tool'
ON CONFLICT (date, tool) DO NOTHING;
-- Aggregate validation stats
INSERT INTO telemetry_daily_validation_stats (date, node_type, total_validations, failed_validations)
SELECT
DATE(created_at) as date,
properties->>'nodeType' as node_type,
COUNT(*) as total_validations,
COUNT(*) FILTER (WHERE properties->>'valid' = 'false') as failed_validations
FROM telemetry_events
WHERE event = 'validation_details'
AND created_at < NOW() - INTERVAL '14 days'
AND created_at >= NOW() - INTERVAL '15 days'
GROUP BY DATE(created_at), properties->>'nodeType'
ON CONFLICT (date, node_type) DO NOTHING;
END;
$$ LANGUAGE plpgsql;
-- Update cron job to aggregate before pruning
SELECT cron.schedule('aggregate-then-prune', '0 2 * * *',
'SELECT aggregate_before_pruning(); SELECT apply_retention_policy();');
```
### Phase 4: Monitoring and Alerting (Week 2)
**Create size monitoring function**
```sql
CREATE OR REPLACE FUNCTION check_database_size()
RETURNS TABLE(
total_size_mb NUMERIC,
limit_mb NUMERIC,
percent_used NUMERIC,
days_until_full NUMERIC
) AS $$
DECLARE
current_size_bytes BIGINT;
growth_rate_bytes_per_day NUMERIC;
BEGIN
-- Get current size
SELECT SUM(pg_total_relation_size(schemaname||'.'||relname))
INTO current_size_bytes
FROM pg_stat_user_tables
WHERE schemaname = 'public';
-- Calculate 7-day growth rate
SELECT
(COUNT(*) FILTER (WHERE created_at >= NOW() - INTERVAL '7 days')) *
AVG(pg_column_size(properties)) * (1.0/7)
INTO growth_rate_bytes_per_day
FROM telemetry_events;
RETURN QUERY
SELECT
ROUND((current_size_bytes / 1024.0 / 1024.0)::numeric, 2) as total_size_mb,
500.0 as limit_mb,
ROUND((current_size_bytes / 1024.0 / 1024.0 / 500.0 * 100)::numeric, 2) as percent_used,
ROUND((((500.0 * 1024 * 1024) - current_size_bytes) / NULLIF(growth_rate_bytes_per_day, 0))::numeric, 1) as days_until_full;
END;
$$ LANGUAGE plpgsql;
-- Alert function (integrate with external monitoring)
CREATE OR REPLACE FUNCTION alert_if_size_critical()
RETURNS void AS $$
DECLARE
size_pct NUMERIC;
BEGIN
SELECT percent_used INTO size_pct FROM check_database_size();
IF size_pct > 90 THEN
-- Log critical alert
INSERT INTO telemetry_events (user_id, event, properties)
VALUES ('system', 'database_size_critical',
json_build_object('percent_used', size_pct, 'timestamp', NOW())::jsonb);
END IF;
END;
$$ LANGUAGE plpgsql;
```
---
## 6. Priority Order for Implementation
### Priority 1: URGENT (Day 1)
1. **Execute Emergency Pruning** - Delete data older than 7 days
- Impact: 47.5 MB saved immediately
- Risk: Low - data already analyzed
- SQL: Provided in Phase 1
### Priority 2: HIGH (Week 1)
2. **Implement Automated Retention Policy**
- Impact: Prevents future overflow
- Risk: Low with proper testing
- Implementation: Phase 2 function
3. **Run VACUUM FULL**
- Impact: 6-7 MB reclaimed from dead rows
- Risk: Low but locks tables briefly
- Command: `VACUUM FULL telemetry_workflows;`
### Priority 3: MEDIUM (Week 2)
4. **Create Aggregation Tables**
- Impact: Preserves insights, enables longer-term pruning
- Risk: Low - additive only
- Implementation: Phase 3 tables and functions
5. **Implement Monitoring**
- Impact: Prevents future surprises
- Risk: None
- Implementation: Phase 4 monitoring functions
### Priority 4: LOW (Month 1)
6. **Optimize Properties Fields**
- Impact: 2-3 MB additional savings
- Risk: Medium - requires code changes
- Action: Truncate verbose error messages
7. **Investigate tool_sequence null properties**
- Impact: 10-15 MB potential savings
- Risk: Medium - requires application changes
- Action: Code review and optimization
---
## 7. Risk Assessment
### Strategy B (7-Day Retention): Risks and Mitigations
| Risk | Likelihood | Impact | Mitigation |
|------|-----------|---------|------------|
| Loss of debugging data for old issues | Medium | Medium | Keep error_occurred for 30 days; aggregate validation stats |
| Unable to analyze long-term trends | Low | Low | Implement aggregation tables before pruning |
| Accidental deletion of critical data | Low | High | Test on staging; implement backups; add rollback capability |
| Performance impact during deletion | Medium | Low | Run during off-peak hours (2 AM UTC) |
| VACUUM locks table briefly | Low | Low | Schedule during low-usage window |
### Strategy C (Hybrid Tiered): Risks and Mitigations
| Risk | Likelihood | Impact | Mitigation |
|------|-----------|---------|------------|
| Complex logic leads to bugs | Medium | Medium | Thorough testing; monitoring; gradual rollout |
| Different retention per event type confusing | Low | Low | Document clearly; add comments in code |
| Tiered approach still insufficient | Low | High | Monitor growth; adjust retention if needed |
---
## 8. Monitoring Metrics
### Key Metrics to Track Post-Implementation
1. **Database Size Trend**
```sql
SELECT * FROM check_database_size();
```
- Target: Stay under 300 MB (60% of limit)
- Alert threshold: 90% (450 MB)
2. **Daily Growth Rate**
```sql
SELECT
DATE(created_at) as date,
COUNT(*) as events,
pg_size_pretty(SUM(pg_column_size(properties))::bigint) as daily_size
FROM telemetry_events
WHERE created_at >= NOW() - INTERVAL '7 days'
GROUP BY DATE(created_at)
ORDER BY date DESC;
```
- Target: < 8 MB/day average
- Alert threshold: > 12 MB/day sustained
3. **Retention Policy Execution**
```sql
-- Add logging to retention policy function
CREATE TABLE retention_policy_log (
executed_at TIMESTAMPTZ DEFAULT NOW(),
events_deleted INTEGER,
workflows_deleted INTEGER,
space_reclaimed_mb NUMERIC
);
```
- Monitor: Daily successful execution
- Alert: If job fails or deletes 0 rows unexpectedly
4. **Data Availability Check**
```sql
-- Ensure sufficient data for analysis
SELECT
event,
COUNT(*) as available_records,
MIN(created_at) as oldest_record,
MAX(created_at) as newest_record
FROM telemetry_events
GROUP BY event;
```
- Target: 7 days of data always available
- Alert: If oldest_record > 8 days ago (retention policy failing)
---
## 9. Recommended Action Plan
### Immediate Actions (Today)
**Step 1:** Execute emergency pruning
```sql
-- Backup first (optional but recommended)
-- Create a copy of current stats
CREATE TABLE telemetry_events_stats_backup AS
SELECT event, COUNT(*), MIN(created_at), MAX(created_at)
FROM telemetry_events
GROUP BY event;
-- Execute pruning
DELETE FROM telemetry_events WHERE created_at < NOW() - INTERVAL '7 days';
DELETE FROM telemetry_workflows WHERE created_at < NOW() - INTERVAL '7 days' AND complexity = 'simple';
VACUUM FULL telemetry_events;
VACUUM FULL telemetry_workflows;
```
**Step 2:** Verify results
```sql
SELECT * FROM check_database_size();
```
**Expected outcome:** Database size ~210-220 MB (58-60% buffer remaining)
### Week 1 Actions
**Step 3:** Implement automated retention policy
- Create retention policy function (Phase 2 code)
- Test function on staging/development environment
- Schedule daily execution via pg_cron
**Step 4:** Set up monitoring
- Create monitoring functions (Phase 4 code)
- Configure alerts for size thresholds
- Document escalation procedures
### Week 2 Actions
**Step 5:** Create aggregation tables
- Implement summary tables (Phase 3 code)
- Backfill historical aggregations if needed
- Update retention policy to aggregate before pruning
**Step 6:** Optimize and tune
- Review query performance post-pruning
- Adjust retention periods if needed based on actual usage
- Document any issues or improvements
### Monthly Maintenance
**Step 7:** Regular review
- Monthly review of database growth trends
- Quarterly review of retention policy effectiveness
- Adjust retention periods based on product needs
---
## 10. SQL Execution Scripts
### Script 1: Emergency Pruning (Run First)
```sql
-- ============================================
-- EMERGENCY PRUNING SCRIPT
-- Expected savings: ~50 MB
-- Execution time: 2-5 minutes
-- ============================================
BEGIN;
-- Create backup of current state
CREATE TABLE IF NOT EXISTS pruning_audit (
executed_at TIMESTAMPTZ DEFAULT NOW(),
action TEXT,
records_affected INTEGER,
size_before_mb NUMERIC,
size_after_mb NUMERIC
);
-- Record size before
INSERT INTO pruning_audit (action, size_before_mb)
SELECT 'before_pruning',
pg_total_relation_size('telemetry_events')::numeric / 1024 / 1024;
-- Delete old events (keep last 7 days)
WITH deleted AS (
DELETE FROM telemetry_events
WHERE created_at < NOW() - INTERVAL '7 days'
RETURNING *
)
INSERT INTO pruning_audit (action, records_affected)
SELECT 'delete_events_7d', COUNT(*) FROM deleted;
-- Delete old simple workflows (keep last 7 days)
WITH deleted AS (
DELETE FROM telemetry_workflows
WHERE created_at < NOW() - INTERVAL '7 days'
AND complexity = 'simple'
RETURNING *
)
INSERT INTO pruning_audit (action, records_affected)
SELECT 'delete_workflows_simple_7d', COUNT(*) FROM deleted;
-- Record size after
UPDATE pruning_audit
SET size_after_mb = pg_total_relation_size('telemetry_events')::numeric / 1024 / 1024
WHERE action = 'before_pruning';
COMMIT;
-- Cleanup dead space
VACUUM FULL telemetry_events;
VACUUM FULL telemetry_workflows;
-- Verify results
SELECT * FROM pruning_audit ORDER BY executed_at DESC LIMIT 5;
SELECT * FROM check_database_size();
```
### Script 2: Create Retention Policy (Run After Testing)
```sql
-- ============================================
-- AUTOMATED RETENTION POLICY
-- Schedule: Daily at 2 AM UTC
-- ============================================
CREATE OR REPLACE FUNCTION apply_retention_policy()
RETURNS TABLE(
action TEXT,
records_deleted INTEGER,
execution_time_ms INTEGER
) AS $$
DECLARE
start_time TIMESTAMPTZ;
end_time TIMESTAMPTZ;
deleted_count INTEGER;
BEGIN
-- Tier 4: 7-day retention (high volume, low long-term value)
start_time := clock_timestamp();
DELETE FROM telemetry_events
WHERE created_at < NOW() - INTERVAL '7 days'
AND event IN ('tool_sequence', 'tool_used', 'session_start',
'workflow_validation_failed', 'search_query');
GET DIAGNOSTICS deleted_count = ROW_COUNT;
end_time := clock_timestamp();
action := 'delete_tier4_7d';
records_deleted := deleted_count;
execution_time_ms := EXTRACT(MILLISECONDS FROM (end_time - start_time))::INTEGER;
RETURN NEXT;
-- Tier 3: 14-day retention (medium value)
start_time := clock_timestamp();
DELETE FROM telemetry_events
WHERE created_at < NOW() - INTERVAL '14 days'
AND event IN ('validation_details', 'workflow_created');
GET DIAGNOSTICS deleted_count = ROW_COUNT;
end_time := clock_timestamp();
action := 'delete_tier3_14d';
records_deleted := deleted_count;
execution_time_ms := EXTRACT(MILLISECONDS FROM (end_time - start_time))::INTEGER;
RETURN NEXT;
-- Tier 1: 30-day retention (errors - keep longer)
start_time := clock_timestamp();
DELETE FROM telemetry_events
WHERE created_at < NOW() - INTERVAL '30 days'
AND event = 'error_occurred';
GET DIAGNOSTICS deleted_count = ROW_COUNT;
end_time := clock_timestamp();
action := 'delete_errors_30d';
records_deleted := deleted_count;
execution_time_ms := EXTRACT(MILLISECONDS FROM (end_time - start_time))::INTEGER;
RETURN NEXT;
-- Workflow pruning by complexity
start_time := clock_timestamp();
DELETE FROM telemetry_workflows
WHERE created_at < NOW() - INTERVAL '7 days'
AND complexity = 'simple';
GET DIAGNOSTICS deleted_count = ROW_COUNT;
end_time := clock_timestamp();
action := 'delete_workflows_simple_7d';
records_deleted := deleted_count;
execution_time_ms := EXTRACT(MILLISECONDS FROM (end_time - start_time))::INTEGER;
RETURN NEXT;
start_time := clock_timestamp();
DELETE FROM telemetry_workflows
WHERE created_at < NOW() - INTERVAL '14 days'
AND complexity = 'medium';
GET DIAGNOSTICS deleted_count = ROW_COUNT;
end_time := clock_timestamp();
action := 'delete_workflows_medium_14d';
records_deleted := deleted_count;
execution_time_ms := EXTRACT(MILLISECONDS FROM (end_time - start_time))::INTEGER;
RETURN NEXT;
start_time := clock_timestamp();
DELETE FROM telemetry_workflows
WHERE created_at < NOW() - INTERVAL '30 days'
AND complexity = 'complex';
GET DIAGNOSTICS deleted_count = ROW_COUNT;
end_time := clock_timestamp();
action := 'delete_workflows_complex_30d';
records_deleted := deleted_count;
execution_time_ms := EXTRACT(MILLISECONDS FROM (end_time - start_time))::INTEGER;
RETURN NEXT;
-- Vacuum to reclaim space
start_time := clock_timestamp();
VACUUM telemetry_events;
VACUUM telemetry_workflows;
end_time := clock_timestamp();
action := 'vacuum_tables';
records_deleted := 0;
execution_time_ms := EXTRACT(MILLISECONDS FROM (end_time - start_time))::INTEGER;
RETURN NEXT;
END;
$$ LANGUAGE plpgsql;
-- Test the function (dry run - won't schedule yet)
SELECT * FROM apply_retention_policy();
-- After testing, schedule with pg_cron
-- Requires pg_cron extension: CREATE EXTENSION IF NOT EXISTS pg_cron;
-- SELECT cron.schedule('retention-policy', '0 2 * * *', 'SELECT apply_retention_policy()');
```
### Script 3: Create Monitoring Dashboard
```sql
-- ============================================
-- MONITORING QUERIES
-- Run these regularly to track database health
-- ============================================
-- Query 1: Current database size and projections
SELECT
'Current Size' as metric,
pg_size_pretty(SUM(pg_total_relation_size(schemaname||'.'||relname))) as value
FROM pg_stat_user_tables
WHERE schemaname = 'public'
UNION ALL
SELECT
'Free Tier Limit' as metric,
'500 MB' as value
UNION ALL
SELECT
'Percent Used' as metric,
CONCAT(
ROUND(
(SUM(pg_total_relation_size(schemaname||'.'||relname))::numeric /
(500.0 * 1024 * 1024) * 100),
2
),
'%'
) as value
FROM pg_stat_user_tables
WHERE schemaname = 'public';
-- Query 2: Data age distribution
SELECT
event,
COUNT(*) as total_records,
MIN(created_at) as oldest_record,
MAX(created_at) as newest_record,
ROUND(EXTRACT(EPOCH FROM (MAX(created_at) - MIN(created_at))) / 86400, 2) as age_days
FROM telemetry_events
GROUP BY event
ORDER BY total_records DESC;
-- Query 3: Daily growth tracking (last 7 days)
SELECT
DATE(created_at) as date,
COUNT(*) as daily_events,
pg_size_pretty(SUM(pg_column_size(properties))::bigint) as daily_data_size,
COUNT(DISTINCT user_id) as active_users
FROM telemetry_events
WHERE created_at >= NOW() - INTERVAL '7 days'
GROUP BY DATE(created_at)
ORDER BY date DESC;
-- Query 4: Retention policy effectiveness
SELECT
DATE(executed_at) as execution_date,
action,
records_deleted,
execution_time_ms
FROM (
SELECT * FROM apply_retention_policy()
) AS policy_run
ORDER BY execution_date DESC;
```
---
## Conclusion
**Immediate Action Required:** Implement Strategy B (7-day retention) immediately to avoid database overflow within 2 weeks.
**Long-Term Strategy:** Transition to Strategy C (Hybrid Tiered Retention) with automated aggregation to balance data preservation with storage constraints.
**Expected Outcomes:**
- Immediate: 50+ MB saved (26% reduction)
- Ongoing: Database stabilized at 200-220 MB (40-44% of limit)
- Buffer: 30-40 days before limit with current growth rate
- Risk: Low with proper testing and monitoring
**Success Metrics:**
1. Database size < 300 MB consistently
2. 7+ days of detailed event data always available
3. No impact on product analytics capabilities
4. Automated retention policy runs daily without errors
---
**Analysis completed:** 2025-10-10
**Next review date:** 2025-11-10 (monthly check)
**Escalation:** If database exceeds 400 MB, consider upgrading to paid tier or implementing more aggressive pruning

View File

@@ -0,0 +1,600 @@
/**
* Integration tests for session persistence (Phase 1)
*
* Tests the complete session restoration flow end-to-end,
* simulating real-world scenarios like container restarts and multi-tenant usage.
*/
import { describe, it, expect, beforeEach, afterEach, vi } from 'vitest';
import { N8NMCPEngine } from '../../src/mcp-engine';
import { SingleSessionHTTPServer } from '../../src/http-server-single-session';
import { InstanceContext } from '../../src/types/instance-context';
import { SessionRestoreHook, SessionState } from '../../src/types/session-restoration';
import type { Request, Response } from 'express';
// In-memory session storage for testing
const sessionStorage: Map<string, SessionState> = new Map();
/**
* Simulates a backend database for session persistence
*/
class MockSessionStore {
async saveSession(sessionState: SessionState): Promise<void> {
sessionStorage.set(sessionState.sessionId, {
...sessionState,
// Only update lastAccess and expiresAt if not provided
lastAccess: sessionState.lastAccess || new Date(),
expiresAt: sessionState.expiresAt || new Date(Date.now() + 30 * 60 * 1000) // 30 minutes
});
}
async loadSession(sessionId: string): Promise<SessionState | null> {
const session = sessionStorage.get(sessionId);
if (!session) return null;
// Check if expired
if (session.expiresAt < new Date()) {
sessionStorage.delete(sessionId);
return null;
}
// Update last access
session.lastAccess = new Date();
session.expiresAt = new Date(Date.now() + 30 * 60 * 1000);
sessionStorage.set(sessionId, session);
return session;
}
async deleteSession(sessionId: string): Promise<void> {
sessionStorage.delete(sessionId);
}
async cleanExpired(): Promise<number> {
const now = new Date();
let count = 0;
for (const [sessionId, session] of sessionStorage.entries()) {
if (session.expiresAt < now) {
sessionStorage.delete(sessionId);
count++;
}
}
return count;
}
getAllSessions(): Map<string, SessionState> {
return new Map(sessionStorage);
}
clear(): void {
sessionStorage.clear();
}
}
describe('Session Persistence Integration Tests', () => {
const TEST_AUTH_TOKEN = 'integration-test-token-with-32-chars-min-length';
let mockStore: MockSessionStore;
let originalEnv: NodeJS.ProcessEnv;
beforeEach(() => {
// Save and set environment
originalEnv = { ...process.env };
process.env.AUTH_TOKEN = TEST_AUTH_TOKEN;
process.env.PORT = '0';
process.env.NODE_ENV = 'test';
// Clear session storage
mockStore = new MockSessionStore();
mockStore.clear();
});
afterEach(() => {
// Restore environment
process.env = originalEnv;
mockStore.clear();
});
// Helper to create properly mocked Request and Response objects
function createMockReqRes(sessionId?: string, body?: any) {
const req = {
method: 'POST',
path: '/mcp',
url: '/mcp',
originalUrl: '/mcp',
headers: {
'authorization': `Bearer ${TEST_AUTH_TOKEN}`,
...(sessionId && { 'mcp-session-id': sessionId })
} as Record<string, string>,
body: body || {
jsonrpc: '2.0',
method: 'tools/list',
params: {},
id: 1
},
ip: '127.0.0.1',
readable: true,
readableEnded: false,
complete: true,
get: vi.fn((header: string) => req.headers[header.toLowerCase()]),
on: vi.fn((event: string, handler: Function) => {}),
removeListener: vi.fn((event: string, handler: Function) => {})
} as any as Request;
const res = {
status: vi.fn().mockReturnThis(),
json: vi.fn().mockReturnThis(),
setHeader: vi.fn(),
send: vi.fn().mockReturnThis(),
headersSent: false,
finished: false
} as any as Response;
return { req, res };
}
describe('Container Restart Simulation', () => {
it('should restore session after simulated container restart', async () => {
// PHASE 1: Initial session creation
const context: InstanceContext = {
n8nApiUrl: 'https://tenant1.n8n.cloud',
n8nApiKey: 'tenant1-api-key',
instanceId: 'tenant-1'
};
const sessionId = 'instance-tenant-1-abc-550e8400-e29b-41d4-a716-446655440000';
// Simulate session being persisted by the backend
await mockStore.saveSession({
sessionId,
instanceContext: context,
createdAt: new Date(),
lastAccess: new Date(),
expiresAt: new Date(Date.now() + 30 * 60 * 1000)
});
// PHASE 2: Simulate container restart (create new engine)
const restorationHook: SessionRestoreHook = async (sid) => {
const session = await mockStore.loadSession(sid);
return session ? session.instanceContext : null;
};
const engine = new N8NMCPEngine({
onSessionNotFound: restorationHook,
sessionRestorationTimeout: 5000
});
// PHASE 3: Client tries to use old session ID
const { req: mockReq, res: mockRes } = createMockReqRes(sessionId);
// Should successfully restore and process request
await engine.processRequest(mockReq, mockRes, context);
// Session should be restored (not return 400 for unknown session)
expect(mockRes.status).not.toHaveBeenCalledWith(400);
expect(mockRes.status).not.toHaveBeenCalledWith(404);
await engine.shutdown();
});
it('should reject expired sessions after container restart', async () => {
const context: InstanceContext = {
n8nApiUrl: 'https://tenant1.n8n.cloud',
n8nApiKey: 'tenant1-api-key',
instanceId: 'tenant-1'
};
const sessionId = '550e8400-e29b-41d4-a716-446655440000';
// Save session with past expiration
await mockStore.saveSession({
sessionId,
instanceContext: context,
createdAt: new Date(Date.now() - 60 * 60 * 1000), // 1 hour ago
lastAccess: new Date(Date.now() - 45 * 60 * 1000), // 45 minutes ago
expiresAt: new Date(Date.now() - 15 * 60 * 1000) // Expired 15 minutes ago
});
const restorationHook: SessionRestoreHook = async (sid) => {
const session = await mockStore.loadSession(sid);
return session ? session.instanceContext : null;
};
const engine = new N8NMCPEngine({
onSessionNotFound: restorationHook,
sessionRestorationTimeout: 5000
});
const { req: mockReq, res: mockRes } = createMockReqRes(sessionId);
await engine.processRequest(mockReq, mockRes);
// Should reject expired session
expect(mockRes.status).toHaveBeenCalledWith(400);
expect(mockRes.json).toHaveBeenCalledWith(
expect.objectContaining({
error: expect.objectContaining({
message: expect.stringMatching(/session|not found/i)
})
})
);
await engine.shutdown();
});
});
describe('Multi-Tenant Session Restoration', () => {
it('should restore correct instance context for each tenant', async () => {
// Create sessions for multiple tenants
const tenant1Context: InstanceContext = {
n8nApiUrl: 'https://tenant1.n8n.cloud',
n8nApiKey: 'tenant1-key',
instanceId: 'tenant-1'
};
const tenant2Context: InstanceContext = {
n8nApiUrl: 'https://tenant2.n8n.cloud',
n8nApiKey: 'tenant2-key',
instanceId: 'tenant-2'
};
const sessionId1 = 'instance-tenant-1-abc-550e8400-e29b-41d4-a716-446655440000';
const sessionId2 = 'instance-tenant-2-xyz-f47ac10b-58cc-4372-a567-0e02b2c3d479';
await mockStore.saveSession({
sessionId: sessionId1,
instanceContext: tenant1Context,
createdAt: new Date(),
lastAccess: new Date(),
expiresAt: new Date(Date.now() + 30 * 60 * 1000)
});
await mockStore.saveSession({
sessionId: sessionId2,
instanceContext: tenant2Context,
createdAt: new Date(),
lastAccess: new Date(),
expiresAt: new Date(Date.now() + 30 * 60 * 1000)
});
const restorationHook: SessionRestoreHook = async (sid) => {
const session = await mockStore.loadSession(sid);
return session ? session.instanceContext : null;
};
const engine = new N8NMCPEngine({
onSessionNotFound: restorationHook,
sessionRestorationTimeout: 5000
});
// Verify each tenant gets their own context
const session1 = await mockStore.loadSession(sessionId1);
const session2 = await mockStore.loadSession(sessionId2);
expect(session1?.instanceContext.instanceId).toBe('tenant-1');
expect(session1?.instanceContext.n8nApiUrl).toBe('https://tenant1.n8n.cloud');
expect(session2?.instanceContext.instanceId).toBe('tenant-2');
expect(session2?.instanceContext.n8nApiUrl).toBe('https://tenant2.n8n.cloud');
await engine.shutdown();
});
it('should isolate sessions between tenants', async () => {
const tenant1Context: InstanceContext = {
n8nApiUrl: 'https://tenant1.n8n.cloud',
n8nApiKey: 'tenant1-key',
instanceId: 'tenant-1'
};
const sessionId = 'instance-tenant-1-abc-550e8400-e29b-41d4-a716-446655440000';
await mockStore.saveSession({
sessionId,
instanceContext: tenant1Context,
createdAt: new Date(),
lastAccess: new Date(),
expiresAt: new Date(Date.now() + 30 * 60 * 1000)
});
const restorationHook: SessionRestoreHook = async (sid) => {
const session = await mockStore.loadSession(sid);
return session ? session.instanceContext : null;
};
const engine = new N8NMCPEngine({
onSessionNotFound: restorationHook
});
// Tenant 2 tries to use tenant 1's session ID
const wrongSessionId = sessionId; // Tenant 1's ID
const { req: tenant2Request, res: mockRes } = createMockReqRes(wrongSessionId);
// The restoration will succeed (session exists), but the backend
// should implement authorization checks to prevent cross-tenant access
await engine.processRequest(tenant2Request, mockRes);
// Restoration should work (this test verifies the session CAN be restored)
// Authorization is the backend's responsibility
expect(mockRes.status).not.toHaveBeenCalledWith(404);
await engine.shutdown();
});
});
describe('Concurrent Restoration Requests', () => {
it('should handle multiple concurrent restoration requests for same session', async () => {
const context: InstanceContext = {
n8nApiUrl: 'https://test.n8n.cloud',
n8nApiKey: 'test-key',
instanceId: 'test-instance'
};
const sessionId = '550e8400-e29b-41d4-a716-446655440000';
await mockStore.saveSession({
sessionId,
instanceContext: context,
createdAt: new Date(),
lastAccess: new Date(),
expiresAt: new Date(Date.now() + 30 * 60 * 1000)
});
let hookCallCount = 0;
const restorationHook: SessionRestoreHook = async (sid) => {
hookCallCount++;
// Simulate slow database query
await new Promise(resolve => setTimeout(resolve, 50));
const session = await mockStore.loadSession(sid);
return session ? session.instanceContext : null;
};
const engine = new N8NMCPEngine({
onSessionNotFound: restorationHook,
sessionRestorationTimeout: 5000
});
// Simulate 5 concurrent requests with same unknown session ID
const requests = Array.from({ length: 5 }, (_, i) => {
const { req: mockReq, res: mockRes } = createMockReqRes(sessionId, {
jsonrpc: '2.0',
method: 'tools/list',
params: {},
id: i + 1
});
return engine.processRequest(mockReq, mockRes, context);
});
// All should complete without error
await Promise.all(requests);
// Hook should be called multiple times (no built-in deduplication)
// This is expected - the idempotent session creation prevents duplicates
expect(hookCallCount).toBeGreaterThan(0);
await engine.shutdown();
});
});
describe('Database Failure Scenarios', () => {
it('should handle database connection failures gracefully', async () => {
const failingHook: SessionRestoreHook = async () => {
throw new Error('Database connection failed');
};
const engine = new N8NMCPEngine({
onSessionNotFound: failingHook,
sessionRestorationTimeout: 5000
});
const { req: mockReq, res: mockRes } = createMockReqRes('550e8400-e29b-41d4-a716-446655440000');
await engine.processRequest(mockReq, mockRes);
// Should return 500 for database errors
expect(mockRes.status).toHaveBeenCalledWith(500);
expect(mockRes.json).toHaveBeenCalledWith(
expect.objectContaining({
error: expect.objectContaining({
message: expect.stringMatching(/restoration failed|error/i)
})
})
);
await engine.shutdown();
});
it('should timeout on slow database queries', async () => {
const slowHook: SessionRestoreHook = async () => {
// Simulate very slow database query
await new Promise(resolve => setTimeout(resolve, 10000));
return {
n8nApiUrl: 'https://test.n8n.cloud',
n8nApiKey: 'test-key',
instanceId: 'test'
};
};
const engine = new N8NMCPEngine({
onSessionNotFound: slowHook,
sessionRestorationTimeout: 100 // 100ms timeout
});
const { req: mockReq, res: mockRes } = createMockReqRes('550e8400-e29b-41d4-a716-446655440000');
await engine.processRequest(mockReq, mockRes);
// Should return 408 for timeout
expect(mockRes.status).toHaveBeenCalledWith(408);
expect(mockRes.json).toHaveBeenCalledWith(
expect.objectContaining({
error: expect.objectContaining({
message: expect.stringMatching(/timeout|timed out/i)
})
})
);
await engine.shutdown();
});
});
describe('Session Metadata Tracking', () => {
it('should track session metadata correctly', async () => {
const context: InstanceContext = {
n8nApiUrl: 'https://test.n8n.cloud',
n8nApiKey: 'test-key',
instanceId: 'test-instance',
metadata: {
userId: 'user-123',
plan: 'premium'
}
};
const sessionId = '550e8400-e29b-41d4-a716-446655440000';
await mockStore.saveSession({
sessionId,
instanceContext: context,
createdAt: new Date(),
lastAccess: new Date(),
expiresAt: new Date(Date.now() + 30 * 60 * 1000),
metadata: {
userAgent: 'test-client/1.0',
ip: '192.168.1.1'
}
});
const session = await mockStore.loadSession(sessionId);
expect(session).toBeDefined();
expect(session?.instanceContext.metadata).toEqual({
userId: 'user-123',
plan: 'premium'
});
expect(session?.metadata).toEqual({
userAgent: 'test-client/1.0',
ip: '192.168.1.1'
});
});
it('should update last access time on restoration', async () => {
const context: InstanceContext = {
n8nApiUrl: 'https://test.n8n.cloud',
n8nApiKey: 'test-key',
instanceId: 'test-instance'
};
const sessionId = '550e8400-e29b-41d4-a716-446655440000';
const originalLastAccess = new Date(Date.now() - 10 * 60 * 1000); // 10 minutes ago
await mockStore.saveSession({
sessionId,
instanceContext: context,
createdAt: new Date(Date.now() - 20 * 60 * 1000),
lastAccess: originalLastAccess,
expiresAt: new Date(Date.now() + 20 * 60 * 1000)
});
// Wait a bit
await new Promise(resolve => setTimeout(resolve, 100));
// Load session (simulates restoration)
const session = await mockStore.loadSession(sessionId);
expect(session).toBeDefined();
expect(session!.lastAccess.getTime()).toBeGreaterThan(originalLastAccess.getTime());
});
});
describe('Session Cleanup', () => {
it('should clean up expired sessions', async () => {
// Add multiple sessions with different expiration times
await mockStore.saveSession({
sessionId: 'session-1',
instanceContext: {
n8nApiUrl: 'https://test.n8n.cloud',
n8nApiKey: 'key1',
instanceId: 'instance-1'
},
createdAt: new Date(Date.now() - 60 * 60 * 1000),
lastAccess: new Date(Date.now() - 45 * 60 * 1000),
expiresAt: new Date(Date.now() - 15 * 60 * 1000) // Expired
});
await mockStore.saveSession({
sessionId: 'session-2',
instanceContext: {
n8nApiUrl: 'https://test.n8n.cloud',
n8nApiKey: 'key2',
instanceId: 'instance-2'
},
createdAt: new Date(),
lastAccess: new Date(),
expiresAt: new Date(Date.now() + 30 * 60 * 1000) // Valid
});
const cleanedCount = await mockStore.cleanExpired();
expect(cleanedCount).toBe(1);
expect(mockStore.getAllSessions().size).toBe(1);
expect(mockStore.getAllSessions().has('session-2')).toBe(true);
expect(mockStore.getAllSessions().has('session-1')).toBe(false);
});
});
describe('Backwards Compatibility', () => {
it('should work without restoration hook (legacy behavior)', async () => {
// Engine without restoration hook should work normally
const engine = new N8NMCPEngine();
const sessionInfo = engine.getSessionInfo();
expect(sessionInfo).toBeDefined();
expect(sessionInfo.active).toBeDefined();
await engine.shutdown();
});
it('should not break existing session creation flow', async () => {
const engine = new N8NMCPEngine({
onSessionNotFound: async () => null
});
// Creating sessions should work normally
const sessionInfo = engine.getSessionInfo();
expect(sessionInfo).toBeDefined();
await engine.shutdown();
});
});
describe('Security Validation', () => {
it('should validate restored context before using it', async () => {
const invalidHook: SessionRestoreHook = async () => {
// Return context with malformed URL (truly invalid)
return {
n8nApiUrl: 'not-a-valid-url',
n8nApiKey: 'test-key',
instanceId: 'test'
} as any;
};
const engine = new N8NMCPEngine({
onSessionNotFound: invalidHook,
sessionRestorationTimeout: 5000
});
const { req: mockReq, res: mockRes } = createMockReqRes('550e8400-e29b-41d4-a716-446655440000');
await engine.processRequest(mockReq, mockRes);
// Should reject invalid context
expect(mockRes.status).toHaveBeenCalledWith(400);
await engine.shutdown();
});
});
});

View File

@@ -0,0 +1,333 @@
/**
* Unit tests for Session Management API (Phase 2 - REQ-5)
* Tests the public API methods for session management in v2.19.0
*/
import { describe, it, expect, beforeEach } from 'vitest';
import { N8NMCPEngine } from '../../src/mcp-engine';
import { InstanceContext } from '../../src/types/instance-context';
describe('Session Management API (Phase 2 - REQ-5)', () => {
let engine: N8NMCPEngine;
const testContext: InstanceContext = {
n8nApiUrl: 'https://test.n8n.cloud',
n8nApiKey: 'test-api-key',
instanceId: 'test-instance'
};
beforeEach(() => {
// Set required AUTH_TOKEN environment variable for testing
process.env.AUTH_TOKEN = 'test-token-for-session-management-testing-32chars';
// Create engine with session restoration disabled for these tests
engine = new N8NMCPEngine({
sessionTimeout: 30 * 60 * 1000 // 30 minutes
});
});
describe('getActiveSessions()', () => {
it('should return empty array when no sessions exist', () => {
const sessionIds = engine.getActiveSessions();
expect(sessionIds).toEqual([]);
});
it('should return session IDs after session creation via restoreSession', () => {
// Create session using direct API (not through HTTP request)
const sessionId = 'instance-test-abc123-uuid-session-test-1';
engine.restoreSession(sessionId, testContext);
const sessionIds = engine.getActiveSessions();
expect(sessionIds.length).toBe(1);
expect(sessionIds).toContain(sessionId);
});
it('should return multiple session IDs when multiple sessions exist', () => {
// Create multiple sessions using direct API
const sessions = [
{ id: 'instance-test1-abc123-uuid-session-1', context: { ...testContext, instanceId: 'instance-1' } },
{ id: 'instance-test2-abc123-uuid-session-2', context: { ...testContext, instanceId: 'instance-2' } }
];
sessions.forEach(({ id, context }) => {
engine.restoreSession(id, context);
});
const sessionIds = engine.getActiveSessions();
expect(sessionIds.length).toBe(2);
expect(sessionIds).toContain(sessions[0].id);
expect(sessionIds).toContain(sessions[1].id);
});
});
describe('getSessionState()', () => {
it('should return null for non-existent session', () => {
const state = engine.getSessionState('non-existent-session-id');
expect(state).toBeNull();
});
it('should return session state for existing session', () => {
// Create a session using direct API
const sessionId = 'instance-test-abc123-uuid-session-state-test';
engine.restoreSession(sessionId, testContext);
const state = engine.getSessionState(sessionId);
expect(state).not.toBeNull();
expect(state).toMatchObject({
sessionId: sessionId,
instanceContext: expect.objectContaining({
n8nApiUrl: testContext.n8nApiUrl,
n8nApiKey: testContext.n8nApiKey,
instanceId: testContext.instanceId
}),
createdAt: expect.any(Date),
lastAccess: expect.any(Date),
expiresAt: expect.any(Date)
});
});
it('should include metadata in session state if available', () => {
const contextWithMetadata: InstanceContext = {
...testContext,
metadata: { userId: 'user-123', tier: 'premium' }
};
const sessionId = 'instance-test-abc123-uuid-metadata-test';
engine.restoreSession(sessionId, contextWithMetadata);
const state = engine.getSessionState(sessionId);
expect(state?.metadata).toEqual({ userId: 'user-123', tier: 'premium' });
});
it('should calculate correct expiration time', () => {
const sessionId = 'instance-test-abc123-uuid-expiry-test';
engine.restoreSession(sessionId, testContext);
const state = engine.getSessionState(sessionId);
expect(state).not.toBeNull();
if (state) {
const expectedExpiry = new Date(state.lastAccess.getTime() + 30 * 60 * 1000);
const actualExpiry = state.expiresAt;
// Allow 1 second difference for test timing
expect(Math.abs(actualExpiry.getTime() - expectedExpiry.getTime())).toBeLessThan(1000);
}
});
});
describe('getAllSessionStates()', () => {
it('should return empty array when no sessions exist', () => {
const states = engine.getAllSessionStates();
expect(states).toEqual([]);
});
it('should return all session states', () => {
// Create two sessions using direct API
const session1Id = 'instance-test1-abc123-uuid-all-states-1';
const session2Id = 'instance-test2-abc123-uuid-all-states-2';
engine.restoreSession(session1Id, {
...testContext,
instanceId: 'instance-1'
});
engine.restoreSession(session2Id, {
...testContext,
instanceId: 'instance-2'
});
const states = engine.getAllSessionStates();
expect(states.length).toBe(2);
expect(states[0]).toMatchObject({
sessionId: expect.any(String),
instanceContext: expect.objectContaining({
n8nApiUrl: testContext.n8nApiUrl
}),
createdAt: expect.any(Date),
lastAccess: expect.any(Date),
expiresAt: expect.any(Date)
});
});
it('should filter out sessions without state', () => {
// Create session using direct API
const sessionId = 'instance-test-abc123-uuid-filter-test';
engine.restoreSession(sessionId, testContext);
// Get states
const states = engine.getAllSessionStates();
expect(states.length).toBe(1);
// All returned states should be non-null
states.forEach(state => {
expect(state).not.toBeNull();
});
});
});
describe('restoreSession()', () => {
it('should create a new session with provided ID and context', () => {
const sessionId = 'instance-test-abc123-uuid-test-session-id';
const result = engine.restoreSession(sessionId, testContext);
expect(result).toBe(true);
expect(engine.getActiveSessions()).toContain(sessionId);
});
it('should be idempotent - return true for existing session', () => {
const sessionId = 'instance-test-abc123-uuid-test-session-id2';
// First restoration
const result1 = engine.restoreSession(sessionId, testContext);
expect(result1).toBe(true);
// Second restoration with same ID
const result2 = engine.restoreSession(sessionId, testContext);
expect(result2).toBe(true);
// Should still only have one session
const sessionIds = engine.getActiveSessions();
expect(sessionIds.filter(id => id === sessionId).length).toBe(1);
});
it('should return false for invalid session ID format', () => {
const invalidSessionIds = [
'short', // Too short (5 chars)
'a'.repeat(101), // Too long (101 chars)
"'; DROP TABLE sessions--", // SQL injection attempt (invalid characters)
'../../../etc/passwd', // Path traversal attempt (invalid characters)
'only-nineteen-chars' // Too short (19 chars, need 20+)
];
invalidSessionIds.forEach(sessionId => {
const result = engine.restoreSession(sessionId, testContext);
expect(result).toBe(false);
});
});
it('should return false for invalid instance context', () => {
const sessionId = 'instance-test-abc123-uuid-test-session-id3';
const invalidContext = {
n8nApiUrl: 'not-a-valid-url', // Invalid URL
n8nApiKey: 'test-key',
instanceId: 'test'
} as any;
const result = engine.restoreSession(sessionId, invalidContext);
expect(result).toBe(false);
});
it('should create session that can be retrieved with getSessionState', () => {
const sessionId = 'instance-test-abc123-uuid-test-session-id4';
engine.restoreSession(sessionId, testContext);
const state = engine.getSessionState(sessionId);
expect(state).not.toBeNull();
expect(state?.sessionId).toBe(sessionId);
expect(state?.instanceContext).toEqual(testContext);
});
});
describe('deleteSession()', () => {
it('should return false for non-existent session', () => {
const result = engine.deleteSession('non-existent-session-id');
expect(result).toBe(false);
});
it('should delete existing session and return true', () => {
// Create a session using direct API
const sessionId = 'instance-test-abc123-uuid-delete-test';
engine.restoreSession(sessionId, testContext);
// Delete the session
const result = engine.deleteSession(sessionId);
expect(result).toBe(true);
// Session should no longer exist
expect(engine.getActiveSessions()).not.toContain(sessionId);
expect(engine.getSessionState(sessionId)).toBeNull();
});
it('should return false when trying to delete already deleted session', () => {
// Create and delete session using direct API
const sessionId = 'instance-test-abc123-uuid-double-delete-test';
engine.restoreSession(sessionId, testContext);
engine.deleteSession(sessionId);
// Try to delete again
const result = engine.deleteSession(sessionId);
expect(result).toBe(false);
});
});
describe('Integration workflows', () => {
it('should support periodic backup workflow', () => {
// Create multiple sessions using direct API
for (let i = 0; i < 3; i++) {
const sessionId = `instance-test${i}-abc123-uuid-backup-${i}`;
engine.restoreSession(sessionId, {
...testContext,
instanceId: `instance-${i}`
});
}
// Simulate periodic backup
const states = engine.getAllSessionStates();
expect(states.length).toBe(3);
// Each state should be serializable
states.forEach(state => {
const serialized = JSON.stringify(state);
expect(serialized).toBeTruthy();
const deserialized = JSON.parse(serialized);
expect(deserialized.sessionId).toBe(state.sessionId);
});
});
it('should support bulk restore workflow', () => {
const sessionData = [
{ sessionId: 'instance-test1-abc123-uuid-bulk-session-1', context: { ...testContext, instanceId: 'user-1' } },
{ sessionId: 'instance-test2-abc123-uuid-bulk-session-2', context: { ...testContext, instanceId: 'user-2' } },
{ sessionId: 'instance-test3-abc123-uuid-bulk-session-3', context: { ...testContext, instanceId: 'user-3' } }
];
// Restore all sessions
for (const { sessionId, context } of sessionData) {
const restored = engine.restoreSession(sessionId, context);
expect(restored).toBe(true);
}
// Verify all sessions exist
const sessionIds = engine.getActiveSessions();
expect(sessionIds.length).toBe(3);
sessionData.forEach(({ sessionId }) => {
expect(sessionIds).toContain(sessionId);
});
});
it('should support session lifecycle workflow (create → get → delete)', () => {
// 1. Create session using direct API
const sessionId = 'instance-test-abc123-uuid-lifecycle-test';
engine.restoreSession(sessionId, testContext);
// 2. Get session state
const state = engine.getSessionState(sessionId);
expect(state).not.toBeNull();
// 3. Simulate saving to database (serialization test)
const serialized = JSON.stringify(state);
expect(serialized).toBeTruthy();
// 4. Delete session
const deleted = engine.deleteSession(sessionId);
expect(deleted).toBe(true);
// 5. Verify deletion
expect(engine.getSessionState(sessionId)).toBeNull();
expect(engine.getActiveSessions()).not.toContain(sessionId);
});
});
});

View File

@@ -0,0 +1,545 @@
import { describe, it, expect, beforeEach, afterEach, vi } from 'vitest';
import { SingleSessionHTTPServer } from '../../src/http-server-single-session';
import { InstanceContext } from '../../src/types/instance-context';
import { SessionRestoreHook } from '../../src/types/session-restoration';
// Mock dependencies
vi.mock('../../src/utils/logger', () => ({
logger: {
info: vi.fn(),
error: vi.fn(),
warn: vi.fn(),
debug: vi.fn()
}
}));
vi.mock('dotenv');
// Mock UUID generation to make tests predictable
vi.mock('uuid', () => ({
v4: vi.fn(() => 'test-session-id-1234-5678-9012-345678901234')
}));
// Mock transport
vi.mock('@modelcontextprotocol/sdk/server/streamableHttp.js', () => ({
StreamableHTTPServerTransport: vi.fn().mockImplementation((options: any) => {
const mockTransport = {
handleRequest: vi.fn().mockImplementation(async (req: any, res: any, body?: any) => {
if (body && body.method === 'initialize') {
res.setHeader('Mcp-Session-Id', mockTransport.sessionId || 'test-session-id');
}
res.status(200).json({
jsonrpc: '2.0',
result: { success: true },
id: body?.id || 1
});
}),
close: vi.fn().mockResolvedValue(undefined),
sessionId: null as string | null,
onclose: null as (() => void) | null
};
if (options?.sessionIdGenerator) {
const sessionId = options.sessionIdGenerator();
mockTransport.sessionId = sessionId;
if (options.onsessioninitialized) {
setTimeout(() => {
options.onsessioninitialized(sessionId);
}, 0);
}
}
return mockTransport;
})
}));
vi.mock('@modelcontextprotocol/sdk/server/sse.js', () => ({
SSEServerTransport: vi.fn().mockImplementation(() => ({
close: vi.fn().mockResolvedValue(undefined)
}))
}));
vi.mock('../../src/mcp/server', () => ({
N8NDocumentationMCPServer: vi.fn().mockImplementation(() => ({
connect: vi.fn().mockResolvedValue(undefined)
}))
}));
const mockConsoleManager = {
wrapOperation: vi.fn().mockImplementation(async (fn: () => Promise<any>) => {
return await fn();
})
};
vi.mock('../../src/utils/console-manager', () => ({
ConsoleManager: vi.fn(() => mockConsoleManager)
}));
vi.mock('../../src/utils/url-detector', () => ({
getStartupBaseUrl: vi.fn((host: string, port: number) => `http://localhost:${port || 3000}`),
formatEndpointUrls: vi.fn((baseUrl: string) => ({
health: `${baseUrl}/health`,
mcp: `${baseUrl}/mcp`
})),
detectBaseUrl: vi.fn((req: any, host: string, port: number) => `http://localhost:${port || 3000}`)
}));
vi.mock('../../src/utils/version', () => ({
PROJECT_VERSION: '2.19.0'
}));
vi.mock('@modelcontextprotocol/sdk/types.js', () => ({
isInitializeRequest: vi.fn((request: any) => {
return request && request.method === 'initialize';
})
}));
// Create handlers storage for Express mock
const mockHandlers: { [key: string]: any[] } = {
get: [],
post: [],
delete: [],
use: []
};
// Mock Express
vi.mock('express', () => {
const mockExpressApp = {
get: vi.fn((path: string, ...handlers: any[]) => {
mockHandlers.get.push({ path, handlers });
return mockExpressApp;
}),
post: vi.fn((path: string, ...handlers: any[]) => {
mockHandlers.post.push({ path, handlers });
return mockExpressApp;
}),
delete: vi.fn((path: string, ...handlers: any[]) => {
mockHandlers.delete.push({ path, handlers });
return mockExpressApp;
}),
use: vi.fn((handler: any) => {
mockHandlers.use.push(handler);
return mockExpressApp;
}),
set: vi.fn(),
listen: vi.fn((port: number, host: string, callback?: () => void) => {
if (callback) callback();
return {
on: vi.fn(),
close: vi.fn((cb: () => void) => cb()),
address: () => ({ port: 3000 })
};
})
};
interface ExpressMock {
(): typeof mockExpressApp;
json(): (req: any, res: any, next: any) => void;
}
const expressMock = vi.fn(() => mockExpressApp) as unknown as ExpressMock;
expressMock.json = vi.fn(() => (req: any, res: any, next: any) => {
req.body = req.body || {};
next();
});
return {
default: expressMock,
Request: {},
Response: {},
NextFunction: {}
};
});
describe('Session Restoration (Phase 1 - REQ-1, REQ-2, REQ-8)', () => {
const originalEnv = process.env;
const TEST_AUTH_TOKEN = 'test-auth-token-with-more-than-32-characters';
let server: SingleSessionHTTPServer;
let consoleLogSpy: any;
let consoleWarnSpy: any;
let consoleErrorSpy: any;
beforeEach(() => {
// Reset environment
process.env = { ...originalEnv };
process.env.AUTH_TOKEN = TEST_AUTH_TOKEN;
process.env.PORT = '0';
process.env.NODE_ENV = 'test';
// Mock console methods
consoleLogSpy = vi.spyOn(console, 'log').mockImplementation(() => {});
consoleWarnSpy = vi.spyOn(console, 'warn').mockImplementation(() => {});
consoleErrorSpy = vi.spyOn(console, 'error').mockImplementation(() => {});
// Clear all mocks and handlers
vi.clearAllMocks();
mockHandlers.get = [];
mockHandlers.post = [];
mockHandlers.delete = [];
mockHandlers.use = [];
});
afterEach(async () => {
// Restore environment
process.env = originalEnv;
// Restore console methods
consoleLogSpy.mockRestore();
consoleWarnSpy.mockRestore();
consoleErrorSpy.mockRestore();
// Shutdown server if running
if (server) {
await server.shutdown();
server = null as any;
}
});
// Helper functions
function findHandler(method: 'get' | 'post' | 'delete', path: string) {
const routes = mockHandlers[method];
const route = routes.find(r => r.path === path);
return route ? route.handlers[route.handlers.length - 1] : null;
}
function createMockReqRes() {
const headers: { [key: string]: string } = {};
const res = {
status: vi.fn().mockReturnThis(),
json: vi.fn().mockReturnThis(),
send: vi.fn().mockReturnThis(),
setHeader: vi.fn((key: string, value: string) => {
headers[key.toLowerCase()] = value;
}),
sendStatus: vi.fn().mockReturnThis(),
headersSent: false,
finished: false,
statusCode: 200,
getHeader: (key: string) => headers[key.toLowerCase()],
headers
};
const req = {
method: 'POST',
path: '/mcp',
url: '/mcp',
originalUrl: '/mcp',
headers: {} as Record<string, string>,
body: {},
ip: '127.0.0.1',
readable: true,
readableEnded: false,
complete: true,
get: vi.fn((header: string) => (req.headers as Record<string, string>)[header.toLowerCase()])
};
return { req, res };
}
describe('REQ-8: Security-Hardened Session ID Validation', () => {
it('should accept valid UUIDv4 session IDs', () => {
server = new SingleSessionHTTPServer();
const validUUIDs = [
'550e8400-e29b-41d4-a716-446655440000',
'f47ac10b-58cc-4372-a567-0e02b2c3d479',
'a1b2c3d4-e5f6-4789-abcd-1234567890ab'
];
for (const sessionId of validUUIDs) {
expect((server as any).isValidSessionId(sessionId)).toBe(true);
}
});
it('should accept multi-tenant instance session IDs', () => {
server = new SingleSessionHTTPServer();
const multiTenantIds = [
'instance-user123-abc-550e8400-e29b-41d4-a716-446655440000',
'instance-tenant456-xyz-f47ac10b-58cc-4372-a567-0e02b2c3d479'
];
for (const sessionId of multiTenantIds) {
expect((server as any).isValidSessionId(sessionId)).toBe(true);
}
});
it('should reject session IDs with SQL injection patterns', () => {
server = new SingleSessionHTTPServer();
const sqlInjectionIds = [
"'; DROP TABLE sessions; --",
"1' OR '1'='1",
"admin'--",
"1'; DELETE FROM sessions WHERE '1'='1"
];
for (const sessionId of sqlInjectionIds) {
expect((server as any).isValidSessionId(sessionId)).toBe(false);
}
});
it('should reject session IDs with NoSQL injection patterns', () => {
server = new SingleSessionHTTPServer();
const nosqlInjectionIds = [
'{"$ne": null}',
'{"$gt": ""}',
'{$where: "1==1"}',
'[$regex]'
];
for (const sessionId of nosqlInjectionIds) {
expect((server as any).isValidSessionId(sessionId)).toBe(false);
}
});
it('should reject session IDs with path traversal attempts', () => {
server = new SingleSessionHTTPServer();
const pathTraversalIds = [
'../../../etc/passwd',
'..\\..\\..\\windows\\system32',
'session/../admin',
'session/./../../config'
];
for (const sessionId of pathTraversalIds) {
expect((server as any).isValidSessionId(sessionId)).toBe(false);
}
});
it('should reject session IDs that are too short (DoS protection)', () => {
server = new SingleSessionHTTPServer();
const tooShortIds = [
'a',
'ab',
'123',
'12345678901234567' // 17 chars (minimum is 20)
];
for (const sessionId of tooShortIds) {
expect((server as any).isValidSessionId(sessionId)).toBe(false);
}
});
it('should reject session IDs that are too long (DoS protection)', () => {
server = new SingleSessionHTTPServer();
const tooLongId = 'a'.repeat(101); // Maximum is 100 chars
expect((server as any).isValidSessionId(tooLongId)).toBe(false);
});
it('should reject empty or null session IDs', () => {
server = new SingleSessionHTTPServer();
expect((server as any).isValidSessionId('')).toBe(false);
expect((server as any).isValidSessionId(null)).toBe(false);
expect((server as any).isValidSessionId(undefined)).toBe(false);
});
it('should reject session IDs with special characters', () => {
server = new SingleSessionHTTPServer();
const specialCharIds = [
'session<script>alert(1)</script>',
'session!@#$%^&*()',
'session\x00null-byte',
'session\r\nnewline'
];
for (const sessionId of specialCharIds) {
expect((server as any).isValidSessionId(sessionId)).toBe(false);
}
});
});
describe('REQ-2: Idempotent Session Creation', () => {
it('should return same session ID for multiple concurrent createSession calls', async () => {
const mockContext: InstanceContext = {
n8nApiUrl: 'https://test.n8n.cloud',
n8nApiKey: 'test-api-key',
instanceId: 'tenant-123'
};
server = new SingleSessionHTTPServer();
const sessionId = 'instance-tenant123-abc-550e8400-e29b-41d4-a716-446655440000';
// Call createSession multiple times with same session ID
const id1 = (server as any).createSession(mockContext, sessionId);
const id2 = (server as any).createSession(mockContext, sessionId);
const id3 = (server as any).createSession(mockContext, sessionId);
// All calls should return the same session ID (idempotent)
expect(id1).toBe(sessionId);
expect(id2).toBe(sessionId);
expect(id3).toBe(sessionId);
// NOTE: Transport creation is async via callback - tested in integration tests
});
it('should skip session creation if session already exists', async () => {
const mockContext: InstanceContext = {
n8nApiUrl: 'https://test.n8n.cloud',
n8nApiKey: 'test-api-key',
instanceId: 'tenant-123'
};
server = new SingleSessionHTTPServer();
const sessionId = '550e8400-e29b-41d4-a716-446655440000';
// Create session first time
(server as any).createSession(mockContext, sessionId);
const transport1 = (server as any).transports[sessionId];
// Try to create again
(server as any).createSession(mockContext, sessionId);
const transport2 = (server as any).transports[sessionId];
// Should be the same transport instance
expect(transport1).toBe(transport2);
});
it('should validate session ID format when provided externally', async () => {
const mockContext: InstanceContext = {
n8nApiUrl: 'https://test.n8n.cloud',
n8nApiKey: 'test-api-key',
instanceId: 'tenant-123'
};
server = new SingleSessionHTTPServer();
const invalidSessionId = "'; DROP TABLE sessions; --";
expect(() => {
(server as any).createSession(mockContext, invalidSessionId);
}).toThrow('Invalid session ID format');
});
});
describe('REQ-1: Session Restoration Hook Configuration', () => {
it('should store restoration hook when provided', () => {
const mockHook: SessionRestoreHook = vi.fn().mockResolvedValue({
n8nApiUrl: 'https://test.n8n.cloud',
n8nApiKey: 'test-api-key',
instanceId: 'tenant-123'
});
server = new SingleSessionHTTPServer({
onSessionNotFound: mockHook,
sessionRestorationTimeout: 5000
});
// Verify hook is stored
expect((server as any).onSessionNotFound).toBe(mockHook);
expect((server as any).sessionRestorationTimeout).toBe(5000);
});
it('should work without restoration hook (backward compatible)', () => {
server = new SingleSessionHTTPServer();
// Verify hook is not configured
expect((server as any).onSessionNotFound).toBeUndefined();
});
// NOTE: Full restoration flow tests (success, failure, timeout, validation)
// are in tests/integration/session-persistence.test.ts which tests the complete
// end-to-end flow with real HTTP requests
});
describe('Backwards Compatibility', () => {
it('should use default timeout when not specified', () => {
server = new SingleSessionHTTPServer({
onSessionNotFound: vi.fn()
});
expect((server as any).sessionRestorationTimeout).toBe(5000);
});
it('should use custom timeout when specified', () => {
server = new SingleSessionHTTPServer({
onSessionNotFound: vi.fn(),
sessionRestorationTimeout: 10000
});
expect((server as any).sessionRestorationTimeout).toBe(10000);
});
it('should work without any restoration options', () => {
server = new SingleSessionHTTPServer();
expect((server as any).onSessionNotFound).toBeUndefined();
expect((server as any).sessionRestorationTimeout).toBe(5000);
});
});
describe('Timeout Utility Method', () => {
it('should reject after specified timeout', async () => {
server = new SingleSessionHTTPServer();
const timeoutPromise = (server as any).timeout(100);
await expect(timeoutPromise).rejects.toThrow('Operation timed out after 100ms');
});
it('should create TimeoutError', async () => {
server = new SingleSessionHTTPServer();
try {
await (server as any).timeout(50);
expect.fail('Should have thrown TimeoutError');
} catch (error: any) {
expect(error.name).toBe('TimeoutError');
expect(error.message).toContain('timed out');
}
});
});
describe('Session ID Generation', () => {
it('should generate valid session IDs', () => {
// Set environment for multi-tenant mode
process.env.ENABLE_MULTI_TENANT = 'true';
process.env.MULTI_TENANT_SESSION_STRATEGY = 'instance';
server = new SingleSessionHTTPServer();
const context: InstanceContext = {
n8nApiUrl: 'https://test.n8n.cloud',
n8nApiKey: 'test-api-key',
instanceId: 'tenant-123'
};
const sessionId = (server as any).generateSessionId(context);
// Should generate instance-prefixed ID in multi-tenant mode
expect(sessionId).toContain('instance-');
expect((server as any).isValidSessionId(sessionId)).toBe(true);
// Clean up env
delete process.env.ENABLE_MULTI_TENANT;
delete process.env.MULTI_TENANT_SESSION_STRATEGY;
});
it('should generate standard UUIDs when not in multi-tenant mode', () => {
// Ensure multi-tenant mode is disabled
delete process.env.ENABLE_MULTI_TENANT;
server = new SingleSessionHTTPServer();
const sessionId = (server as any).generateSessionId();
// Should be a UUID format (mocked in tests but should be non-empty string with hyphens)
expect(sessionId).toBeTruthy();
expect(typeof sessionId).toBe('string');
expect(sessionId.length).toBeGreaterThan(20); // At minimum should be longer than minimum session ID length
expect(sessionId).toContain('-');
// NOTE: In tests, UUID is mocked so it may not pass strict validation
// In production, generateSessionId uses real uuid.v4() which generates valid UUIDs
});
});
});