mirror of
https://github.com/czlonkowski/n8n-mcp.git
synced 2026-03-19 08:53:09 +00:00
Implement telemetry infrastructure to track workflow mutations from partial update operations. This enables data-driven improvements to partial update tooling by capturing: - Workflow state before and after mutations - User intent and operation patterns - Validation results and improvements - Change metrics (nodes/connections modified) - Success/failure rates and error patterns New Components: - Intent classifier: Categorizes mutation patterns - Intent sanitizer: Removes PII from user instructions - Mutation validator: Ensures data quality before tracking - Mutation tracker: Coordinates validation and metric calculation Extended Components: - TelemetryManager: New trackWorkflowMutation() method - EventTracker: Mutation queue management - BatchProcessor: Mutation data flushing to Supabase MCP Tool Enhancements: - n8n_update_partial_workflow: Added optional 'intent' parameter - n8n_update_full_workflow: Added optional 'intent' parameter - Both tools now track mutations asynchronously Database Schema: - New workflow_mutations table with 20+ fields - Comprehensive indexes for efficient querying - Supports deduplication and data analysis This telemetry system is: - Privacy-focused (PII sanitization, anonymized users) - Non-blocking (async tracking, silent failures) - Production-ready (batching, retries, circuit breaker) - Backward compatible (all parameters optional) Conceived by Romuald Członkowski - https://www.aiadvisors.pl/en
919 lines
26 KiB
Markdown
919 lines
26 KiB
Markdown
# Telemetry Workflow Mutation Tracking Specification
|
|
|
|
**Purpose:** Define the technical requirements for capturing workflow mutation data to build the n8n-fixer dataset
|
|
|
|
**Status:** Specification Document (Pre-Implementation)
|
|
|
|
---
|
|
|
|
## 1. Overview
|
|
|
|
This specification details how to extend the n8n-mcp telemetry system to capture:
|
|
- **Before State:** Complete workflow JSON before modification
|
|
- **Instruction:** The transformation instruction/prompt
|
|
- **After State:** Complete workflow JSON after modification
|
|
- **Metadata:** Timestamps, user ID, success metrics, validation states
|
|
|
|
---
|
|
|
|
## 2. Schema Design
|
|
|
|
### 2.1 New Database Table: `workflow_mutations`
|
|
|
|
```sql
|
|
CREATE TABLE IF NOT EXISTS workflow_mutations (
|
|
-- Primary Key & Identifiers
|
|
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
|
user_id TEXT NOT NULL,
|
|
workflow_id TEXT, -- n8n workflow ID (nullable for new workflows)
|
|
|
|
-- Source Workflow Snapshot (Before)
|
|
before_workflow_json JSONB NOT NULL, -- Complete workflow definition
|
|
before_workflow_hash TEXT NOT NULL, -- SHA-256(before_workflow_json)
|
|
before_validation_status TEXT NOT NULL CHECK(before_validation_status IN (
|
|
'valid', -- Workflow passes validation
|
|
'invalid', -- Has validation errors
|
|
'unknown' -- Unknown state (not tested)
|
|
)),
|
|
before_error_count INTEGER, -- Number of validation errors
|
|
before_error_types TEXT[], -- Array: ['type_error', 'missing_field', ...]
|
|
|
|
-- Mutation Details
|
|
instruction TEXT NOT NULL, -- The modification instruction/prompt
|
|
instruction_type TEXT NOT NULL CHECK(instruction_type IN (
|
|
'ai_generated', -- Generated by AI/LLM
|
|
'user_provided', -- User input/request
|
|
'auto_fix', -- System auto-correction
|
|
'validation_correction' -- Validation rule fix
|
|
)),
|
|
mutation_source TEXT, -- Which tool/service created the mutation
|
|
-- e.g., 'n8n_autofix_workflow', 'validation_engine'
|
|
mutation_tool_version TEXT, -- Version of tool that performed mutation
|
|
|
|
-- Target Workflow Snapshot (After)
|
|
after_workflow_json JSONB NOT NULL, -- Complete modified workflow
|
|
after_workflow_hash TEXT NOT NULL, -- SHA-256(after_workflow_json)
|
|
after_validation_status TEXT NOT NULL CHECK(after_validation_status IN (
|
|
'valid',
|
|
'invalid',
|
|
'unknown'
|
|
)),
|
|
after_error_count INTEGER, -- Validation errors after mutation
|
|
after_error_types TEXT[], -- Remaining error types
|
|
|
|
-- Mutation Analysis (Pre-calculated for Performance)
|
|
nodes_modified TEXT[], -- Array of modified node IDs/names
|
|
nodes_added TEXT[], -- New nodes in after state
|
|
nodes_removed TEXT[], -- Removed nodes
|
|
nodes_modified_count INTEGER, -- Count of modified nodes
|
|
nodes_added_count INTEGER,
|
|
nodes_removed_count INTEGER,
|
|
|
|
connections_modified BOOLEAN, -- Were connections/edges changed?
|
|
connections_before_count INTEGER, -- Number of connections before
|
|
connections_after_count INTEGER, -- Number after
|
|
|
|
properties_modified TEXT[], -- Changed property paths
|
|
-- e.g., ['nodes[0].parameters.url', ...]
|
|
properties_modified_count INTEGER,
|
|
expressions_modified BOOLEAN, -- Were expressions/formulas changed?
|
|
|
|
-- Complexity Metrics
|
|
complexity_before TEXT CHECK(complexity_before IN (
|
|
'simple',
|
|
'medium',
|
|
'complex'
|
|
)),
|
|
complexity_after TEXT,
|
|
node_count_before INTEGER,
|
|
node_count_after INTEGER,
|
|
node_types_before TEXT[],
|
|
node_types_after TEXT[],
|
|
|
|
-- Outcome Metrics
|
|
mutation_success BOOLEAN, -- Did mutation achieve intended goal?
|
|
validation_improved BOOLEAN, -- true if: error_count_after < error_count_before
|
|
validation_errors_fixed INTEGER, -- Count of errors fixed
|
|
new_errors_introduced INTEGER, -- Errors created by mutation
|
|
|
|
-- Optional: User Feedback
|
|
user_approved BOOLEAN, -- User accepted the mutation?
|
|
user_feedback TEXT, -- User comment (truncated)
|
|
|
|
-- Data Quality & Compression
|
|
workflow_size_before INTEGER, -- Byte size of before_workflow_json
|
|
workflow_size_after INTEGER, -- Byte size of after_workflow_json
|
|
is_compressed BOOLEAN DEFAULT false, -- True if workflows are gzip-compressed
|
|
|
|
-- Timing
|
|
execution_duration_ms INTEGER, -- Time taken to apply mutation
|
|
created_at TIMESTAMP DEFAULT NOW(),
|
|
|
|
-- Metadata
|
|
tags TEXT[], -- Custom tags for filtering
|
|
metadata JSONB -- Flexible metadata storage
|
|
);
|
|
```
|
|
|
|
### 2.2 Indexes for Performance
|
|
|
|
```sql
|
|
-- User Analysis (User's mutation history)
|
|
CREATE INDEX idx_mutations_user_id
|
|
ON workflow_mutations(user_id, created_at DESC);
|
|
|
|
-- Workflow Analysis (Mutations to specific workflow)
|
|
CREATE INDEX idx_mutations_workflow_id
|
|
ON workflow_mutations(workflow_id, created_at DESC);
|
|
|
|
-- Mutation Success Rate
|
|
CREATE INDEX idx_mutations_success
|
|
ON workflow_mutations(mutation_success, created_at DESC);
|
|
|
|
-- Validation Improvement Analysis
|
|
CREATE INDEX idx_mutations_validation_improved
|
|
ON workflow_mutations(validation_improved, created_at DESC);
|
|
|
|
-- Time-series Analysis
|
|
CREATE INDEX idx_mutations_created_at
|
|
ON workflow_mutations(created_at DESC);
|
|
|
|
-- Source Analysis
|
|
CREATE INDEX idx_mutations_source
|
|
ON workflow_mutations(mutation_source, created_at DESC);
|
|
|
|
-- Instruction Type Analysis
|
|
CREATE INDEX idx_mutations_instruction_type
|
|
ON workflow_mutations(instruction_type, created_at DESC);
|
|
|
|
-- Composite: For common query patterns
|
|
CREATE INDEX idx_mutations_user_success_time
|
|
ON workflow_mutations(user_id, mutation_success, created_at DESC);
|
|
|
|
CREATE INDEX idx_mutations_source_validation
|
|
ON workflow_mutations(mutation_source, validation_improved, created_at DESC);
|
|
```
|
|
|
|
### 2.3 Optional: Materialized View for Analytics
|
|
|
|
```sql
|
|
-- Pre-calculate common metrics for fast dashboarding
|
|
CREATE MATERIALIZED VIEW vw_mutation_analytics AS
|
|
SELECT
|
|
DATE(created_at) as mutation_date,
|
|
instruction_type,
|
|
mutation_source,
|
|
|
|
COUNT(*) as total_mutations,
|
|
SUM(CASE WHEN mutation_success THEN 1 ELSE 0 END) as successful_mutations,
|
|
SUM(CASE WHEN validation_improved THEN 1 ELSE 0 END) as validation_improved_count,
|
|
|
|
ROUND(100.0 * COUNT(*) FILTER(WHERE mutation_success = true)
|
|
/ NULLIF(COUNT(*), 0), 2) as success_rate,
|
|
|
|
AVG(nodes_modified_count) as avg_nodes_modified,
|
|
AVG(properties_modified_count) as avg_properties_modified,
|
|
AVG(execution_duration_ms) as avg_duration_ms,
|
|
|
|
AVG(before_error_count) as avg_errors_before,
|
|
AVG(after_error_count) as avg_errors_after,
|
|
AVG(validation_errors_fixed) as avg_errors_fixed
|
|
|
|
FROM workflow_mutations
|
|
GROUP BY DATE(created_at), instruction_type, mutation_source;
|
|
|
|
CREATE INDEX idx_mutation_analytics_date
|
|
ON vw_mutation_analytics(mutation_date DESC);
|
|
```
|
|
|
|
---
|
|
|
|
## 3. TypeScript Interfaces
|
|
|
|
### 3.1 Core Mutation Interface
|
|
|
|
```typescript
|
|
// In src/telemetry/telemetry-types.ts
|
|
|
|
export interface WorkflowMutationEvent extends TelemetryEvent {
|
|
event: 'workflow_mutation';
|
|
properties: {
|
|
// Identification
|
|
workflowId?: string;
|
|
|
|
// Hashes for deduplication & integrity
|
|
beforeHash: string; // SHA-256 of before state
|
|
afterHash: string; // SHA-256 of after state
|
|
|
|
// Instruction
|
|
instruction: string; // The modification prompt/request
|
|
instructionType: 'ai_generated' | 'user_provided' | 'auto_fix' | 'validation_correction';
|
|
mutationSource?: string; // Tool that created the instruction
|
|
|
|
// Change Summary
|
|
nodesModified: number;
|
|
propertiesChanged: number;
|
|
connectionsModified: boolean;
|
|
expressionsModified: boolean;
|
|
|
|
// Outcome
|
|
mutationSuccess: boolean;
|
|
validationImproved: boolean;
|
|
errorsBefore: number;
|
|
errorsAfter: number;
|
|
|
|
// Performance
|
|
executionDurationMs?: number;
|
|
workflowSizeBefore?: number;
|
|
workflowSizeAfter?: number;
|
|
}
|
|
}
|
|
|
|
export interface WorkflowMutation {
|
|
// Primary Key
|
|
id: string; // UUID
|
|
user_id: string; // Anonymized user
|
|
workflow_id?: string; // n8n workflow ID
|
|
|
|
// Before State
|
|
before_workflow_json: any; // Complete workflow
|
|
before_workflow_hash: string;
|
|
before_validation_status: 'valid' | 'invalid' | 'unknown';
|
|
before_error_count?: number;
|
|
before_error_types?: string[];
|
|
|
|
// Mutation
|
|
instruction: string;
|
|
instruction_type: 'ai_generated' | 'user_provided' | 'auto_fix' | 'validation_correction';
|
|
mutation_source?: string;
|
|
mutation_tool_version?: string;
|
|
|
|
// After State
|
|
after_workflow_json: any;
|
|
after_workflow_hash: string;
|
|
after_validation_status: 'valid' | 'invalid' | 'unknown';
|
|
after_error_count?: number;
|
|
after_error_types?: string[];
|
|
|
|
// Analysis
|
|
nodes_modified?: string[];
|
|
nodes_added?: string[];
|
|
nodes_removed?: string[];
|
|
nodes_modified_count?: number;
|
|
connections_modified?: boolean;
|
|
properties_modified?: string[];
|
|
properties_modified_count?: number;
|
|
|
|
// Complexity
|
|
complexity_before?: 'simple' | 'medium' | 'complex';
|
|
complexity_after?: 'simple' | 'medium' | 'complex';
|
|
node_count_before?: number;
|
|
node_count_after?: number;
|
|
|
|
// Outcome
|
|
mutation_success: boolean;
|
|
validation_improved: boolean;
|
|
validation_errors_fixed?: number;
|
|
new_errors_introduced?: number;
|
|
user_approved?: boolean;
|
|
|
|
// Timing
|
|
created_at: string; // ISO 8601
|
|
execution_duration_ms?: number;
|
|
}
|
|
```
|
|
|
|
### 3.2 Mutation Analysis Service
|
|
|
|
```typescript
|
|
// New file: src/telemetry/mutation-analyzer.ts
|
|
|
|
export interface MutationDiff {
|
|
nodesAdded: string[];
|
|
nodesRemoved: string[];
|
|
nodesModified: Map<string, PropertyDiff[]>;
|
|
connectionsChanged: boolean;
|
|
expressionsChanged: boolean;
|
|
}
|
|
|
|
export interface PropertyDiff {
|
|
path: string; // e.g., "parameters.url"
|
|
beforeValue: any;
|
|
afterValue: any;
|
|
isExpression: boolean; // Contains {{}} or $json?
|
|
}
|
|
|
|
export class WorkflowMutationAnalyzer {
|
|
/**
|
|
* Analyze differences between before/after workflows
|
|
*/
|
|
static analyzeDifferences(
|
|
beforeWorkflow: any,
|
|
afterWorkflow: any
|
|
): MutationDiff {
|
|
// Implementation: Deep comparison of workflow structures
|
|
// Return detailed diff information
|
|
}
|
|
|
|
/**
|
|
* Extract changed property paths
|
|
*/
|
|
static getChangedProperties(diff: MutationDiff): string[] {
|
|
// Implementation
|
|
}
|
|
|
|
/**
|
|
* Determine if expression/formula was modified
|
|
*/
|
|
static hasExpressionChanges(diff: MutationDiff): boolean {
|
|
// Implementation
|
|
}
|
|
|
|
/**
|
|
* Validate workflow structure
|
|
*/
|
|
static validateWorkflowStructure(workflow: any): {
|
|
isValid: boolean;
|
|
errors: string[];
|
|
errorTypes: string[];
|
|
} {
|
|
// Implementation
|
|
}
|
|
}
|
|
```
|
|
|
|
---
|
|
|
|
## 4. Integration Points
|
|
|
|
### 4.1 TelemetryManager Extension
|
|
|
|
```typescript
|
|
// In src/telemetry/telemetry-manager.ts
|
|
|
|
export class TelemetryManager {
|
|
// ... existing code ...
|
|
|
|
/**
|
|
* Track workflow mutation (new method)
|
|
*/
|
|
async trackWorkflowMutation(
|
|
beforeWorkflow: any,
|
|
instruction: string,
|
|
afterWorkflow: any,
|
|
options?: {
|
|
instructionType?: 'ai_generated' | 'user_provided' | 'auto_fix';
|
|
mutationSource?: string;
|
|
workflowId?: string;
|
|
success?: boolean;
|
|
executionDurationMs?: number;
|
|
userApproved?: boolean;
|
|
}
|
|
): Promise<void> {
|
|
this.ensureInitialized();
|
|
this.performanceMonitor.startOperation('trackWorkflowMutation');
|
|
|
|
try {
|
|
await this.eventTracker.trackWorkflowMutation(
|
|
beforeWorkflow,
|
|
instruction,
|
|
afterWorkflow,
|
|
options
|
|
);
|
|
// Auto-flush mutations to prevent data loss
|
|
await this.flush();
|
|
} catch (error) {
|
|
const telemetryError = error instanceof TelemetryError
|
|
? error
|
|
: new TelemetryError(
|
|
TelemetryErrorType.UNKNOWN_ERROR,
|
|
'Failed to track workflow mutation',
|
|
{ error: String(error) }
|
|
);
|
|
this.errorAggregator.record(telemetryError);
|
|
} finally {
|
|
this.performanceMonitor.endOperation('trackWorkflowMutation');
|
|
}
|
|
}
|
|
}
|
|
```
|
|
|
|
### 4.2 EventTracker Extension
|
|
|
|
```typescript
|
|
// In src/telemetry/event-tracker.ts
|
|
|
|
export class TelemetryEventTracker {
|
|
// ... existing code ...
|
|
|
|
private mutationQueue: WorkflowMutation[] = [];
|
|
private mutationAnalyzer = new WorkflowMutationAnalyzer();
|
|
|
|
/**
|
|
* Track a workflow mutation
|
|
*/
|
|
async trackWorkflowMutation(
|
|
beforeWorkflow: any,
|
|
instruction: string,
|
|
afterWorkflow: any,
|
|
options?: MutationTrackingOptions
|
|
): Promise<void> {
|
|
if (!this.isEnabled()) return;
|
|
|
|
try {
|
|
// 1. Analyze differences
|
|
const diff = this.mutationAnalyzer.analyzeDifferences(
|
|
beforeWorkflow,
|
|
afterWorkflow
|
|
);
|
|
|
|
// 2. Calculate hashes
|
|
const beforeHash = this.calculateHash(beforeWorkflow);
|
|
const afterHash = this.calculateHash(afterWorkflow);
|
|
|
|
// 3. Detect validation changes
|
|
const beforeValidation = this.mutationAnalyzer.validateWorkflowStructure(
|
|
beforeWorkflow
|
|
);
|
|
const afterValidation = this.mutationAnalyzer.validateWorkflowStructure(
|
|
afterWorkflow
|
|
);
|
|
|
|
// 4. Create mutation record
|
|
const mutation: WorkflowMutation = {
|
|
id: generateUUID(),
|
|
user_id: this.getUserId(),
|
|
workflow_id: options?.workflowId,
|
|
|
|
before_workflow_json: beforeWorkflow,
|
|
before_workflow_hash: beforeHash,
|
|
before_validation_status: beforeValidation.isValid ? 'valid' : 'invalid',
|
|
before_error_count: beforeValidation.errors.length,
|
|
before_error_types: beforeValidation.errorTypes,
|
|
|
|
instruction,
|
|
instruction_type: options?.instructionType || 'user_provided',
|
|
mutation_source: options?.mutationSource,
|
|
|
|
after_workflow_json: afterWorkflow,
|
|
after_workflow_hash: afterHash,
|
|
after_validation_status: afterValidation.isValid ? 'valid' : 'invalid',
|
|
after_error_count: afterValidation.errors.length,
|
|
after_error_types: afterValidation.errorTypes,
|
|
|
|
nodes_modified: Array.from(diff.nodesModified.keys()),
|
|
nodes_added: diff.nodesAdded,
|
|
nodes_removed: diff.nodesRemoved,
|
|
properties_modified: this.mutationAnalyzer.getChangedProperties(diff),
|
|
connections_modified: diff.connectionsChanged,
|
|
|
|
mutation_success: options?.success !== false,
|
|
validation_improved: afterValidation.errors.length
|
|
< beforeValidation.errors.length,
|
|
validation_errors_fixed: Math.max(
|
|
0,
|
|
beforeValidation.errors.length - afterValidation.errors.length
|
|
),
|
|
|
|
created_at: new Date().toISOString(),
|
|
execution_duration_ms: options?.executionDurationMs,
|
|
user_approved: options?.userApproved
|
|
};
|
|
|
|
// 5. Validate and queue
|
|
const validated = this.validator.validateMutation(mutation);
|
|
if (validated) {
|
|
this.mutationQueue.push(validated);
|
|
}
|
|
|
|
// 6. Track as event for real-time monitoring
|
|
this.trackEvent('workflow_mutation', {
|
|
beforeHash,
|
|
afterHash,
|
|
instructionType: options?.instructionType || 'user_provided',
|
|
nodesModified: diff.nodesModified.size,
|
|
propertiesChanged: diff.properties_modified?.length || 0,
|
|
mutationSuccess: options?.success !== false,
|
|
validationImproved: mutation.validation_improved,
|
|
errorsBefore: beforeValidation.errors.length,
|
|
errorsAfter: afterValidation.errors.length
|
|
});
|
|
|
|
} catch (error) {
|
|
logger.debug('Failed to track workflow mutation:', error);
|
|
throw new TelemetryError(
|
|
TelemetryErrorType.VALIDATION_ERROR,
|
|
'Failed to process workflow mutation',
|
|
{ error: error instanceof Error ? error.message : String(error) }
|
|
);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Get queued mutations
|
|
*/
|
|
getMutationQueue(): WorkflowMutation[] {
|
|
return [...this.mutationQueue];
|
|
}
|
|
|
|
/**
|
|
* Clear mutation queue
|
|
*/
|
|
clearMutationQueue(): void {
|
|
this.mutationQueue = [];
|
|
}
|
|
|
|
/**
|
|
* Calculate SHA-256 hash of workflow
|
|
*/
|
|
private calculateHash(workflow: any): string {
|
|
const crypto = require('crypto');
|
|
const normalized = JSON.stringify(workflow, null, 0);
|
|
return crypto.createHash('sha256').update(normalized).digest('hex');
|
|
}
|
|
}
|
|
```
|
|
|
|
### 4.3 BatchProcessor Extension
|
|
|
|
```typescript
|
|
// In src/telemetry/batch-processor.ts
|
|
|
|
export class TelemetryBatchProcessor {
|
|
// ... existing code ...
|
|
|
|
/**
|
|
* Flush mutations to Supabase
|
|
*/
|
|
private async flushMutations(
|
|
mutations: WorkflowMutation[]
|
|
): Promise<boolean> {
|
|
if (this.isFlushingMutations || mutations.length === 0) return true;
|
|
|
|
this.isFlushingMutations = true;
|
|
|
|
try {
|
|
const batches = this.createBatches(
|
|
mutations,
|
|
TELEMETRY_CONFIG.MAX_BATCH_SIZE
|
|
);
|
|
|
|
for (const batch of batches) {
|
|
const result = await this.executeWithRetry(async () => {
|
|
const { error } = await this.supabase!
|
|
.from('workflow_mutations')
|
|
.insert(batch);
|
|
|
|
if (error) throw error;
|
|
|
|
logger.debug(`Flushed batch of ${batch.length} workflow mutations`);
|
|
return true;
|
|
}, 'Flush workflow mutations');
|
|
|
|
if (!result) {
|
|
this.addToDeadLetterQueue(batch);
|
|
return false;
|
|
}
|
|
}
|
|
|
|
return true;
|
|
} catch (error) {
|
|
logger.debug('Failed to flush mutations:', error);
|
|
throw new TelemetryError(
|
|
TelemetryErrorType.NETWORK_ERROR,
|
|
'Failed to flush mutations',
|
|
{ error: error instanceof Error ? error.message : String(error) },
|
|
true
|
|
);
|
|
} finally {
|
|
this.isFlushingMutations = false;
|
|
}
|
|
}
|
|
}
|
|
```
|
|
|
|
---
|
|
|
|
## 5. Integration with Workflow Tools
|
|
|
|
### 5.1 n8n_autofix_workflow
|
|
|
|
```typescript
|
|
// Where n8n_autofix_workflow applies fixes
|
|
import { telemetry } from '../telemetry';
|
|
|
|
export async function n8n_autofix_workflow(
|
|
workflow: any,
|
|
options?: AutofixOptions
|
|
): Promise<WorkflowFixResult> {
|
|
|
|
const beforeWorkflow = JSON.parse(JSON.stringify(workflow)); // Deep copy
|
|
|
|
try {
|
|
// Apply fixes
|
|
const fixed = await applyFixes(workflow, options);
|
|
|
|
// Track mutation
|
|
await telemetry.trackWorkflowMutation(
|
|
beforeWorkflow,
|
|
'Auto-fix validation errors',
|
|
fixed,
|
|
{
|
|
instructionType: 'auto_fix',
|
|
mutationSource: 'n8n_autofix_workflow',
|
|
success: true,
|
|
executionDurationMs: duration
|
|
}
|
|
);
|
|
|
|
return fixed;
|
|
} catch (error) {
|
|
// Track failed mutation attempt
|
|
await telemetry.trackWorkflowMutation(
|
|
beforeWorkflow,
|
|
'Auto-fix validation errors',
|
|
beforeWorkflow, // No changes
|
|
{
|
|
instructionType: 'auto_fix',
|
|
mutationSource: 'n8n_autofix_workflow',
|
|
success: false
|
|
}
|
|
);
|
|
throw error;
|
|
}
|
|
}
|
|
```
|
|
|
|
### 5.2 n8n_update_partial_workflow
|
|
|
|
```typescript
|
|
// Partial workflow updates
|
|
export async function n8n_update_partial_workflow(
|
|
workflow: any,
|
|
operations: DiffOperation[]
|
|
): Promise<UpdateResult> {
|
|
|
|
const beforeWorkflow = JSON.parse(JSON.stringify(workflow));
|
|
const instructionText = formatOperationsAsInstruction(operations);
|
|
|
|
try {
|
|
const updated = applyOperations(workflow, operations);
|
|
|
|
await telemetry.trackWorkflowMutation(
|
|
beforeWorkflow,
|
|
instructionText,
|
|
updated,
|
|
{
|
|
instructionType: 'user_provided',
|
|
mutationSource: 'n8n_update_partial_workflow'
|
|
}
|
|
);
|
|
|
|
return updated;
|
|
} catch (error) {
|
|
await telemetry.trackWorkflowMutation(
|
|
beforeWorkflow,
|
|
instructionText,
|
|
beforeWorkflow,
|
|
{
|
|
instructionType: 'user_provided',
|
|
mutationSource: 'n8n_update_partial_workflow',
|
|
success: false
|
|
}
|
|
);
|
|
throw error;
|
|
}
|
|
}
|
|
```
|
|
|
|
---
|
|
|
|
## 6. Data Quality & Validation
|
|
|
|
### 6.1 Mutation Validation Rules
|
|
|
|
```typescript
|
|
// In src/telemetry/mutation-validator.ts
|
|
|
|
export class WorkflowMutationValidator {
|
|
/**
|
|
* Validate mutation data before storage
|
|
*/
|
|
static validate(mutation: WorkflowMutation): ValidationResult {
|
|
const errors: string[] = [];
|
|
|
|
// Required fields
|
|
if (!mutation.user_id) errors.push('user_id is required');
|
|
if (!mutation.before_workflow_json) errors.push('before_workflow_json required');
|
|
if (!mutation.after_workflow_json) errors.push('after_workflow_json required');
|
|
if (!mutation.before_workflow_hash) errors.push('before_workflow_hash required');
|
|
if (!mutation.after_workflow_hash) errors.push('after_workflow_hash required');
|
|
if (!mutation.instruction) errors.push('instruction is required');
|
|
if (!mutation.instruction_type) errors.push('instruction_type is required');
|
|
|
|
// Hash verification
|
|
const beforeHash = calculateHash(mutation.before_workflow_json);
|
|
const afterHash = calculateHash(mutation.after_workflow_json);
|
|
|
|
if (beforeHash !== mutation.before_workflow_hash) {
|
|
errors.push('before_workflow_hash mismatch');
|
|
}
|
|
if (afterHash !== mutation.after_workflow_hash) {
|
|
errors.push('after_workflow_hash mismatch');
|
|
}
|
|
|
|
// Deduplication: Skip if before == after
|
|
if (beforeHash === afterHash) {
|
|
errors.push('before and after states are identical (skipping)');
|
|
}
|
|
|
|
// Size validation
|
|
const beforeSize = JSON.stringify(mutation.before_workflow_json).length;
|
|
const afterSize = JSON.stringify(mutation.after_workflow_json).length;
|
|
|
|
if (beforeSize > 10 * 1024 * 1024) {
|
|
errors.push('before_workflow_json exceeds 10MB size limit');
|
|
}
|
|
if (afterSize > 10 * 1024 * 1024) {
|
|
errors.push('after_workflow_json exceeds 10MB size limit');
|
|
}
|
|
|
|
// Instruction validation
|
|
if (mutation.instruction.length > 5000) {
|
|
mutation.instruction = mutation.instruction.substring(0, 5000);
|
|
}
|
|
if (mutation.instruction.length < 3) {
|
|
errors.push('instruction too short (min 3 chars)');
|
|
}
|
|
|
|
// Error count validation
|
|
if (mutation.before_error_count && mutation.before_error_count < 0) {
|
|
errors.push('before_error_count cannot be negative');
|
|
}
|
|
if (mutation.after_error_count && mutation.after_error_count < 0) {
|
|
errors.push('after_error_count cannot be negative');
|
|
}
|
|
|
|
return {
|
|
isValid: errors.length === 0,
|
|
errors
|
|
};
|
|
}
|
|
}
|
|
```
|
|
|
|
### 6.2 Data Compression Strategy
|
|
|
|
For large workflows (>1MB):
|
|
|
|
```typescript
|
|
import { gzipSync, gunzipSync } from 'zlib';
|
|
|
|
export function compressWorkflow(workflow: any): {
|
|
compressed: string; // base64
|
|
originalSize: number;
|
|
compressedSize: number;
|
|
} {
|
|
const json = JSON.stringify(workflow);
|
|
const buffer = Buffer.from(json, 'utf-8');
|
|
const compressed = gzipSync(buffer);
|
|
const base64 = compressed.toString('base64');
|
|
|
|
return {
|
|
compressed: base64,
|
|
originalSize: buffer.length,
|
|
compressedSize: compressed.length
|
|
};
|
|
}
|
|
|
|
export function decompressWorkflow(compressed: string): any {
|
|
const buffer = Buffer.from(compressed, 'base64');
|
|
const decompressed = gunzipSync(buffer);
|
|
const json = decompressed.toString('utf-8');
|
|
return JSON.parse(json);
|
|
}
|
|
```
|
|
|
|
---
|
|
|
|
## 7. Query Examples for Analysis
|
|
|
|
### 7.1 Basic Mutation Statistics
|
|
|
|
```sql
|
|
-- Overall mutation metrics
|
|
SELECT
|
|
COUNT(*) as total_mutations,
|
|
COUNT(*) FILTER(WHERE mutation_success) as successful,
|
|
COUNT(*) FILTER(WHERE validation_improved) as validation_improved,
|
|
ROUND(100.0 * COUNT(*) FILTER(WHERE mutation_success) / COUNT(*), 2) as success_rate,
|
|
ROUND(100.0 * COUNT(*) FILTER(WHERE validation_improved) / COUNT(*), 2) as improvement_rate,
|
|
AVG(nodes_modified_count) as avg_nodes_modified,
|
|
AVG(properties_modified_count) as avg_properties_modified,
|
|
AVG(execution_duration_ms)::INTEGER as avg_duration_ms
|
|
FROM workflow_mutations
|
|
WHERE created_at >= NOW() - INTERVAL '7 days';
|
|
```
|
|
|
|
### 7.2 Success by Instruction Type
|
|
|
|
```sql
|
|
SELECT
|
|
instruction_type,
|
|
COUNT(*) as count,
|
|
ROUND(100.0 * COUNT(*) FILTER(WHERE mutation_success) / COUNT(*), 2) as success_rate,
|
|
ROUND(100.0 * COUNT(*) FILTER(WHERE validation_improved) / COUNT(*), 2) as improvement_rate,
|
|
AVG(validation_errors_fixed) as avg_errors_fixed,
|
|
AVG(new_errors_introduced) as avg_new_errors
|
|
FROM workflow_mutations
|
|
WHERE created_at >= NOW() - INTERVAL '30 days'
|
|
GROUP BY instruction_type
|
|
ORDER BY count DESC;
|
|
```
|
|
|
|
### 7.3 Most Common Mutations
|
|
|
|
```sql
|
|
SELECT
|
|
properties_modified,
|
|
COUNT(*) as frequency,
|
|
ROUND(100.0 * COUNT(*) / (SELECT COUNT(*) FROM workflow_mutations
|
|
WHERE created_at >= NOW() - INTERVAL '30 days'), 2) as percentage
|
|
FROM workflow_mutations
|
|
WHERE created_at >= NOW() - INTERVAL '30 days'
|
|
ORDER BY frequency DESC
|
|
LIMIT 20;
|
|
```
|
|
|
|
### 7.4 Complexity Impact
|
|
|
|
```sql
|
|
SELECT
|
|
complexity_before,
|
|
complexity_after,
|
|
COUNT(*) as transitions,
|
|
ROUND(100.0 * COUNT(*) FILTER(WHERE mutation_success) / COUNT(*), 2) as success_rate
|
|
FROM workflow_mutations
|
|
WHERE created_at >= NOW() - INTERVAL '30 days'
|
|
GROUP BY complexity_before, complexity_after
|
|
ORDER BY transitions DESC;
|
|
```
|
|
|
|
---
|
|
|
|
## 8. Implementation Roadmap
|
|
|
|
### Phase 1: Infrastructure (Week 1)
|
|
- [ ] Create `workflow_mutations` table in Supabase
|
|
- [ ] Add indexes for common query patterns
|
|
- [ ] Update TypeScript types
|
|
- [ ] Create mutation analyzer service
|
|
- [ ] Add mutation validator
|
|
|
|
### Phase 2: Integration (Week 2)
|
|
- [ ] Extend TelemetryManager with trackWorkflowMutation()
|
|
- [ ] Extend EventTracker with mutation queue
|
|
- [ ] Extend BatchProcessor with flush logic
|
|
- [ ] Add mutation event type
|
|
|
|
### Phase 3: Tool Integration (Week 3)
|
|
- [ ] Integrate with n8n_autofix_workflow
|
|
- [ ] Integrate with n8n_update_partial_workflow
|
|
- [ ] Add test cases
|
|
- [ ] Documentation
|
|
|
|
### Phase 4: Validation & Analysis (Week 4)
|
|
- [ ] Run sample queries
|
|
- [ ] Validate data quality
|
|
- [ ] Create analytics dashboard
|
|
- [ ] Begin dataset collection
|
|
|
|
---
|
|
|
|
## 9. Security & Privacy Considerations
|
|
|
|
- **No Credentials:** Sanitizer strips credentials before storage
|
|
- **No Secrets:** Workflow secret references removed
|
|
- **User Anonymity:** User ID is anonymized
|
|
- **Hash Verification:** All workflow hashes verified before storage
|
|
- **Size Limits:** 10MB max per workflow (with compression option)
|
|
- **Retention:** Define data retention policy separately
|
|
- **Encryption:** Enable Supabase encryption at rest
|
|
- **Access Control:** Restrict table access to application-level only
|
|
|
|
---
|
|
|
|
## 10. Performance Considerations
|
|
|
|
| Aspect | Target | Strategy |
|
|
|--------|--------|----------|
|
|
| **Batch Flush** | <5s latency | 5-second flush interval + auto-flush |
|
|
| **Large Workflows** | >1MB support | Gzip compression + base64 encoding |
|
|
| **Query Performance** | <100ms | Strategic indexing + materialized views |
|
|
| **Storage Growth** | <50GB/month | Compression + retention policies |
|
|
| **Network Throughput** | <1MB/batch | Compress before transmission |
|
|
|
|
---
|
|
|
|
*End of Specification*
|