- Add ConsoleManager to prevent console output interference with StreamableHTTPServerTransport - Implement SingleSessionHTTPServer with persistent session reuse - Create N8NMCPEngine for clean service integration - Add automatic session expiry after 30 minutes of inactivity - Update logger to be HTTP-aware during active requests - Maintain backward compatibility with existing deployments This fixes the "stream is not readable" error by implementing the Hybrid Single-Session architecture as documented in MCP_ERROR_FIX_PLAN.md 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
456 lines
13 KiB
Markdown
456 lines
13 KiB
Markdown
# MCP "Stream is not readable" Error Fix Implementation Plan
|
|
|
|
## Executive Summary
|
|
|
|
This document outlines a comprehensive plan to fix the "InternalServerError: stream is not readable" error in the n8n-MCP HTTP server implementation. The error stems from multiple architectural and implementation issues that need systematic resolution.
|
|
|
|
**Chosen Solution**: After thorough analysis, we will implement a **Hybrid Single-Session Architecture** that provides protocol compliance while optimizing for the single-player use case. This approach balances simplicity with correctness, making it ideal for use as an engine in larger services.
|
|
|
|
## Problem Analysis
|
|
|
|
### Root Causes
|
|
|
|
1. **Stream Contamination**
|
|
- Console output during server initialization interferes with StreamableHTTPServerTransport
|
|
- The transport expects clean stdin/stdout/stderr streams
|
|
- Any console.log/error before or during request handling corrupts the stream
|
|
|
|
2. **Architectural Mismatch**
|
|
- Current implementation: Stateless (new server instance per request)
|
|
- StreamableHTTPServerTransport design: Stateful (expects session persistence)
|
|
- Passing `sessionIdGenerator: undefined` doesn't make it truly stateless
|
|
|
|
3. **Protocol Implementation Gap**
|
|
- Missing proper SSE (Server-Sent Events) support
|
|
- Not handling the dual-mode nature of Streamable HTTP (JSON-RPC + SSE)
|
|
- Accept header validation but no actual SSE implementation
|
|
|
|
4. **Version Inconsistency**
|
|
- Multiple MCP SDK versions in dependency tree (1.12.1, 1.11.0)
|
|
- Potential API incompatibilities between versions
|
|
|
|
## Implementation Strategy
|
|
|
|
### Phase 1: Dependency Consolidation (Priority: Critical)
|
|
|
|
#### 1.1 Update MCP SDK
|
|
```json
|
|
{
|
|
"dependencies": {
|
|
"@modelcontextprotocol/sdk": "^1.12.1"
|
|
},
|
|
"overrides": {
|
|
"@modelcontextprotocol/sdk": "^1.12.1"
|
|
}
|
|
}
|
|
```
|
|
|
|
#### 1.2 Remove Conflicting Dependencies
|
|
- Audit n8n packages that bundle older MCP versions
|
|
- Consider isolating MCP server from n8n dependencies
|
|
|
|
### Phase 2: Console Output Isolation (Priority: Critical)
|
|
|
|
#### 2.1 Create Environment-Aware Logging
|
|
```typescript
|
|
// src/utils/console-manager.ts
|
|
export class ConsoleManager {
|
|
private originalConsole = {
|
|
log: console.log,
|
|
error: console.error,
|
|
warn: console.warn
|
|
};
|
|
|
|
public silence() {
|
|
if (process.env.MCP_MODE === 'http') {
|
|
console.log = () => {};
|
|
console.error = () => {};
|
|
console.warn = () => {};
|
|
}
|
|
}
|
|
|
|
public restore() {
|
|
console.log = this.originalConsole.log;
|
|
console.error = this.originalConsole.error;
|
|
console.warn = this.originalConsole.warn;
|
|
}
|
|
}
|
|
```
|
|
|
|
#### 2.2 Refactor All Console Usage
|
|
- Replace console.* with logger.* throughout codebase
|
|
- Add initialization flag to prevent startup logs in HTTP mode
|
|
- Ensure no third-party libraries write to console
|
|
|
|
### Phase 3: Transport Architecture - Hybrid Single-Session (Priority: High)
|
|
|
|
#### 3.1 Chosen Architecture: Single-Session Implementation
|
|
Based on architectural analysis, we will implement a hybrid single-session approach that:
|
|
- Maintains protocol compliance with StreamableHTTPServerTransport
|
|
- Optimizes for single-player use case (one user at a time)
|
|
- Simplifies implementation while fixing the core issues
|
|
- Provides clean interface for future service integration
|
|
|
|
```typescript
|
|
// src/http-server-single-session.ts
|
|
export class SingleSessionHTTPServer {
|
|
private session: {
|
|
server: N8NDocumentationMCPServer;
|
|
transport: StreamableHTTPServerTransport;
|
|
lastAccess: Date;
|
|
} | null = null;
|
|
|
|
private consoleManager = new ConsoleManager();
|
|
|
|
async handleRequest(req: Request, res: Response): Promise<void> {
|
|
// Wrap all operations to prevent console interference
|
|
return this.consoleManager.wrapOperation(async () => {
|
|
// Ensure we have a valid session
|
|
if (!this.session || this.isExpired()) {
|
|
await this.resetSession();
|
|
}
|
|
|
|
// Update last access time
|
|
this.session.lastAccess = new Date();
|
|
|
|
// Handle request with existing transport
|
|
await this.session.transport.handleRequest(req, res);
|
|
});
|
|
}
|
|
|
|
private async resetSession(): Promise<void> {
|
|
// Clean up old session if exists
|
|
if (this.session) {
|
|
try {
|
|
await this.session.transport.close();
|
|
await this.session.server.close();
|
|
} catch (error) {
|
|
logger.warn('Error closing previous session:', error);
|
|
}
|
|
}
|
|
|
|
// Create new session
|
|
const server = new N8NDocumentationMCPServer();
|
|
const transport = new StreamableHTTPServerTransport({
|
|
sessionIdGenerator: () => 'single-session', // Always same ID
|
|
});
|
|
|
|
await server.connect(transport);
|
|
|
|
this.session = {
|
|
server,
|
|
transport,
|
|
lastAccess: new Date()
|
|
};
|
|
|
|
logger.info('Created new single session');
|
|
}
|
|
|
|
private isExpired(): boolean {
|
|
const thirtyMinutes = 30 * 60 * 1000;
|
|
return Date.now() - this.session.lastAccess.getTime() > thirtyMinutes;
|
|
}
|
|
|
|
async shutdown(): Promise<void> {
|
|
if (this.session) {
|
|
await this.session.transport.close();
|
|
await this.session.server.close();
|
|
this.session = null;
|
|
}
|
|
}
|
|
}
|
|
```
|
|
|
|
#### 3.2 Console Wrapper Implementation
|
|
```typescript
|
|
// src/utils/console-manager.ts
|
|
export class ConsoleManager {
|
|
private originalConsole = {
|
|
log: console.log,
|
|
error: console.error,
|
|
warn: console.warn
|
|
};
|
|
|
|
public wrapOperation<T>(operation: () => T | Promise<T>): T | Promise<T> {
|
|
this.silence();
|
|
try {
|
|
const result = operation();
|
|
if (result instanceof Promise) {
|
|
return result.finally(() => this.restore());
|
|
}
|
|
this.restore();
|
|
return result;
|
|
} catch (error) {
|
|
this.restore();
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
private silence() {
|
|
if (process.env.MCP_MODE === 'http') {
|
|
console.log = () => {};
|
|
console.error = () => {};
|
|
console.warn = () => {};
|
|
}
|
|
}
|
|
|
|
private restore() {
|
|
console.log = this.originalConsole.log;
|
|
console.error = this.originalConsole.error;
|
|
console.warn = this.originalConsole.warn;
|
|
}
|
|
}
|
|
```
|
|
|
|
### Phase 4: Engine Integration Interface (Priority: Medium)
|
|
|
|
#### 4.1 Clean API for Service Integration
|
|
```typescript
|
|
// src/mcp-engine.ts
|
|
export class N8NMCPEngine {
|
|
private server: SingleSessionHTTPServer;
|
|
|
|
constructor() {
|
|
this.server = new SingleSessionHTTPServer();
|
|
}
|
|
|
|
/**
|
|
* Process a single MCP request
|
|
* The wrapping service handles authentication, multi-tenancy, etc.
|
|
*/
|
|
async processRequest(req: Request, res: Response): Promise<void> {
|
|
return this.server.handleRequest(req, res);
|
|
}
|
|
|
|
/**
|
|
* Health check for service monitoring
|
|
*/
|
|
async healthCheck(): Promise<{ status: string; uptime: number }> {
|
|
return {
|
|
status: 'healthy',
|
|
uptime: process.uptime()
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Graceful shutdown for service lifecycle
|
|
*/
|
|
async shutdown(): Promise<void> {
|
|
return this.server.shutdown();
|
|
}
|
|
}
|
|
|
|
// Usage in multi-tenant service:
|
|
// const engine = new N8NMCPEngine();
|
|
// app.post('/api/users/:userId/mcp', authenticate, (req, res) => {
|
|
// engine.processRequest(req, res);
|
|
// });
|
|
```
|
|
|
|
### Phase 5: SSE Support Implementation (Priority: Low)
|
|
|
|
Note: Basic SSE support may be added later if needed, but the single-session architecture handles most use cases through standard request-response.
|
|
|
|
#### 4.1 Dual-Mode Response Handler
|
|
```typescript
|
|
class DualModeHandler {
|
|
async handleRequest(req: Request, res: Response) {
|
|
const acceptsSSE = req.headers.accept?.includes('text/event-stream');
|
|
|
|
if (acceptsSSE && this.isStreamableMethod(req.body.method)) {
|
|
// Handle as SSE stream
|
|
res.writeHead(200, {
|
|
'Content-Type': 'text/event-stream',
|
|
'Cache-Control': 'no-cache',
|
|
'Connection': 'keep-alive'
|
|
});
|
|
|
|
await this.handleSSEStream(req, res);
|
|
} else {
|
|
// Handle as single JSON-RPC response
|
|
await this.handleJSONRPC(req, res);
|
|
}
|
|
}
|
|
}
|
|
```
|
|
|
|
### Phase 5: Testing Strategy (Priority: High)
|
|
|
|
#### 5.1 Unit Tests
|
|
- Test console output isolation
|
|
- Test session management
|
|
- Test SSE vs JSON-RPC response handling
|
|
|
|
#### 5.2 Integration Tests
|
|
```typescript
|
|
describe('Single Session MCP Server', () => {
|
|
it('should handle JSON-RPC requests without console interference', async () => {
|
|
const server = new SingleSessionHTTPServer();
|
|
const mockReq = createMockRequest({ method: 'tools/list' });
|
|
const mockRes = createMockResponse();
|
|
|
|
await server.handleRequest(mockReq, mockRes);
|
|
|
|
expect(mockRes.statusCode).toBe(200);
|
|
expect(console.log).not.toHaveBeenCalled();
|
|
});
|
|
|
|
it('should reuse single session for multiple requests', async () => {
|
|
const server = new SingleSessionHTTPServer();
|
|
|
|
// First request creates session
|
|
await server.handleRequest(req1, res1);
|
|
const firstSessionId = server.getSessionId();
|
|
|
|
// Second request reuses session
|
|
await server.handleRequest(req2, res2);
|
|
const secondSessionId = server.getSessionId();
|
|
|
|
expect(firstSessionId).toBe(secondSessionId);
|
|
expect(firstSessionId).toBe('single-session');
|
|
});
|
|
|
|
it('should reset expired sessions', async () => {
|
|
const server = new SingleSessionHTTPServer();
|
|
|
|
// First request
|
|
await server.handleRequest(req1, res1);
|
|
|
|
// Simulate 31 minutes passing
|
|
jest.advanceTimersByTime(31 * 60 * 1000);
|
|
|
|
// Second request should trigger reset
|
|
const resetSpy = jest.spyOn(server, 'resetSession');
|
|
await server.handleRequest(req2, res2);
|
|
|
|
expect(resetSpy).toHaveBeenCalled();
|
|
});
|
|
|
|
it('should handle errors gracefully', async () => {
|
|
const server = new SingleSessionHTTPServer();
|
|
const badReq = createMockRequest({ invalid: 'data' });
|
|
|
|
await expect(server.handleRequest(badReq, mockRes))
|
|
.resolves.not.toThrow();
|
|
});
|
|
});
|
|
```
|
|
|
|
#### 5.3 Docker Testing
|
|
- Test in isolated Docker environment
|
|
- Verify no stream corruption
|
|
- Test with actual Claude Desktop client
|
|
|
|
## Implementation Order
|
|
|
|
### Phase 1: Foundation (2 days)
|
|
1. **Day 1**:
|
|
- Update dependencies, consolidate MCP SDK version
|
|
- Create ConsoleManager utility class
|
|
- Replace console.* calls with logger in HTTP paths
|
|
|
|
2. **Day 2**:
|
|
- Implement and test console output isolation
|
|
- Verify no third-party console writes
|
|
|
|
### Phase 2: Core Fix (3 days)
|
|
1. **Day 3-4**:
|
|
- Implement SingleSessionHTTPServer class
|
|
- Integrate console wrapping
|
|
- Handle session lifecycle (create, expire, reset)
|
|
|
|
2. **Day 5**:
|
|
- Update HTTP server to use new architecture
|
|
- Test with actual MCP requests
|
|
- Verify "stream is not readable" error is resolved
|
|
|
|
### Phase 3: Polish & Testing (2 days)
|
|
1. **Day 6**:
|
|
- Comprehensive testing suite
|
|
- Error handling improvements
|
|
- Performance metrics
|
|
|
|
2. **Day 7**:
|
|
- Docker integration testing
|
|
- Documentation updates
|
|
- Release preparation
|
|
|
|
### Total Timeline: 7 days (vs original 15 days)
|
|
|
|
## Risk Mitigation
|
|
|
|
### Backward Compatibility
|
|
- Keep existing stdio mode unchanged
|
|
- Add feature flag for new HTTP implementation
|
|
- Gradual rollout with fallback option
|
|
|
|
### Performance Considerations
|
|
- Single session = minimal memory overhead
|
|
- Automatic expiry after 30 minutes of inactivity
|
|
- No session accumulation or cleanup complexity
|
|
- Connection pooling for database access
|
|
|
|
### Security Implications
|
|
- Session timeout configuration
|
|
- Rate limiting per session
|
|
- Secure session ID generation
|
|
|
|
## Success Metrics
|
|
|
|
1. **Zero "stream is not readable" errors** in production
|
|
2. **Successful Claude Desktop integration** via mcp-remote
|
|
3. **Response time < 100ms** for standard queries
|
|
4. **Memory usage stable** over extended periods
|
|
5. **Clean logs** without stream corruption
|
|
|
|
## Alternative Approaches
|
|
|
|
### Alternative 1: Different Transport
|
|
- Use WebSocket instead of HTTP
|
|
- Implement custom transport that avoids StreamableHTTP issues
|
|
- Direct JSON-RPC without MCP SDK transport layer
|
|
|
|
### Alternative 2: Process Isolation
|
|
- Spawn separate process for each request
|
|
- Complete isolation of streams
|
|
- Higher overhead but guaranteed clean state
|
|
|
|
### Alternative 3: Proxy Layer
|
|
- Add nginx or similar proxy
|
|
- Handle SSE at proxy level
|
|
- Simplify Node.js implementation
|
|
|
|
## Rollback Plan
|
|
|
|
If issues persist after implementation:
|
|
1. Revert to previous version
|
|
2. Disable HTTP mode temporarily
|
|
3. Focus on stdio mode for Claude Desktop
|
|
4. Investigate alternative MCP implementations
|
|
|
|
## Long-term Considerations
|
|
|
|
1. **Monitor MCP SDK Development**
|
|
- StreamableHTTP is evolving
|
|
- May need updates as SDK matures
|
|
|
|
2. **Consider Official Examples**
|
|
- Align with official MCP server implementations
|
|
- Contribute fixes back to SDK if needed
|
|
|
|
3. **Performance Optimization**
|
|
- Cache frequently accessed data
|
|
- Optimize session management
|
|
- Consider clustering for scale
|
|
|
|
## Conclusion
|
|
|
|
The "stream is not readable" error is solvable through systematic addressing of console output and implementing the Hybrid Single-Session architecture. This approach provides:
|
|
|
|
1. **Protocol Compliance**: Works with StreamableHTTPServerTransport's expectations
|
|
2. **Simplicity**: Single session eliminates complex state management
|
|
3. **Performance**: Minimal overhead, automatic cleanup
|
|
4. **Integration Ready**: Clean interface for service wrapper
|
|
5. **Reduced Timeline**: 7 days vs original 15 days
|
|
|
|
The single-session approach is ideal for a single-player repository that will serve as an engine for larger services, maintaining simplicity while ensuring correctness. |