feat: enhance workflow mutation telemetry for better AI responses (#419)

* feat: add comprehensive telemetry for partial workflow updates

Implement telemetry infrastructure to track workflow mutations from
partial update operations. This enables data-driven improvements to
partial update tooling by capturing:

- Workflow state before and after mutations
- User intent and operation patterns
- Validation results and improvements
- Change metrics (nodes/connections modified)
- Success/failure rates and error patterns

New Components:
- Intent classifier: Categorizes mutation patterns
- Intent sanitizer: Removes PII from user instructions
- Mutation validator: Ensures data quality before tracking
- Mutation tracker: Coordinates validation and metric calculation

Extended Components:
- TelemetryManager: New trackWorkflowMutation() method
- EventTracker: Mutation queue management
- BatchProcessor: Mutation data flushing to Supabase

MCP Tool Enhancements:
- n8n_update_partial_workflow: Added optional 'intent' parameter
- n8n_update_full_workflow: Added optional 'intent' parameter
- Both tools now track mutations asynchronously

Database Schema:
- New workflow_mutations table with 20+ fields
- Comprehensive indexes for efficient querying
- Supports deduplication and data analysis

This telemetry system is:
- Privacy-focused (PII sanitization, anonymized users)
- Non-blocking (async tracking, silent failures)
- Production-ready (batching, retries, circuit breaker)
- Backward compatible (all parameters optional)

Conceived by Romuald Członkowski - https://www.aiadvisors.pl/en

* fix: correct SQL syntax for expression index in workflow_mutations schema

The expression index for significant changes needs double parentheses
around the arithmetic expression to be valid PostgreSQL syntax.

Conceived by Romuald Członkowski - https://www.aiadvisors.pl/en

* fix: enable RLS policies for workflow_mutations table

Enable Row-Level Security and add policies:
- Allow anonymous (anon) inserts for telemetry data collection
- Allow authenticated reads for data analysis and querying

These policies are required for the telemetry system to function
correctly with Supabase, as the MCP server uses the anon key to
insert mutation data.

Conceived by Romuald Członkowski - https://www.aiadvisors.pl/en

* fix: reduce mutation auto-flush threshold from 5 to 2

Lower the auto-flush threshold for workflow mutations from 5 to 2 to ensure
more timely data persistence. Since mutations are less frequent than regular
telemetry events, a lower threshold provides:

- Faster data persistence (don't wait for 5 mutations)
- Better testing experience (easier to verify with fewer operations)
- Reduced risk of data loss if process exits before threshold
- More responsive telemetry for low-volume mutation scenarios

This complements the existing 5-second periodic flush and process exit
handlers, ensuring mutations are persisted promptly.

Conceived by Romuald Członkowski - https://www.aiadvisors.pl/en

* fix: improve mutation telemetry error logging and diagnostics

Changes:
- Upgrade error logging from debug to warn level for better visibility
- Add diagnostic logging to track mutation processing
- Log telemetry disabled state explicitly
- Add context info (sessionId, intent, operationCount) to error logs
- Remove 'await' from telemetry calls to make them truly non-blocking

This will help identify why mutations aren't being persisted to the
workflow_mutations table despite successful workflow operations.

Conceived by Romuald Członkowski - https://www.aiadvisors.pl/en

* feat: enhance workflow mutation telemetry for better AI responses

Improve workflow mutation tracking to capture comprehensive data that helps provide better responses when users update workflows. This enhancement collects workflow state, user intent, and operation details to enable more context-aware assistance.

Key improvements:
- Reduce auto-flush threshold from 5 to 2 for more reliable mutation tracking
- Add comprehensive workflow and credential sanitization to mutation tracker
- Document intent parameter in workflow update tools for better UX
- Fix mutation queue handling in telemetry manager (flush now handles 3 queues)
- Add extensive unit tests for mutation tracking and validation (35 new tests)

Technical changes:
- mutation-tracker.ts: Multi-layer sanitization (workflow, node, parameter levels)
- batch-processor.ts: Support mutation data flushing to Supabase
- telemetry-manager.ts: Auto-flush mutations at threshold 2, track mutations queue
- handlers-workflow-diff.ts: Track workflow mutations with sanitized data
- Tests: 13 tests for mutation-tracker, 22 tests for mutation-validator

The intent parameter messaging emphasizes user benefit ("helps to return better response") rather than technical implementation details.

Conceived by Romuald Członkowski - https://www.aiadvisors.pl/en

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

* chore: bump version to 2.22.16 with telemetry changelog

Updated package.json and package.runtime.json to version 2.22.16.
Added comprehensive CHANGELOG entry documenting workflow mutation
telemetry enhancements for better AI-powered workflow assistance.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Conceived by Romuald Członkowski - https://www.aiadvisors.pl/en

Co-Authored-By: Claude <noreply@anthropic.com>

* fix: resolve TypeScript lint errors in telemetry tests

Fixed type issues in mutation-tracker and mutation-validator tests:
- Import and use MutationToolName enum instead of string literals
- Fix ValidationResult.errors to use proper object structure
- Add UpdateNodeOperation type assertion for operation with nodeName

All TypeScript errors resolved, lint now passes.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Conceived by Romuald Członkowski - https://www.aiadvisors.pl/en

Co-Authored-By: Claude <noreply@anthropic.com>

---------

Co-authored-by: Claude <noreply@anthropic.com>
This commit is contained in:
Romuald Członkowski
2025-11-13 14:21:51 +01:00
committed by GitHub
parent 77151e013e
commit 99c5907b71
26 changed files with 5900 additions and 25 deletions

View File

@@ -4,14 +4,36 @@
*/
import { SupabaseClient } from '@supabase/supabase-js';
import { TelemetryEvent, WorkflowTelemetry, TELEMETRY_CONFIG, TelemetryMetrics } from './telemetry-types';
import { TelemetryEvent, WorkflowTelemetry, WorkflowMutationRecord, TELEMETRY_CONFIG, TelemetryMetrics } from './telemetry-types';
import { TelemetryError, TelemetryErrorType, TelemetryCircuitBreaker } from './telemetry-error';
import { logger } from '../utils/logger';
/**
* Convert camelCase object keys to snake_case
* Needed because Supabase PostgREST doesn't auto-convert
*/
function toSnakeCase(obj: any): any {
if (obj === null || obj === undefined) return obj;
if (Array.isArray(obj)) return obj.map(toSnakeCase);
if (typeof obj !== 'object') return obj;
const result: any = {};
for (const key in obj) {
if (obj.hasOwnProperty(key)) {
// Convert camelCase to snake_case
const snakeKey = key.replace(/[A-Z]/g, letter => `_${letter.toLowerCase()}`);
// Recursively convert nested objects
result[snakeKey] = toSnakeCase(obj[key]);
}
}
return result;
}
export class TelemetryBatchProcessor {
private flushTimer?: NodeJS.Timeout;
private isFlushingEvents: boolean = false;
private isFlushingWorkflows: boolean = false;
private isFlushingMutations: boolean = false;
private circuitBreaker: TelemetryCircuitBreaker;
private metrics: TelemetryMetrics = {
eventsTracked: 0,
@@ -23,7 +45,7 @@ export class TelemetryBatchProcessor {
rateLimitHits: 0
};
private flushTimes: number[] = [];
private deadLetterQueue: (TelemetryEvent | WorkflowTelemetry)[] = [];
private deadLetterQueue: (TelemetryEvent | WorkflowTelemetry | WorkflowMutationRecord)[] = [];
private readonly maxDeadLetterSize = 100;
constructor(
@@ -76,15 +98,15 @@ export class TelemetryBatchProcessor {
}
/**
* Flush events and workflows to Supabase
* Flush events, workflows, and mutations to Supabase
*/
async flush(events?: TelemetryEvent[], workflows?: WorkflowTelemetry[]): Promise<void> {
async flush(events?: TelemetryEvent[], workflows?: WorkflowTelemetry[], mutations?: WorkflowMutationRecord[]): Promise<void> {
if (!this.isEnabled() || !this.supabase) return;
// Check circuit breaker
if (!this.circuitBreaker.shouldAllow()) {
logger.debug('Circuit breaker open - skipping flush');
this.metrics.eventsDropped += (events?.length || 0) + (workflows?.length || 0);
this.metrics.eventsDropped += (events?.length || 0) + (workflows?.length || 0) + (mutations?.length || 0);
return;
}
@@ -101,6 +123,11 @@ export class TelemetryBatchProcessor {
hasErrors = !(await this.flushWorkflows(workflows)) || hasErrors;
}
// Flush mutations if provided
if (mutations && mutations.length > 0) {
hasErrors = !(await this.flushMutations(mutations)) || hasErrors;
}
// Record flush time
const flushTime = Date.now() - startTime;
this.recordFlushTime(flushTime);
@@ -224,6 +251,71 @@ export class TelemetryBatchProcessor {
}
}
/**
* Flush workflow mutations with batching
*/
private async flushMutations(mutations: WorkflowMutationRecord[]): Promise<boolean> {
if (this.isFlushingMutations || mutations.length === 0) return true;
this.isFlushingMutations = true;
try {
// Batch mutations
const batches = this.createBatches(mutations, TELEMETRY_CONFIG.MAX_BATCH_SIZE);
for (const batch of batches) {
const result = await this.executeWithRetry(async () => {
// Convert camelCase to snake_case for Supabase
const snakeCaseBatch = batch.map(mutation => toSnakeCase(mutation));
const { error } = await this.supabase!
.from('workflow_mutations')
.insert(snakeCaseBatch);
if (error) {
// Enhanced error logging for mutation flushes
logger.error('Mutation insert error details:', {
code: (error as any).code,
message: (error as any).message,
details: (error as any).details,
hint: (error as any).hint,
fullError: String(error)
});
throw error;
}
logger.debug(`Flushed batch of ${batch.length} workflow mutations`);
return true;
}, 'Flush workflow mutations');
if (result) {
this.metrics.eventsTracked += batch.length;
this.metrics.batchesSent++;
} else {
this.metrics.eventsFailed += batch.length;
this.metrics.batchesFailed++;
this.addToDeadLetterQueue(batch);
return false;
}
}
return true;
} catch (error) {
logger.error('Failed to flush mutations with details:', {
errorMsg: error instanceof Error ? error.message : String(error),
errorType: error instanceof Error ? error.constructor.name : typeof error
});
throw new TelemetryError(
TelemetryErrorType.NETWORK_ERROR,
'Failed to flush workflow mutations',
{ error: error instanceof Error ? error.message : String(error) },
true
);
} finally {
this.isFlushingMutations = false;
}
}
/**
* Execute operation with exponential backoff retry
*/
@@ -305,7 +397,7 @@ export class TelemetryBatchProcessor {
/**
* Add failed items to dead letter queue
*/
private addToDeadLetterQueue(items: (TelemetryEvent | WorkflowTelemetry)[]): void {
private addToDeadLetterQueue(items: (TelemetryEvent | WorkflowTelemetry | WorkflowMutationRecord)[]): void {
for (const item of items) {
this.deadLetterQueue.push(item);

View File

@@ -4,7 +4,7 @@
* Now uses shared sanitization utilities to avoid code duplication
*/
import { TelemetryEvent, WorkflowTelemetry } from './telemetry-types';
import { TelemetryEvent, WorkflowTelemetry, WorkflowMutationRecord } from './telemetry-types';
import { WorkflowSanitizer } from './workflow-sanitizer';
import { TelemetryRateLimiter } from './rate-limiter';
import { TelemetryEventValidator } from './event-validator';
@@ -19,6 +19,7 @@ export class TelemetryEventTracker {
private validator: TelemetryEventValidator;
private eventQueue: TelemetryEvent[] = [];
private workflowQueue: WorkflowTelemetry[] = [];
private mutationQueue: WorkflowMutationRecord[] = [];
private previousTool?: string;
private previousToolTimestamp: number = 0;
private performanceMetrics: Map<string, number[]> = new Map();
@@ -325,6 +326,13 @@ export class TelemetryEventTracker {
return [...this.workflowQueue];
}
/**
* Get queued mutations
*/
getMutationQueue(): WorkflowMutationRecord[] {
return [...this.mutationQueue];
}
/**
* Clear event queue
*/
@@ -339,6 +347,28 @@ export class TelemetryEventTracker {
this.workflowQueue = [];
}
/**
* Clear mutation queue
*/
clearMutationQueue(): void {
this.mutationQueue = [];
}
/**
* Enqueue mutation for batch processing
*/
enqueueMutation(mutation: WorkflowMutationRecord): void {
if (!this.isEnabled()) return;
this.mutationQueue.push(mutation);
}
/**
* Get mutation queue size
*/
getMutationQueueSize(): number {
return this.mutationQueue.length;
}
/**
* Get tracking statistics
*/
@@ -348,6 +378,7 @@ export class TelemetryEventTracker {
validator: this.validator.getStats(),
eventQueueSize: this.eventQueue.length,
workflowQueueSize: this.workflowQueue.length,
mutationQueueSize: this.mutationQueue.length,
performanceMetrics: this.getPerformanceStats()
};
}

View File

@@ -0,0 +1,243 @@
/**
* Intent classifier for workflow mutations
* Analyzes operations to determine the intent/pattern of the mutation
*/
import { DiffOperation } from '../types/workflow-diff.js';
import { IntentClassification } from './mutation-types.js';
/**
* Classifies the intent of a workflow mutation based on operations performed
*/
export class IntentClassifier {
/**
* Classify mutation intent from operations and optional user intent text
*/
classify(operations: DiffOperation[], userIntent?: string): IntentClassification {
if (operations.length === 0) {
return IntentClassification.UNKNOWN;
}
// First, try to classify from user intent text if provided
if (userIntent) {
const textClassification = this.classifyFromText(userIntent);
if (textClassification !== IntentClassification.UNKNOWN) {
return textClassification;
}
}
// Fall back to operation pattern analysis
return this.classifyFromOperations(operations);
}
/**
* Classify from user intent text using keyword matching
*/
private classifyFromText(intent: string): IntentClassification {
const lowerIntent = intent.toLowerCase();
// Fix validation errors
if (
lowerIntent.includes('fix') ||
lowerIntent.includes('resolve') ||
lowerIntent.includes('correct') ||
lowerIntent.includes('repair') ||
lowerIntent.includes('error')
) {
return IntentClassification.FIX_VALIDATION;
}
// Add new functionality
if (
lowerIntent.includes('add') ||
lowerIntent.includes('create') ||
lowerIntent.includes('insert') ||
lowerIntent.includes('new node')
) {
return IntentClassification.ADD_FUNCTIONALITY;
}
// Modify configuration
if (
lowerIntent.includes('update') ||
lowerIntent.includes('change') ||
lowerIntent.includes('modify') ||
lowerIntent.includes('configure') ||
lowerIntent.includes('set')
) {
return IntentClassification.MODIFY_CONFIGURATION;
}
// Rewire logic
if (
lowerIntent.includes('connect') ||
lowerIntent.includes('reconnect') ||
lowerIntent.includes('rewire') ||
lowerIntent.includes('reroute') ||
lowerIntent.includes('link')
) {
return IntentClassification.REWIRE_LOGIC;
}
// Cleanup
if (
lowerIntent.includes('remove') ||
lowerIntent.includes('delete') ||
lowerIntent.includes('clean') ||
lowerIntent.includes('disable')
) {
return IntentClassification.CLEANUP;
}
return IntentClassification.UNKNOWN;
}
/**
* Classify from operation patterns
*/
private classifyFromOperations(operations: DiffOperation[]): IntentClassification {
const opTypes = operations.map((op) => op.type);
const opTypeSet = new Set(opTypes);
// Pattern: Adding nodes and connections (add functionality)
if (opTypeSet.has('addNode') && opTypeSet.has('addConnection')) {
return IntentClassification.ADD_FUNCTIONALITY;
}
// Pattern: Only adding nodes (add functionality)
if (opTypeSet.has('addNode') && !opTypeSet.has('removeNode')) {
return IntentClassification.ADD_FUNCTIONALITY;
}
// Pattern: Removing nodes or connections (cleanup)
if (opTypeSet.has('removeNode') || opTypeSet.has('removeConnection')) {
return IntentClassification.CLEANUP;
}
// Pattern: Disabling nodes (cleanup)
if (opTypeSet.has('disableNode')) {
return IntentClassification.CLEANUP;
}
// Pattern: Rewiring connections
if (
opTypeSet.has('rewireConnection') ||
opTypeSet.has('replaceConnections') ||
(opTypeSet.has('addConnection') && opTypeSet.has('removeConnection'))
) {
return IntentClassification.REWIRE_LOGIC;
}
// Pattern: Only updating nodes (modify configuration)
if (opTypeSet.has('updateNode') && opTypes.every((t) => t === 'updateNode')) {
return IntentClassification.MODIFY_CONFIGURATION;
}
// Pattern: Updating settings or metadata (modify configuration)
if (
opTypeSet.has('updateSettings') ||
opTypeSet.has('updateName') ||
opTypeSet.has('addTag') ||
opTypeSet.has('removeTag')
) {
return IntentClassification.MODIFY_CONFIGURATION;
}
// Pattern: Mix of updates with some additions/removals (modify configuration)
if (opTypeSet.has('updateNode')) {
return IntentClassification.MODIFY_CONFIGURATION;
}
// Pattern: Moving nodes (modify configuration)
if (opTypeSet.has('moveNode')) {
return IntentClassification.MODIFY_CONFIGURATION;
}
// Pattern: Enabling nodes (could be fixing)
if (opTypeSet.has('enableNode')) {
return IntentClassification.FIX_VALIDATION;
}
// Pattern: Clean stale connections (cleanup)
if (opTypeSet.has('cleanStaleConnections')) {
return IntentClassification.CLEANUP;
}
return IntentClassification.UNKNOWN;
}
/**
* Get confidence score for classification (0-1)
* Higher score means more confident in the classification
*/
getConfidence(
classification: IntentClassification,
operations: DiffOperation[],
userIntent?: string
): number {
// High confidence if user intent matches operation pattern
if (userIntent && this.classifyFromText(userIntent) === classification) {
return 0.9;
}
// Medium-high confidence for clear operation patterns
if (classification !== IntentClassification.UNKNOWN) {
const opTypes = new Set(operations.map((op) => op.type));
// Very clear patterns get high confidence
if (
classification === IntentClassification.ADD_FUNCTIONALITY &&
opTypes.has('addNode')
) {
return 0.8;
}
if (
classification === IntentClassification.CLEANUP &&
(opTypes.has('removeNode') || opTypes.has('removeConnection'))
) {
return 0.8;
}
if (
classification === IntentClassification.REWIRE_LOGIC &&
opTypes.has('rewireConnection')
) {
return 0.8;
}
// Other patterns get medium confidence
return 0.6;
}
// Low confidence for unknown classification
return 0.3;
}
/**
* Get human-readable description of the classification
*/
getDescription(classification: IntentClassification): string {
switch (classification) {
case IntentClassification.ADD_FUNCTIONALITY:
return 'Adding new nodes or functionality to the workflow';
case IntentClassification.MODIFY_CONFIGURATION:
return 'Modifying configuration of existing nodes';
case IntentClassification.REWIRE_LOGIC:
return 'Changing workflow execution flow by rewiring connections';
case IntentClassification.FIX_VALIDATION:
return 'Fixing validation errors or issues';
case IntentClassification.CLEANUP:
return 'Removing or disabling nodes and connections';
case IntentClassification.UNKNOWN:
return 'Unknown or complex mutation pattern';
default:
return 'Unclassified mutation';
}
}
}
/**
* Singleton instance for easy access
*/
export const intentClassifier = new IntentClassifier();

View File

@@ -0,0 +1,187 @@
/**
* Intent sanitizer for removing PII from user intent strings
* Ensures privacy by masking sensitive information
*/
/**
* Patterns for detecting and removing PII
*/
const PII_PATTERNS = {
// Email addresses
email: /\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b/gi,
// URLs with domains
url: /https?:\/\/[^\s]+/gi,
// IP addresses
ip: /\b(?:\d{1,3}\.){3}\d{1,3}\b/g,
// Phone numbers (various formats)
phone: /\b(?:\+?\d{1,3}[-.\s]?)?\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4}\b/g,
// Credit card-like numbers (groups of 4 digits)
creditCard: /\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b/g,
// API keys and tokens (long alphanumeric strings)
apiKey: /\b[A-Za-z0-9_-]{32,}\b/g,
// UUIDs
uuid: /\b[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\b/gi,
// File paths (Unix and Windows)
filePath: /(?:\/[\w.-]+)+\/?|(?:[A-Z]:\\(?:[\w.-]+\\)*[\w.-]+)/g,
// Potential passwords or secrets (common patterns)
secret: /\b(?:password|passwd|pwd|secret|token|key)[:=\s]+[^\s]+/gi,
};
/**
* Company/organization name patterns to anonymize
* These are common patterns that might appear in workflow intents
*/
const COMPANY_PATTERNS = {
// Company suffixes
companySuffix: /\b\w+(?:\s+(?:Inc|LLC|Corp|Corporation|Ltd|Limited|GmbH|AG)\.?)\b/gi,
// Common business terms that might indicate company names
businessContext: /\b(?:company|organization|client|customer)\s+(?:named?|called)\s+\w+/gi,
};
/**
* Sanitizes user intent by removing PII and sensitive information
*/
export class IntentSanitizer {
/**
* Sanitize user intent string
*/
sanitize(intent: string): string {
if (!intent) {
return intent;
}
let sanitized = intent;
// Remove email addresses
sanitized = sanitized.replace(PII_PATTERNS.email, '[EMAIL]');
// Remove URLs
sanitized = sanitized.replace(PII_PATTERNS.url, '[URL]');
// Remove IP addresses
sanitized = sanitized.replace(PII_PATTERNS.ip, '[IP_ADDRESS]');
// Remove phone numbers
sanitized = sanitized.replace(PII_PATTERNS.phone, '[PHONE]');
// Remove credit card numbers
sanitized = sanitized.replace(PII_PATTERNS.creditCard, '[CARD_NUMBER]');
// Remove API keys and long tokens
sanitized = sanitized.replace(PII_PATTERNS.apiKey, '[API_KEY]');
// Remove UUIDs
sanitized = sanitized.replace(PII_PATTERNS.uuid, '[UUID]');
// Remove file paths
sanitized = sanitized.replace(PII_PATTERNS.filePath, '[FILE_PATH]');
// Remove secrets/passwords
sanitized = sanitized.replace(PII_PATTERNS.secret, '[SECRET]');
// Anonymize company names
sanitized = sanitized.replace(COMPANY_PATTERNS.companySuffix, '[COMPANY]');
sanitized = sanitized.replace(COMPANY_PATTERNS.businessContext, '[COMPANY_CONTEXT]');
// Clean up multiple spaces
sanitized = sanitized.replace(/\s{2,}/g, ' ').trim();
return sanitized;
}
/**
* Check if intent contains potential PII
*/
containsPII(intent: string): boolean {
if (!intent) {
return false;
}
return Object.values(PII_PATTERNS).some((pattern) => pattern.test(intent));
}
/**
* Get list of PII types detected in the intent
*/
detectPIITypes(intent: string): string[] {
if (!intent) {
return [];
}
const detected: string[] = [];
if (PII_PATTERNS.email.test(intent)) detected.push('email');
if (PII_PATTERNS.url.test(intent)) detected.push('url');
if (PII_PATTERNS.ip.test(intent)) detected.push('ip_address');
if (PII_PATTERNS.phone.test(intent)) detected.push('phone');
if (PII_PATTERNS.creditCard.test(intent)) detected.push('credit_card');
if (PII_PATTERNS.apiKey.test(intent)) detected.push('api_key');
if (PII_PATTERNS.uuid.test(intent)) detected.push('uuid');
if (PII_PATTERNS.filePath.test(intent)) detected.push('file_path');
if (PII_PATTERNS.secret.test(intent)) detected.push('secret');
// Reset lastIndex for global regexes
Object.values(PII_PATTERNS).forEach((pattern) => {
pattern.lastIndex = 0;
});
return detected;
}
/**
* Truncate intent to maximum length while preserving meaning
*/
truncate(intent: string, maxLength: number = 1000): string {
if (!intent || intent.length <= maxLength) {
return intent;
}
// Try to truncate at sentence boundary
const truncated = intent.substring(0, maxLength);
const lastSentence = truncated.lastIndexOf('.');
const lastSpace = truncated.lastIndexOf(' ');
if (lastSentence > maxLength * 0.8) {
return truncated.substring(0, lastSentence + 1);
} else if (lastSpace > maxLength * 0.9) {
return truncated.substring(0, lastSpace) + '...';
}
return truncated + '...';
}
/**
* Validate intent is safe for telemetry
*/
isSafeForTelemetry(intent: string): boolean {
if (!intent) {
return true;
}
// Check length
if (intent.length > 5000) {
return false;
}
// Check for null bytes or control characters
if (/[\x00-\x08\x0B\x0C\x0E-\x1F]/.test(intent)) {
return false;
}
return true;
}
}
/**
* Singleton instance for easy access
*/
export const intentSanitizer = new IntentSanitizer();

View File

@@ -0,0 +1,369 @@
/**
* Core mutation tracker for workflow transformations
* Coordinates validation, classification, and metric calculation
*/
import { DiffOperation } from '../types/workflow-diff.js';
import {
WorkflowMutationData,
WorkflowMutationRecord,
MutationChangeMetrics,
MutationValidationMetrics,
IntentClassification,
} from './mutation-types.js';
import { intentClassifier } from './intent-classifier.js';
import { mutationValidator } from './mutation-validator.js';
import { intentSanitizer } from './intent-sanitizer.js';
import { WorkflowSanitizer } from './workflow-sanitizer.js';
import { logger } from '../utils/logger.js';
/**
* Tracks workflow mutations and prepares data for telemetry
*/
export class MutationTracker {
private recentMutations: Array<{
hashBefore: string;
hashAfter: string;
operations: DiffOperation[];
}> = [];
private readonly RECENT_MUTATIONS_LIMIT = 100;
/**
* Process and prepare mutation data for tracking
*/
async processMutation(data: WorkflowMutationData, userId: string): Promise<WorkflowMutationRecord | null> {
try {
// Validate data quality
if (!this.validateMutationData(data)) {
logger.debug('Mutation data validation failed');
return null;
}
// Sanitize workflows to remove credentials and sensitive data
const workflowBefore = this.sanitizeFullWorkflow(data.workflowBefore);
const workflowAfter = this.sanitizeFullWorkflow(data.workflowAfter);
// Sanitize user intent
const sanitizedIntent = intentSanitizer.sanitize(data.userIntent);
// Check if should be excluded
if (mutationValidator.shouldExclude(data)) {
logger.debug('Mutation excluded from tracking based on quality criteria');
return null;
}
// Check for duplicates
if (
mutationValidator.isDuplicate(
workflowBefore,
workflowAfter,
data.operations,
this.recentMutations
)
) {
logger.debug('Duplicate mutation detected, skipping tracking');
return null;
}
// Generate hashes
const hashBefore = mutationValidator.hashWorkflow(workflowBefore);
const hashAfter = mutationValidator.hashWorkflow(workflowAfter);
// Classify intent
const intentClassification = intentClassifier.classify(data.operations, sanitizedIntent);
// Calculate metrics
const changeMetrics = this.calculateChangeMetrics(data.operations);
const validationMetrics = this.calculateValidationMetrics(
data.validationBefore,
data.validationAfter
);
// Create mutation record
const record: WorkflowMutationRecord = {
userId,
sessionId: data.sessionId,
workflowBefore,
workflowAfter,
workflowHashBefore: hashBefore,
workflowHashAfter: hashAfter,
userIntent: sanitizedIntent,
intentClassification,
toolName: data.toolName,
operations: data.operations,
operationCount: data.operations.length,
operationTypes: this.extractOperationTypes(data.operations),
validationBefore: data.validationBefore,
validationAfter: data.validationAfter,
...validationMetrics,
...changeMetrics,
mutationSuccess: data.mutationSuccess,
mutationError: data.mutationError,
durationMs: data.durationMs,
};
// Store in recent mutations for deduplication
this.addToRecentMutations(hashBefore, hashAfter, data.operations);
return record;
} catch (error) {
logger.error('Error processing mutation:', error);
return null;
}
}
/**
* Validate mutation data
*/
private validateMutationData(data: WorkflowMutationData): boolean {
const validationResult = mutationValidator.validate(data);
if (!validationResult.valid) {
logger.warn('Mutation data validation failed:', validationResult.errors);
return false;
}
if (validationResult.warnings.length > 0) {
logger.debug('Mutation data validation warnings:', validationResult.warnings);
}
return true;
}
/**
* Calculate change metrics from operations
*/
private calculateChangeMetrics(operations: DiffOperation[]): MutationChangeMetrics {
const metrics: MutationChangeMetrics = {
nodesAdded: 0,
nodesRemoved: 0,
nodesModified: 0,
connectionsAdded: 0,
connectionsRemoved: 0,
propertiesChanged: 0,
};
for (const op of operations) {
switch (op.type) {
case 'addNode':
metrics.nodesAdded++;
break;
case 'removeNode':
metrics.nodesRemoved++;
break;
case 'updateNode':
metrics.nodesModified++;
if ('updates' in op && op.updates) {
metrics.propertiesChanged += Object.keys(op.updates as any).length;
}
break;
case 'addConnection':
metrics.connectionsAdded++;
break;
case 'removeConnection':
metrics.connectionsRemoved++;
break;
case 'rewireConnection':
// Rewiring is effectively removing + adding
metrics.connectionsRemoved++;
metrics.connectionsAdded++;
break;
case 'replaceConnections':
// Count how many connections are being replaced
if ('connections' in op && op.connections) {
metrics.connectionsRemoved++;
metrics.connectionsAdded++;
}
break;
case 'updateSettings':
if ('settings' in op && op.settings) {
metrics.propertiesChanged += Object.keys(op.settings as any).length;
}
break;
case 'moveNode':
case 'enableNode':
case 'disableNode':
case 'updateName':
case 'addTag':
case 'removeTag':
case 'activateWorkflow':
case 'deactivateWorkflow':
case 'cleanStaleConnections':
// These don't directly affect node/connection counts
// but count as property changes
metrics.propertiesChanged++;
break;
}
}
return metrics;
}
/**
* Sanitize a full workflow while preserving structure
* Removes credentials and sensitive data but keeps all nodes, connections, parameters
*/
private sanitizeFullWorkflow(workflow: any): any {
if (!workflow) return workflow;
// Deep clone to avoid modifying original
const sanitized = JSON.parse(JSON.stringify(workflow));
// Remove sensitive workflow-level fields
delete sanitized.credentials;
delete sanitized.sharedWorkflows;
delete sanitized.ownedBy;
delete sanitized.createdBy;
delete sanitized.updatedBy;
// Sanitize each node
if (sanitized.nodes && Array.isArray(sanitized.nodes)) {
sanitized.nodes = sanitized.nodes.map((node: any) => {
const sanitizedNode = { ...node };
// Remove credentials field
delete sanitizedNode.credentials;
// Sanitize parameters if present
if (sanitizedNode.parameters && typeof sanitizedNode.parameters === 'object') {
sanitizedNode.parameters = this.sanitizeParameters(sanitizedNode.parameters);
}
return sanitizedNode;
});
}
return sanitized;
}
/**
* Recursively sanitize parameters object
*/
private sanitizeParameters(params: any): any {
if (!params || typeof params !== 'object') return params;
const sensitiveKeys = [
'apiKey', 'api_key', 'token', 'secret', 'password', 'credential',
'auth', 'authorization', 'privateKey', 'accessToken', 'refreshToken'
];
const sanitized: any = Array.isArray(params) ? [] : {};
for (const [key, value] of Object.entries(params)) {
const lowerKey = key.toLowerCase();
// Check if key is sensitive
if (sensitiveKeys.some(sk => lowerKey.includes(sk.toLowerCase()))) {
sanitized[key] = '[REDACTED]';
} else if (typeof value === 'object' && value !== null) {
// Recursively sanitize nested objects
sanitized[key] = this.sanitizeParameters(value);
} else if (typeof value === 'string') {
// Sanitize string values that might contain sensitive data
sanitized[key] = this.sanitizeStringValue(value);
} else {
sanitized[key] = value;
}
}
return sanitized;
}
/**
* Sanitize string values that might contain sensitive data
*/
private sanitizeStringValue(value: string): string {
if (!value || typeof value !== 'string') return value;
let sanitized = value;
// Redact URLs with authentication
sanitized = sanitized.replace(/https?:\/\/[^:]+:[^@]+@[^\s/]+/g, '[REDACTED_URL_WITH_AUTH]');
// Redact long API keys/tokens (20+ alphanumeric chars)
sanitized = sanitized.replace(/\b[A-Za-z0-9_-]{32,}\b/g, '[REDACTED_TOKEN]');
// Redact OpenAI-style keys
sanitized = sanitized.replace(/\bsk-[A-Za-z0-9]{32,}\b/g, '[REDACTED_APIKEY]');
// Redact Bearer tokens
sanitized = sanitized.replace(/Bearer\s+[^\s]+/gi, 'Bearer [REDACTED]');
return sanitized;
}
/**
* Calculate validation improvement metrics
*/
private calculateValidationMetrics(
validationBefore: any,
validationAfter: any
): MutationValidationMetrics {
// If validation data is missing, return nulls
if (!validationBefore || !validationAfter) {
return {
validationImproved: null,
errorsResolved: 0,
errorsIntroduced: 0,
};
}
const errorsBefore = validationBefore.errors?.length || 0;
const errorsAfter = validationAfter.errors?.length || 0;
const errorsResolved = Math.max(0, errorsBefore - errorsAfter);
const errorsIntroduced = Math.max(0, errorsAfter - errorsBefore);
const validationImproved = errorsBefore > errorsAfter;
return {
validationImproved,
errorsResolved,
errorsIntroduced,
};
}
/**
* Extract unique operation types from operations
*/
private extractOperationTypes(operations: DiffOperation[]): string[] {
const types = new Set(operations.map((op) => op.type));
return Array.from(types);
}
/**
* Add mutation to recent list for deduplication
*/
private addToRecentMutations(
hashBefore: string,
hashAfter: string,
operations: DiffOperation[]
): void {
this.recentMutations.push({ hashBefore, hashAfter, operations });
// Keep only recent mutations
if (this.recentMutations.length > this.RECENT_MUTATIONS_LIMIT) {
this.recentMutations.shift();
}
}
/**
* Clear recent mutations (useful for testing)
*/
clearRecentMutations(): void {
this.recentMutations = [];
}
/**
* Get statistics about tracked mutations
*/
getRecentMutationsCount(): number {
return this.recentMutations.length;
}
}
/**
* Singleton instance for easy access
*/
export const mutationTracker = new MutationTracker();

View File

@@ -0,0 +1,154 @@
/**
* Types and interfaces for workflow mutation tracking
* Purpose: Track workflow transformations to improve partial updates tooling
*/
import { DiffOperation } from '../types/workflow-diff.js';
/**
* Intent classification for workflow mutations
*/
export enum IntentClassification {
ADD_FUNCTIONALITY = 'add_functionality',
MODIFY_CONFIGURATION = 'modify_configuration',
REWIRE_LOGIC = 'rewire_logic',
FIX_VALIDATION = 'fix_validation',
CLEANUP = 'cleanup',
UNKNOWN = 'unknown',
}
/**
* Tool names that perform workflow mutations
*/
export enum MutationToolName {
UPDATE_PARTIAL = 'n8n_update_partial_workflow',
UPDATE_FULL = 'n8n_update_full_workflow',
}
/**
* Validation result structure
*/
export interface ValidationResult {
valid: boolean;
errors: Array<{
type: string;
message: string;
severity?: string;
location?: string;
}>;
warnings?: Array<{
type: string;
message: string;
}>;
}
/**
* Change metrics calculated from workflow mutation
*/
export interface MutationChangeMetrics {
nodesAdded: number;
nodesRemoved: number;
nodesModified: number;
connectionsAdded: number;
connectionsRemoved: number;
propertiesChanged: number;
}
/**
* Validation improvement metrics
*/
export interface MutationValidationMetrics {
validationImproved: boolean | null;
errorsResolved: number;
errorsIntroduced: number;
}
/**
* Input data for tracking a workflow mutation
*/
export interface WorkflowMutationData {
sessionId: string;
toolName: MutationToolName;
userIntent: string;
operations: DiffOperation[];
workflowBefore: any;
workflowAfter: any;
validationBefore?: ValidationResult;
validationAfter?: ValidationResult;
mutationSuccess: boolean;
mutationError?: string;
durationMs: number;
}
/**
* Complete mutation record for database storage
*/
export interface WorkflowMutationRecord {
id?: string;
userId: string;
sessionId: string;
workflowBefore: any;
workflowAfter: any;
workflowHashBefore: string;
workflowHashAfter: string;
userIntent: string;
intentClassification: IntentClassification;
toolName: MutationToolName;
operations: DiffOperation[];
operationCount: number;
operationTypes: string[];
validationBefore?: ValidationResult;
validationAfter?: ValidationResult;
validationImproved: boolean | null;
errorsResolved: number;
errorsIntroduced: number;
nodesAdded: number;
nodesRemoved: number;
nodesModified: number;
connectionsAdded: number;
connectionsRemoved: number;
propertiesChanged: number;
mutationSuccess: boolean;
mutationError?: string;
durationMs: number;
createdAt?: Date;
}
/**
* Options for mutation tracking
*/
export interface MutationTrackingOptions {
/** Whether to track this mutation (default: true) */
enabled?: boolean;
/** Maximum workflow size in KB to track (default: 500) */
maxWorkflowSizeKb?: number;
/** Whether to validate data quality before tracking (default: true) */
validateQuality?: boolean;
/** Whether to sanitize workflows for PII (default: true) */
sanitize?: boolean;
}
/**
* Mutation tracking statistics for monitoring
*/
export interface MutationTrackingStats {
totalMutationsTracked: number;
successfulMutations: number;
failedMutations: number;
mutationsWithValidationImprovement: number;
averageDurationMs: number;
intentClassificationBreakdown: Record<IntentClassification, number>;
operationTypeBreakdown: Record<string, number>;
}
/**
* Data quality validation result
*/
export interface MutationDataQualityResult {
valid: boolean;
errors: string[];
warnings: string[];
}

View File

@@ -0,0 +1,237 @@
/**
* Data quality validator for workflow mutations
* Ensures mutation data meets quality standards before tracking
*/
import { createHash } from 'crypto';
import {
WorkflowMutationData,
MutationDataQualityResult,
MutationTrackingOptions,
} from './mutation-types.js';
/**
* Default options for mutation tracking
*/
export const DEFAULT_MUTATION_TRACKING_OPTIONS: Required<MutationTrackingOptions> = {
enabled: true,
maxWorkflowSizeKb: 500,
validateQuality: true,
sanitize: true,
};
/**
* Validates workflow mutation data quality
*/
export class MutationValidator {
private options: Required<MutationTrackingOptions>;
constructor(options: MutationTrackingOptions = {}) {
this.options = { ...DEFAULT_MUTATION_TRACKING_OPTIONS, ...options };
}
/**
* Validate mutation data quality
*/
validate(data: WorkflowMutationData): MutationDataQualityResult {
const errors: string[] = [];
const warnings: string[] = [];
// Check workflow structure
if (!this.isValidWorkflow(data.workflowBefore)) {
errors.push('Invalid workflow_before structure');
}
if (!this.isValidWorkflow(data.workflowAfter)) {
errors.push('Invalid workflow_after structure');
}
// Check workflow size
const beforeSizeKb = this.getWorkflowSizeKb(data.workflowBefore);
const afterSizeKb = this.getWorkflowSizeKb(data.workflowAfter);
if (beforeSizeKb > this.options.maxWorkflowSizeKb) {
errors.push(
`workflow_before size (${beforeSizeKb}KB) exceeds maximum (${this.options.maxWorkflowSizeKb}KB)`
);
}
if (afterSizeKb > this.options.maxWorkflowSizeKb) {
errors.push(
`workflow_after size (${afterSizeKb}KB) exceeds maximum (${this.options.maxWorkflowSizeKb}KB)`
);
}
// Check for meaningful change
if (!this.hasMeaningfulChange(data.workflowBefore, data.workflowAfter)) {
warnings.push('No meaningful change detected between before and after workflows');
}
// Check intent quality
if (!data.userIntent || data.userIntent.trim().length === 0) {
warnings.push('User intent is empty');
} else if (data.userIntent.trim().length < 5) {
warnings.push('User intent is too short (less than 5 characters)');
} else if (data.userIntent.length > 1000) {
warnings.push('User intent is very long (over 1000 characters)');
}
// Check operations
if (!data.operations || data.operations.length === 0) {
errors.push('No operations provided');
}
// Check validation data consistency
if (data.validationBefore && data.validationAfter) {
if (typeof data.validationBefore.valid !== 'boolean') {
warnings.push('Invalid validation_before structure');
}
if (typeof data.validationAfter.valid !== 'boolean') {
warnings.push('Invalid validation_after structure');
}
}
// Check duration sanity
if (data.durationMs !== undefined) {
if (data.durationMs < 0) {
errors.push('Duration cannot be negative');
}
if (data.durationMs > 300000) {
// 5 minutes
warnings.push('Duration is very long (over 5 minutes)');
}
}
return {
valid: errors.length === 0,
errors,
warnings,
};
}
/**
* Check if workflow has valid structure
*/
private isValidWorkflow(workflow: any): boolean {
if (!workflow || typeof workflow !== 'object') {
return false;
}
// Must have nodes array
if (!Array.isArray(workflow.nodes)) {
return false;
}
// Must have connections object
if (!workflow.connections || typeof workflow.connections !== 'object') {
return false;
}
return true;
}
/**
* Get workflow size in KB
*/
private getWorkflowSizeKb(workflow: any): number {
try {
const json = JSON.stringify(workflow);
return json.length / 1024;
} catch {
return 0;
}
}
/**
* Check if there's meaningful change between workflows
*/
private hasMeaningfulChange(workflowBefore: any, workflowAfter: any): boolean {
try {
// Compare hashes
const hashBefore = this.hashWorkflow(workflowBefore);
const hashAfter = this.hashWorkflow(workflowAfter);
return hashBefore !== hashAfter;
} catch {
return false;
}
}
/**
* Hash workflow for comparison
*/
hashWorkflow(workflow: any): string {
try {
const json = JSON.stringify(workflow);
return createHash('sha256').update(json).digest('hex').substring(0, 16);
} catch {
return '';
}
}
/**
* Check if mutation should be excluded from tracking
*/
shouldExclude(data: WorkflowMutationData): boolean {
// Exclude if not successful and no error message
if (!data.mutationSuccess && !data.mutationError) {
return true;
}
// Exclude if workflows are identical
if (!this.hasMeaningfulChange(data.workflowBefore, data.workflowAfter)) {
return true;
}
// Exclude if workflow size exceeds limits
const beforeSizeKb = this.getWorkflowSizeKb(data.workflowBefore);
const afterSizeKb = this.getWorkflowSizeKb(data.workflowAfter);
if (
beforeSizeKb > this.options.maxWorkflowSizeKb ||
afterSizeKb > this.options.maxWorkflowSizeKb
) {
return true;
}
return false;
}
/**
* Check for duplicate mutation (same hash + operations)
*/
isDuplicate(
workflowBefore: any,
workflowAfter: any,
operations: any[],
recentMutations: Array<{ hashBefore: string; hashAfter: string; operations: any[] }>
): boolean {
const hashBefore = this.hashWorkflow(workflowBefore);
const hashAfter = this.hashWorkflow(workflowAfter);
const operationsHash = this.hashOperations(operations);
return recentMutations.some(
(m) =>
m.hashBefore === hashBefore &&
m.hashAfter === hashAfter &&
this.hashOperations(m.operations) === operationsHash
);
}
/**
* Hash operations for deduplication
*/
private hashOperations(operations: any[]): string {
try {
const json = JSON.stringify(operations);
return createHash('sha256').update(json).digest('hex').substring(0, 16);
} catch {
return '';
}
}
}
/**
* Singleton instance for easy access
*/
export const mutationValidator = new MutationValidator();

View File

@@ -148,6 +148,50 @@ export class TelemetryManager {
}
}
/**
* Track workflow mutation from partial updates
*/
async trackWorkflowMutation(data: any): Promise<void> {
this.ensureInitialized();
if (!this.isEnabled()) {
logger.debug('Telemetry disabled, skipping mutation tracking');
return;
}
this.performanceMonitor.startOperation('trackWorkflowMutation');
try {
const { mutationTracker } = await import('./mutation-tracker.js');
const userId = this.configManager.getUserId();
const mutationRecord = await mutationTracker.processMutation(data, userId);
if (mutationRecord) {
// Queue for batch processing
this.eventTracker.enqueueMutation(mutationRecord);
// Auto-flush if queue reaches threshold
// Lower threshold (2) for mutations since they're less frequent than regular events
const queueSize = this.eventTracker.getMutationQueueSize();
if (queueSize >= 2) {
await this.flushMutations();
}
}
} catch (error) {
const telemetryError = error instanceof TelemetryError
? error
: new TelemetryError(
TelemetryErrorType.UNKNOWN_ERROR,
'Failed to track workflow mutation',
{ error: String(error) }
);
this.errorAggregator.record(telemetryError);
logger.debug('Error tracking workflow mutation:', error);
} finally {
this.performanceMonitor.endOperation('trackWorkflowMutation');
}
}
/**
* Track an error event
@@ -221,14 +265,16 @@ export class TelemetryManager {
// Get queued data from event tracker
const events = this.eventTracker.getEventQueue();
const workflows = this.eventTracker.getWorkflowQueue();
const mutations = this.eventTracker.getMutationQueue();
// Clear queues immediately to prevent duplicate processing
this.eventTracker.clearEventQueue();
this.eventTracker.clearWorkflowQueue();
this.eventTracker.clearMutationQueue();
try {
// Use batch processor to flush
await this.batchProcessor.flush(events, workflows);
await this.batchProcessor.flush(events, workflows, mutations);
} catch (error) {
const telemetryError = error instanceof TelemetryError
? error
@@ -248,6 +294,21 @@ export class TelemetryManager {
}
}
/**
* Flush queued mutations only
*/
async flushMutations(): Promise<void> {
this.ensureInitialized();
if (!this.isEnabled() || !this.supabase) return;
const mutations = this.eventTracker.getMutationQueue();
this.eventTracker.clearMutationQueue();
if (mutations.length > 0) {
await this.batchProcessor.flush([], [], mutations);
}
}
/**
* Check if telemetry is enabled

View File

@@ -131,4 +131,9 @@ export interface TelemetryErrorContext {
context?: Record<string, any>;
timestamp: number;
retryable: boolean;
}
}
/**
* Re-export workflow mutation types
*/
export type { WorkflowMutationRecord, WorkflowMutationData } from './mutation-types.js';