Implement n8n-MCP integration

This commit adds a complete integration between n8n workflow automation and the Model Context Protocol (MCP):

Features:
- MCP server that exposes n8n workflows as tools, resources, and prompts
- Custom n8n node for connecting to MCP servers from workflows
- Bidirectional bridge for data format conversion
- Token-based authentication and credential management
- Comprehensive error handling and logging
- Full test coverage for core components

Infrastructure:
- TypeScript/Node.js project setup with proper build configuration
- Docker support with multi-stage builds
- Development and production docker-compose configurations
- Installation script for n8n custom node deployment

Documentation:
- Detailed README with usage examples and API reference
- Environment configuration templates
- Troubleshooting guide

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
czlonkowski
2025-06-07 15:43:02 +00:00
parent b51591a87d
commit 1f8140c45c
28 changed files with 17543 additions and 0 deletions

100
src/utils/auth.ts Normal file
View File

@@ -0,0 +1,100 @@
import crypto from 'crypto';
export class AuthManager {
private validTokens: Set<string>;
private tokenExpiry: Map<string, number>;
constructor() {
this.validTokens = new Set();
this.tokenExpiry = new Map();
}
/**
* Validate an authentication token
*/
validateToken(token: string | undefined, expectedToken?: string): boolean {
if (!expectedToken) {
// No authentication required
return true;
}
if (!token) {
return false;
}
// Check static token
if (token === expectedToken) {
return true;
}
// Check dynamic tokens
if (this.validTokens.has(token)) {
const expiry = this.tokenExpiry.get(token);
if (expiry && expiry > Date.now()) {
return true;
} else {
// Token expired
this.validTokens.delete(token);
this.tokenExpiry.delete(token);
return false;
}
}
return false;
}
/**
* Generate a new authentication token
*/
generateToken(expiryHours: number = 24): string {
const token = crypto.randomBytes(32).toString('hex');
const expiryTime = Date.now() + (expiryHours * 60 * 60 * 1000);
this.validTokens.add(token);
this.tokenExpiry.set(token, expiryTime);
// Clean up expired tokens
this.cleanupExpiredTokens();
return token;
}
/**
* Revoke a token
*/
revokeToken(token: string): void {
this.validTokens.delete(token);
this.tokenExpiry.delete(token);
}
/**
* Clean up expired tokens
*/
private cleanupExpiredTokens(): void {
const now = Date.now();
for (const [token, expiry] of this.tokenExpiry.entries()) {
if (expiry <= now) {
this.validTokens.delete(token);
this.tokenExpiry.delete(token);
}
}
}
/**
* Hash a password or token for secure storage
*/
static hashToken(token: string): string {
return crypto.createHash('sha256').update(token).digest('hex');
}
/**
* Compare a plain token with a hashed token
*/
static compareTokens(plainToken: string, hashedToken: string): boolean {
const hashedPlainToken = AuthManager.hashToken(plainToken);
return crypto.timingSafeEqual(
Buffer.from(hashedPlainToken),
Buffer.from(hashedToken)
);
}
}

166
src/utils/bridge.ts Normal file
View File

@@ -0,0 +1,166 @@
import { INodeExecutionData, IDataObject } from 'n8n-workflow';
export class N8NMCPBridge {
/**
* Convert n8n workflow data to MCP tool arguments
*/
static n8nToMCPToolArgs(data: IDataObject): any {
// Handle different data formats from n8n
if (data.json) {
return data.json;
}
// Remove n8n-specific metadata
const { pairedItem, ...cleanData } = data;
return cleanData;
}
/**
* Convert MCP tool response to n8n execution data
*/
static mcpToN8NExecutionData(mcpResponse: any, itemIndex: number = 0): INodeExecutionData {
// Handle MCP content array format
if (mcpResponse.content && Array.isArray(mcpResponse.content)) {
const textContent = mcpResponse.content
.filter((c: any) => c.type === 'text')
.map((c: any) => c.text)
.join('\n');
try {
// Try to parse as JSON if possible
const parsed = JSON.parse(textContent);
return {
json: parsed,
pairedItem: itemIndex,
};
} catch {
// Return as text if not JSON
return {
json: { result: textContent },
pairedItem: itemIndex,
};
}
}
// Handle direct object response
return {
json: mcpResponse,
pairedItem: itemIndex,
};
}
/**
* Convert n8n workflow definition to MCP-compatible format
*/
static n8nWorkflowToMCP(workflow: any): any {
return {
id: workflow.id,
name: workflow.name,
description: workflow.description || '',
nodes: workflow.nodes?.map((node: any) => ({
id: node.id,
type: node.type,
name: node.name,
parameters: node.parameters,
position: node.position,
})),
connections: workflow.connections,
settings: workflow.settings,
metadata: {
createdAt: workflow.createdAt,
updatedAt: workflow.updatedAt,
active: workflow.active,
},
};
}
/**
* Convert MCP workflow format to n8n-compatible format
*/
static mcpToN8NWorkflow(mcpWorkflow: any): any {
return {
name: mcpWorkflow.name,
nodes: mcpWorkflow.nodes || [],
connections: mcpWorkflow.connections || {},
settings: mcpWorkflow.settings || {
executionOrder: 'v1',
},
staticData: null,
pinData: {},
};
}
/**
* Convert n8n execution data to MCP resource format
*/
static n8nExecutionToMCPResource(execution: any): any {
return {
uri: `execution://${execution.id}`,
name: `Execution ${execution.id}`,
description: `Workflow: ${execution.workflowData?.name || 'Unknown'}`,
mimeType: 'application/json',
data: {
id: execution.id,
workflowId: execution.workflowId,
status: execution.finished ? 'completed' : execution.stoppedAt ? 'stopped' : 'running',
mode: execution.mode,
startedAt: execution.startedAt,
stoppedAt: execution.stoppedAt,
error: execution.data?.resultData?.error,
executionData: execution.data,
},
};
}
/**
* Convert MCP prompt arguments to n8n-compatible format
*/
static mcpPromptArgsToN8N(promptArgs: any): IDataObject {
return {
prompt: promptArgs.name || '',
arguments: promptArgs.arguments || {},
messages: promptArgs.messages || [],
};
}
/**
* Validate and sanitize data before conversion
*/
static sanitizeData(data: any): any {
if (data === null || data === undefined) {
return {};
}
if (typeof data !== 'object') {
return { value: data };
}
// Remove circular references
const seen = new WeakSet();
return JSON.parse(JSON.stringify(data, (_key, value) => {
if (typeof value === 'object' && value !== null) {
if (seen.has(value)) {
return '[Circular]';
}
seen.add(value);
}
return value;
}));
}
/**
* Extract error information for both n8n and MCP formats
*/
static formatError(error: any): any {
return {
message: error.message || 'Unknown error',
type: error.name || 'Error',
stack: error.stack,
details: {
code: error.code,
statusCode: error.statusCode,
data: error.data,
},
};
}
}

View File

@@ -0,0 +1,95 @@
import { logger } from './logger';
export class MCPError extends Error {
public code: string;
public statusCode?: number;
public data?: any;
constructor(message: string, code: string, statusCode?: number, data?: any) {
super(message);
this.name = 'MCPError';
this.code = code;
this.statusCode = statusCode;
this.data = data;
}
}
export class N8NConnectionError extends MCPError {
constructor(message: string, data?: any) {
super(message, 'N8N_CONNECTION_ERROR', 503, data);
this.name = 'N8NConnectionError';
}
}
export class AuthenticationError extends MCPError {
constructor(message: string = 'Authentication failed') {
super(message, 'AUTH_ERROR', 401);
this.name = 'AuthenticationError';
}
}
export class ValidationError extends MCPError {
constructor(message: string, data?: any) {
super(message, 'VALIDATION_ERROR', 400, data);
this.name = 'ValidationError';
}
}
export class ToolNotFoundError extends MCPError {
constructor(toolName: string) {
super(`Tool '${toolName}' not found`, 'TOOL_NOT_FOUND', 404);
this.name = 'ToolNotFoundError';
}
}
export class ResourceNotFoundError extends MCPError {
constructor(resourceUri: string) {
super(`Resource '${resourceUri}' not found`, 'RESOURCE_NOT_FOUND', 404);
this.name = 'ResourceNotFoundError';
}
}
export function handleError(error: any): MCPError {
if (error instanceof MCPError) {
return error;
}
if (error.response) {
// HTTP error from n8n API
const status = error.response.status;
const message = error.response.data?.message || error.message;
if (status === 401) {
return new AuthenticationError(message);
} else if (status === 404) {
return new MCPError(message, 'NOT_FOUND', 404);
} else if (status >= 500) {
return new N8NConnectionError(message);
}
return new MCPError(message, 'API_ERROR', status);
}
if (error.code === 'ECONNREFUSED') {
return new N8NConnectionError('Cannot connect to n8n API');
}
// Generic error
return new MCPError(
error.message || 'An unexpected error occurred',
'UNKNOWN_ERROR',
500
);
}
export async function withErrorHandling<T>(
operation: () => Promise<T>,
context: string
): Promise<T> {
try {
return await operation();
} catch (error) {
logger.error(`Error in ${context}:`, error);
throw handleError(error);
}
}

106
src/utils/logger.ts Normal file
View File

@@ -0,0 +1,106 @@
export enum LogLevel {
ERROR = 0,
WARN = 1,
INFO = 2,
DEBUG = 3,
}
export interface LoggerConfig {
level: LogLevel;
prefix?: string;
timestamp?: boolean;
}
export class Logger {
private config: LoggerConfig;
private static instance: Logger;
constructor(config?: Partial<LoggerConfig>) {
this.config = {
level: LogLevel.INFO,
prefix: 'n8n-mcp',
timestamp: true,
...config,
};
}
static getInstance(config?: Partial<LoggerConfig>): Logger {
if (!Logger.instance) {
Logger.instance = new Logger(config);
}
return Logger.instance;
}
private formatMessage(level: string, message: string): string {
const parts: string[] = [];
if (this.config.timestamp) {
parts.push(`[${new Date().toISOString()}]`);
}
if (this.config.prefix) {
parts.push(`[${this.config.prefix}]`);
}
parts.push(`[${level}]`);
parts.push(message);
return parts.join(' ');
}
private log(level: LogLevel, levelName: string, message: string, ...args: any[]): void {
if (level <= this.config.level) {
const formattedMessage = this.formatMessage(levelName, message);
switch (level) {
case LogLevel.ERROR:
console.error(formattedMessage, ...args);
break;
case LogLevel.WARN:
console.warn(formattedMessage, ...args);
break;
default:
console.log(formattedMessage, ...args);
}
}
}
error(message: string, ...args: any[]): void {
this.log(LogLevel.ERROR, 'ERROR', message, ...args);
}
warn(message: string, ...args: any[]): void {
this.log(LogLevel.WARN, 'WARN', message, ...args);
}
info(message: string, ...args: any[]): void {
this.log(LogLevel.INFO, 'INFO', message, ...args);
}
debug(message: string, ...args: any[]): void {
this.log(LogLevel.DEBUG, 'DEBUG', message, ...args);
}
setLevel(level: LogLevel): void {
this.config.level = level;
}
static parseLogLevel(level: string): LogLevel {
switch (level.toLowerCase()) {
case 'error':
return LogLevel.ERROR;
case 'warn':
return LogLevel.WARN;
case 'debug':
return LogLevel.DEBUG;
case 'info':
default:
return LogLevel.INFO;
}
}
}
// Create a default logger instance
export const logger = Logger.getInstance({
level: Logger.parseLogLevel(process.env.LOG_LEVEL || 'info'),
});

150
src/utils/mcp-client.ts Normal file
View File

@@ -0,0 +1,150 @@
import { Client } from '@modelcontextprotocol/sdk/client/index.js';
import { StdioClientTransport } from '@modelcontextprotocol/sdk/client/stdio.js';
import { WebSocketClientTransport } from '@modelcontextprotocol/sdk/client/websocket.js';
import {
CallToolRequest,
ListToolsRequest,
ListResourcesRequest,
ReadResourceRequest,
ListPromptsRequest,
GetPromptRequest,
CallToolResultSchema,
ListToolsResultSchema,
ListResourcesResultSchema,
ReadResourceResultSchema,
ListPromptsResultSchema,
GetPromptResultSchema,
} from '@modelcontextprotocol/sdk/types.js';
export interface MCPClientConfig {
serverUrl: string;
authToken?: string;
connectionType: 'http' | 'websocket' | 'stdio';
}
export class MCPClient {
private client: Client;
private config: MCPClientConfig;
private connected: boolean = false;
constructor(config: MCPClientConfig) {
this.config = config;
this.client = new Client(
{
name: 'n8n-mcp-client',
version: '1.0.0',
},
{
capabilities: {},
}
);
}
async connect(): Promise<void> {
if (this.connected) {
return;
}
let transport;
switch (this.config.connectionType) {
case 'websocket':
const wsUrl = this.config.serverUrl.replace(/^http/, 'ws');
transport = new WebSocketClientTransport(new URL(wsUrl));
break;
case 'stdio':
// For stdio, the serverUrl should be the command to execute
const [command, ...args] = this.config.serverUrl.split(' ');
transport = new StdioClientTransport({
command,
args,
});
break;
default:
throw new Error(`HTTP transport is not yet supported for MCP clients`);
}
await this.client.connect(transport);
this.connected = true;
}
async disconnect(): Promise<void> {
if (this.connected) {
await this.client.close();
this.connected = false;
}
}
async listTools(): Promise<any> {
await this.ensureConnected();
return await this.client.request(
{ method: 'tools/list' } as ListToolsRequest,
ListToolsResultSchema
);
}
async callTool(name: string, args: any): Promise<any> {
await this.ensureConnected();
return await this.client.request(
{
method: 'tools/call',
params: {
name,
arguments: args,
},
} as CallToolRequest,
CallToolResultSchema
);
}
async listResources(): Promise<any> {
await this.ensureConnected();
return await this.client.request(
{ method: 'resources/list' } as ListResourcesRequest,
ListResourcesResultSchema
);
}
async readResource(uri: string): Promise<any> {
await this.ensureConnected();
return await this.client.request(
{
method: 'resources/read',
params: {
uri,
},
} as ReadResourceRequest,
ReadResourceResultSchema
);
}
async listPrompts(): Promise<any> {
await this.ensureConnected();
return await this.client.request(
{ method: 'prompts/list' } as ListPromptsRequest,
ListPromptsResultSchema
);
}
async getPrompt(name: string, args?: any): Promise<any> {
await this.ensureConnected();
return await this.client.request(
{
method: 'prompts/get',
params: {
name,
arguments: args,
},
} as GetPromptRequest,
GetPromptResultSchema
);
}
private async ensureConnected(): Promise<void> {
if (!this.connected) {
await this.connect();
}
}
}

141
src/utils/n8n-client.ts Normal file
View File

@@ -0,0 +1,141 @@
import { N8NConfig } from '../types';
export class N8NApiClient {
private config: N8NConfig;
private headers: Record<string, string>;
constructor(config: N8NConfig) {
this.config = config;
this.headers = {
'Content-Type': 'application/json',
'X-N8N-API-KEY': config.apiKey,
};
}
private async request(endpoint: string, options: RequestInit = {}): Promise<any> {
const url = `${this.config.apiUrl}/api/v1${endpoint}`;
try {
const response = await fetch(url, {
...options,
headers: {
...this.headers,
...options.headers,
},
});
if (!response.ok) {
const error = await response.text();
throw new Error(`n8n API error: ${response.status} - ${error}`);
}
return await response.json();
} catch (error) {
throw new Error(`Failed to connect to n8n: ${error instanceof Error ? error.message : 'Unknown error'}`);
}
}
// Workflow operations
async getWorkflows(filters?: { active?: boolean; tags?: string[] }): Promise<any> {
const query = new URLSearchParams();
if (filters?.active !== undefined) {
query.append('active', filters.active.toString());
}
if (filters?.tags?.length) {
query.append('tags', filters.tags.join(','));
}
return this.request(`/workflows${query.toString() ? `?${query}` : ''}`);
}
async getWorkflow(id: string): Promise<any> {
return this.request(`/workflows/${id}`);
}
async createWorkflow(workflowData: any): Promise<any> {
return this.request('/workflows', {
method: 'POST',
body: JSON.stringify(workflowData),
});
}
async updateWorkflow(id: string, updates: any): Promise<any> {
return this.request(`/workflows/${id}`, {
method: 'PATCH',
body: JSON.stringify(updates),
});
}
async deleteWorkflow(id: string): Promise<any> {
return this.request(`/workflows/${id}`, {
method: 'DELETE',
});
}
async activateWorkflow(id: string): Promise<any> {
return this.request(`/workflows/${id}/activate`, {
method: 'POST',
});
}
async deactivateWorkflow(id: string): Promise<any> {
return this.request(`/workflows/${id}/deactivate`, {
method: 'POST',
});
}
// Execution operations
async executeWorkflow(id: string, data?: any): Promise<any> {
return this.request(`/workflows/${id}/execute`, {
method: 'POST',
body: JSON.stringify({ data }),
});
}
async getExecutions(filters?: {
workflowId?: string;
status?: string;
limit?: number
}): Promise<any> {
const query = new URLSearchParams();
if (filters?.workflowId) {
query.append('workflowId', filters.workflowId);
}
if (filters?.status) {
query.append('status', filters.status);
}
if (filters?.limit) {
query.append('limit', filters.limit.toString());
}
return this.request(`/executions${query.toString() ? `?${query}` : ''}`);
}
async getExecution(id: string): Promise<any> {
return this.request(`/executions/${id}`);
}
async deleteExecution(id: string): Promise<any> {
return this.request(`/executions/${id}`, {
method: 'DELETE',
});
}
// Credential operations
async getCredentialTypes(): Promise<any> {
return this.request('/credential-types');
}
async getCredentials(): Promise<any> {
return this.request('/credentials');
}
// Node operations
async getNodeTypes(): Promise<any> {
return this.request('/node-types');
}
async getNodeType(nodeType: string): Promise<any> {
return this.request(`/node-types/${nodeType}`);
}
}