mirror of
https://github.com/czlonkowski/n8n-mcp.git
synced 2026-01-30 22:42:04 +00:00
* feat: add comprehensive telemetry for partial workflow updates Implement telemetry infrastructure to track workflow mutations from partial update operations. This enables data-driven improvements to partial update tooling by capturing: - Workflow state before and after mutations - User intent and operation patterns - Validation results and improvements - Change metrics (nodes/connections modified) - Success/failure rates and error patterns New Components: - Intent classifier: Categorizes mutation patterns - Intent sanitizer: Removes PII from user instructions - Mutation validator: Ensures data quality before tracking - Mutation tracker: Coordinates validation and metric calculation Extended Components: - TelemetryManager: New trackWorkflowMutation() method - EventTracker: Mutation queue management - BatchProcessor: Mutation data flushing to Supabase MCP Tool Enhancements: - n8n_update_partial_workflow: Added optional 'intent' parameter - n8n_update_full_workflow: Added optional 'intent' parameter - Both tools now track mutations asynchronously Database Schema: - New workflow_mutations table with 20+ fields - Comprehensive indexes for efficient querying - Supports deduplication and data analysis This telemetry system is: - Privacy-focused (PII sanitization, anonymized users) - Non-blocking (async tracking, silent failures) - Production-ready (batching, retries, circuit breaker) - Backward compatible (all parameters optional) Conceived by Romuald Członkowski - https://www.aiadvisors.pl/en * fix: correct SQL syntax for expression index in workflow_mutations schema The expression index for significant changes needs double parentheses around the arithmetic expression to be valid PostgreSQL syntax. Conceived by Romuald Członkowski - https://www.aiadvisors.pl/en * fix: enable RLS policies for workflow_mutations table Enable Row-Level Security and add policies: - Allow anonymous (anon) inserts for telemetry data collection - Allow authenticated reads for data analysis and querying These policies are required for the telemetry system to function correctly with Supabase, as the MCP server uses the anon key to insert mutation data. Conceived by Romuald Członkowski - https://www.aiadvisors.pl/en * fix: reduce mutation auto-flush threshold from 5 to 2 Lower the auto-flush threshold for workflow mutations from 5 to 2 to ensure more timely data persistence. Since mutations are less frequent than regular telemetry events, a lower threshold provides: - Faster data persistence (don't wait for 5 mutations) - Better testing experience (easier to verify with fewer operations) - Reduced risk of data loss if process exits before threshold - More responsive telemetry for low-volume mutation scenarios This complements the existing 5-second periodic flush and process exit handlers, ensuring mutations are persisted promptly. Conceived by Romuald Członkowski - https://www.aiadvisors.pl/en * fix: improve mutation telemetry error logging and diagnostics Changes: - Upgrade error logging from debug to warn level for better visibility - Add diagnostic logging to track mutation processing - Log telemetry disabled state explicitly - Add context info (sessionId, intent, operationCount) to error logs - Remove 'await' from telemetry calls to make them truly non-blocking This will help identify why mutations aren't being persisted to the workflow_mutations table despite successful workflow operations. Conceived by Romuald Członkowski - https://www.aiadvisors.pl/en * feat: enhance workflow mutation telemetry for better AI responses Improve workflow mutation tracking to capture comprehensive data that helps provide better responses when users update workflows. This enhancement collects workflow state, user intent, and operation details to enable more context-aware assistance. Key improvements: - Reduce auto-flush threshold from 5 to 2 for more reliable mutation tracking - Add comprehensive workflow and credential sanitization to mutation tracker - Document intent parameter in workflow update tools for better UX - Fix mutation queue handling in telemetry manager (flush now handles 3 queues) - Add extensive unit tests for mutation tracking and validation (35 new tests) Technical changes: - mutation-tracker.ts: Multi-layer sanitization (workflow, node, parameter levels) - batch-processor.ts: Support mutation data flushing to Supabase - telemetry-manager.ts: Auto-flush mutations at threshold 2, track mutations queue - handlers-workflow-diff.ts: Track workflow mutations with sanitized data - Tests: 13 tests for mutation-tracker, 22 tests for mutation-validator The intent parameter messaging emphasizes user benefit ("helps to return better response") rather than technical implementation details. Conceived by Romuald Członkowski - https://www.aiadvisors.pl/en 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com> * chore: bump version to 2.22.16 with telemetry changelog Updated package.json and package.runtime.json to version 2.22.16. Added comprehensive CHANGELOG entry documenting workflow mutation telemetry enhancements for better AI-powered workflow assistance. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Conceived by Romuald Członkowski - https://www.aiadvisors.pl/en Co-Authored-By: Claude <noreply@anthropic.com> * fix: resolve TypeScript lint errors in telemetry tests Fixed type issues in mutation-tracker and mutation-validator tests: - Import and use MutationToolName enum instead of string literals - Fix ValidationResult.errors to use proper object structure - Add UpdateNodeOperation type assertion for operation with nodeName All TypeScript errors resolved, lint now passes. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Conceived by Romuald Członkowski - https://www.aiadvisors.pl/en Co-Authored-By: Claude <noreply@anthropic.com> --------- Co-authored-by: Claude <noreply@anthropic.com>
492 lines
14 KiB
TypeScript
492 lines
14 KiB
TypeScript
/**
|
|
* Batch Processor for Telemetry
|
|
* Handles batching, queuing, and sending telemetry data to Supabase
|
|
*/
|
|
|
|
import { SupabaseClient } from '@supabase/supabase-js';
|
|
import { TelemetryEvent, WorkflowTelemetry, WorkflowMutationRecord, TELEMETRY_CONFIG, TelemetryMetrics } from './telemetry-types';
|
|
import { TelemetryError, TelemetryErrorType, TelemetryCircuitBreaker } from './telemetry-error';
|
|
import { logger } from '../utils/logger';
|
|
|
|
/**
|
|
* Convert camelCase object keys to snake_case
|
|
* Needed because Supabase PostgREST doesn't auto-convert
|
|
*/
|
|
function toSnakeCase(obj: any): any {
|
|
if (obj === null || obj === undefined) return obj;
|
|
if (Array.isArray(obj)) return obj.map(toSnakeCase);
|
|
if (typeof obj !== 'object') return obj;
|
|
|
|
const result: any = {};
|
|
for (const key in obj) {
|
|
if (obj.hasOwnProperty(key)) {
|
|
// Convert camelCase to snake_case
|
|
const snakeKey = key.replace(/[A-Z]/g, letter => `_${letter.toLowerCase()}`);
|
|
// Recursively convert nested objects
|
|
result[snakeKey] = toSnakeCase(obj[key]);
|
|
}
|
|
}
|
|
return result;
|
|
}
|
|
|
|
export class TelemetryBatchProcessor {
|
|
private flushTimer?: NodeJS.Timeout;
|
|
private isFlushingEvents: boolean = false;
|
|
private isFlushingWorkflows: boolean = false;
|
|
private isFlushingMutations: boolean = false;
|
|
private circuitBreaker: TelemetryCircuitBreaker;
|
|
private metrics: TelemetryMetrics = {
|
|
eventsTracked: 0,
|
|
eventsDropped: 0,
|
|
eventsFailed: 0,
|
|
batchesSent: 0,
|
|
batchesFailed: 0,
|
|
averageFlushTime: 0,
|
|
rateLimitHits: 0
|
|
};
|
|
private flushTimes: number[] = [];
|
|
private deadLetterQueue: (TelemetryEvent | WorkflowTelemetry | WorkflowMutationRecord)[] = [];
|
|
private readonly maxDeadLetterSize = 100;
|
|
|
|
constructor(
|
|
private supabase: SupabaseClient | null,
|
|
private isEnabled: () => boolean
|
|
) {
|
|
this.circuitBreaker = new TelemetryCircuitBreaker();
|
|
}
|
|
|
|
/**
|
|
* Start the batch processor
|
|
*/
|
|
start(): void {
|
|
if (!this.isEnabled() || !this.supabase) return;
|
|
|
|
// Set up periodic flushing
|
|
this.flushTimer = setInterval(() => {
|
|
this.flush();
|
|
}, TELEMETRY_CONFIG.BATCH_FLUSH_INTERVAL);
|
|
|
|
// Prevent timer from keeping process alive
|
|
// In tests, flushTimer might be a number instead of a Timer object
|
|
if (typeof this.flushTimer === 'object' && 'unref' in this.flushTimer) {
|
|
this.flushTimer.unref();
|
|
}
|
|
|
|
// Set up process exit handlers
|
|
process.on('beforeExit', () => this.flush());
|
|
process.on('SIGINT', () => {
|
|
this.flush();
|
|
process.exit(0);
|
|
});
|
|
process.on('SIGTERM', () => {
|
|
this.flush();
|
|
process.exit(0);
|
|
});
|
|
|
|
logger.debug('Telemetry batch processor started');
|
|
}
|
|
|
|
/**
|
|
* Stop the batch processor
|
|
*/
|
|
stop(): void {
|
|
if (this.flushTimer) {
|
|
clearInterval(this.flushTimer);
|
|
this.flushTimer = undefined;
|
|
}
|
|
logger.debug('Telemetry batch processor stopped');
|
|
}
|
|
|
|
/**
|
|
* Flush events, workflows, and mutations to Supabase
|
|
*/
|
|
async flush(events?: TelemetryEvent[], workflows?: WorkflowTelemetry[], mutations?: WorkflowMutationRecord[]): Promise<void> {
|
|
if (!this.isEnabled() || !this.supabase) return;
|
|
|
|
// Check circuit breaker
|
|
if (!this.circuitBreaker.shouldAllow()) {
|
|
logger.debug('Circuit breaker open - skipping flush');
|
|
this.metrics.eventsDropped += (events?.length || 0) + (workflows?.length || 0) + (mutations?.length || 0);
|
|
return;
|
|
}
|
|
|
|
const startTime = Date.now();
|
|
let hasErrors = false;
|
|
|
|
// Flush events if provided
|
|
if (events && events.length > 0) {
|
|
hasErrors = !(await this.flushEvents(events)) || hasErrors;
|
|
}
|
|
|
|
// Flush workflows if provided
|
|
if (workflows && workflows.length > 0) {
|
|
hasErrors = !(await this.flushWorkflows(workflows)) || hasErrors;
|
|
}
|
|
|
|
// Flush mutations if provided
|
|
if (mutations && mutations.length > 0) {
|
|
hasErrors = !(await this.flushMutations(mutations)) || hasErrors;
|
|
}
|
|
|
|
// Record flush time
|
|
const flushTime = Date.now() - startTime;
|
|
this.recordFlushTime(flushTime);
|
|
|
|
// Update circuit breaker
|
|
if (hasErrors) {
|
|
this.circuitBreaker.recordFailure();
|
|
} else {
|
|
this.circuitBreaker.recordSuccess();
|
|
}
|
|
|
|
// Process dead letter queue if circuit is healthy
|
|
if (!hasErrors && this.deadLetterQueue.length > 0) {
|
|
await this.processDeadLetterQueue();
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Flush events with batching
|
|
*/
|
|
private async flushEvents(events: TelemetryEvent[]): Promise<boolean> {
|
|
if (this.isFlushingEvents || events.length === 0) return true;
|
|
|
|
this.isFlushingEvents = true;
|
|
|
|
try {
|
|
// Batch events
|
|
const batches = this.createBatches(events, TELEMETRY_CONFIG.MAX_BATCH_SIZE);
|
|
|
|
for (const batch of batches) {
|
|
const result = await this.executeWithRetry(async () => {
|
|
const { error } = await this.supabase!
|
|
.from('telemetry_events')
|
|
.insert(batch);
|
|
|
|
if (error) {
|
|
throw error;
|
|
}
|
|
|
|
logger.debug(`Flushed batch of ${batch.length} telemetry events`);
|
|
return true;
|
|
}, 'Flush telemetry events');
|
|
|
|
if (result) {
|
|
this.metrics.eventsTracked += batch.length;
|
|
this.metrics.batchesSent++;
|
|
} else {
|
|
this.metrics.eventsFailed += batch.length;
|
|
this.metrics.batchesFailed++;
|
|
this.addToDeadLetterQueue(batch);
|
|
return false;
|
|
}
|
|
}
|
|
|
|
return true;
|
|
} catch (error) {
|
|
logger.debug('Failed to flush events:', error);
|
|
throw new TelemetryError(
|
|
TelemetryErrorType.NETWORK_ERROR,
|
|
'Failed to flush events',
|
|
{ error: error instanceof Error ? error.message : String(error) },
|
|
true
|
|
);
|
|
} finally {
|
|
this.isFlushingEvents = false;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Flush workflows with deduplication
|
|
*/
|
|
private async flushWorkflows(workflows: WorkflowTelemetry[]): Promise<boolean> {
|
|
if (this.isFlushingWorkflows || workflows.length === 0) return true;
|
|
|
|
this.isFlushingWorkflows = true;
|
|
|
|
try {
|
|
// Deduplicate workflows by hash
|
|
const uniqueWorkflows = this.deduplicateWorkflows(workflows);
|
|
logger.debug(`Deduplicating workflows: ${workflows.length} -> ${uniqueWorkflows.length}`);
|
|
|
|
// Batch workflows
|
|
const batches = this.createBatches(uniqueWorkflows, TELEMETRY_CONFIG.MAX_BATCH_SIZE);
|
|
|
|
for (const batch of batches) {
|
|
const result = await this.executeWithRetry(async () => {
|
|
const { error } = await this.supabase!
|
|
.from('telemetry_workflows')
|
|
.insert(batch);
|
|
|
|
if (error) {
|
|
throw error;
|
|
}
|
|
|
|
logger.debug(`Flushed batch of ${batch.length} telemetry workflows`);
|
|
return true;
|
|
}, 'Flush telemetry workflows');
|
|
|
|
if (result) {
|
|
this.metrics.eventsTracked += batch.length;
|
|
this.metrics.batchesSent++;
|
|
} else {
|
|
this.metrics.eventsFailed += batch.length;
|
|
this.metrics.batchesFailed++;
|
|
this.addToDeadLetterQueue(batch);
|
|
return false;
|
|
}
|
|
}
|
|
|
|
return true;
|
|
} catch (error) {
|
|
logger.debug('Failed to flush workflows:', error);
|
|
throw new TelemetryError(
|
|
TelemetryErrorType.NETWORK_ERROR,
|
|
'Failed to flush workflows',
|
|
{ error: error instanceof Error ? error.message : String(error) },
|
|
true
|
|
);
|
|
} finally {
|
|
this.isFlushingWorkflows = false;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Flush workflow mutations with batching
|
|
*/
|
|
private async flushMutations(mutations: WorkflowMutationRecord[]): Promise<boolean> {
|
|
if (this.isFlushingMutations || mutations.length === 0) return true;
|
|
|
|
this.isFlushingMutations = true;
|
|
|
|
try {
|
|
// Batch mutations
|
|
const batches = this.createBatches(mutations, TELEMETRY_CONFIG.MAX_BATCH_SIZE);
|
|
|
|
for (const batch of batches) {
|
|
const result = await this.executeWithRetry(async () => {
|
|
// Convert camelCase to snake_case for Supabase
|
|
const snakeCaseBatch = batch.map(mutation => toSnakeCase(mutation));
|
|
|
|
const { error } = await this.supabase!
|
|
.from('workflow_mutations')
|
|
.insert(snakeCaseBatch);
|
|
|
|
if (error) {
|
|
// Enhanced error logging for mutation flushes
|
|
logger.error('Mutation insert error details:', {
|
|
code: (error as any).code,
|
|
message: (error as any).message,
|
|
details: (error as any).details,
|
|
hint: (error as any).hint,
|
|
fullError: String(error)
|
|
});
|
|
throw error;
|
|
}
|
|
|
|
logger.debug(`Flushed batch of ${batch.length} workflow mutations`);
|
|
return true;
|
|
}, 'Flush workflow mutations');
|
|
|
|
if (result) {
|
|
this.metrics.eventsTracked += batch.length;
|
|
this.metrics.batchesSent++;
|
|
} else {
|
|
this.metrics.eventsFailed += batch.length;
|
|
this.metrics.batchesFailed++;
|
|
this.addToDeadLetterQueue(batch);
|
|
return false;
|
|
}
|
|
}
|
|
|
|
return true;
|
|
} catch (error) {
|
|
logger.error('Failed to flush mutations with details:', {
|
|
errorMsg: error instanceof Error ? error.message : String(error),
|
|
errorType: error instanceof Error ? error.constructor.name : typeof error
|
|
});
|
|
throw new TelemetryError(
|
|
TelemetryErrorType.NETWORK_ERROR,
|
|
'Failed to flush workflow mutations',
|
|
{ error: error instanceof Error ? error.message : String(error) },
|
|
true
|
|
);
|
|
} finally {
|
|
this.isFlushingMutations = false;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Execute operation with exponential backoff retry
|
|
*/
|
|
private async executeWithRetry<T>(
|
|
operation: () => Promise<T>,
|
|
operationName: string
|
|
): Promise<T | null> {
|
|
let lastError: Error | null = null;
|
|
let delay = TELEMETRY_CONFIG.RETRY_DELAY;
|
|
|
|
for (let attempt = 1; attempt <= TELEMETRY_CONFIG.MAX_RETRIES; attempt++) {
|
|
try {
|
|
// In test environment, execute without timeout but still handle errors
|
|
if (process.env.NODE_ENV === 'test' && process.env.VITEST) {
|
|
const result = await operation();
|
|
return result;
|
|
}
|
|
|
|
// Create a timeout promise
|
|
const timeoutPromise = new Promise<never>((_, reject) => {
|
|
setTimeout(() => reject(new Error('Operation timed out')), TELEMETRY_CONFIG.OPERATION_TIMEOUT);
|
|
});
|
|
|
|
// Race between operation and timeout
|
|
const result = await Promise.race([operation(), timeoutPromise]) as T;
|
|
return result;
|
|
} catch (error) {
|
|
lastError = error as Error;
|
|
logger.debug(`${operationName} attempt ${attempt} failed:`, error);
|
|
|
|
if (attempt < TELEMETRY_CONFIG.MAX_RETRIES) {
|
|
// Skip delay in test environment when using fake timers
|
|
if (!(process.env.NODE_ENV === 'test' && process.env.VITEST)) {
|
|
// Exponential backoff with jitter
|
|
const jitter = Math.random() * 0.3 * delay; // 30% jitter
|
|
const waitTime = delay + jitter;
|
|
await new Promise(resolve => setTimeout(resolve, waitTime));
|
|
delay *= 2; // Double the delay for next attempt
|
|
}
|
|
// In test mode, continue to next retry attempt without delay
|
|
}
|
|
}
|
|
}
|
|
|
|
logger.debug(`${operationName} failed after ${TELEMETRY_CONFIG.MAX_RETRIES} attempts:`, lastError);
|
|
return null;
|
|
}
|
|
|
|
/**
|
|
* Create batches from array
|
|
*/
|
|
private createBatches<T>(items: T[], batchSize: number): T[][] {
|
|
const batches: T[][] = [];
|
|
|
|
for (let i = 0; i < items.length; i += batchSize) {
|
|
batches.push(items.slice(i, i + batchSize));
|
|
}
|
|
|
|
return batches;
|
|
}
|
|
|
|
/**
|
|
* Deduplicate workflows by hash
|
|
*/
|
|
private deduplicateWorkflows(workflows: WorkflowTelemetry[]): WorkflowTelemetry[] {
|
|
const seen = new Set<string>();
|
|
const unique: WorkflowTelemetry[] = [];
|
|
|
|
for (const workflow of workflows) {
|
|
if (!seen.has(workflow.workflow_hash)) {
|
|
seen.add(workflow.workflow_hash);
|
|
unique.push(workflow);
|
|
}
|
|
}
|
|
|
|
return unique;
|
|
}
|
|
|
|
/**
|
|
* Add failed items to dead letter queue
|
|
*/
|
|
private addToDeadLetterQueue(items: (TelemetryEvent | WorkflowTelemetry | WorkflowMutationRecord)[]): void {
|
|
for (const item of items) {
|
|
this.deadLetterQueue.push(item);
|
|
|
|
// Maintain max size
|
|
if (this.deadLetterQueue.length > this.maxDeadLetterSize) {
|
|
const dropped = this.deadLetterQueue.shift();
|
|
if (dropped) {
|
|
this.metrics.eventsDropped++;
|
|
}
|
|
}
|
|
}
|
|
|
|
logger.debug(`Added ${items.length} items to dead letter queue`);
|
|
}
|
|
|
|
/**
|
|
* Process dead letter queue when circuit is healthy
|
|
*/
|
|
private async processDeadLetterQueue(): Promise<void> {
|
|
if (this.deadLetterQueue.length === 0) return;
|
|
|
|
logger.debug(`Processing ${this.deadLetterQueue.length} items from dead letter queue`);
|
|
|
|
const events: TelemetryEvent[] = [];
|
|
const workflows: WorkflowTelemetry[] = [];
|
|
|
|
// Separate events and workflows
|
|
for (const item of this.deadLetterQueue) {
|
|
if ('workflow_hash' in item) {
|
|
workflows.push(item as WorkflowTelemetry);
|
|
} else {
|
|
events.push(item as TelemetryEvent);
|
|
}
|
|
}
|
|
|
|
// Clear dead letter queue
|
|
this.deadLetterQueue = [];
|
|
|
|
// Try to flush
|
|
if (events.length > 0) {
|
|
await this.flushEvents(events);
|
|
}
|
|
if (workflows.length > 0) {
|
|
await this.flushWorkflows(workflows);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Record flush time for metrics
|
|
*/
|
|
private recordFlushTime(time: number): void {
|
|
this.flushTimes.push(time);
|
|
|
|
// Keep last 100 flush times
|
|
if (this.flushTimes.length > 100) {
|
|
this.flushTimes.shift();
|
|
}
|
|
|
|
// Update average
|
|
const sum = this.flushTimes.reduce((a, b) => a + b, 0);
|
|
this.metrics.averageFlushTime = Math.round(sum / this.flushTimes.length);
|
|
this.metrics.lastFlushTime = time;
|
|
}
|
|
|
|
/**
|
|
* Get processor metrics
|
|
*/
|
|
getMetrics(): TelemetryMetrics & { circuitBreakerState: any; deadLetterQueueSize: number } {
|
|
return {
|
|
...this.metrics,
|
|
circuitBreakerState: this.circuitBreaker.getState(),
|
|
deadLetterQueueSize: this.deadLetterQueue.length
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Reset metrics
|
|
*/
|
|
resetMetrics(): void {
|
|
this.metrics = {
|
|
eventsTracked: 0,
|
|
eventsDropped: 0,
|
|
eventsFailed: 0,
|
|
batchesSent: 0,
|
|
batchesFailed: 0,
|
|
averageFlushTime: 0,
|
|
rateLimitHits: 0
|
|
};
|
|
this.flushTimes = [];
|
|
this.circuitBreaker.reset();
|
|
}
|
|
} |