feat: SSE (Server-Sent Events) support for n8n integration

- Added SSE server implementation for real-time event streaming
- Created n8n compatibility mode with strict schema validation
- Implemented session management for concurrent connections
- Added comprehensive SSE documentation and examples
- Enhanced MCP tools with async execution support
- Added Docker Compose configuration for SSE deployment
- Included test scripts and integration tests

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
czlonkowski
2025-07-09 08:24:44 +02:00
parent 87f0cfc4dc
commit 54e09c9673
23 changed files with 3981 additions and 20 deletions

104
scripts/test-sse.sh Executable file
View File

@@ -0,0 +1,104 @@
#!/bin/bash
# Test script for SSE server
# Usage: ./scripts/test-sse.sh
SERVER_URL="${SERVER_URL:-http://localhost:3000}"
AUTH_TOKEN="${AUTH_TOKEN:-test-secure-token-123456789}"
echo "🧪 Testing SSE Server Implementation"
echo "Server URL: $SERVER_URL"
echo "Auth Token: ${AUTH_TOKEN:0:8}..."
echo ""
# Function to test endpoint
test_endpoint() {
local method=$1
local endpoint=$2
local data=$3
local headers=$4
echo -n "Testing $method $endpoint... "
if [ "$method" = "GET" ]; then
response=$(curl -s -w "\n%{http_code}" -X GET "$SERVER_URL$endpoint" $headers)
else
response=$(curl -s -w "\n%{http_code}" -X POST "$SERVER_URL$endpoint" \
-H "Content-Type: application/json" \
$headers \
-d "$data")
fi
http_code=$(echo "$response" | tail -n1)
body=$(echo "$response" | head -n-1)
if [ "$http_code" = "200" ]; then
echo "✅ OK ($http_code)"
if [ -n "$body" ]; then
echo " Response: $(echo "$body" | jq -c . 2>/dev/null || echo "$body")"
fi
else
echo "❌ FAILED ($http_code)"
if [ -n "$body" ]; then
echo " Error: $body"
fi
fi
echo ""
}
# Test health check
test_endpoint "GET" "/health" "" ""
# Test SSE connection (limited test with curl)
echo -n "Testing SSE connection... "
timeout 2 curl -s -N -H "Authorization: Bearer $AUTH_TOKEN" "$SERVER_URL/sse" > /tmp/sse-test.log 2>&1 &
SSE_PID=$!
sleep 1
if kill -0 $SSE_PID 2>/dev/null; then
echo "✅ Connection established"
kill $SSE_PID 2>/dev/null
if [ -s /tmp/sse-test.log ]; then
echo " Initial events:"
cat /tmp/sse-test.log | head -5
fi
else
echo "❌ Connection failed"
fi
echo ""
# Test legacy MCP endpoint
test_endpoint "POST" "/mcp" '{"jsonrpc":"2.0","method":"tools/list","id":1}' "-H 'Authorization: Bearer $AUTH_TOKEN'"
# Test invalid auth
echo -n "Testing authentication rejection... "
response=$(curl -s -w "\n%{http_code}" -X GET "$SERVER_URL/sse" -H "Authorization: Bearer invalid-token")
http_code=$(echo "$response" | tail -n1)
if [ "$http_code" = "401" ]; then
echo "✅ Correctly rejected ($http_code)"
else
echo "❌ Expected 401, got $http_code"
fi
# Summary
echo ""
echo "📊 Test Summary:"
echo "SSE server endpoint: $SERVER_URL/sse"
echo "Message endpoint: $SERVER_URL/mcp/message"
echo "Legacy endpoint: $SERVER_URL/mcp"
# Instructions for manual testing
echo ""
echo "📝 Manual Testing Instructions:"
echo ""
echo "1. Connect to SSE stream:"
echo " curl -N -H \"Authorization: Bearer $AUTH_TOKEN\" $SERVER_URL/sse"
echo ""
echo "2. In another terminal, get the client ID from the connected event and send a message:"
echo " curl -X POST $SERVER_URL/mcp/message \\"
echo " -H \"Authorization: Bearer $AUTH_TOKEN\" \\"
echo " -H \"X-Client-ID: <client-id-from-sse>\" \\"
echo " -H \"Content-Type: application/json\" \\"
echo " -d '{\"jsonrpc\":\"2.0\",\"method\":\"tools/list\",\"id\":1}'"
echo ""
echo "3. You should see the response in the SSE stream"

242
scripts/test-sse.ts Executable file
View File

@@ -0,0 +1,242 @@
#!/usr/bin/env ts-node
/**
* Test script for SSE server implementation
* Tests the SSE connection and MCP protocol communication
*/
import axios from 'axios';
const SERVER_URL = process.env.SERVER_URL || 'http://localhost:3000';
const AUTH_TOKEN = process.env.AUTH_TOKEN || 'test-token';
interface TestResult {
test: string;
status: 'passed' | 'failed';
message?: string;
duration?: number;
}
const results: TestResult[] = [];
function logTest(test: string, status: 'passed' | 'failed', message?: string, duration?: number) {
results.push({ test, status, message, duration });
console.log(`${status === 'passed' ? '✅' : '❌'} ${test}${message ? `: ${message}` : ''}${duration ? ` (${duration}ms)` : ''}`);
}
async function testHealthCheck() {
const start = Date.now();
try {
const response = await axios.get(`${SERVER_URL}/health`);
const duration = Date.now() - start;
if (response.data.status === 'ok' && response.data.mode === 'sse') {
logTest('Health check', 'passed', `Server is running in SSE mode`, duration);
} else {
logTest('Health check', 'failed', `Unexpected response: ${JSON.stringify(response.data)}`, duration);
}
} catch (error) {
logTest('Health check', 'failed', error instanceof Error ? error.message : 'Unknown error');
}
}
async function testSSEConnection(): Promise<string | null> {
return new Promise((resolve) => {
const start = Date.now();
let clientId: string | null = null;
let eventSource: EventSource | null = null;
try {
// Note: eventsource package doesn't support headers in constructor
// We'll need to use a different approach or library for authenticated SSE
const EventSourcePolyfill = require('eventsource');
eventSource = new EventSourcePolyfill(`${SERVER_URL}/sse`, {
headers: {
'Authorization': `Bearer ${AUTH_TOKEN}`
}
}) as EventSource;
eventSource.addEventListener('connected', (event: any) => {
const duration = Date.now() - start;
const data = JSON.parse(event.data);
clientId = data.clientId;
logTest('SSE connection', 'passed', `Connected with client ID: ${clientId}`, duration);
});
eventSource.addEventListener('mcp-response', (event: any) => {
const data = JSON.parse(event.data);
console.log(' Received MCP response:', data);
});
eventSource.addEventListener('ping', (event: any) => {
console.log(' Received ping:', event.data);
});
eventSource.onerror = (error) => {
logTest('SSE connection', 'failed', `Connection error: ${error}`);
eventSource?.close();
resolve(null);
};
// Wait for connection and initial message
setTimeout(() => {
if (eventSource) {
eventSource.close();
}
resolve(clientId);
}, 2000);
} catch (error) {
logTest('SSE connection', 'failed', error instanceof Error ? error.message : 'Unknown error');
resolve(null);
}
});
}
async function testMCPMessage(clientId: string) {
const start = Date.now();
try {
// Test initialize
const initResponse = await axios.post(
`${SERVER_URL}/mcp/message`,
{
jsonrpc: '2.0',
method: 'initialize',
id: 'test-init-1',
params: {}
},
{
headers: {
'Authorization': `Bearer ${AUTH_TOKEN}`,
'X-Client-ID': clientId,
'Content-Type': 'application/json'
}
}
);
const duration = Date.now() - start;
if (initResponse.data.status === 'ok') {
logTest('MCP initialize message', 'passed', `Message acknowledged`, duration);
} else {
logTest('MCP initialize message', 'failed', `Unexpected response: ${JSON.stringify(initResponse.data)}`, duration);
}
} catch (error) {
logTest('MCP initialize message', 'failed', error instanceof Error ? error.message : 'Unknown error');
}
}
async function testToolsList(clientId: string) {
const start = Date.now();
try {
const response = await axios.post(
`${SERVER_URL}/mcp/message`,
{
jsonrpc: '2.0',
method: 'tools/list',
id: 'test-tools-1',
params: {}
},
{
headers: {
'Authorization': `Bearer ${AUTH_TOKEN}`,
'X-Client-ID': clientId,
'Content-Type': 'application/json'
}
}
);
const duration = Date.now() - start;
if (response.data.status === 'ok') {
logTest('MCP tools/list message', 'passed', `Message acknowledged`, duration);
} else {
logTest('MCP tools/list message', 'failed', `Unexpected response: ${JSON.stringify(response.data)}`, duration);
}
} catch (error) {
logTest('MCP tools/list message', 'failed', error instanceof Error ? error.message : 'Unknown error');
}
}
async function testLegacyEndpoint() {
const start = Date.now();
try {
const response = await axios.post(
`${SERVER_URL}/mcp`,
{
jsonrpc: '2.0',
method: 'tools/list',
id: 'test-legacy-1',
params: {}
},
{
headers: {
'Authorization': `Bearer ${AUTH_TOKEN}`,
'Content-Type': 'application/json'
}
}
);
const duration = Date.now() - start;
if (response.data.result && response.data.result.tools) {
logTest('Legacy /mcp endpoint', 'passed', `Found ${response.data.result.tools.length} tools`, duration);
} else {
logTest('Legacy /mcp endpoint', 'failed', `Unexpected response: ${JSON.stringify(response.data)}`, duration);
}
} catch (error) {
logTest('Legacy /mcp endpoint', 'failed', error instanceof Error ? error.message : 'Unknown error');
}
}
async function runTests() {
console.log('🧪 Testing SSE Server Implementation');
console.log(`Server URL: ${SERVER_URL}`);
console.log(`Auth Token: ${AUTH_TOKEN.substring(0, 8)}...`);
console.log('');
// Health check
await testHealthCheck();
// SSE connection
const clientId = await testSSEConnection();
if (clientId) {
// Test MCP messages
await testMCPMessage(clientId);
await testToolsList(clientId);
}
// Test legacy endpoint
await testLegacyEndpoint();
// Summary
console.log('\n📊 Test Summary:');
const passed = results.filter(r => r.status === 'passed').length;
const failed = results.filter(r => r.status === 'failed').length;
console.log(`Total: ${results.length}, Passed: ${passed}, Failed: ${failed}`);
if (failed > 0) {
console.log('\n❌ Failed tests:');
results.filter(r => r.status === 'failed').forEach(r => {
console.log(` - ${r.test}: ${r.message}`);
});
process.exit(1);
} else {
console.log('\n✅ All tests passed!');
process.exit(0);
}
}
// Install eventsource if not available
try {
require('eventsource');
} catch {
console.log('Installing eventsource package...');
require('child_process').execSync('npm install --no-save eventsource', { stdio: 'inherit' });
}
// Run tests
runTests().catch(error => {
console.error('Test runner error:', error);
process.exit(1);
});