mirror of
https://github.com/czlonkowski/n8n-mcp.git
synced 2026-03-20 17:33:08 +00:00
feat: implement anonymous telemetry system with Supabase integration
Adds zero-configuration anonymous usage statistics to track: - Number of active users with deterministic user IDs - Which MCP tools AI agents use most - What workflows are built (sanitized to protect privacy) - Common errors and issues Key features: - Zero-configuration design with hardcoded write-only credentials - Privacy-first approach with comprehensive data sanitization - Opt-out support via config file and environment variables - Docker-friendly with environment variable support - Multi-process safe with immediate flush strategy - Row Level Security (RLS) policies for write-only access Technical implementation: - Supabase backend with anon key for INSERT-only operations - Workflow sanitization removes all sensitive data - Environment variables checked for opt-out (TELEMETRY_DISABLED, etc.) - Telemetry enabled by default but respects user preferences - Cleaned up all debug logging for production readiness 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
@@ -50,7 +50,7 @@ Commands:
|
||||
disable Disable anonymous telemetry
|
||||
status Show current telemetry status
|
||||
|
||||
Learn more: https://github.com/czlonkowski/n8n-mcp/privacy
|
||||
Learn more: https://github.com/czlonkowski/n8n-mcp/blob/main/PRIVACY.md
|
||||
`);
|
||||
process.exit(args[1] ? 1 : 0);
|
||||
}
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
*/
|
||||
|
||||
import { existsSync, readFileSync, writeFileSync, mkdirSync } from 'fs';
|
||||
import { join } from 'path';
|
||||
import { join, resolve, dirname } from 'path';
|
||||
import { homedir } from 'os';
|
||||
import { createHash } from 'crypto';
|
||||
import { hostname, platform, arch } from 'os';
|
||||
@@ -53,15 +53,24 @@ export class TelemetryConfigManager {
|
||||
|
||||
if (!existsSync(this.configPath)) {
|
||||
// First run - create default config
|
||||
const version = this.getPackageVersion();
|
||||
|
||||
// Check if telemetry is disabled via environment variable
|
||||
const envDisabled = this.isDisabledByEnvironment();
|
||||
|
||||
this.config = {
|
||||
enabled: true,
|
||||
enabled: !envDisabled, // Respect env var on first run
|
||||
userId: this.generateUserId(),
|
||||
firstRun: new Date().toISOString(),
|
||||
version: require('../../package.json').version
|
||||
version
|
||||
};
|
||||
|
||||
this.saveConfig();
|
||||
this.showFirstRunNotice();
|
||||
|
||||
// Only show notice if not disabled via environment
|
||||
if (!envDisabled) {
|
||||
this.showFirstRunNotice();
|
||||
}
|
||||
|
||||
return this.config;
|
||||
}
|
||||
@@ -107,12 +116,51 @@ export class TelemetryConfigManager {
|
||||
|
||||
/**
|
||||
* Check if telemetry is enabled
|
||||
* Priority: Environment variable > Config file > Default (true)
|
||||
*/
|
||||
isEnabled(): boolean {
|
||||
// Check environment variables first (for Docker users)
|
||||
if (this.isDisabledByEnvironment()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
const config = this.loadConfig();
|
||||
return config.enabled;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if telemetry is disabled via environment variable
|
||||
*/
|
||||
private isDisabledByEnvironment(): boolean {
|
||||
const envVars = [
|
||||
'N8N_MCP_TELEMETRY_DISABLED',
|
||||
'TELEMETRY_DISABLED',
|
||||
'DISABLE_TELEMETRY'
|
||||
];
|
||||
|
||||
for (const varName of envVars) {
|
||||
const value = process.env[varName];
|
||||
if (value !== undefined) {
|
||||
const normalized = value.toLowerCase().trim();
|
||||
|
||||
// Warn about invalid values
|
||||
if (!['true', 'false', '1', '0', ''].includes(normalized)) {
|
||||
console.warn(
|
||||
`⚠️ Invalid telemetry environment variable value: ${varName}="${value}"\n` +
|
||||
` Use "true" to disable or "false" to enable telemetry.`
|
||||
);
|
||||
}
|
||||
|
||||
// Accept common truthy values
|
||||
if (normalized === 'true' || normalized === '1') {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the anonymous user ID
|
||||
*/
|
||||
@@ -155,14 +203,25 @@ export class TelemetryConfigManager {
|
||||
*/
|
||||
getStatus(): string {
|
||||
const config = this.loadConfig();
|
||||
|
||||
// Check if disabled by environment
|
||||
const envDisabled = this.isDisabledByEnvironment();
|
||||
|
||||
let status = config.enabled ? 'ENABLED' : 'DISABLED';
|
||||
if (envDisabled) {
|
||||
status = 'DISABLED (via environment variable)';
|
||||
}
|
||||
|
||||
return `
|
||||
Telemetry Status: ${config.enabled ? 'ENABLED' : 'DISABLED'}
|
||||
Telemetry Status: ${status}
|
||||
Anonymous ID: ${config.userId}
|
||||
First Run: ${config.firstRun || 'Unknown'}
|
||||
Config Path: ${this.configPath}
|
||||
|
||||
To opt-out: npx n8n-mcp telemetry disable
|
||||
To opt-in: npx n8n-mcp telemetry enable
|
||||
|
||||
For Docker: Set N8N_MCP_TELEMETRY_DISABLED=true
|
||||
`;
|
||||
}
|
||||
|
||||
@@ -199,9 +258,44 @@ To opt-in: npx n8n-mcp telemetry enable
|
||||
║ npx n8n-mcp telemetry disable ║
|
||||
║ ║
|
||||
║ Learn more: ║
|
||||
║ https://github.com/czlonkowski/n8n-mcp/privacy ║
|
||||
║ https://github.com/czlonkowski/n8n-mcp/blob/main/PRIVACY.md ║
|
||||
║ ║
|
||||
╚════════════════════════════════════════════════════════════╝
|
||||
`);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get package version safely
|
||||
*/
|
||||
private getPackageVersion(): string {
|
||||
try {
|
||||
// Try multiple approaches to find package.json
|
||||
const possiblePaths = [
|
||||
resolve(__dirname, '..', '..', 'package.json'),
|
||||
resolve(process.cwd(), 'package.json'),
|
||||
resolve(__dirname, '..', '..', '..', 'package.json')
|
||||
];
|
||||
|
||||
for (const packagePath of possiblePaths) {
|
||||
if (existsSync(packagePath)) {
|
||||
const packageJson = JSON.parse(readFileSync(packagePath, 'utf-8'));
|
||||
if (packageJson.version) {
|
||||
return packageJson.version;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Fallback: try require (works in some environments)
|
||||
try {
|
||||
const packageJson = require('../../package.json');
|
||||
return packageJson.version || 'unknown';
|
||||
} catch {
|
||||
// Ignore require error
|
||||
}
|
||||
|
||||
return 'unknown';
|
||||
} catch (error) {
|
||||
return 'unknown';
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -7,6 +7,8 @@ import { createClient, SupabaseClient } from '@supabase/supabase-js';
|
||||
import { TelemetryConfigManager } from './config-manager';
|
||||
import { WorkflowSanitizer } from './workflow-sanitizer';
|
||||
import { logger } from '../utils/logger';
|
||||
import { resolve } from 'path';
|
||||
import { existsSync, readFileSync } from 'fs';
|
||||
|
||||
interface TelemetryEvent {
|
||||
user_id: string;
|
||||
@@ -27,6 +29,28 @@ interface WorkflowTelemetry {
|
||||
created_at?: string;
|
||||
}
|
||||
|
||||
// Configuration constants
|
||||
const TELEMETRY_CONFIG = {
|
||||
BATCH_FLUSH_INTERVAL: 5000, // 5 seconds - reduced for multi-process
|
||||
EVENT_QUEUE_THRESHOLD: 1, // Immediate flush for multi-process compatibility
|
||||
WORKFLOW_QUEUE_THRESHOLD: 1, // Immediate flush for multi-process compatibility
|
||||
MAX_RETRIES: 3,
|
||||
RETRY_DELAY: 1000, // 1 second
|
||||
OPERATION_TIMEOUT: 5000, // 5 seconds
|
||||
} as const;
|
||||
|
||||
// Hardcoded telemetry backend configuration
|
||||
// IMPORTANT: This is intentionally hardcoded for zero-configuration telemetry
|
||||
// The anon key is PUBLIC and SAFE to expose because:
|
||||
// 1. It only allows INSERT operations (write-only)
|
||||
// 2. Row Level Security (RLS) policies prevent reading/updating/deleting data
|
||||
// 3. This is standard practice for anonymous telemetry collection
|
||||
// 4. No sensitive user data is ever sent
|
||||
const TELEMETRY_BACKEND = {
|
||||
URL: 'https://ydyufsohxdfpopqbubwk.supabase.co',
|
||||
ANON_KEY: 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZSIsInJlZiI6InlkeXVmc29oeGRmcG9wcWJ1YndrIiwicm9sZSI6ImFub24iLCJpYXQiOjE3NTg3OTYyMDAsImV4cCI6MjA3NDM3MjIwMH0.xESphg6h5ozaDsm4Vla3QnDJGc6Nc_cpfoqTHRynkCk'
|
||||
} as const;
|
||||
|
||||
export class TelemetryManager {
|
||||
private static instance: TelemetryManager;
|
||||
private supabase: SupabaseClient | null = null;
|
||||
@@ -35,6 +59,7 @@ export class TelemetryManager {
|
||||
private workflowQueue: WorkflowTelemetry[] = [];
|
||||
private flushTimer?: NodeJS.Timeout;
|
||||
private isInitialized: boolean = false;
|
||||
private isFlushingWorkflows: boolean = false;
|
||||
|
||||
private constructor() {
|
||||
this.configManager = TelemetryConfigManager.getInstance();
|
||||
@@ -57,13 +82,10 @@ export class TelemetryManager {
|
||||
return;
|
||||
}
|
||||
|
||||
const supabaseUrl = process.env.SUPABASE_URL;
|
||||
const supabaseAnonKey = process.env.SUPABASE_ANON_KEY;
|
||||
|
||||
if (!supabaseUrl || !supabaseAnonKey) {
|
||||
logger.debug('Telemetry not configured: missing SUPABASE_URL or SUPABASE_ANON_KEY');
|
||||
return;
|
||||
}
|
||||
// Use hardcoded credentials for zero-configuration telemetry
|
||||
// Environment variables can override for development/testing
|
||||
const supabaseUrl = process.env.SUPABASE_URL || TELEMETRY_BACKEND.URL;
|
||||
const supabaseAnonKey = process.env.SUPABASE_ANON_KEY || TELEMETRY_BACKEND.ANON_KEY;
|
||||
|
||||
try {
|
||||
this.supabase = createClient(supabaseUrl, supabaseAnonKey, {
|
||||
@@ -116,9 +138,9 @@ export class TelemetryManager {
|
||||
}
|
||||
|
||||
/**
|
||||
* Track workflow creation
|
||||
* Track workflow creation (fire-and-forget)
|
||||
*/
|
||||
async trackWorkflowCreation(workflow: any, validationPassed: boolean): Promise<void> {
|
||||
trackWorkflowCreation(workflow: any, validationPassed: boolean): void {
|
||||
if (!this.isEnabled()) return;
|
||||
|
||||
// Only store workflows that pass validation
|
||||
@@ -129,41 +151,67 @@ export class TelemetryManager {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
const sanitized = WorkflowSanitizer.sanitizeWorkflow(workflow);
|
||||
// Process asynchronously without blocking
|
||||
setImmediate(async () => {
|
||||
try {
|
||||
const sanitized = WorkflowSanitizer.sanitizeWorkflow(workflow);
|
||||
|
||||
const telemetryData: WorkflowTelemetry = {
|
||||
user_id: this.configManager.getUserId(),
|
||||
workflow_hash: sanitized.workflowHash,
|
||||
node_count: sanitized.nodeCount,
|
||||
node_types: sanitized.nodeTypes,
|
||||
has_trigger: sanitized.hasTrigger,
|
||||
has_webhook: sanitized.hasWebhook,
|
||||
complexity: sanitized.complexity,
|
||||
sanitized_workflow: {
|
||||
nodes: sanitized.nodes,
|
||||
connections: sanitized.connections,
|
||||
},
|
||||
};
|
||||
const telemetryData: WorkflowTelemetry = {
|
||||
user_id: this.configManager.getUserId(),
|
||||
workflow_hash: sanitized.workflowHash,
|
||||
node_count: sanitized.nodeCount,
|
||||
node_types: sanitized.nodeTypes,
|
||||
has_trigger: sanitized.hasTrigger,
|
||||
has_webhook: sanitized.hasWebhook,
|
||||
complexity: sanitized.complexity,
|
||||
sanitized_workflow: {
|
||||
nodes: sanitized.nodes,
|
||||
connections: sanitized.connections,
|
||||
},
|
||||
};
|
||||
|
||||
this.workflowQueue.push(telemetryData);
|
||||
// Add to queue synchronously to avoid race conditions
|
||||
const queueLength = this.addToWorkflowQueue(telemetryData);
|
||||
|
||||
// Also track as event
|
||||
this.trackEvent('workflow_created', {
|
||||
nodeCount: sanitized.nodeCount,
|
||||
nodeTypes: sanitized.nodeTypes.length,
|
||||
complexity: sanitized.complexity,
|
||||
hasTrigger: sanitized.hasTrigger,
|
||||
hasWebhook: sanitized.hasWebhook,
|
||||
});
|
||||
// Also track as event
|
||||
this.trackEvent('workflow_created', {
|
||||
nodeCount: sanitized.nodeCount,
|
||||
nodeTypes: sanitized.nodeTypes.length,
|
||||
complexity: sanitized.complexity,
|
||||
hasTrigger: sanitized.hasTrigger,
|
||||
hasWebhook: sanitized.hasWebhook,
|
||||
});
|
||||
|
||||
// Flush if queue is getting large
|
||||
if (this.workflowQueue.length >= 5) {
|
||||
await this.flush();
|
||||
// Flush if queue reached threshold
|
||||
if (queueLength >= TELEMETRY_CONFIG.WORKFLOW_QUEUE_THRESHOLD) {
|
||||
await this.flush();
|
||||
}
|
||||
} catch (error) {
|
||||
logger.debug('Failed to track workflow creation:', error);
|
||||
}
|
||||
} catch (error) {
|
||||
logger.debug('Failed to track workflow creation:', error);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Thread-safe method to add workflow to queue
|
||||
* Returns the new queue length after adding
|
||||
*/
|
||||
private addToWorkflowQueue(telemetryData: WorkflowTelemetry): number {
|
||||
// Don't add to queue if we're currently flushing workflows
|
||||
// This prevents race conditions where items are added during flush
|
||||
if (this.isFlushingWorkflows) {
|
||||
// Queue the flush for later to ensure we don't lose data
|
||||
setImmediate(() => {
|
||||
this.workflowQueue.push(telemetryData);
|
||||
if (this.workflowQueue.length >= TELEMETRY_CONFIG.WORKFLOW_QUEUE_THRESHOLD) {
|
||||
this.flush();
|
||||
}
|
||||
});
|
||||
return 0; // Don't trigger immediate flush
|
||||
}
|
||||
|
||||
this.workflowQueue.push(telemetryData);
|
||||
return this.workflowQueue.length;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -194,7 +242,7 @@ export class TelemetryManager {
|
||||
this.eventQueue.push(event);
|
||||
|
||||
// Flush if queue is getting large
|
||||
if (this.eventQueue.length >= 20) {
|
||||
if (this.eventQueue.length >= TELEMETRY_CONFIG.EVENT_QUEUE_THRESHOLD) {
|
||||
this.flush();
|
||||
}
|
||||
}
|
||||
@@ -206,13 +254,83 @@ export class TelemetryManager {
|
||||
if (!this.isEnabled()) return;
|
||||
|
||||
this.trackEvent('session_start', {
|
||||
version: require('../../package.json').version,
|
||||
version: this.getPackageVersion(),
|
||||
platform: process.platform,
|
||||
arch: process.arch,
|
||||
nodeVersion: process.version,
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Get package version safely
|
||||
*/
|
||||
private getPackageVersion(): string {
|
||||
try {
|
||||
// Try multiple approaches to find package.json
|
||||
const possiblePaths = [
|
||||
resolve(__dirname, '..', '..', 'package.json'),
|
||||
resolve(process.cwd(), 'package.json'),
|
||||
resolve(__dirname, '..', '..', '..', 'package.json')
|
||||
];
|
||||
|
||||
for (const packagePath of possiblePaths) {
|
||||
if (existsSync(packagePath)) {
|
||||
const packageJson = JSON.parse(readFileSync(packagePath, 'utf-8'));
|
||||
if (packageJson.version) {
|
||||
return packageJson.version;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Fallback: try require (works in some environments)
|
||||
try {
|
||||
const packageJson = require('../../package.json');
|
||||
return packageJson.version || 'unknown';
|
||||
} catch {
|
||||
// Ignore require error
|
||||
}
|
||||
|
||||
return 'unknown';
|
||||
} catch (error) {
|
||||
logger.debug('Failed to get package version:', error);
|
||||
return 'unknown';
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute Supabase operation with retry and timeout
|
||||
*/
|
||||
private async executeWithRetry<T>(
|
||||
operation: () => Promise<T>,
|
||||
operationName: string
|
||||
): Promise<T | null> {
|
||||
let lastError: Error | null = null;
|
||||
|
||||
for (let attempt = 1; attempt <= TELEMETRY_CONFIG.MAX_RETRIES; attempt++) {
|
||||
try {
|
||||
// Create a timeout promise
|
||||
const timeoutPromise = new Promise<never>((_, reject) => {
|
||||
setTimeout(() => reject(new Error('Operation timed out')), TELEMETRY_CONFIG.OPERATION_TIMEOUT);
|
||||
});
|
||||
|
||||
// Race between operation and timeout
|
||||
const result = await Promise.race([operation(), timeoutPromise]) as T;
|
||||
return result;
|
||||
} catch (error) {
|
||||
lastError = error as Error;
|
||||
logger.debug(`${operationName} attempt ${attempt} failed:`, error);
|
||||
|
||||
if (attempt < TELEMETRY_CONFIG.MAX_RETRIES) {
|
||||
// Wait before retrying
|
||||
await new Promise(resolve => setTimeout(resolve, TELEMETRY_CONFIG.RETRY_DELAY * attempt));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
logger.debug(`${operationName} failed after ${TELEMETRY_CONFIG.MAX_RETRIES} attempts:`, lastError);
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Flush queued events to Supabase
|
||||
*/
|
||||
@@ -224,42 +342,66 @@ export class TelemetryManager {
|
||||
const events = [...this.eventQueue];
|
||||
this.eventQueue = [];
|
||||
|
||||
try {
|
||||
const { error } = await this.supabase
|
||||
await this.executeWithRetry(async () => {
|
||||
const { error } = await this.supabase!
|
||||
.from('telemetry_events')
|
||||
.insert(events); // No .select() - we don't need the response
|
||||
|
||||
if (error) {
|
||||
logger.debug('Failed to flush telemetry events:', error.message);
|
||||
} else {
|
||||
logger.debug(`Flushed ${events.length} telemetry events`);
|
||||
throw error;
|
||||
}
|
||||
} catch (error) {
|
||||
logger.debug('Error flushing telemetry events:', error);
|
||||
}
|
||||
|
||||
logger.debug(`Flushed ${events.length} telemetry events`);
|
||||
return true;
|
||||
}, 'Flush telemetry events');
|
||||
}
|
||||
|
||||
// Flush workflows
|
||||
if (this.workflowQueue.length > 0) {
|
||||
const workflows = [...this.workflowQueue];
|
||||
this.workflowQueue = [];
|
||||
this.isFlushingWorkflows = true;
|
||||
|
||||
try {
|
||||
// Use upsert to avoid duplicates based on workflow_hash
|
||||
const { error } = await this.supabase
|
||||
.from('telemetry_workflows')
|
||||
.upsert(workflows, {
|
||||
onConflict: 'workflow_hash',
|
||||
ignoreDuplicates: true,
|
||||
}); // No .select() - we don't need the response
|
||||
const workflows = [...this.workflowQueue];
|
||||
this.workflowQueue = [];
|
||||
|
||||
if (error) {
|
||||
logger.debug('Failed to flush telemetry workflows:', error.message);
|
||||
} else {
|
||||
logger.debug(`Flushed ${workflows.length} telemetry workflows`);
|
||||
const result = await this.executeWithRetry(async () => {
|
||||
// Deduplicate workflows by hash before inserting
|
||||
const uniqueWorkflows = workflows.reduce((acc, workflow) => {
|
||||
if (!acc.some(w => w.workflow_hash === workflow.workflow_hash)) {
|
||||
acc.push(workflow);
|
||||
}
|
||||
return acc;
|
||||
}, [] as WorkflowTelemetry[]);
|
||||
|
||||
logger.debug(`Deduplicating workflows: ${workflows.length} -> ${uniqueWorkflows.length} unique`);
|
||||
|
||||
// Use insert (same as events) - duplicates are handled by deduplication above
|
||||
const { error } = await this.supabase!
|
||||
.from('telemetry_workflows')
|
||||
.insert(uniqueWorkflows); // No .select() - we don't need the response
|
||||
|
||||
if (error) {
|
||||
logger.debug('Detailed workflow flush error:', {
|
||||
error: error,
|
||||
workflowCount: workflows.length,
|
||||
firstWorkflow: workflows[0] ? {
|
||||
user_id: workflows[0].user_id,
|
||||
workflow_hash: workflows[0].workflow_hash,
|
||||
node_count: workflows[0].node_count
|
||||
} : null
|
||||
});
|
||||
throw error;
|
||||
}
|
||||
|
||||
logger.debug(`Flushed ${uniqueWorkflows.length} unique telemetry workflows (${workflows.length} total processed)`);
|
||||
return true;
|
||||
}, 'Flush telemetry workflows');
|
||||
|
||||
if (!result) {
|
||||
logger.debug('Failed to flush workflows after retries');
|
||||
}
|
||||
} catch (error) {
|
||||
logger.debug('Error flushing telemetry workflows:', error);
|
||||
} finally {
|
||||
this.isFlushingWorkflows = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -268,10 +410,10 @@ export class TelemetryManager {
|
||||
* Start batch processor for periodic flushing
|
||||
*/
|
||||
private startBatchProcessor(): void {
|
||||
// Flush every 30 seconds
|
||||
// Flush periodically
|
||||
this.flushTimer = setInterval(() => {
|
||||
this.flush();
|
||||
}, 30000);
|
||||
}, TELEMETRY_CONFIG.BATCH_FLUSH_INTERVAL);
|
||||
|
||||
// Prevent timer from keeping process alive
|
||||
this.flushTimer.unref();
|
||||
@@ -387,5 +529,12 @@ export class TelemetryManager {
|
||||
}
|
||||
}
|
||||
|
||||
// Create a global singleton to ensure only one instance across all imports
|
||||
const globalAny = global as any;
|
||||
|
||||
if (!globalAny.__telemetryManager) {
|
||||
globalAny.__telemetryManager = TelemetryManager.getInstance();
|
||||
}
|
||||
|
||||
// Export singleton instance
|
||||
export const telemetry = TelemetryManager.getInstance();
|
||||
export const telemetry = globalAny.__telemetryManager as TelemetryManager;
|
||||
Reference in New Issue
Block a user