fix: resolve SSE reconnection loop with separate /sse + /messages endpoints (v2.46.1) (#699)

Fix SSE clients entering rapid reconnection loops because POST /mcp
never routed messages to SSEServerTransport.handlePostMessage() (#617).

Root cause: SSE sessions were stored in a separate `this.session` property
invisible to the StreamableHTTP POST handler. The POST handler only
checked `this.transports` (StreamableHTTP map), so SSE messages were
never delivered, causing immediate reconnection and rate limiter exhaustion.

Changes:
- Add GET /sse + POST /messages endpoints following the official MCP SDK
  backward-compatible server pattern (separate endpoints per transport)
- Store SSE transports in the shared this.transports map with instanceof
  guards for type discrimination
- Remove legacy this.session singleton, resetSessionSSE(), and isExpired()
- Extract duplicated auth logic into authenticateRequest() method
- Add Bearer token auth and rate limiting to SSE endpoints
- Add skipSuccessfulRequests to authLimiter to prevent 429 storms
- Mark SSE transport as deprecated (removed in MCP SDK v2.x)

The handleRequest() codepath used by the downstream SaaS backend
(N8NMCPEngine.processRequest()) is unchanged. Session persistence
(exportSessionState/restoreSessionState) is unchanged.

Closes #617

Conceived by Romuald Członkowski - https://www.aiadvisors.pl/en

Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Romuald Członkowski
2026-04-03 14:00:53 +02:00
committed by GitHub
parent 2d4115530c
commit 12d7d5bdb6
10 changed files with 493 additions and 437 deletions

View File

@@ -46,15 +46,6 @@ interface MultiTenantHeaders {
const MAX_SESSIONS = Math.max(1, parseInt(process.env.N8N_MCP_MAX_SESSIONS || '100', 10));
const SESSION_CLEANUP_INTERVAL = 5 * 60 * 1000; // 5 minutes
interface Session {
server: N8NDocumentationMCPServer;
transport: StreamableHTTPServerTransport | SSEServerTransport;
lastAccess: Date;
sessionId: string;
initialized: boolean;
isSSE: boolean;
}
interface SessionMetrics {
totalSessions: number;
activeSessions: number;
@@ -104,12 +95,12 @@ export interface SingleSessionHTTPServerOptions {
export class SingleSessionHTTPServer {
// Map to store transports by session ID (following SDK pattern)
private transports: { [sessionId: string]: StreamableHTTPServerTransport } = {};
// Stores both StreamableHTTP and SSE transports; use instanceof to discriminate
private transports: { [sessionId: string]: StreamableHTTPServerTransport | SSEServerTransport } = {};
private servers: { [sessionId: string]: N8NDocumentationMCPServer } = {};
private sessionMetadata: { [sessionId: string]: { lastAccess: Date; createdAt: Date } } = {};
private sessionContexts: { [sessionId: string]: InstanceContext | undefined } = {};
private contextSwitchLocks: Map<string, Promise<void>> = new Map();
private session: Session | null = null; // Keep for SSE compatibility
private consoleManager = new ConsoleManager();
private expressServer: any;
// Session timeout — configurable via SESSION_TIMEOUT_MINUTES environment variable
@@ -319,6 +310,49 @@ export class SingleSessionHTTPServer {
}
}
/**
* Authenticate a request by validating the Bearer token.
* Returns true if authentication succeeds, false if it fails
* (and the response has already been sent with a 401 status).
*/
private authenticateRequest(req: express.Request, res: express.Response): boolean {
const authHeader = req.headers.authorization;
if (!authHeader || !authHeader.startsWith('Bearer ')) {
const reason = !authHeader ? 'no_auth_header' : 'invalid_auth_format';
logger.warn('Authentication failed', {
ip: req.ip,
userAgent: req.get('user-agent'),
reason
});
res.status(401).json({
jsonrpc: '2.0',
error: { code: -32001, message: 'Unauthorized' },
id: null
});
return false;
}
const token = authHeader.slice(7).trim();
const isValid = this.authToken && AuthManager.timingSafeCompare(token, this.authToken);
if (!isValid) {
logger.warn('Authentication failed: Invalid token', {
ip: req.ip,
userAgent: req.get('user-agent'),
reason: 'invalid_token'
});
res.status(401).json({
jsonrpc: '2.0',
error: { code: -32001, message: 'Unauthorized' },
id: null
});
return false;
}
return true;
}
/**
* Switch session context with locking to prevent race conditions
*/
@@ -636,7 +670,22 @@ export class SingleSessionHTTPServer {
// For non-initialize requests: reuse existing transport for this session
logger.info('handleRequest: Reusing existing transport for session', { sessionId });
transport = this.transports[sessionId];
// Guard: reject SSE transports on the StreamableHTTP path
if (this.transports[sessionId] instanceof SSEServerTransport) {
logger.warn('handleRequest: SSE session used on StreamableHTTP endpoint', { sessionId });
res.status(400).json({
jsonrpc: '2.0',
error: {
code: -32000,
message: 'Session uses SSE transport. Send messages to POST /messages?sessionId=<id> instead.'
},
id: req.body?.id || null
});
return;
}
transport = this.transports[sessionId] as StreamableHTTPServerTransport;
// TOCTOU guard: session may have been removed between the check above and here
if (!transport) {
@@ -751,73 +800,47 @@ export class SingleSessionHTTPServer {
/**
* Reset the session for SSE - clean up old and create new SSE transport
* Create a new SSE session and store it in the shared transports map.
* Following SDK pattern: SSE uses /messages endpoint, separate from /mcp.
*/
private async resetSessionSSE(res: express.Response): Promise<void> {
// Clean up old session if exists
if (this.session) {
const sessionId = this.session.sessionId;
logger.info('Closing previous session for SSE', { sessionId });
// Close server first to free resources (database, cache timer, etc.)
// This mirrors the cleanup pattern in removeSession() (issue #542)
// Handle server close errors separately so transport close still runs
if (this.session.server && typeof this.session.server.close === 'function') {
try {
await this.session.server.close();
} catch (serverError) {
logger.warn('Error closing server for SSE session', { sessionId, error: serverError });
}
}
// Close transport last - always attempt even if server.close() failed
try {
await this.session.transport.close();
} catch (transportError) {
logger.warn('Error closing transport for SSE session', { sessionId, error: transportError });
}
}
try {
// Create new session
logger.info('Creating new N8NDocumentationMCPServer for SSE...');
const server = new N8NDocumentationMCPServer(undefined, undefined, {
generateWorkflowHandler: this.generateWorkflowHandler,
private async createSSESession(res: express.Response): Promise<void> {
if (!this.canCreateSession()) {
logger.warn('SSE session creation rejected: session limit reached', {
currentSessions: this.getActiveSessionCount(),
maxSessions: MAX_SESSIONS
});
// Generate cryptographically secure session ID
const sessionId = uuidv4();
logger.info('Creating SSEServerTransport...');
const transport = new SSEServerTransport('/mcp', res);
logger.info('Connecting server to SSE transport...');
await server.connect(transport);
// Note: server.connect() automatically calls transport.start(), so we don't need to call it again
this.session = {
server,
transport,
lastAccess: new Date(),
sessionId,
initialized: false,
isSSE: true
};
logger.info('Created new SSE session successfully', { sessionId: this.session.sessionId });
} catch (error) {
logger.error('Failed to create SSE session:', error);
throw error;
throw new Error(`Session limit reached (${MAX_SESSIONS})`);
}
}
/**
* Check if current session is expired
*/
private isExpired(): boolean {
if (!this.session) return true;
return Date.now() - this.session.lastAccess.getTime() > this.sessionTimeout;
// Note: SSE sessions do not support multi-tenant context.
// The SaaS backend uses StreamableHTTP exclusively.
const server = new N8NDocumentationMCPServer(undefined, undefined, {
generateWorkflowHandler: this.generateWorkflowHandler,
});
const transport = new SSEServerTransport('/messages', res);
// Use the SDK-assigned session ID — the client receives this via the SSE
// `endpoint` event and sends it back as ?sessionId on POST /messages.
const sessionId = transport.sessionId;
this.transports[sessionId] = transport;
this.servers[sessionId] = server;
this.sessionMetadata[sessionId] = {
lastAccess: new Date(),
createdAt: new Date()
};
// Clean up on SSE disconnect
res.on('close', () => {
logger.info('SSE connection closed by client', { sessionId });
this.removeSession(sessionId, 'sse_disconnect').catch(err => {
logger.warn('Error cleaning up SSE session on disconnect', { sessionId, error: err });
});
});
await server.connect(transport);
logger.info('SSE session created', { sessionId, transport: 'SSEServerTransport' });
}
/**
@@ -913,7 +936,7 @@ export class SingleSessionHTTPServer {
authentication: {
type: 'Bearer Token',
header: 'Authorization: Bearer <token>',
required_for: ['POST /mcp']
required_for: ['POST /mcp', 'GET /sse', 'POST /messages']
},
documentation: 'https://github.com/czlonkowski/n8n-mcp'
});
@@ -948,7 +971,7 @@ export class SingleSessionHTTPServer {
},
activeTransports: activeTransports.length, // Legacy field
activeServers: activeServers.length, // Legacy field
legacySessionActive: !!this.session, // For SSE compatibility
legacySessionActive: false, // Deprecated: SSE now uses shared transports map
memory: {
used: Math.round(process.memoryUsage().heapUsed / 1024 / 1024),
total: Math.round(process.memoryUsage().heapTotal / 1024 / 1024),
@@ -1005,10 +1028,11 @@ export class SingleSessionHTTPServer {
app.get('/mcp', async (req, res) => {
// Handle StreamableHTTP transport requests with new pattern
const sessionId = req.headers['mcp-session-id'] as string | undefined;
if (sessionId && this.transports[sessionId]) {
const existingTransport = sessionId ? this.transports[sessionId] : undefined;
if (existingTransport && existingTransport instanceof StreamableHTTPServerTransport) {
// Let the StreamableHTTPServerTransport handle the GET request
try {
await this.transports[sessionId].handleRequest(req, res, undefined);
await existingTransport.handleRequest(req, res, undefined);
return;
} catch (error) {
logger.error('StreamableHTTP GET request failed:', error);
@@ -1016,26 +1040,15 @@ export class SingleSessionHTTPServer {
}
}
// Check Accept header for text/event-stream (SSE support)
// SSE clients should use GET /sse instead (SDK pattern: separate endpoints)
const accept = req.headers.accept;
if (accept && accept.includes('text/event-stream')) {
logger.info('SSE stream request received - establishing SSE connection');
try {
// Create or reset session for SSE
await this.resetSessionSSE(res);
logger.info('SSE connection established successfully');
} catch (error) {
logger.error('Failed to establish SSE connection:', error);
res.status(500).json({
jsonrpc: '2.0',
error: {
code: -32603,
message: 'Failed to establish SSE connection'
},
id: null
});
}
logger.info('SSE request on /mcp redirected to /sse', { ip: req.ip });
res.status(400).json({
error: 'SSE transport uses /sse endpoint',
message: 'Connect via GET /sse for SSE streaming. POST messages to /messages?sessionId=<id>.',
documentation: 'https://github.com/czlonkowski/n8n-mcp'
});
return;
}
@@ -1072,9 +1085,23 @@ export class SingleSessionHTTPServer {
mcp: {
method: 'POST',
path: '/mcp',
description: 'Main MCP JSON-RPC endpoint',
description: 'Main MCP JSON-RPC endpoint (StreamableHTTP)',
authentication: 'Bearer token required'
},
sse: {
method: 'GET',
path: '/sse',
description: 'DEPRECATED: SSE stream for legacy clients. Migrate to StreamableHTTP (POST /mcp).',
authentication: 'Bearer token required',
deprecated: true
},
messages: {
method: 'POST',
path: '/messages',
description: 'DEPRECATED: Message delivery for SSE sessions. Migrate to StreamableHTTP (POST /mcp).',
authentication: 'Bearer token required',
deprecated: true
},
health: {
method: 'GET',
path: '/health',
@@ -1092,6 +1119,110 @@ export class SingleSessionHTTPServer {
});
});
// SECURITY: Rate limiting for authentication endpoints
// Prevents brute force attacks and DoS
// See: https://github.com/czlonkowski/n8n-mcp/issues/265 (HIGH-02)
const authLimiter = rateLimit({
windowMs: parseInt(process.env.AUTH_RATE_LIMIT_WINDOW || '900000'), // 15 minutes
max: parseInt(process.env.AUTH_RATE_LIMIT_MAX || '20'), // 20 authentication attempts per IP
message: {
jsonrpc: '2.0',
error: {
code: -32000,
message: 'Too many authentication attempts. Please try again later.'
},
id: null
},
standardHeaders: true, // Return rate limit info in `RateLimit-*` headers
legacyHeaders: false, // Disable `X-RateLimit-*` headers
skipSuccessfulRequests: true, // Only count failed auth attempts (#617)
handler: (req, res) => {
logger.warn('Rate limit exceeded', {
ip: req.ip,
userAgent: req.get('user-agent'),
event: 'rate_limit'
});
res.status(429).json({
jsonrpc: '2.0',
error: {
code: -32000,
message: 'Too many authentication attempts'
},
id: null
});
}
});
// Legacy SSE stream endpoint (protocol version 2024-11-05)
// DEPRECATED: SSE transport is deprecated in MCP SDK v1.x and removed in v2.x.
// Clients should migrate to StreamableHTTP (POST /mcp). This endpoint will be
// removed in a future major release.
app.get('/sse', authLimiter, async (req: express.Request, res: express.Response): Promise<void> => {
if (!this.authenticateRequest(req, res)) return;
logger.warn('SSE transport is deprecated and will be removed in a future release. Migrate to StreamableHTTP (POST /mcp).', {
ip: req.ip,
userAgent: req.get('user-agent')
});
try {
await this.createSSESession(res);
} catch (error) {
logger.error('Failed to create SSE session:', error);
if (!res.headersSent) {
res.status(error instanceof Error && error.message.includes('Session limit')
? 429 : 500
).json({
error: error instanceof Error ? error.message : 'Failed to establish SSE connection'
});
}
}
});
// SSE message delivery endpoint (receives JSON-RPC messages from SSE clients)
app.post('/messages', authLimiter, jsonParser, async (req: express.Request, res: express.Response): Promise<void> => {
if (!this.authenticateRequest(req, res)) return;
// SSE uses ?sessionId query param (not mcp-session-id header)
const sessionId = req.query.sessionId as string | undefined;
if (!sessionId) {
res.status(400).json({
jsonrpc: '2.0',
error: { code: -32602, message: 'Missing sessionId query parameter' },
id: req.body?.id || null
});
return;
}
const transport = this.transports[sessionId];
if (!transport || !(transport instanceof SSEServerTransport)) {
res.status(400).json({
jsonrpc: '2.0',
error: { code: -32000, message: 'SSE session not found or expired' },
id: req.body?.id || null
});
return;
}
// Update session access time
this.updateSessionAccess(sessionId);
try {
await transport.handlePostMessage(req, res, req.body);
} catch (error) {
logger.error('SSE message handling error', { sessionId, error });
if (!res.headersSent) {
res.status(500).json({
jsonrpc: '2.0',
error: { code: -32603, message: 'Internal error processing SSE message' },
id: req.body?.id || null
});
}
}
});
// Session termination endpoint
app.delete('/mcp', async (req: express.Request, res: express.Response): Promise<void> => {
const mcpSessionId = req.headers['mcp-session-id'] as string;
@@ -1150,40 +1281,6 @@ export class SingleSessionHTTPServer {
}
});
// SECURITY: Rate limiting for authentication endpoint
// Prevents brute force attacks and DoS
// See: https://github.com/czlonkowski/n8n-mcp/issues/265 (HIGH-02)
const authLimiter = rateLimit({
windowMs: parseInt(process.env.AUTH_RATE_LIMIT_WINDOW || '900000'), // 15 minutes
max: parseInt(process.env.AUTH_RATE_LIMIT_MAX || '20'), // 20 authentication attempts per IP
message: {
jsonrpc: '2.0',
error: {
code: -32000,
message: 'Too many authentication attempts. Please try again later.'
},
id: null
},
standardHeaders: true, // Return rate limit info in `RateLimit-*` headers
legacyHeaders: false, // Disable `X-RateLimit-*` headers
handler: (req, res) => {
logger.warn('Rate limit exceeded', {
ip: req.ip,
userAgent: req.get('user-agent'),
event: 'rate_limit'
});
res.status(429).json({
jsonrpc: '2.0',
error: {
code: -32000,
message: 'Too many authentication attempts'
},
id: null
});
}
});
// Main MCP endpoint with authentication and rate limiting
app.post('/mcp', authLimiter, jsonParser, async (req: express.Request, res: express.Response): Promise<void> => {
// Log comprehensive debug info about the request
@@ -1234,76 +1331,10 @@ export class SingleSessionHTTPServer {
});
}
// Enhanced authentication check with specific logging
const authHeader = req.headers.authorization;
// Check if Authorization header is missing
if (!authHeader) {
logger.warn('Authentication failed: Missing Authorization header', {
ip: req.ip,
userAgent: req.get('user-agent'),
reason: 'no_auth_header'
});
res.status(401).json({
jsonrpc: '2.0',
error: {
code: -32001,
message: 'Unauthorized'
},
id: null
});
return;
}
// Check if Authorization header has Bearer prefix
if (!authHeader.startsWith('Bearer ')) {
logger.warn('Authentication failed: Invalid Authorization header format (expected Bearer token)', {
ip: req.ip,
userAgent: req.get('user-agent'),
reason: 'invalid_auth_format',
headerPrefix: authHeader.substring(0, Math.min(authHeader.length, 10)) + '...' // Log first 10 chars for debugging
});
res.status(401).json({
jsonrpc: '2.0',
error: {
code: -32001,
message: 'Unauthorized'
},
id: null
});
return;
}
// Extract token and trim whitespace
const token = authHeader.slice(7).trim();
if (!this.authenticateRequest(req, res)) return;
// SECURITY: Use timing-safe comparison to prevent timing attacks
// See: https://github.com/czlonkowski/n8n-mcp/issues/265 (CRITICAL-02)
const isValidToken = this.authToken &&
AuthManager.timingSafeCompare(token, this.authToken);
if (!isValidToken) {
logger.warn('Authentication failed: Invalid token', {
ip: req.ip,
userAgent: req.get('user-agent'),
reason: 'invalid_token'
});
res.status(401).json({
jsonrpc: '2.0',
error: {
code: -32001,
message: 'Unauthorized'
},
id: null
});
return;
}
// Handle request with single session
logger.info('Authentication successful - proceeding to handleRequest', {
hasSession: !!this.session,
sessionType: this.session?.isSSE ? 'SSE' : 'StreamableHTTP',
sessionInitialized: this.session?.initialized
activeSessions: this.getActiveSessionCount()
});
// Extract instance context from headers if present (for multi-tenant support)
@@ -1417,6 +1448,7 @@ export class SingleSessionHTTPServer {
console.log(`Session Limits: ${MAX_SESSIONS} max sessions, ${this.sessionTimeout / 1000 / 60}min timeout`);
console.log(`Health check: ${endpoints.health}`);
console.log(`MCP endpoint: ${endpoints.mcp}`);
console.log(`SSE endpoint: ${baseUrl}/sse (legacy clients)`);
if (isProduction) {
console.log('🔒 Running in PRODUCTION mode - enhanced security enabled');
@@ -1483,17 +1515,6 @@ export class SingleSessionHTTPServer {
}
}
// Clean up legacy session (for SSE compatibility)
if (this.session) {
try {
await this.session.transport.close();
logger.info('Legacy session closed');
} catch (error) {
logger.warn('Error closing legacy session:', error);
}
this.session = null;
}
// Close Express server
if (this.expressServer) {
await new Promise<void>((resolve) => {
@@ -1532,25 +1553,9 @@ export class SingleSessionHTTPServer {
};
} {
const metrics = this.getSessionMetrics();
// Legacy SSE session info
if (!this.session) {
return {
active: false,
sessions: {
total: metrics.totalSessions,
active: metrics.activeSessions,
expired: metrics.expiredSessions,
max: MAX_SESSIONS,
sessionIds: Object.keys(this.transports)
}
};
}
return {
active: true,
sessionId: this.session.sessionId,
age: Date.now() - this.session.lastAccess.getTime(),
active: metrics.activeSessions > 0,
sessions: {
total: metrics.totalSessions,
active: metrics.activeSessions,