# Telemetry Workflow Mutation Tracking Specification **Purpose:** Define the technical requirements for capturing workflow mutation data to build the n8n-fixer dataset **Status:** Specification Document (Pre-Implementation) --- ## 1. Overview This specification details how to extend the n8n-mcp telemetry system to capture: - **Before State:** Complete workflow JSON before modification - **Instruction:** The transformation instruction/prompt - **After State:** Complete workflow JSON after modification - **Metadata:** Timestamps, user ID, success metrics, validation states --- ## 2. Schema Design ### 2.1 New Database Table: `workflow_mutations` ```sql CREATE TABLE IF NOT EXISTS workflow_mutations ( -- Primary Key & Identifiers id UUID PRIMARY KEY DEFAULT gen_random_uuid(), user_id TEXT NOT NULL, workflow_id TEXT, -- n8n workflow ID (nullable for new workflows) -- Source Workflow Snapshot (Before) before_workflow_json JSONB NOT NULL, -- Complete workflow definition before_workflow_hash TEXT NOT NULL, -- SHA-256(before_workflow_json) before_validation_status TEXT NOT NULL CHECK(before_validation_status IN ( 'valid', -- Workflow passes validation 'invalid', -- Has validation errors 'unknown' -- Unknown state (not tested) )), before_error_count INTEGER, -- Number of validation errors before_error_types TEXT[], -- Array: ['type_error', 'missing_field', ...] -- Mutation Details instruction TEXT NOT NULL, -- The modification instruction/prompt instruction_type TEXT NOT NULL CHECK(instruction_type IN ( 'ai_generated', -- Generated by AI/LLM 'user_provided', -- User input/request 'auto_fix', -- System auto-correction 'validation_correction' -- Validation rule fix )), mutation_source TEXT, -- Which tool/service created the mutation -- e.g., 'n8n_autofix_workflow', 'validation_engine' mutation_tool_version TEXT, -- Version of tool that performed mutation -- Target Workflow Snapshot (After) after_workflow_json JSONB NOT NULL, -- Complete modified workflow after_workflow_hash TEXT NOT NULL, -- SHA-256(after_workflow_json) after_validation_status TEXT NOT NULL CHECK(after_validation_status IN ( 'valid', 'invalid', 'unknown' )), after_error_count INTEGER, -- Validation errors after mutation after_error_types TEXT[], -- Remaining error types -- Mutation Analysis (Pre-calculated for Performance) nodes_modified TEXT[], -- Array of modified node IDs/names nodes_added TEXT[], -- New nodes in after state nodes_removed TEXT[], -- Removed nodes nodes_modified_count INTEGER, -- Count of modified nodes nodes_added_count INTEGER, nodes_removed_count INTEGER, connections_modified BOOLEAN, -- Were connections/edges changed? connections_before_count INTEGER, -- Number of connections before connections_after_count INTEGER, -- Number after properties_modified TEXT[], -- Changed property paths -- e.g., ['nodes[0].parameters.url', ...] properties_modified_count INTEGER, expressions_modified BOOLEAN, -- Were expressions/formulas changed? -- Complexity Metrics complexity_before TEXT CHECK(complexity_before IN ( 'simple', 'medium', 'complex' )), complexity_after TEXT, node_count_before INTEGER, node_count_after INTEGER, node_types_before TEXT[], node_types_after TEXT[], -- Outcome Metrics mutation_success BOOLEAN, -- Did mutation achieve intended goal? validation_improved BOOLEAN, -- true if: error_count_after < error_count_before validation_errors_fixed INTEGER, -- Count of errors fixed new_errors_introduced INTEGER, -- Errors created by mutation -- Optional: User Feedback user_approved BOOLEAN, -- User accepted the mutation? user_feedback TEXT, -- User comment (truncated) -- Data Quality & Compression workflow_size_before INTEGER, -- Byte size of before_workflow_json workflow_size_after INTEGER, -- Byte size of after_workflow_json is_compressed BOOLEAN DEFAULT false, -- True if workflows are gzip-compressed -- Timing execution_duration_ms INTEGER, -- Time taken to apply mutation created_at TIMESTAMP DEFAULT NOW(), -- Metadata tags TEXT[], -- Custom tags for filtering metadata JSONB -- Flexible metadata storage ); ``` ### 2.2 Indexes for Performance ```sql -- User Analysis (User's mutation history) CREATE INDEX idx_mutations_user_id ON workflow_mutations(user_id, created_at DESC); -- Workflow Analysis (Mutations to specific workflow) CREATE INDEX idx_mutations_workflow_id ON workflow_mutations(workflow_id, created_at DESC); -- Mutation Success Rate CREATE INDEX idx_mutations_success ON workflow_mutations(mutation_success, created_at DESC); -- Validation Improvement Analysis CREATE INDEX idx_mutations_validation_improved ON workflow_mutations(validation_improved, created_at DESC); -- Time-series Analysis CREATE INDEX idx_mutations_created_at ON workflow_mutations(created_at DESC); -- Source Analysis CREATE INDEX idx_mutations_source ON workflow_mutations(mutation_source, created_at DESC); -- Instruction Type Analysis CREATE INDEX idx_mutations_instruction_type ON workflow_mutations(instruction_type, created_at DESC); -- Composite: For common query patterns CREATE INDEX idx_mutations_user_success_time ON workflow_mutations(user_id, mutation_success, created_at DESC); CREATE INDEX idx_mutations_source_validation ON workflow_mutations(mutation_source, validation_improved, created_at DESC); ``` ### 2.3 Optional: Materialized View for Analytics ```sql -- Pre-calculate common metrics for fast dashboarding CREATE MATERIALIZED VIEW vw_mutation_analytics AS SELECT DATE(created_at) as mutation_date, instruction_type, mutation_source, COUNT(*) as total_mutations, SUM(CASE WHEN mutation_success THEN 1 ELSE 0 END) as successful_mutations, SUM(CASE WHEN validation_improved THEN 1 ELSE 0 END) as validation_improved_count, ROUND(100.0 * COUNT(*) FILTER(WHERE mutation_success = true) / NULLIF(COUNT(*), 0), 2) as success_rate, AVG(nodes_modified_count) as avg_nodes_modified, AVG(properties_modified_count) as avg_properties_modified, AVG(execution_duration_ms) as avg_duration_ms, AVG(before_error_count) as avg_errors_before, AVG(after_error_count) as avg_errors_after, AVG(validation_errors_fixed) as avg_errors_fixed FROM workflow_mutations GROUP BY DATE(created_at), instruction_type, mutation_source; CREATE INDEX idx_mutation_analytics_date ON vw_mutation_analytics(mutation_date DESC); ``` --- ## 3. TypeScript Interfaces ### 3.1 Core Mutation Interface ```typescript // In src/telemetry/telemetry-types.ts export interface WorkflowMutationEvent extends TelemetryEvent { event: 'workflow_mutation'; properties: { // Identification workflowId?: string; // Hashes for deduplication & integrity beforeHash: string; // SHA-256 of before state afterHash: string; // SHA-256 of after state // Instruction instruction: string; // The modification prompt/request instructionType: 'ai_generated' | 'user_provided' | 'auto_fix' | 'validation_correction'; mutationSource?: string; // Tool that created the instruction // Change Summary nodesModified: number; propertiesChanged: number; connectionsModified: boolean; expressionsModified: boolean; // Outcome mutationSuccess: boolean; validationImproved: boolean; errorsBefore: number; errorsAfter: number; // Performance executionDurationMs?: number; workflowSizeBefore?: number; workflowSizeAfter?: number; } } export interface WorkflowMutation { // Primary Key id: string; // UUID user_id: string; // Anonymized user workflow_id?: string; // n8n workflow ID // Before State before_workflow_json: any; // Complete workflow before_workflow_hash: string; before_validation_status: 'valid' | 'invalid' | 'unknown'; before_error_count?: number; before_error_types?: string[]; // Mutation instruction: string; instruction_type: 'ai_generated' | 'user_provided' | 'auto_fix' | 'validation_correction'; mutation_source?: string; mutation_tool_version?: string; // After State after_workflow_json: any; after_workflow_hash: string; after_validation_status: 'valid' | 'invalid' | 'unknown'; after_error_count?: number; after_error_types?: string[]; // Analysis nodes_modified?: string[]; nodes_added?: string[]; nodes_removed?: string[]; nodes_modified_count?: number; connections_modified?: boolean; properties_modified?: string[]; properties_modified_count?: number; // Complexity complexity_before?: 'simple' | 'medium' | 'complex'; complexity_after?: 'simple' | 'medium' | 'complex'; node_count_before?: number; node_count_after?: number; // Outcome mutation_success: boolean; validation_improved: boolean; validation_errors_fixed?: number; new_errors_introduced?: number; user_approved?: boolean; // Timing created_at: string; // ISO 8601 execution_duration_ms?: number; } ``` ### 3.2 Mutation Analysis Service ```typescript // New file: src/telemetry/mutation-analyzer.ts export interface MutationDiff { nodesAdded: string[]; nodesRemoved: string[]; nodesModified: Map; connectionsChanged: boolean; expressionsChanged: boolean; } export interface PropertyDiff { path: string; // e.g., "parameters.url" beforeValue: any; afterValue: any; isExpression: boolean; // Contains {{}} or $json? } export class WorkflowMutationAnalyzer { /** * Analyze differences between before/after workflows */ static analyzeDifferences( beforeWorkflow: any, afterWorkflow: any ): MutationDiff { // Implementation: Deep comparison of workflow structures // Return detailed diff information } /** * Extract changed property paths */ static getChangedProperties(diff: MutationDiff): string[] { // Implementation } /** * Determine if expression/formula was modified */ static hasExpressionChanges(diff: MutationDiff): boolean { // Implementation } /** * Validate workflow structure */ static validateWorkflowStructure(workflow: any): { isValid: boolean; errors: string[]; errorTypes: string[]; } { // Implementation } } ``` --- ## 4. Integration Points ### 4.1 TelemetryManager Extension ```typescript // In src/telemetry/telemetry-manager.ts export class TelemetryManager { // ... existing code ... /** * Track workflow mutation (new method) */ async trackWorkflowMutation( beforeWorkflow: any, instruction: string, afterWorkflow: any, options?: { instructionType?: 'ai_generated' | 'user_provided' | 'auto_fix'; mutationSource?: string; workflowId?: string; success?: boolean; executionDurationMs?: number; userApproved?: boolean; } ): Promise { this.ensureInitialized(); this.performanceMonitor.startOperation('trackWorkflowMutation'); try { await this.eventTracker.trackWorkflowMutation( beforeWorkflow, instruction, afterWorkflow, options ); // Auto-flush mutations to prevent data loss await this.flush(); } catch (error) { const telemetryError = error instanceof TelemetryError ? error : new TelemetryError( TelemetryErrorType.UNKNOWN_ERROR, 'Failed to track workflow mutation', { error: String(error) } ); this.errorAggregator.record(telemetryError); } finally { this.performanceMonitor.endOperation('trackWorkflowMutation'); } } } ``` ### 4.2 EventTracker Extension ```typescript // In src/telemetry/event-tracker.ts export class TelemetryEventTracker { // ... existing code ... private mutationQueue: WorkflowMutation[] = []; private mutationAnalyzer = new WorkflowMutationAnalyzer(); /** * Track a workflow mutation */ async trackWorkflowMutation( beforeWorkflow: any, instruction: string, afterWorkflow: any, options?: MutationTrackingOptions ): Promise { if (!this.isEnabled()) return; try { // 1. Analyze differences const diff = this.mutationAnalyzer.analyzeDifferences( beforeWorkflow, afterWorkflow ); // 2. Calculate hashes const beforeHash = this.calculateHash(beforeWorkflow); const afterHash = this.calculateHash(afterWorkflow); // 3. Detect validation changes const beforeValidation = this.mutationAnalyzer.validateWorkflowStructure( beforeWorkflow ); const afterValidation = this.mutationAnalyzer.validateWorkflowStructure( afterWorkflow ); // 4. Create mutation record const mutation: WorkflowMutation = { id: generateUUID(), user_id: this.getUserId(), workflow_id: options?.workflowId, before_workflow_json: beforeWorkflow, before_workflow_hash: beforeHash, before_validation_status: beforeValidation.isValid ? 'valid' : 'invalid', before_error_count: beforeValidation.errors.length, before_error_types: beforeValidation.errorTypes, instruction, instruction_type: options?.instructionType || 'user_provided', mutation_source: options?.mutationSource, after_workflow_json: afterWorkflow, after_workflow_hash: afterHash, after_validation_status: afterValidation.isValid ? 'valid' : 'invalid', after_error_count: afterValidation.errors.length, after_error_types: afterValidation.errorTypes, nodes_modified: Array.from(diff.nodesModified.keys()), nodes_added: diff.nodesAdded, nodes_removed: diff.nodesRemoved, properties_modified: this.mutationAnalyzer.getChangedProperties(diff), connections_modified: diff.connectionsChanged, mutation_success: options?.success !== false, validation_improved: afterValidation.errors.length < beforeValidation.errors.length, validation_errors_fixed: Math.max( 0, beforeValidation.errors.length - afterValidation.errors.length ), created_at: new Date().toISOString(), execution_duration_ms: options?.executionDurationMs, user_approved: options?.userApproved }; // 5. Validate and queue const validated = this.validator.validateMutation(mutation); if (validated) { this.mutationQueue.push(validated); } // 6. Track as event for real-time monitoring this.trackEvent('workflow_mutation', { beforeHash, afterHash, instructionType: options?.instructionType || 'user_provided', nodesModified: diff.nodesModified.size, propertiesChanged: diff.properties_modified?.length || 0, mutationSuccess: options?.success !== false, validationImproved: mutation.validation_improved, errorsBefore: beforeValidation.errors.length, errorsAfter: afterValidation.errors.length }); } catch (error) { logger.debug('Failed to track workflow mutation:', error); throw new TelemetryError( TelemetryErrorType.VALIDATION_ERROR, 'Failed to process workflow mutation', { error: error instanceof Error ? error.message : String(error) } ); } } /** * Get queued mutations */ getMutationQueue(): WorkflowMutation[] { return [...this.mutationQueue]; } /** * Clear mutation queue */ clearMutationQueue(): void { this.mutationQueue = []; } /** * Calculate SHA-256 hash of workflow */ private calculateHash(workflow: any): string { const crypto = require('crypto'); const normalized = JSON.stringify(workflow, null, 0); return crypto.createHash('sha256').update(normalized).digest('hex'); } } ``` ### 4.3 BatchProcessor Extension ```typescript // In src/telemetry/batch-processor.ts export class TelemetryBatchProcessor { // ... existing code ... /** * Flush mutations to Supabase */ private async flushMutations( mutations: WorkflowMutation[] ): Promise { if (this.isFlushingMutations || mutations.length === 0) return true; this.isFlushingMutations = true; try { const batches = this.createBatches( mutations, TELEMETRY_CONFIG.MAX_BATCH_SIZE ); for (const batch of batches) { const result = await this.executeWithRetry(async () => { const { error } = await this.supabase! .from('workflow_mutations') .insert(batch); if (error) throw error; logger.debug(`Flushed batch of ${batch.length} workflow mutations`); return true; }, 'Flush workflow mutations'); if (!result) { this.addToDeadLetterQueue(batch); return false; } } return true; } catch (error) { logger.debug('Failed to flush mutations:', error); throw new TelemetryError( TelemetryErrorType.NETWORK_ERROR, 'Failed to flush mutations', { error: error instanceof Error ? error.message : String(error) }, true ); } finally { this.isFlushingMutations = false; } } } ``` --- ## 5. Integration with Workflow Tools ### 5.1 n8n_autofix_workflow ```typescript // Where n8n_autofix_workflow applies fixes import { telemetry } from '../telemetry'; export async function n8n_autofix_workflow( workflow: any, options?: AutofixOptions ): Promise { const beforeWorkflow = JSON.parse(JSON.stringify(workflow)); // Deep copy try { // Apply fixes const fixed = await applyFixes(workflow, options); // Track mutation await telemetry.trackWorkflowMutation( beforeWorkflow, 'Auto-fix validation errors', fixed, { instructionType: 'auto_fix', mutationSource: 'n8n_autofix_workflow', success: true, executionDurationMs: duration } ); return fixed; } catch (error) { // Track failed mutation attempt await telemetry.trackWorkflowMutation( beforeWorkflow, 'Auto-fix validation errors', beforeWorkflow, // No changes { instructionType: 'auto_fix', mutationSource: 'n8n_autofix_workflow', success: false } ); throw error; } } ``` ### 5.2 n8n_update_partial_workflow ```typescript // Partial workflow updates export async function n8n_update_partial_workflow( workflow: any, operations: DiffOperation[] ): Promise { const beforeWorkflow = JSON.parse(JSON.stringify(workflow)); const instructionText = formatOperationsAsInstruction(operations); try { const updated = applyOperations(workflow, operations); await telemetry.trackWorkflowMutation( beforeWorkflow, instructionText, updated, { instructionType: 'user_provided', mutationSource: 'n8n_update_partial_workflow' } ); return updated; } catch (error) { await telemetry.trackWorkflowMutation( beforeWorkflow, instructionText, beforeWorkflow, { instructionType: 'user_provided', mutationSource: 'n8n_update_partial_workflow', success: false } ); throw error; } } ``` --- ## 6. Data Quality & Validation ### 6.1 Mutation Validation Rules ```typescript // In src/telemetry/mutation-validator.ts export class WorkflowMutationValidator { /** * Validate mutation data before storage */ static validate(mutation: WorkflowMutation): ValidationResult { const errors: string[] = []; // Required fields if (!mutation.user_id) errors.push('user_id is required'); if (!mutation.before_workflow_json) errors.push('before_workflow_json required'); if (!mutation.after_workflow_json) errors.push('after_workflow_json required'); if (!mutation.before_workflow_hash) errors.push('before_workflow_hash required'); if (!mutation.after_workflow_hash) errors.push('after_workflow_hash required'); if (!mutation.instruction) errors.push('instruction is required'); if (!mutation.instruction_type) errors.push('instruction_type is required'); // Hash verification const beforeHash = calculateHash(mutation.before_workflow_json); const afterHash = calculateHash(mutation.after_workflow_json); if (beforeHash !== mutation.before_workflow_hash) { errors.push('before_workflow_hash mismatch'); } if (afterHash !== mutation.after_workflow_hash) { errors.push('after_workflow_hash mismatch'); } // Deduplication: Skip if before == after if (beforeHash === afterHash) { errors.push('before and after states are identical (skipping)'); } // Size validation const beforeSize = JSON.stringify(mutation.before_workflow_json).length; const afterSize = JSON.stringify(mutation.after_workflow_json).length; if (beforeSize > 10 * 1024 * 1024) { errors.push('before_workflow_json exceeds 10MB size limit'); } if (afterSize > 10 * 1024 * 1024) { errors.push('after_workflow_json exceeds 10MB size limit'); } // Instruction validation if (mutation.instruction.length > 5000) { mutation.instruction = mutation.instruction.substring(0, 5000); } if (mutation.instruction.length < 3) { errors.push('instruction too short (min 3 chars)'); } // Error count validation if (mutation.before_error_count && mutation.before_error_count < 0) { errors.push('before_error_count cannot be negative'); } if (mutation.after_error_count && mutation.after_error_count < 0) { errors.push('after_error_count cannot be negative'); } return { isValid: errors.length === 0, errors }; } } ``` ### 6.2 Data Compression Strategy For large workflows (>1MB): ```typescript import { gzipSync, gunzipSync } from 'zlib'; export function compressWorkflow(workflow: any): { compressed: string; // base64 originalSize: number; compressedSize: number; } { const json = JSON.stringify(workflow); const buffer = Buffer.from(json, 'utf-8'); const compressed = gzipSync(buffer); const base64 = compressed.toString('base64'); return { compressed: base64, originalSize: buffer.length, compressedSize: compressed.length }; } export function decompressWorkflow(compressed: string): any { const buffer = Buffer.from(compressed, 'base64'); const decompressed = gunzipSync(buffer); const json = decompressed.toString('utf-8'); return JSON.parse(json); } ``` --- ## 7. Query Examples for Analysis ### 7.1 Basic Mutation Statistics ```sql -- Overall mutation metrics SELECT COUNT(*) as total_mutations, COUNT(*) FILTER(WHERE mutation_success) as successful, COUNT(*) FILTER(WHERE validation_improved) as validation_improved, ROUND(100.0 * COUNT(*) FILTER(WHERE mutation_success) / COUNT(*), 2) as success_rate, ROUND(100.0 * COUNT(*) FILTER(WHERE validation_improved) / COUNT(*), 2) as improvement_rate, AVG(nodes_modified_count) as avg_nodes_modified, AVG(properties_modified_count) as avg_properties_modified, AVG(execution_duration_ms)::INTEGER as avg_duration_ms FROM workflow_mutations WHERE created_at >= NOW() - INTERVAL '7 days'; ``` ### 7.2 Success by Instruction Type ```sql SELECT instruction_type, COUNT(*) as count, ROUND(100.0 * COUNT(*) FILTER(WHERE mutation_success) / COUNT(*), 2) as success_rate, ROUND(100.0 * COUNT(*) FILTER(WHERE validation_improved) / COUNT(*), 2) as improvement_rate, AVG(validation_errors_fixed) as avg_errors_fixed, AVG(new_errors_introduced) as avg_new_errors FROM workflow_mutations WHERE created_at >= NOW() - INTERVAL '30 days' GROUP BY instruction_type ORDER BY count DESC; ``` ### 7.3 Most Common Mutations ```sql SELECT properties_modified, COUNT(*) as frequency, ROUND(100.0 * COUNT(*) / (SELECT COUNT(*) FROM workflow_mutations WHERE created_at >= NOW() - INTERVAL '30 days'), 2) as percentage FROM workflow_mutations WHERE created_at >= NOW() - INTERVAL '30 days' ORDER BY frequency DESC LIMIT 20; ``` ### 7.4 Complexity Impact ```sql SELECT complexity_before, complexity_after, COUNT(*) as transitions, ROUND(100.0 * COUNT(*) FILTER(WHERE mutation_success) / COUNT(*), 2) as success_rate FROM workflow_mutations WHERE created_at >= NOW() - INTERVAL '30 days' GROUP BY complexity_before, complexity_after ORDER BY transitions DESC; ``` --- ## 8. Implementation Roadmap ### Phase 1: Infrastructure (Week 1) - [ ] Create `workflow_mutations` table in Supabase - [ ] Add indexes for common query patterns - [ ] Update TypeScript types - [ ] Create mutation analyzer service - [ ] Add mutation validator ### Phase 2: Integration (Week 2) - [ ] Extend TelemetryManager with trackWorkflowMutation() - [ ] Extend EventTracker with mutation queue - [ ] Extend BatchProcessor with flush logic - [ ] Add mutation event type ### Phase 3: Tool Integration (Week 3) - [ ] Integrate with n8n_autofix_workflow - [ ] Integrate with n8n_update_partial_workflow - [ ] Add test cases - [ ] Documentation ### Phase 4: Validation & Analysis (Week 4) - [ ] Run sample queries - [ ] Validate data quality - [ ] Create analytics dashboard - [ ] Begin dataset collection --- ## 9. Security & Privacy Considerations - **No Credentials:** Sanitizer strips credentials before storage - **No Secrets:** Workflow secret references removed - **User Anonymity:** User ID is anonymized - **Hash Verification:** All workflow hashes verified before storage - **Size Limits:** 10MB max per workflow (with compression option) - **Retention:** Define data retention policy separately - **Encryption:** Enable Supabase encryption at rest - **Access Control:** Restrict table access to application-level only --- ## 10. Performance Considerations | Aspect | Target | Strategy | |--------|--------|----------| | **Batch Flush** | <5s latency | 5-second flush interval + auto-flush | | **Large Workflows** | >1MB support | Gzip compression + base64 encoding | | **Query Performance** | <100ms | Strategic indexing + materialized views | | **Storage Growth** | <50GB/month | Compression + retention policies | | **Network Throughput** | <1MB/batch | Compress before transmission | --- *End of Specification*