fix: implement Single-Session architecture to resolve MCP stream errors

- Add ConsoleManager to prevent console output interference with StreamableHTTPServerTransport
- Implement SingleSessionHTTPServer with persistent session reuse
- Create N8NMCPEngine for clean service integration
- Add automatic session expiry after 30 minutes of inactivity
- Update logger to be HTTP-aware during active requests
- Maintain backward compatibility with existing deployments

This fixes the "stream is not readable" error by implementing the Hybrid
Single-Session architecture as documented in MCP_ERROR_FIX_PLAN.md

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
czlonkowski
2025-06-14 15:02:49 +02:00
parent 88dd66bb7a
commit 2cb264fd56
13 changed files with 1894 additions and 51 deletions

View File

@@ -0,0 +1,361 @@
#!/usr/bin/env node
/**
* Single-Session HTTP server for n8n-MCP
* Implements Hybrid Single-Session Architecture for protocol compliance
* while maintaining simplicity for single-player use case
*/
import express from 'express';
import { StreamableHTTPServerTransport } from '@modelcontextprotocol/sdk/server/streamableHttp.js';
import { N8NDocumentationMCPServer } from './mcp/server-update';
import { ConsoleManager } from './utils/console-manager';
import { logger } from './utils/logger';
import dotenv from 'dotenv';
dotenv.config();
interface Session {
server: N8NDocumentationMCPServer;
transport: StreamableHTTPServerTransport;
lastAccess: Date;
sessionId: string;
}
export class SingleSessionHTTPServer {
private session: Session | null = null;
private consoleManager = new ConsoleManager();
private expressServer: any;
private sessionTimeout = 30 * 60 * 1000; // 30 minutes
constructor() {
// Validate environment on construction
this.validateEnvironment();
}
/**
* Validate required environment variables
*/
private validateEnvironment(): void {
const required = ['AUTH_TOKEN'];
const missing = required.filter(key => !process.env[key]);
if (missing.length > 0) {
const message = `Missing required environment variables: ${missing.join(', ')}`;
logger.error(message);
throw new Error(message);
}
if (process.env.AUTH_TOKEN && process.env.AUTH_TOKEN.length < 32) {
logger.warn('AUTH_TOKEN should be at least 32 characters for security');
}
}
/**
* Handle incoming MCP request
*/
async handleRequest(req: express.Request, res: express.Response): Promise<void> {
const startTime = Date.now();
// Wrap all operations to prevent console interference
return this.consoleManager.wrapOperation(async () => {
try {
// Ensure we have a valid session
if (!this.session || this.isExpired()) {
await this.resetSession();
}
// Update last access time
this.session!.lastAccess = new Date();
// Handle request with existing transport
await this.session!.transport.handleRequest(req, res);
// Log request duration
const duration = Date.now() - startTime;
logger.info('MCP request completed', {
duration,
method: req.body?.method,
sessionId: this.session!.sessionId
});
} catch (error) {
logger.error('MCP request error:', error);
if (!res.headersSent) {
res.status(500).json({
jsonrpc: '2.0',
error: {
code: -32603,
message: 'Internal server error',
data: process.env.NODE_ENV === 'development'
? (error as Error).message
: undefined
},
id: null
});
}
}
});
}
/**
* Reset the session - clean up old and create new
*/
private async resetSession(): Promise<void> {
// Clean up old session if exists
if (this.session) {
try {
logger.info('Closing previous session', { sessionId: this.session.sessionId });
await this.session.transport.close();
// Note: Don't close the server as it handles its own lifecycle
} catch (error) {
logger.warn('Error closing previous session:', error);
}
}
// Create new session
const server = new N8NDocumentationMCPServer();
const transport = new StreamableHTTPServerTransport({
sessionIdGenerator: () => 'single-session', // Always same ID for single-session
});
await server.connect(transport);
this.session = {
server,
transport,
lastAccess: new Date(),
sessionId: 'single-session'
};
logger.info('Created new single session', { sessionId: this.session.sessionId });
}
/**
* Check if current session is expired
*/
private isExpired(): boolean {
if (!this.session) return true;
return Date.now() - this.session.lastAccess.getTime() > this.sessionTimeout;
}
/**
* Start the HTTP server
*/
async start(): Promise<void> {
const app = express();
// Parse JSON with strict limits
app.use(express.json({
limit: '1mb',
strict: true
}));
// Security headers
app.use((req, res, next) => {
res.setHeader('X-Content-Type-Options', 'nosniff');
res.setHeader('X-Frame-Options', 'DENY');
res.setHeader('X-XSS-Protection', '1; mode=block');
res.setHeader('Strict-Transport-Security', 'max-age=31536000; includeSubDomains');
next();
});
// CORS configuration
app.use((req, res, next) => {
const allowedOrigin = process.env.CORS_ORIGIN || '*';
res.setHeader('Access-Control-Allow-Origin', allowedOrigin);
res.setHeader('Access-Control-Allow-Methods', 'POST, GET, OPTIONS');
res.setHeader('Access-Control-Allow-Headers', 'Content-Type, Authorization, Accept');
res.setHeader('Access-Control-Max-Age', '86400');
if (req.method === 'OPTIONS') {
res.sendStatus(204);
return;
}
next();
});
// Request logging middleware
app.use((req, res, next) => {
logger.info(`${req.method} ${req.path}`, {
ip: req.ip,
userAgent: req.get('user-agent'),
contentLength: req.get('content-length')
});
next();
});
// Health check endpoint
app.get('/health', (req, res) => {
res.json({
status: 'ok',
mode: 'single-session',
version: '2.3.1',
uptime: Math.floor(process.uptime()),
sessionActive: !!this.session,
sessionAge: this.session
? Math.floor((Date.now() - this.session.lastAccess.getTime()) / 1000)
: null,
memory: {
used: Math.round(process.memoryUsage().heapUsed / 1024 / 1024),
total: Math.round(process.memoryUsage().heapTotal / 1024 / 1024),
unit: 'MB'
},
timestamp: new Date().toISOString()
});
});
// Main MCP endpoint with authentication
app.post('/mcp', async (req: express.Request, res: express.Response): Promise<void> => {
// Simple auth check
const authHeader = req.headers.authorization;
const token = authHeader?.startsWith('Bearer ')
? authHeader.slice(7)
: authHeader;
if (token !== process.env.AUTH_TOKEN) {
logger.warn('Authentication failed', {
ip: req.ip,
userAgent: req.get('user-agent')
});
res.status(401).json({
jsonrpc: '2.0',
error: {
code: -32001,
message: 'Unauthorized'
},
id: null
});
return;
}
// Handle request with single session
await this.handleRequest(req, res);
});
// 404 handler
app.use((req, res) => {
res.status(404).json({
error: 'Not found',
message: `Cannot ${req.method} ${req.path}`
});
});
// Error handler
app.use((err: any, req: express.Request, res: express.Response, next: express.NextFunction) => {
logger.error('Express error handler:', err);
if (!res.headersSent) {
res.status(500).json({
jsonrpc: '2.0',
error: {
code: -32603,
message: 'Internal server error',
data: process.env.NODE_ENV === 'development' ? err.message : undefined
},
id: null
});
}
});
const port = parseInt(process.env.PORT || '3000');
const host = process.env.HOST || '0.0.0.0';
this.expressServer = app.listen(port, host, () => {
logger.info(`n8n MCP Single-Session HTTP Server started`, { port, host });
console.log(`n8n MCP Single-Session HTTP Server running on ${host}:${port}`);
console.log(`Health check: http://localhost:${port}/health`);
console.log(`MCP endpoint: http://localhost:${port}/mcp`);
console.log('\nPress Ctrl+C to stop the server');
});
// Handle server errors
this.expressServer.on('error', (error: any) => {
if (error.code === 'EADDRINUSE') {
logger.error(`Port ${port} is already in use`);
console.error(`ERROR: Port ${port} is already in use`);
process.exit(1);
} else {
logger.error('Server error:', error);
console.error('Server error:', error);
process.exit(1);
}
});
}
/**
* Graceful shutdown
*/
async shutdown(): Promise<void> {
logger.info('Shutting down Single-Session HTTP server...');
// Clean up session
if (this.session) {
try {
await this.session.transport.close();
logger.info('Session closed');
} catch (error) {
logger.warn('Error closing session:', error);
}
this.session = null;
}
// Close Express server
if (this.expressServer) {
await new Promise<void>((resolve) => {
this.expressServer.close(() => {
logger.info('HTTP server closed');
resolve();
});
});
}
}
/**
* Get current session info (for testing/debugging)
*/
getSessionInfo(): { active: boolean; sessionId?: string; age?: number } {
if (!this.session) {
return { active: false };
}
return {
active: true,
sessionId: this.session.sessionId,
age: Date.now() - this.session.lastAccess.getTime()
};
}
}
// Start if called directly
if (require.main === module) {
const server = new SingleSessionHTTPServer();
// Graceful shutdown handlers
const shutdown = async () => {
await server.shutdown();
process.exit(0);
};
process.on('SIGTERM', shutdown);
process.on('SIGINT', shutdown);
// Handle uncaught errors
process.on('uncaughtException', (error) => {
logger.error('Uncaught exception:', error);
console.error('Uncaught exception:', error);
shutdown();
});
process.on('unhandledRejection', (reason, promise) => {
logger.error('Unhandled rejection:', reason);
console.error('Unhandled rejection at:', promise, 'reason:', reason);
shutdown();
});
// Start server
server.start().catch(error => {
logger.error('Failed to start Single-Session HTTP server:', error);
console.error('Failed to start Single-Session HTTP server:', error);
process.exit(1);
});
}

View File

@@ -4,48 +4,15 @@
* Licensed under the Sustainable Use License v1.0
*/
import dotenv from 'dotenv';
import { N8NMCPServer } from './mcp/server';
import { MCPServerConfig, N8NConfig } from './types';
import { logger } from './utils/logger';
// Engine exports for service integration
export { N8NMCPEngine, EngineHealth, EngineOptions } from './mcp-engine';
export { SingleSessionHTTPServer } from './http-server-single-session';
export { ConsoleManager } from './utils/console-manager';
export { N8NDocumentationMCPServer } from './mcp/server-update';
// Load environment variables
dotenv.config();
// Default export for convenience
import N8NMCPEngine from './mcp-engine';
export default N8NMCPEngine;
async function main() {
const config: MCPServerConfig = {
port: parseInt(process.env.MCP_SERVER_PORT || '3000', 10),
host: process.env.MCP_SERVER_HOST || 'localhost',
authToken: process.env.MCP_AUTH_TOKEN,
};
const n8nConfig: N8NConfig = {
apiUrl: process.env.N8N_API_URL || 'http://localhost:5678',
apiKey: process.env.N8N_API_KEY || '',
};
const server = new N8NMCPServer(config, n8nConfig);
try {
await server.start();
} catch (error) {
logger.error('Failed to start MCP server:', error);
process.exit(1);
}
}
// Handle graceful shutdown
process.on('SIGINT', () => {
logger.info('Received SIGINT, shutting down MCP server...');
process.exit(0);
});
process.on('SIGTERM', () => {
logger.info('Received SIGTERM, shutting down MCP server...');
process.exit(0);
});
main().catch((error) => {
logger.error('Unhandled error:', error);
process.exit(1);
});
// Legacy CLI functionality - moved to ./mcp/index.ts
// This file now serves as the main entry point for library usage

170
src/mcp-engine.ts Normal file
View File

@@ -0,0 +1,170 @@
/**
* N8N MCP Engine - Clean interface for service integration
*
* This class provides a simple API for integrating the n8n-MCP server
* into larger services. The wrapping service handles authentication,
* multi-tenancy, rate limiting, etc.
*/
import { Request, Response } from 'express';
import { SingleSessionHTTPServer } from './http-server-single-session';
import { logger } from './utils/logger';
export interface EngineHealth {
status: 'healthy' | 'unhealthy';
uptime: number;
sessionActive: boolean;
memoryUsage: {
used: number;
total: number;
unit: string;
};
version: string;
}
export interface EngineOptions {
sessionTimeout?: number;
logLevel?: string;
}
export class N8NMCPEngine {
private server: SingleSessionHTTPServer;
private startTime: Date;
constructor(options: EngineOptions = {}) {
this.server = new SingleSessionHTTPServer();
this.startTime = new Date();
if (options.logLevel) {
process.env.LOG_LEVEL = options.logLevel;
}
}
/**
* Process a single MCP request
* The wrapping service handles authentication, multi-tenancy, etc.
*
* @example
* // In your service
* const engine = new N8NMCPEngine();
*
* app.post('/api/users/:userId/mcp', authenticate, async (req, res) => {
* // Your service handles auth, rate limiting, user context
* await engine.processRequest(req, res);
* });
*/
async processRequest(req: Request, res: Response): Promise<void> {
try {
await this.server.handleRequest(req, res);
} catch (error) {
logger.error('Engine processRequest error:', error);
throw error;
}
}
/**
* Health check for service monitoring
*
* @example
* app.get('/health', async (req, res) => {
* const health = await engine.healthCheck();
* res.status(health.status === 'healthy' ? 200 : 503).json(health);
* });
*/
async healthCheck(): Promise<EngineHealth> {
try {
const sessionInfo = this.server.getSessionInfo();
const memoryUsage = process.memoryUsage();
return {
status: 'healthy',
uptime: Math.floor((Date.now() - this.startTime.getTime()) / 1000),
sessionActive: sessionInfo.active,
memoryUsage: {
used: Math.round(memoryUsage.heapUsed / 1024 / 1024),
total: Math.round(memoryUsage.heapTotal / 1024 / 1024),
unit: 'MB'
},
version: '2.3.1'
};
} catch (error) {
logger.error('Health check failed:', error);
return {
status: 'unhealthy',
uptime: 0,
sessionActive: false,
memoryUsage: { used: 0, total: 0, unit: 'MB' },
version: '2.3.1'
};
}
}
/**
* Get current session information
* Useful for monitoring and debugging
*/
getSessionInfo(): { active: boolean; sessionId?: string; age?: number } {
return this.server.getSessionInfo();
}
/**
* Graceful shutdown for service lifecycle
*
* @example
* process.on('SIGTERM', async () => {
* await engine.shutdown();
* process.exit(0);
* });
*/
async shutdown(): Promise<void> {
logger.info('Shutting down N8N MCP Engine...');
await this.server.shutdown();
}
/**
* Start the engine (if using standalone mode)
* For embedded use, this is not necessary
*/
async start(): Promise<void> {
await this.server.start();
}
}
/**
* Example usage in a multi-tenant service:
*
* ```typescript
* import { N8NMCPEngine } from 'n8n-mcp/engine';
* import express from 'express';
*
* const app = express();
* const engine = new N8NMCPEngine();
*
* // Middleware for authentication
* const authenticate = (req, res, next) => {
* // Your auth logic
* req.userId = 'user123';
* next();
* };
*
* // MCP endpoint with multi-tenant support
* app.post('/api/mcp/:userId', authenticate, async (req, res) => {
* // Log usage for billing
* await logUsage(req.userId, 'mcp-request');
*
* // Rate limiting
* if (await isRateLimited(req.userId)) {
* return res.status(429).json({ error: 'Rate limited' });
* }
*
* // Process request
* await engine.processRequest(req, res);
* });
*
* // Health endpoint
* app.get('/health', async (req, res) => {
* const health = await engine.healthCheck();
* res.json(health);
* });
* ```
*/
export default N8NMCPEngine;

View File

@@ -25,9 +25,20 @@ async function main() {
console.error('Node version:', process.version);
if (mode === 'http') {
// HTTP mode - for remote deployment
const { startHTTPServer } = await import('../http-server');
await startHTTPServer();
// HTTP mode - for remote deployment with single-session architecture
const { SingleSessionHTTPServer } = await import('../http-server-single-session');
const server = new SingleSessionHTTPServer();
// Graceful shutdown handlers
const shutdown = async () => {
await server.shutdown();
process.exit(0);
};
process.on('SIGTERM', shutdown);
process.on('SIGINT', shutdown);
await server.start();
} else {
// Stdio mode - for local Claude Desktop
const server = new N8NDocumentationMCPServer();

View File

@@ -0,0 +1,232 @@
import { SingleSessionHTTPServer } from '../http-server-single-session';
import express from 'express';
import { ConsoleManager } from '../utils/console-manager';
// Mock express Request and Response
const createMockRequest = (body: any = {}): express.Request => {
return {
body,
headers: {
authorization: `Bearer ${process.env.AUTH_TOKEN || 'test-token'}`
},
method: 'POST',
path: '/mcp',
ip: '127.0.0.1',
get: (header: string) => {
if (header === 'user-agent') return 'test-agent';
if (header === 'content-length') return '100';
return null;
}
} as any;
};
const createMockResponse = (): express.Response => {
const res: any = {
statusCode: 200,
headers: {},
body: null,
headersSent: false,
status: function(code: number) {
this.statusCode = code;
return this;
},
json: function(data: any) {
this.body = data;
this.headersSent = true;
return this;
},
setHeader: function(name: string, value: string) {
this.headers[name] = value;
return this;
},
on: function(event: string, callback: Function) {
// Simple event emitter mock
return this;
}
};
return res;
};
describe('SingleSessionHTTPServer', () => {
let server: SingleSessionHTTPServer;
beforeAll(() => {
process.env.AUTH_TOKEN = 'test-token';
process.env.MCP_MODE = 'http';
});
beforeEach(() => {
server = new SingleSessionHTTPServer();
});
afterEach(async () => {
await server.shutdown();
});
describe('Console Management', () => {
it('should silence console during request handling', async () => {
const consoleManager = new ConsoleManager();
const originalLog = console.log;
// Create spy functions
const logSpy = jest.fn();
console.log = logSpy;
// Test console is silenced during operation
await consoleManager.wrapOperation(() => {
console.log('This should not appear');
expect(logSpy).not.toHaveBeenCalled();
});
// Test console is restored after operation
console.log('This should appear');
expect(logSpy).toHaveBeenCalledWith('This should appear');
// Restore original
console.log = originalLog;
});
it('should handle errors and still restore console', async () => {
const consoleManager = new ConsoleManager();
const originalError = console.error;
try {
await consoleManager.wrapOperation(() => {
throw new Error('Test error');
});
} catch (error) {
// Expected error
}
// Verify console was restored
expect(console.error).toBe(originalError);
});
});
describe('Session Management', () => {
it('should create a single session on first request', async () => {
const req = createMockRequest({ method: 'tools/list' });
const res = createMockResponse();
const sessionInfoBefore = server.getSessionInfo();
expect(sessionInfoBefore.active).toBe(false);
await server.handleRequest(req, res);
const sessionInfoAfter = server.getSessionInfo();
expect(sessionInfoAfter.active).toBe(true);
expect(sessionInfoAfter.sessionId).toBe('single-session');
});
it('should reuse the same session for multiple requests', async () => {
const req1 = createMockRequest({ method: 'tools/list' });
const res1 = createMockResponse();
const req2 = createMockRequest({ method: 'get_node_info' });
const res2 = createMockResponse();
// First request creates session
await server.handleRequest(req1, res1);
const session1 = server.getSessionInfo();
// Second request reuses session
await server.handleRequest(req2, res2);
const session2 = server.getSessionInfo();
expect(session1.sessionId).toBe(session2.sessionId);
expect(session2.sessionId).toBe('single-session');
});
it('should handle authentication correctly', async () => {
const reqNoAuth = createMockRequest({ method: 'tools/list' });
delete reqNoAuth.headers.authorization;
const resNoAuth = createMockResponse();
await server.handleRequest(reqNoAuth, resNoAuth);
expect(resNoAuth.statusCode).toBe(401);
expect(resNoAuth.body).toEqual({
jsonrpc: '2.0',
error: {
code: -32001,
message: 'Unauthorized'
},
id: null
});
});
it('should handle invalid auth token', async () => {
const reqBadAuth = createMockRequest({ method: 'tools/list' });
reqBadAuth.headers.authorization = 'Bearer wrong-token';
const resBadAuth = createMockResponse();
await server.handleRequest(reqBadAuth, resBadAuth);
expect(resBadAuth.statusCode).toBe(401);
});
});
describe('Session Expiry', () => {
it('should detect expired sessions', () => {
// This would require mocking timers or exposing internal state
// For now, we'll test the concept
const sessionInfo = server.getSessionInfo();
expect(sessionInfo.active).toBe(false);
});
});
describe('Error Handling', () => {
it('should handle server errors gracefully', async () => {
const req = createMockRequest({ invalid: 'data' });
const res = createMockResponse();
// This might not cause an error with the current implementation
// but demonstrates error handling structure
await server.handleRequest(req, res);
// Should not throw, should return error response
if (res.statusCode === 500) {
expect(res.body).toHaveProperty('error');
expect(res.body.error).toHaveProperty('code', -32603);
}
});
});
});
describe('ConsoleManager', () => {
it('should only silence in HTTP mode', () => {
const originalMode = process.env.MCP_MODE;
process.env.MCP_MODE = 'stdio';
const consoleManager = new ConsoleManager();
const originalLog = console.log;
consoleManager.silence();
expect(console.log).toBe(originalLog); // Should not change
process.env.MCP_MODE = originalMode;
});
it('should track silenced state', () => {
process.env.MCP_MODE = 'http';
const consoleManager = new ConsoleManager();
expect(consoleManager.isActive).toBe(false);
consoleManager.silence();
expect(consoleManager.isActive).toBe(true);
consoleManager.restore();
expect(consoleManager.isActive).toBe(false);
});
it('should handle nested calls correctly', () => {
process.env.MCP_MODE = 'http';
const consoleManager = new ConsoleManager();
const originalLog = console.log;
consoleManager.silence();
consoleManager.silence(); // Second call should be no-op
expect(consoleManager.isActive).toBe(true);
consoleManager.restore();
expect(console.log).toBe(originalLog);
});
});

View File

@@ -0,0 +1,83 @@
/**
* Console Manager for MCP HTTP Server
*
* Prevents console output from interfering with StreamableHTTPServerTransport
* by silencing console methods during MCP request handling.
*/
export class ConsoleManager {
private originalConsole = {
log: console.log,
error: console.error,
warn: console.warn,
info: console.info,
debug: console.debug,
trace: console.trace
};
private isSilenced = false;
/**
* Silence all console output
*/
public silence(): void {
if (this.isSilenced || process.env.MCP_MODE !== 'http') {
return;
}
this.isSilenced = true;
process.env.MCP_REQUEST_ACTIVE = 'true';
console.log = () => {};
console.error = () => {};
console.warn = () => {};
console.info = () => {};
console.debug = () => {};
console.trace = () => {};
}
/**
* Restore original console methods
*/
public restore(): void {
if (!this.isSilenced) {
return;
}
this.isSilenced = false;
process.env.MCP_REQUEST_ACTIVE = 'false';
console.log = this.originalConsole.log;
console.error = this.originalConsole.error;
console.warn = this.originalConsole.warn;
console.info = this.originalConsole.info;
console.debug = this.originalConsole.debug;
console.trace = this.originalConsole.trace;
}
/**
* Wrap an operation with console silencing
* Automatically restores console on completion or error
*/
public async wrapOperation<T>(operation: () => T | Promise<T>): Promise<T> {
this.silence();
try {
const result = operation();
if (result instanceof Promise) {
return await result.finally(() => this.restore());
}
this.restore();
return result;
} catch (error) {
this.restore();
throw error;
}
}
/**
* Check if console is currently silenced
*/
public get isActive(): boolean {
return this.isSilenced;
}
}
// Export singleton instance for easy use
export const consoleManager = new ConsoleManager();

View File

@@ -14,6 +14,8 @@ export interface LoggerConfig {
export class Logger {
private config: LoggerConfig;
private static instance: Logger;
private useFileLogging = false;
private fileStream: any = null;
constructor(config?: Partial<LoggerConfig>) {
this.config = {
@@ -52,6 +54,13 @@ export class Logger {
if (level <= this.config.level) {
const formattedMessage = this.formatMessage(levelName, message);
// In HTTP mode during request handling, suppress console output
// The ConsoleManager will handle this, but we add a safety check
if (process.env.MCP_MODE === 'http' && process.env.MCP_REQUEST_ACTIVE === 'true') {
// Silently drop the log during active MCP requests
return;
}
switch (level) {
case LogLevel.ERROR:
console.error(formattedMessage, ...args);