fix: critical memory leak from per-session database connections (#554)

* fix: critical memory leak from per-session database connections (#542)

Each MCP session was creating its own database connection (~900MB),
causing OOM kills every ~20 minutes with 3-4 concurrent sessions.

Changes:
- Add SharedDatabase singleton pattern - all sessions share ONE connection
- Reduce session timeout from 30 min to 5 min (configurable)
- Add eager cleanup for reconnecting instances
- Fix telemetry event listener leak

Memory impact: ~900MB/session → ~68MB shared + ~5MB/session overhead

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Conceived by Romuald Czlonkowski - https://www.aiadvisors.pl/en

* fix: resolve test failures from shared database race conditions

- Fix `shutdown()` to respect shared database pattern (was directly closing)
- Add `await this.initialized` in both `close()` and `shutdown()` to prevent
  race condition where cleanup runs while initialization is in progress
- Add double-shutdown protection with `isShutdown` flag
- Export `SharedDatabaseState` type for proper typing
- Include error details in debug logs
- Add MCP server close to `shutdown()` for consistency with `close()`
- Null out `earlyLogger` in `shutdown()` for consistency

The CI test failure "The database connection is not open" was caused by:
1. `shutdown()` directly calling `this.db.close()` which closed the SHARED
   database connection, breaking subsequent tests
2. Race condition where `shutdown()` ran before initialization completed

Conceived by Romuald Członkowski - www.aiadvisors.pl/en

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* test: add unit tests for shared-database module

Add comprehensive unit tests covering:
- getSharedDatabase: initialization, reuse, different path error, concurrent requests
- releaseSharedDatabase: refCount decrement, double-release guard
- closeSharedDatabase: state clearing, error handling, re-initialization
- Helper functions: isSharedDatabaseInitialized, getSharedDatabaseRefCount

21 tests covering the singleton database connection pattern used to prevent
~900MB memory leaks per session.

Conceived by Romuald Członkowski - www.aiadvisors.pl/en

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Romuald Członkowski
2026-01-23 19:51:22 +01:00
committed by GitHub
parent fad3437977
commit c8c76e435d
9 changed files with 761 additions and 45 deletions

View File

@@ -7,6 +7,42 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased] ## [Unreleased]
## [2.33.5] - 2026-01-23
### Fixed
- **Critical memory leak: per-session database connections** (Issue #542): Fixed severe memory leak where each MCP session created its own database connection (~900MB per session)
- Root cause: `N8NDocumentationMCPServer` called `createDatabaseAdapter()` for every new session, duplicating the entire 68MB database in memory
- With 3-4 sessions, memory would exceed 4GB causing OOM kills every ~20 minutes
- Fix: Implemented singleton `SharedDatabase` pattern - all sessions now share ONE database connection
- Memory impact: Reduced from ~900MB per session to ~68MB total (shared) + ~5MB per session overhead
- Added `getSharedDatabase()` and `releaseSharedDatabase()` for thread-safe connection management
- Added reference counting to track active sessions using the shared connection
- **Session timeout optimization**: Reduced default session timeout from 30 minutes to 5 minutes
- Faster cleanup of stale sessions reduces memory buildup
- Configurable via `SESSION_TIMEOUT_MINUTES` environment variable
- **Eager instance cleanup**: When a client reconnects, previous sessions for the same instanceId are now immediately cleaned up
- Prevents memory accumulation from reconnecting clients in multi-tenant deployments
- **Telemetry event listener leak**: Fixed event listeners in `TelemetryBatchProcessor` that were never removed
- Added proper cleanup in `stop()` method
- Added guard against multiple `start()` calls
### Added
- **New module: `src/database/shared-database.ts`** - Singleton database manager
- `getSharedDatabase(dbPath)`: Thread-safe initialization with promise lock pattern
- `releaseSharedDatabase(state)`: Reference counting for cleanup
- `closeSharedDatabase()`: Graceful shutdown for process termination
- `isSharedDatabaseInitialized()` and `getSharedDatabaseRefCount()`: Monitoring helpers
### Changed
- **`N8NDocumentationMCPServer.close()`**: Now releases shared database reference instead of closing the connection
- **`SingleSessionHTTPServer.shutdown()`**: Calls `closeSharedDatabase()` during graceful shutdown
## [2.33.4] - 2026-01-21 ## [2.33.4] - 2026-01-21
### Fixed ### Fixed

View File

@@ -1,6 +1,6 @@
{ {
"name": "n8n-mcp", "name": "n8n-mcp",
"version": "2.33.4", "version": "2.33.5",
"description": "Integration between n8n workflow automation and Model Context Protocol (MCP)", "description": "Integration between n8n workflow automation and Model Context Protocol (MCP)",
"main": "dist/index.js", "main": "dist/index.js",
"types": "dist/index.d.ts", "types": "dist/index.d.ts",

View File

@@ -419,12 +419,36 @@ class BetterSQLiteStatement implements PreparedStatement {
/** /**
* Statement wrapper for sql.js * Statement wrapper for sql.js
*
* IMPORTANT: sql.js requires explicit memory management via Statement.free().
* This wrapper automatically frees statement memory after each operation
* to prevent memory leaks during sustained traffic.
*
* See: https://sql.js.org/documentation/Statement.html
* "After calling db.prepare() you must manually free the assigned memory
* by calling Statement.free()."
*/ */
class SQLJSStatement implements PreparedStatement { class SQLJSStatement implements PreparedStatement {
private boundParams: any = null; private boundParams: any = null;
private freed: boolean = false;
constructor(private stmt: any, private onModify: () => void) {} constructor(private stmt: any, private onModify: () => void) {}
/**
* Free the underlying sql.js statement memory.
* Safe to call multiple times - subsequent calls are no-ops.
*/
private freeStatement(): void {
if (!this.freed && this.stmt) {
try {
this.stmt.free();
this.freed = true;
} catch (e) {
// Statement may already be freed or invalid - ignore
}
}
}
run(...params: any[]): RunResult { run(...params: any[]): RunResult {
try { try {
if (params.length > 0) { if (params.length > 0) {
@@ -445,6 +469,9 @@ class SQLJSStatement implements PreparedStatement {
} catch (error) { } catch (error) {
this.stmt.reset(); this.stmt.reset();
throw error; throw error;
} finally {
// Free statement memory after write operation completes
this.freeStatement();
} }
} }
@@ -468,6 +495,9 @@ class SQLJSStatement implements PreparedStatement {
} catch (error) { } catch (error) {
this.stmt.reset(); this.stmt.reset();
throw error; throw error;
} finally {
// Free statement memory after read operation completes
this.freeStatement();
} }
} }
@@ -490,6 +520,9 @@ class SQLJSStatement implements PreparedStatement {
} catch (error) { } catch (error) {
this.stmt.reset(); this.stmt.reset();
throw error; throw error;
} finally {
// Free statement memory after read operation completes
this.freeStatement();
} }
} }

View File

@@ -0,0 +1,197 @@
/**
* Shared Database Manager - Singleton for cross-session database connection
*
* This module implements a singleton pattern to share a single database connection
* across all MCP server sessions. This prevents memory leaks caused by each session
* creating its own database connection (~900MB per session).
*
* Memory impact: Reduces per-session memory from ~900MB to near-zero by sharing
* a single ~68MB database connection across all sessions.
*
* Issue: https://github.com/czlonkowski/n8n-mcp/issues/XXX
*/
import { DatabaseAdapter, createDatabaseAdapter } from './database-adapter';
import { NodeRepository } from './node-repository';
import { TemplateService } from '../templates/template-service';
import { EnhancedConfigValidator } from '../services/enhanced-config-validator';
import { logger } from '../utils/logger';
/**
* Shared database state - holds the singleton connection and services
*/
export interface SharedDatabaseState {
db: DatabaseAdapter;
repository: NodeRepository;
templateService: TemplateService;
dbPath: string;
refCount: number;
initialized: boolean;
}
// Module-level singleton state
let sharedState: SharedDatabaseState | null = null;
let initializationPromise: Promise<SharedDatabaseState> | null = null;
/**
* Get or create the shared database connection
*
* Thread-safe initialization using a promise lock pattern.
* Multiple concurrent calls will wait for the same initialization.
*
* @param dbPath - Path to the SQLite database file
* @returns Shared database state with connection and services
*/
export async function getSharedDatabase(dbPath: string): Promise<SharedDatabaseState> {
// If already initialized with the same path, increment ref count and return
if (sharedState && sharedState.initialized && sharedState.dbPath === dbPath) {
sharedState.refCount++;
logger.debug('Reusing shared database connection', {
refCount: sharedState.refCount,
dbPath
});
return sharedState;
}
// If already initialized with a DIFFERENT path, this is a configuration error
if (sharedState && sharedState.initialized && sharedState.dbPath !== dbPath) {
logger.error('Attempted to initialize shared database with different path', {
existingPath: sharedState.dbPath,
requestedPath: dbPath
});
throw new Error(`Shared database already initialized with different path: ${sharedState.dbPath}`);
}
// If initialization is in progress, wait for it
if (initializationPromise) {
try {
const state = await initializationPromise;
state.refCount++;
logger.debug('Reusing shared database (waited for init)', {
refCount: state.refCount,
dbPath
});
return state;
} catch (error) {
// Initialization failed while we were waiting, clear promise and rethrow
initializationPromise = null;
throw error;
}
}
// Start new initialization
initializationPromise = initializeSharedDatabase(dbPath);
try {
const state = await initializationPromise;
// Clear the promise on success to allow future re-initialization after close
initializationPromise = null;
return state;
} catch (error) {
// Clear promise on failure to allow retry
initializationPromise = null;
throw error;
}
}
/**
* Initialize the shared database connection and services
*/
async function initializeSharedDatabase(dbPath: string): Promise<SharedDatabaseState> {
logger.info('Initializing shared database connection', { dbPath });
const db = await createDatabaseAdapter(dbPath);
const repository = new NodeRepository(db);
const templateService = new TemplateService(db);
// Initialize similarity services for enhanced validation
EnhancedConfigValidator.initializeSimilarityServices(repository);
sharedState = {
db,
repository,
templateService,
dbPath,
refCount: 1,
initialized: true
};
logger.info('Shared database initialized successfully', {
dbPath,
refCount: sharedState.refCount
});
return sharedState;
}
/**
* Release a reference to the shared database
*
* Decrements the reference count. Does NOT close the database
* as it's shared across all sessions for the lifetime of the process.
*
* @param state - The shared database state to release
*/
export function releaseSharedDatabase(state: SharedDatabaseState): void {
if (!state || !sharedState) {
return;
}
// Guard against double-release (refCount going negative)
if (sharedState.refCount <= 0) {
logger.warn('Attempted to release shared database with refCount already at or below 0', {
refCount: sharedState.refCount
});
return;
}
sharedState.refCount--;
logger.debug('Released shared database reference', {
refCount: sharedState.refCount
});
// Note: We intentionally do NOT close the database even when refCount hits 0
// The database should remain open for the lifetime of the process to handle
// new sessions. Only process shutdown should close it.
}
/**
* Force close the shared database (for graceful shutdown only)
*
* This should only be called during process shutdown, not during normal
* session cleanup. Closing the database would break other active sessions.
*/
export async function closeSharedDatabase(): Promise<void> {
if (!sharedState) {
return;
}
logger.info('Closing shared database connection', {
refCount: sharedState.refCount
});
try {
sharedState.db.close();
} catch (error) {
logger.warn('Error closing shared database', {
error: error instanceof Error ? error.message : String(error)
});
}
sharedState = null;
initializationPromise = null;
}
/**
* Check if shared database is initialized
*/
export function isSharedDatabaseInitialized(): boolean {
return sharedState !== null && sharedState.initialized;
}
/**
* Get current reference count (for debugging/monitoring)
*/
export function getSharedDatabaseRefCount(): number {
return sharedState?.refCount ?? 0;
}

View File

@@ -26,6 +26,7 @@ import {
} from './utils/protocol-version'; } from './utils/protocol-version';
import { InstanceContext, validateInstanceContext } from './types/instance-context'; import { InstanceContext, validateInstanceContext } from './types/instance-context';
import { SessionState } from './types/session-state'; import { SessionState } from './types/session-state';
import { closeSharedDatabase } from './database/shared-database';
dotenv.config(); dotenv.config();
@@ -106,7 +107,12 @@ export class SingleSessionHTTPServer {
private session: Session | null = null; // Keep for SSE compatibility private session: Session | null = null; // Keep for SSE compatibility
private consoleManager = new ConsoleManager(); private consoleManager = new ConsoleManager();
private expressServer: any; private expressServer: any;
private sessionTimeout = 30 * 60 * 1000; // 30 minutes // Session timeout reduced from 30 minutes to 5 minutes for faster cleanup
// Configurable via SESSION_TIMEOUT_MINUTES environment variable
// This prevents memory buildup from stale sessions
private sessionTimeout = parseInt(
process.env.SESSION_TIMEOUT_MINUTES || '5', 10
) * 60 * 1000;
private authToken: string | null = null; private authToken: string | null = null;
private cleanupTimer: NodeJS.Timeout | null = null; private cleanupTimer: NodeJS.Timeout | null = null;
@@ -492,6 +498,29 @@ export class SingleSessionHTTPServer {
// For initialize requests: always create new transport and server // For initialize requests: always create new transport and server
logger.info('handleRequest: Creating new transport for initialize request'); logger.info('handleRequest: Creating new transport for initialize request');
// EAGER CLEANUP: Remove existing sessions for the same instance
// This prevents memory buildup when clients reconnect without proper cleanup
if (instanceContext?.instanceId) {
const sessionsToRemove: string[] = [];
for (const [existingSessionId, context] of Object.entries(this.sessionContexts)) {
if (context?.instanceId === instanceContext.instanceId) {
sessionsToRemove.push(existingSessionId);
}
}
for (const oldSessionId of sessionsToRemove) {
// Double-check session still exists (may have been cleaned by concurrent request)
if (!this.transports[oldSessionId]) {
continue;
}
logger.info('Cleaning up previous session for instance', {
instanceId: instanceContext.instanceId,
oldSession: oldSessionId,
reason: 'instance_reconnect'
});
await this.removeSession(oldSessionId, 'instance_reconnect');
}
}
// Generate session ID based on multi-tenant configuration // Generate session ID based on multi-tenant configuration
let sessionIdToUse: string; let sessionIdToUse: string;
@@ -1423,6 +1452,15 @@ export class SingleSessionHTTPServer {
}); });
} }
// Close the shared database connection (only during process shutdown)
// This must happen after all sessions are closed
try {
await closeSharedDatabase();
logger.info('Shared database closed');
} catch (error) {
logger.warn('Error closing shared database:', error);
}
logger.info('Single-Session HTTP server shutdown completed'); logger.info('Single-Session HTTP server shutdown completed');
} }

View File

@@ -14,6 +14,7 @@ import { getWorkflowExampleString } from './workflow-examples';
import { logger } from '../utils/logger'; import { logger } from '../utils/logger';
import { NodeRepository } from '../database/node-repository'; import { NodeRepository } from '../database/node-repository';
import { DatabaseAdapter, createDatabaseAdapter } from '../database/database-adapter'; import { DatabaseAdapter, createDatabaseAdapter } from '../database/database-adapter';
import { getSharedDatabase, releaseSharedDatabase, SharedDatabaseState } from '../database/shared-database';
import { PropertyFilter } from '../services/property-filter'; import { PropertyFilter } from '../services/property-filter';
import { TaskTemplates } from '../services/task-templates'; import { TaskTemplates } from '../services/task-templates';
import { ConfigValidator } from '../services/config-validator'; import { ConfigValidator } from '../services/config-validator';
@@ -150,6 +151,9 @@ export class N8NDocumentationMCPServer {
private previousToolTimestamp: number = Date.now(); private previousToolTimestamp: number = Date.now();
private earlyLogger: EarlyErrorLogger | null = null; private earlyLogger: EarlyErrorLogger | null = null;
private disabledToolsCache: Set<string> | null = null; private disabledToolsCache: Set<string> | null = null;
private useSharedDatabase: boolean = false; // Track if using shared DB for cleanup
private sharedDbState: SharedDatabaseState | null = null; // Reference to shared DB state for release
private isShutdown: boolean = false; // Prevent double-shutdown
constructor(instanceContext?: InstanceContext, earlyLogger?: EarlyErrorLogger) { constructor(instanceContext?: InstanceContext, earlyLogger?: EarlyErrorLogger) {
this.instanceContext = instanceContext; this.instanceContext = instanceContext;
@@ -245,18 +249,39 @@ export class N8NDocumentationMCPServer {
* Order of cleanup: * Order of cleanup:
* 1. Close MCP server connection * 1. Close MCP server connection
* 2. Destroy cache (clears entries AND stops cleanup timer) * 2. Destroy cache (clears entries AND stops cleanup timer)
* 3. Close database connection * 3. Release shared database OR close dedicated connection
* 4. Null out references to help GC * 4. Null out references to help GC
*
* IMPORTANT: For shared databases, we only release the reference (decrement refCount),
* NOT close the database. The database stays open for other sessions.
* For in-memory databases (tests), we close the dedicated connection.
*/ */
async close(): Promise<void> { async close(): Promise<void> {
// Wait for initialization to complete (or fail) before cleanup
// This prevents race conditions where close runs while init is in progress
try {
await this.initialized;
} catch (error) {
// Initialization failed - that's OK, we still need to clean up
logger.debug('Initialization had failed, proceeding with cleanup', {
error: error instanceof Error ? error.message : String(error)
});
}
try { try {
await this.server.close(); await this.server.close();
// Use destroy() not clear() - also stops the cleanup timer // Use destroy() not clear() - also stops the cleanup timer
this.cache.destroy(); this.cache.destroy();
// Close database connection before nullifying reference // Handle database cleanup based on whether it's shared or dedicated
if (this.db) { if (this.useSharedDatabase && this.sharedDbState) {
// Shared database: release reference, don't close
// The database stays open for other sessions
releaseSharedDatabase(this.sharedDbState);
logger.debug('Released shared database reference');
} else if (this.db) {
// Dedicated database (in-memory for tests): close it
try { try {
this.db.close(); this.db.close();
} catch (dbError) { } catch (dbError) {
@@ -271,6 +296,7 @@ export class N8NDocumentationMCPServer {
this.repository = null; this.repository = null;
this.templateService = null; this.templateService = null;
this.earlyLogger = null; this.earlyLogger = null;
this.sharedDbState = null;
} catch (error) { } catch (error) {
// Log but don't throw - cleanup should be best-effort // Log but don't throw - cleanup should be best-effort
logger.warn('Error closing MCP server', { error: error instanceof Error ? error.message : String(error) }); logger.warn('Error closing MCP server', { error: error instanceof Error ? error.message : String(error) });
@@ -286,23 +312,32 @@ export class N8NDocumentationMCPServer {
logger.debug('Database initialization starting...', { dbPath }); logger.debug('Database initialization starting...', { dbPath });
this.db = await createDatabaseAdapter(dbPath); // For in-memory databases (tests), create a dedicated connection
logger.debug('Database adapter created'); // For regular databases, use the shared connection to prevent memory leaks
// If using in-memory database for tests, initialize schema
if (dbPath === ':memory:') { if (dbPath === ':memory:') {
this.db = await createDatabaseAdapter(dbPath);
logger.debug('Database adapter created (in-memory mode)');
await this.initializeInMemorySchema(); await this.initializeInMemorySchema();
logger.debug('In-memory schema initialized'); logger.debug('In-memory schema initialized');
this.repository = new NodeRepository(this.db);
this.templateService = new TemplateService(this.db);
// Initialize similarity services for enhanced validation
EnhancedConfigValidator.initializeSimilarityServices(this.repository);
this.useSharedDatabase = false;
} else {
// Use shared database connection to prevent ~900MB memory leak per session
// See: Memory leak fix - database was being duplicated per session
const sharedState = await getSharedDatabase(dbPath);
this.db = sharedState.db;
this.repository = sharedState.repository;
this.templateService = sharedState.templateService;
this.sharedDbState = sharedState;
this.useSharedDatabase = true;
logger.debug('Using shared database connection');
} }
this.repository = new NodeRepository(this.db);
logger.debug('Node repository initialized'); logger.debug('Node repository initialized');
this.templateService = new TemplateService(this.db);
logger.debug('Template service initialized'); logger.debug('Template service initialized');
// Initialize similarity services for enhanced validation
EnhancedConfigValidator.initializeSimilarityServices(this.repository);
logger.debug('Similarity services initialized'); logger.debug('Similarity services initialized');
// Checkpoint: Database connected (v2.18.3) // Checkpoint: Database connected (v2.18.3)
@@ -3910,8 +3945,33 @@ Full documentation is being prepared. For now, use get_node_essentials for confi
} }
async shutdown(): Promise<void> { async shutdown(): Promise<void> {
// Prevent double-shutdown
if (this.isShutdown) {
logger.debug('Shutdown already called, skipping');
return;
}
this.isShutdown = true;
logger.info('Shutting down MCP server...'); logger.info('Shutting down MCP server...');
// Wait for initialization to complete (or fail) before cleanup
// This prevents race conditions where shutdown runs while init is in progress
try {
await this.initialized;
} catch (error) {
// Initialization failed - that's OK, we still need to clean up
logger.debug('Initialization had failed, proceeding with cleanup', {
error: error instanceof Error ? error.message : String(error)
});
}
// Close MCP server connection (for consistency with close() method)
try {
await this.server.close();
} catch (error) {
logger.error('Error closing MCP server:', error);
}
// Clean up cache timers to prevent memory leaks // Clean up cache timers to prevent memory leaks
if (this.cache) { if (this.cache) {
try { try {
@@ -3922,14 +3982,30 @@ Full documentation is being prepared. For now, use get_node_essentials for confi
} }
} }
// Close database connection if it exists // Handle database cleanup based on whether it's shared or dedicated
if (this.db) { // For shared databases, we only release the reference (decrement refCount)
// For dedicated databases (in-memory for tests), we close the connection
if (this.useSharedDatabase && this.sharedDbState) {
try { try {
await this.db.close(); releaseSharedDatabase(this.sharedDbState);
logger.info('Released shared database reference');
} catch (error) {
logger.error('Error releasing shared database:', error);
}
} else if (this.db) {
try {
this.db.close();
logger.info('Database connection closed'); logger.info('Database connection closed');
} catch (error) { } catch (error) {
logger.error('Error closing database:', error); logger.error('Error closing database:', error);
} }
} }
// Null out references to help garbage collection
this.db = null;
this.repository = null;
this.templateService = null;
this.earlyLogger = null;
this.sharedDbState = null;
} }
} }

View File

@@ -58,6 +58,13 @@ export class TelemetryBatchProcessor {
private flushTimes: number[] = []; private flushTimes: number[] = [];
private deadLetterQueue: (TelemetryEvent | WorkflowTelemetry | WorkflowMutationRecord)[] = []; private deadLetterQueue: (TelemetryEvent | WorkflowTelemetry | WorkflowMutationRecord)[] = [];
private readonly maxDeadLetterSize = 100; private readonly maxDeadLetterSize = 100;
// Track event listeners for proper cleanup to prevent memory leaks
private eventListeners: {
beforeExit?: () => void;
sigint?: () => void;
sigterm?: () => void;
} = {};
private started: boolean = false;
constructor( constructor(
private supabase: SupabaseClient | null, private supabase: SupabaseClient | null,
@@ -72,6 +79,12 @@ export class TelemetryBatchProcessor {
start(): void { start(): void {
if (!this.isEnabled() || !this.supabase) return; if (!this.isEnabled() || !this.supabase) return;
// Guard against multiple starts (prevents event listener accumulation)
if (this.started) {
logger.debug('Telemetry batch processor already started, skipping');
return;
}
// Set up periodic flushing // Set up periodic flushing
this.flushTimer = setInterval(() => { this.flushTimer = setInterval(() => {
this.flush(); this.flush();
@@ -83,17 +96,22 @@ export class TelemetryBatchProcessor {
this.flushTimer.unref(); this.flushTimer.unref();
} }
// Set up process exit handlers // Set up process exit handlers with stored references for cleanup
process.on('beforeExit', () => this.flush()); this.eventListeners.beforeExit = () => this.flush();
process.on('SIGINT', () => { this.eventListeners.sigint = () => {
this.flush(); this.flush();
process.exit(0); process.exit(0);
}); };
process.on('SIGTERM', () => { this.eventListeners.sigterm = () => {
this.flush(); this.flush();
process.exit(0); process.exit(0);
}); };
process.on('beforeExit', this.eventListeners.beforeExit);
process.on('SIGINT', this.eventListeners.sigint);
process.on('SIGTERM', this.eventListeners.sigterm);
this.started = true;
logger.debug('Telemetry batch processor started'); logger.debug('Telemetry batch processor started');
} }
@@ -105,6 +123,20 @@ export class TelemetryBatchProcessor {
clearInterval(this.flushTimer); clearInterval(this.flushTimer);
this.flushTimer = undefined; this.flushTimer = undefined;
} }
// Remove event listeners to prevent memory leaks
if (this.eventListeners.beforeExit) {
process.removeListener('beforeExit', this.eventListeners.beforeExit);
}
if (this.eventListeners.sigint) {
process.removeListener('SIGINT', this.eventListeners.sigint);
}
if (this.eventListeners.sigterm) {
process.removeListener('SIGTERM', this.eventListeners.sigterm);
}
this.eventListeners = {};
this.started = false;
logger.debug('Telemetry batch processor stopped'); logger.debug('Telemetry batch processor stopped');
} }

View File

@@ -0,0 +1,302 @@
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
// Mock dependencies at module level
const mockDb = {
prepare: vi.fn().mockReturnValue({
get: vi.fn(),
all: vi.fn(),
run: vi.fn()
}),
exec: vi.fn(),
close: vi.fn(),
pragma: vi.fn(),
inTransaction: false,
transaction: vi.fn(),
checkFTS5Support: vi.fn()
};
vi.mock('../../../src/database/database-adapter', () => ({
createDatabaseAdapter: vi.fn().mockResolvedValue(mockDb)
}));
vi.mock('../../../src/database/node-repository', () => ({
NodeRepository: vi.fn().mockImplementation(() => ({
getNodeTypes: vi.fn().mockReturnValue([])
}))
}));
vi.mock('../../../src/templates/template-service', () => ({
TemplateService: vi.fn().mockImplementation(() => ({}))
}));
vi.mock('../../../src/services/enhanced-config-validator', () => ({
EnhancedConfigValidator: {
initializeSimilarityServices: vi.fn()
}
}));
vi.mock('../../../src/utils/logger', () => ({
logger: {
debug: vi.fn(),
info: vi.fn(),
warn: vi.fn(),
error: vi.fn()
}
}));
describe('Shared Database Module', () => {
let sharedDbModule: typeof import('../../../src/database/shared-database');
let createDatabaseAdapter: ReturnType<typeof vi.fn>;
beforeEach(async () => {
// Reset all mocks
vi.clearAllMocks();
mockDb.close.mockReset();
// Reset modules to get fresh state
vi.resetModules();
// Import fresh module
sharedDbModule = await import('../../../src/database/shared-database');
// Get the mocked function
const adapterModule = await import('../../../src/database/database-adapter');
createDatabaseAdapter = adapterModule.createDatabaseAdapter as ReturnType<typeof vi.fn>;
});
afterEach(async () => {
// Clean up any shared state by closing
try {
await sharedDbModule.closeSharedDatabase();
} catch {
// Ignore errors during cleanup
}
});
describe('getSharedDatabase', () => {
it('should initialize database on first call', async () => {
const state = await sharedDbModule.getSharedDatabase('/path/to/db');
expect(state).toBeDefined();
expect(state.db).toBe(mockDb);
expect(state.dbPath).toBe('/path/to/db');
expect(state.refCount).toBe(1);
expect(state.initialized).toBe(true);
expect(createDatabaseAdapter).toHaveBeenCalledWith('/path/to/db');
});
it('should reuse existing connection and increment refCount', async () => {
// First call initializes
const state1 = await sharedDbModule.getSharedDatabase('/path/to/db');
expect(state1.refCount).toBe(1);
// Second call reuses
const state2 = await sharedDbModule.getSharedDatabase('/path/to/db');
expect(state2.refCount).toBe(2);
// Same object
expect(state1).toBe(state2);
// Only initialized once
expect(createDatabaseAdapter).toHaveBeenCalledTimes(1);
});
it('should throw error when called with different path', async () => {
await sharedDbModule.getSharedDatabase('/path/to/db1');
await expect(sharedDbModule.getSharedDatabase('/path/to/db2'))
.rejects.toThrow('Shared database already initialized with different path');
});
it('should handle concurrent initialization requests', async () => {
// Start two requests concurrently
const [state1, state2] = await Promise.all([
sharedDbModule.getSharedDatabase('/path/to/db'),
sharedDbModule.getSharedDatabase('/path/to/db')
]);
// Both should get the same state
expect(state1).toBe(state2);
// RefCount should be 2 (one for each call)
expect(state1.refCount).toBe(2);
// Only one actual initialization
expect(createDatabaseAdapter).toHaveBeenCalledTimes(1);
});
it('should handle initialization failure', async () => {
createDatabaseAdapter.mockRejectedValueOnce(new Error('DB error'));
await expect(sharedDbModule.getSharedDatabase('/path/to/db'))
.rejects.toThrow('DB error');
// After failure, should not be initialized
expect(sharedDbModule.isSharedDatabaseInitialized()).toBe(false);
});
it('should allow retry after initialization failure', async () => {
// First call fails
createDatabaseAdapter.mockRejectedValueOnce(new Error('DB error'));
await expect(sharedDbModule.getSharedDatabase('/path/to/db'))
.rejects.toThrow('DB error');
// Reset mock for successful call
createDatabaseAdapter.mockResolvedValueOnce(mockDb);
// Second call succeeds
const state = await sharedDbModule.getSharedDatabase('/path/to/db');
expect(state).toBeDefined();
expect(state.initialized).toBe(true);
});
});
describe('releaseSharedDatabase', () => {
it('should decrement refCount', async () => {
const state = await sharedDbModule.getSharedDatabase('/path/to/db');
expect(state.refCount).toBe(1);
sharedDbModule.releaseSharedDatabase(state);
expect(sharedDbModule.getSharedDatabaseRefCount()).toBe(0);
});
it('should not decrement below 0', async () => {
const state = await sharedDbModule.getSharedDatabase('/path/to/db');
// Release once (refCount: 1 -> 0)
sharedDbModule.releaseSharedDatabase(state);
expect(sharedDbModule.getSharedDatabaseRefCount()).toBe(0);
// Release again (should stay at 0, not go negative)
sharedDbModule.releaseSharedDatabase(state);
expect(sharedDbModule.getSharedDatabaseRefCount()).toBe(0);
});
it('should handle null state gracefully', () => {
// Should not throw
sharedDbModule.releaseSharedDatabase(null as any);
});
it('should not close database when refCount hits 0', async () => {
const state = await sharedDbModule.getSharedDatabase('/path/to/db');
sharedDbModule.releaseSharedDatabase(state);
expect(sharedDbModule.getSharedDatabaseRefCount()).toBe(0);
expect(mockDb.close).not.toHaveBeenCalled();
// Database should still be accessible
expect(sharedDbModule.isSharedDatabaseInitialized()).toBe(true);
});
});
describe('closeSharedDatabase', () => {
it('should close database and clear state', async () => {
// Get state
await sharedDbModule.getSharedDatabase('/path/to/db');
expect(sharedDbModule.isSharedDatabaseInitialized()).toBe(true);
expect(sharedDbModule.getSharedDatabaseRefCount()).toBe(1);
await sharedDbModule.closeSharedDatabase();
// State should be cleared
expect(sharedDbModule.isSharedDatabaseInitialized()).toBe(false);
expect(sharedDbModule.getSharedDatabaseRefCount()).toBe(0);
});
it('should handle close error gracefully', async () => {
await sharedDbModule.getSharedDatabase('/path/to/db');
mockDb.close.mockImplementationOnce(() => {
throw new Error('Close error');
});
// Should not throw
await sharedDbModule.closeSharedDatabase();
// State should still be cleared
expect(sharedDbModule.isSharedDatabaseInitialized()).toBe(false);
});
it('should be idempotent when already closed', async () => {
// Close without ever initializing
await sharedDbModule.closeSharedDatabase();
// Should not throw
await sharedDbModule.closeSharedDatabase();
});
it('should allow re-initialization after close', async () => {
// Initialize
const state1 = await sharedDbModule.getSharedDatabase('/path/to/db');
expect(state1.refCount).toBe(1);
// Close
await sharedDbModule.closeSharedDatabase();
expect(sharedDbModule.isSharedDatabaseInitialized()).toBe(false);
// Re-initialize
const state2 = await sharedDbModule.getSharedDatabase('/path/to/db');
expect(state2.refCount).toBe(1);
expect(sharedDbModule.isSharedDatabaseInitialized()).toBe(true);
// Should be a new state object
expect(state1).not.toBe(state2);
});
});
describe('isSharedDatabaseInitialized', () => {
it('should return false before initialization', () => {
expect(sharedDbModule.isSharedDatabaseInitialized()).toBe(false);
});
it('should return true after initialization', async () => {
await sharedDbModule.getSharedDatabase('/path/to/db');
expect(sharedDbModule.isSharedDatabaseInitialized()).toBe(true);
});
it('should return false after close', async () => {
await sharedDbModule.getSharedDatabase('/path/to/db');
await sharedDbModule.closeSharedDatabase();
expect(sharedDbModule.isSharedDatabaseInitialized()).toBe(false);
});
});
describe('getSharedDatabaseRefCount', () => {
it('should return 0 before initialization', () => {
expect(sharedDbModule.getSharedDatabaseRefCount()).toBe(0);
});
it('should return correct refCount after multiple operations', async () => {
const state = await sharedDbModule.getSharedDatabase('/path/to/db');
expect(sharedDbModule.getSharedDatabaseRefCount()).toBe(1);
await sharedDbModule.getSharedDatabase('/path/to/db');
expect(sharedDbModule.getSharedDatabaseRefCount()).toBe(2);
await sharedDbModule.getSharedDatabase('/path/to/db');
expect(sharedDbModule.getSharedDatabaseRefCount()).toBe(3);
sharedDbModule.releaseSharedDatabase(state);
expect(sharedDbModule.getSharedDatabaseRefCount()).toBe(2);
});
it('should return 0 after close', async () => {
await sharedDbModule.getSharedDatabase('/path/to/db');
await sharedDbModule.closeSharedDatabase();
expect(sharedDbModule.getSharedDatabaseRefCount()).toBe(0);
});
});
describe('SharedDatabaseState interface', () => {
it('should expose correct properties', async () => {
const state = await sharedDbModule.getSharedDatabase('/path/to/db');
expect(state).toHaveProperty('db');
expect(state).toHaveProperty('repository');
expect(state).toHaveProperty('templateService');
expect(state).toHaveProperty('dbPath');
expect(state).toHaveProperty('refCount');
expect(state).toHaveProperty('initialized');
});
});
});

View File

@@ -333,13 +333,14 @@ describe('HTTP Server Session Management', () => {
server = new SingleSessionHTTPServer(); server = new SingleSessionHTTPServer();
// Mock expired sessions // Mock expired sessions
// Note: Default session timeout is 5 minutes (configurable via SESSION_TIMEOUT_MINUTES)
const mockSessionMetadata = { const mockSessionMetadata = {
'session-1': { 'session-1': {
lastAccess: new Date(Date.now() - 40 * 60 * 1000), // 40 minutes ago (expired) lastAccess: new Date(Date.now() - 10 * 60 * 1000), // 10 minutes ago (expired with 5 min timeout)
createdAt: new Date(Date.now() - 60 * 60 * 1000) createdAt: new Date(Date.now() - 60 * 60 * 1000)
}, },
'session-2': { 'session-2': {
lastAccess: new Date(Date.now() - 10 * 60 * 1000), // 10 minutes ago (not expired) lastAccess: new Date(Date.now() - 2 * 60 * 1000), // 2 minutes ago (not expired with 5 min timeout)
createdAt: new Date(Date.now() - 20 * 60 * 1000) createdAt: new Date(Date.now() - 20 * 60 * 1000)
} }
}; };
@@ -515,14 +516,15 @@ describe('HTTP Server Session Management', () => {
it('should get session metrics correctly', async () => { it('should get session metrics correctly', async () => {
server = new SingleSessionHTTPServer(); server = new SingleSessionHTTPServer();
// Note: Default session timeout is 5 minutes (configurable via SESSION_TIMEOUT_MINUTES)
const now = Date.now(); const now = Date.now();
(server as any).sessionMetadata = { (server as any).sessionMetadata = {
'active-session': { 'active-session': {
lastAccess: new Date(now - 10 * 60 * 1000), // 10 minutes ago lastAccess: new Date(now - 2 * 60 * 1000), // 2 minutes ago (not expired with 5 min timeout)
createdAt: new Date(now - 20 * 60 * 1000) createdAt: new Date(now - 20 * 60 * 1000)
}, },
'expired-session': { 'expired-session': {
lastAccess: new Date(now - 40 * 60 * 1000), // 40 minutes ago (expired) lastAccess: new Date(now - 10 * 60 * 1000), // 10 minutes ago (expired with 5 min timeout)
createdAt: new Date(now - 60 * 60 * 1000) createdAt: new Date(now - 60 * 60 * 1000)
} }
}; };