mirror of
https://github.com/czlonkowski/n8n-mcp.git
synced 2026-02-06 05:23:08 +00:00
- Fix fake timer issues in rate-limiter and batch-processor tests - Add proper timer handling for vitest fake timers - Handle timer.unref() compatibility with fake timers - Add test environment detection to skip timeouts in tests This resolves the CI timeout issues where tests would hang indefinitely.
662 lines
22 KiB
TypeScript
662 lines
22 KiB
TypeScript
import { describe, it, expect, beforeEach, vi, afterEach, beforeAll, afterAll } from 'vitest';
|
|
import { TelemetryBatchProcessor } from '../../../src/telemetry/batch-processor';
|
|
import { TelemetryEvent, WorkflowTelemetry, TELEMETRY_CONFIG } from '../../../src/telemetry/telemetry-types';
|
|
import { TelemetryError, TelemetryErrorType } from '../../../src/telemetry/telemetry-error';
|
|
import type { SupabaseClient } from '@supabase/supabase-js';
|
|
|
|
// Mock logger to avoid console output in tests
|
|
vi.mock('../../../src/utils/logger', () => ({
|
|
logger: {
|
|
debug: vi.fn(),
|
|
info: vi.fn(),
|
|
warn: vi.fn(),
|
|
error: vi.fn(),
|
|
}
|
|
}));
|
|
|
|
describe('TelemetryBatchProcessor', () => {
|
|
let batchProcessor: TelemetryBatchProcessor;
|
|
let mockSupabase: SupabaseClient;
|
|
let mockIsEnabled: vi.Mock;
|
|
let mockProcessExit: vi.SpyInstance;
|
|
|
|
const createMockSupabaseResponse = (error: any = null) => ({
|
|
data: null,
|
|
error,
|
|
status: error ? 400 : 200,
|
|
statusText: error ? 'Bad Request' : 'OK'
|
|
});
|
|
|
|
beforeEach(() => {
|
|
vi.useFakeTimers();
|
|
mockIsEnabled = vi.fn().mockReturnValue(true);
|
|
|
|
mockSupabase = {
|
|
from: vi.fn().mockReturnValue({
|
|
insert: vi.fn().mockResolvedValue(createMockSupabaseResponse())
|
|
})
|
|
} as any;
|
|
|
|
// Mock process events to prevent actual exit
|
|
mockProcessExit = vi.spyOn(process, 'exit').mockImplementation(() => undefined as never);
|
|
|
|
vi.clearAllMocks();
|
|
|
|
batchProcessor = new TelemetryBatchProcessor(mockSupabase, mockIsEnabled);
|
|
});
|
|
|
|
afterEach(() => {
|
|
// Stop the batch processor to clear any intervals
|
|
batchProcessor.stop();
|
|
mockProcessExit.mockRestore();
|
|
vi.clearAllTimers();
|
|
vi.useRealTimers();
|
|
});
|
|
|
|
describe('start()', () => {
|
|
it('should start periodic flushing when enabled', () => {
|
|
const setIntervalSpy = vi.spyOn(global, 'setInterval');
|
|
|
|
batchProcessor.start();
|
|
|
|
expect(setIntervalSpy).toHaveBeenCalledWith(
|
|
expect.any(Function),
|
|
TELEMETRY_CONFIG.BATCH_FLUSH_INTERVAL
|
|
);
|
|
});
|
|
|
|
it('should not start when disabled', () => {
|
|
mockIsEnabled.mockReturnValue(false);
|
|
const setIntervalSpy = vi.spyOn(global, 'setInterval');
|
|
|
|
batchProcessor.start();
|
|
|
|
expect(setIntervalSpy).not.toHaveBeenCalled();
|
|
});
|
|
|
|
it('should not start without Supabase client', () => {
|
|
const processor = new TelemetryBatchProcessor(null, mockIsEnabled);
|
|
const setIntervalSpy = vi.spyOn(global, 'setInterval');
|
|
|
|
processor.start();
|
|
|
|
expect(setIntervalSpy).not.toHaveBeenCalled();
|
|
processor.stop();
|
|
});
|
|
|
|
it('should set up process exit handlers', () => {
|
|
const onSpy = vi.spyOn(process, 'on');
|
|
|
|
batchProcessor.start();
|
|
|
|
expect(onSpy).toHaveBeenCalledWith('beforeExit', expect.any(Function));
|
|
expect(onSpy).toHaveBeenCalledWith('SIGINT', expect.any(Function));
|
|
expect(onSpy).toHaveBeenCalledWith('SIGTERM', expect.any(Function));
|
|
});
|
|
});
|
|
|
|
describe('stop()', () => {
|
|
it('should clear flush timer', () => {
|
|
const clearIntervalSpy = vi.spyOn(global, 'clearInterval');
|
|
|
|
batchProcessor.start();
|
|
batchProcessor.stop();
|
|
|
|
expect(clearIntervalSpy).toHaveBeenCalled();
|
|
});
|
|
});
|
|
|
|
describe('flush()', () => {
|
|
const mockEvents: TelemetryEvent[] = [
|
|
{
|
|
user_id: 'user1',
|
|
event: 'tool_used',
|
|
properties: { tool: 'httpRequest', success: true }
|
|
},
|
|
{
|
|
user_id: 'user2',
|
|
event: 'tool_used',
|
|
properties: { tool: 'webhook', success: false }
|
|
}
|
|
];
|
|
|
|
const mockWorkflows: WorkflowTelemetry[] = [
|
|
{
|
|
user_id: 'user1',
|
|
workflow_hash: 'hash1',
|
|
node_count: 3,
|
|
node_types: ['webhook', 'httpRequest', 'set'],
|
|
has_trigger: true,
|
|
has_webhook: true,
|
|
complexity: 'medium',
|
|
sanitized_workflow: { nodes: [], connections: {} }
|
|
}
|
|
];
|
|
|
|
it('should flush events successfully', async () => {
|
|
await batchProcessor.flush(mockEvents);
|
|
|
|
expect(mockSupabase.from).toHaveBeenCalledWith('telemetry_events');
|
|
expect(mockSupabase.from('telemetry_events').insert).toHaveBeenCalledWith(mockEvents);
|
|
|
|
const metrics = batchProcessor.getMetrics();
|
|
expect(metrics.eventsTracked).toBe(2);
|
|
expect(metrics.batchesSent).toBe(1);
|
|
});
|
|
|
|
it('should flush workflows successfully', async () => {
|
|
await batchProcessor.flush(undefined, mockWorkflows);
|
|
|
|
expect(mockSupabase.from).toHaveBeenCalledWith('telemetry_workflows');
|
|
expect(mockSupabase.from('telemetry_workflows').insert).toHaveBeenCalledWith(mockWorkflows);
|
|
|
|
const metrics = batchProcessor.getMetrics();
|
|
expect(metrics.eventsTracked).toBe(1);
|
|
expect(metrics.batchesSent).toBe(1);
|
|
});
|
|
|
|
it('should flush both events and workflows', async () => {
|
|
await batchProcessor.flush(mockEvents, mockWorkflows);
|
|
|
|
expect(mockSupabase.from).toHaveBeenCalledWith('telemetry_events');
|
|
expect(mockSupabase.from).toHaveBeenCalledWith('telemetry_workflows');
|
|
|
|
const metrics = batchProcessor.getMetrics();
|
|
expect(metrics.eventsTracked).toBe(3); // 2 events + 1 workflow
|
|
expect(metrics.batchesSent).toBe(2);
|
|
});
|
|
|
|
it('should not flush when disabled', async () => {
|
|
mockIsEnabled.mockReturnValue(false);
|
|
|
|
await batchProcessor.flush(mockEvents, mockWorkflows);
|
|
|
|
expect(mockSupabase.from).not.toHaveBeenCalled();
|
|
});
|
|
|
|
it('should not flush without Supabase client', async () => {
|
|
const processor = new TelemetryBatchProcessor(null, mockIsEnabled);
|
|
|
|
await processor.flush(mockEvents);
|
|
|
|
expect(mockSupabase.from).not.toHaveBeenCalled();
|
|
});
|
|
|
|
it('should skip flush when circuit breaker is open', async () => {
|
|
// Open circuit breaker by failing multiple times
|
|
const errorResponse = createMockSupabaseResponse(new Error('Network error'));
|
|
vi.mocked(mockSupabase.from('telemetry_events').insert).mockResolvedValue(errorResponse);
|
|
|
|
// Fail enough times to open circuit breaker (5 by default)
|
|
for (let i = 0; i < 5; i++) {
|
|
await batchProcessor.flush(mockEvents);
|
|
}
|
|
|
|
const metrics = batchProcessor.getMetrics();
|
|
expect(metrics.circuitBreakerState.state).toBe('open');
|
|
|
|
// Next flush should be skipped
|
|
vi.clearAllMocks();
|
|
await batchProcessor.flush(mockEvents);
|
|
|
|
expect(mockSupabase.from).not.toHaveBeenCalled();
|
|
expect(batchProcessor.getMetrics().eventsDropped).toBeGreaterThan(0);
|
|
});
|
|
|
|
it('should record flush time metrics', async () => {
|
|
const startTime = Date.now();
|
|
await batchProcessor.flush(mockEvents);
|
|
|
|
const metrics = batchProcessor.getMetrics();
|
|
expect(metrics.averageFlushTime).toBeGreaterThanOrEqual(0);
|
|
expect(metrics.lastFlushTime).toBeGreaterThanOrEqual(0);
|
|
});
|
|
});
|
|
|
|
describe('batch creation', () => {
|
|
it('should create single batch for small datasets', async () => {
|
|
const events: TelemetryEvent[] = Array.from({ length: 10 }, (_, i) => ({
|
|
user_id: `user${i}`,
|
|
event: 'test_event',
|
|
properties: { index: i }
|
|
}));
|
|
|
|
await batchProcessor.flush(events);
|
|
|
|
expect(mockSupabase.from('telemetry_events').insert).toHaveBeenCalledTimes(1);
|
|
expect(mockSupabase.from('telemetry_events').insert).toHaveBeenCalledWith(events);
|
|
});
|
|
|
|
it('should create multiple batches for large datasets', async () => {
|
|
const events: TelemetryEvent[] = Array.from({ length: 75 }, (_, i) => ({
|
|
user_id: `user${i}`,
|
|
event: 'test_event',
|
|
properties: { index: i }
|
|
}));
|
|
|
|
await batchProcessor.flush(events);
|
|
|
|
// Should create 2 batches (50 + 25) based on TELEMETRY_CONFIG.MAX_BATCH_SIZE
|
|
expect(mockSupabase.from('telemetry_events').insert).toHaveBeenCalledTimes(2);
|
|
|
|
const firstCall = vi.mocked(mockSupabase.from('telemetry_events').insert).mock.calls[0][0];
|
|
const secondCall = vi.mocked(mockSupabase.from('telemetry_events').insert).mock.calls[1][0];
|
|
|
|
expect(firstCall).toHaveLength(TELEMETRY_CONFIG.MAX_BATCH_SIZE);
|
|
expect(secondCall).toHaveLength(25);
|
|
});
|
|
});
|
|
|
|
describe('workflow deduplication', () => {
|
|
it('should deduplicate workflows by hash', async () => {
|
|
const workflows: WorkflowTelemetry[] = [
|
|
{
|
|
user_id: 'user1',
|
|
workflow_hash: 'hash1',
|
|
node_count: 2,
|
|
node_types: ['webhook', 'set'],
|
|
has_trigger: true,
|
|
has_webhook: true,
|
|
complexity: 'simple',
|
|
sanitized_workflow: { nodes: [], connections: {} }
|
|
},
|
|
{
|
|
user_id: 'user2',
|
|
workflow_hash: 'hash1', // Same hash - should be deduplicated
|
|
node_count: 2,
|
|
node_types: ['webhook', 'set'],
|
|
has_trigger: true,
|
|
has_webhook: true,
|
|
complexity: 'simple',
|
|
sanitized_workflow: { nodes: [], connections: {} }
|
|
},
|
|
{
|
|
user_id: 'user1',
|
|
workflow_hash: 'hash2', // Different hash - should be kept
|
|
node_count: 3,
|
|
node_types: ['webhook', 'httpRequest', 'set'],
|
|
has_trigger: true,
|
|
has_webhook: true,
|
|
complexity: 'medium',
|
|
sanitized_workflow: { nodes: [], connections: {} }
|
|
}
|
|
];
|
|
|
|
await batchProcessor.flush(undefined, workflows);
|
|
|
|
const insertCall = vi.mocked(mockSupabase.from('telemetry_workflows').insert).mock.calls[0][0];
|
|
expect(insertCall).toHaveLength(2); // Should deduplicate to 2 workflows
|
|
|
|
const hashes = insertCall.map((w: WorkflowTelemetry) => w.workflow_hash);
|
|
expect(hashes).toEqual(['hash1', 'hash2']);
|
|
});
|
|
});
|
|
|
|
describe('error handling and retries', () => {
|
|
it('should retry on failure with exponential backoff', async () => {
|
|
const error = new Error('Network timeout');
|
|
const errorResponse = createMockSupabaseResponse(error);
|
|
|
|
// Mock to fail first 2 times, then succeed
|
|
vi.mocked(mockSupabase.from('telemetry_events').insert)
|
|
.mockResolvedValueOnce(errorResponse)
|
|
.mockResolvedValueOnce(errorResponse)
|
|
.mockResolvedValueOnce(createMockSupabaseResponse());
|
|
|
|
const events: TelemetryEvent[] = [{
|
|
user_id: 'user1',
|
|
event: 'test_event',
|
|
properties: {}
|
|
}];
|
|
|
|
await batchProcessor.flush(events);
|
|
|
|
// Should have been called 3 times (2 failures + 1 success)
|
|
expect(mockSupabase.from('telemetry_events').insert).toHaveBeenCalledTimes(3);
|
|
|
|
const metrics = batchProcessor.getMetrics();
|
|
expect(metrics.eventsTracked).toBe(1); // Should succeed on third try
|
|
});
|
|
|
|
it('should fail after max retries', async () => {
|
|
const error = new Error('Persistent network error');
|
|
const errorResponse = createMockSupabaseResponse(error);
|
|
|
|
vi.mocked(mockSupabase.from('telemetry_events').insert).mockResolvedValue(errorResponse);
|
|
|
|
const events: TelemetryEvent[] = [{
|
|
user_id: 'user1',
|
|
event: 'test_event',
|
|
properties: {}
|
|
}];
|
|
|
|
await batchProcessor.flush(events);
|
|
|
|
// Should have been called MAX_RETRIES times
|
|
expect(mockSupabase.from('telemetry_events').insert)
|
|
.toHaveBeenCalledTimes(TELEMETRY_CONFIG.MAX_RETRIES);
|
|
|
|
const metrics = batchProcessor.getMetrics();
|
|
expect(metrics.eventsFailed).toBe(1);
|
|
expect(metrics.batchesFailed).toBe(1);
|
|
expect(metrics.deadLetterQueueSize).toBe(1);
|
|
});
|
|
|
|
it('should handle operation timeout', async () => {
|
|
// Mock the operation to always fail with timeout error
|
|
vi.mocked(mockSupabase.from('telemetry_events').insert).mockRejectedValue(
|
|
new Error('Operation timed out')
|
|
);
|
|
|
|
const events: TelemetryEvent[] = [{
|
|
user_id: 'user1',
|
|
event: 'test_event',
|
|
properties: {}
|
|
}];
|
|
|
|
// The flush should fail after retries
|
|
await batchProcessor.flush(events);
|
|
|
|
const metrics = batchProcessor.getMetrics();
|
|
expect(metrics.eventsFailed).toBe(1);
|
|
});
|
|
});
|
|
|
|
describe('dead letter queue', () => {
|
|
it('should add failed events to dead letter queue', async () => {
|
|
const error = new Error('Persistent error');
|
|
const errorResponse = createMockSupabaseResponse(error);
|
|
vi.mocked(mockSupabase.from('telemetry_events').insert).mockResolvedValue(errorResponse);
|
|
|
|
const events: TelemetryEvent[] = [
|
|
{ user_id: 'user1', event: 'event1', properties: {} },
|
|
{ user_id: 'user2', event: 'event2', properties: {} }
|
|
];
|
|
|
|
await batchProcessor.flush(events);
|
|
|
|
const metrics = batchProcessor.getMetrics();
|
|
expect(metrics.deadLetterQueueSize).toBe(2);
|
|
});
|
|
|
|
it('should process dead letter queue when circuit is healthy', async () => {
|
|
const error = new Error('Temporary error');
|
|
const errorResponse = createMockSupabaseResponse(error);
|
|
|
|
// First call fails, second succeeds
|
|
vi.mocked(mockSupabase.from('telemetry_events').insert)
|
|
.mockResolvedValueOnce(errorResponse)
|
|
.mockResolvedValueOnce(createMockSupabaseResponse());
|
|
|
|
const events: TelemetryEvent[] = [
|
|
{ user_id: 'user1', event: 'event1', properties: {} }
|
|
];
|
|
|
|
// First flush - should fail and add to dead letter queue
|
|
await batchProcessor.flush(events);
|
|
expect(batchProcessor.getMetrics().deadLetterQueueSize).toBe(1);
|
|
|
|
// Second flush - should process dead letter queue
|
|
await batchProcessor.flush([]);
|
|
expect(batchProcessor.getMetrics().deadLetterQueueSize).toBe(0);
|
|
});
|
|
|
|
it('should maintain dead letter queue size limit', async () => {
|
|
const error = new Error('Persistent error');
|
|
const errorResponse = createMockSupabaseResponse(error);
|
|
vi.mocked(mockSupabase.from('telemetry_events').insert).mockResolvedValue(errorResponse);
|
|
|
|
// Add more items than the limit (100)
|
|
for (let i = 0; i < 25; i++) {
|
|
const events: TelemetryEvent[] = Array.from({ length: 5 }, (_, j) => ({
|
|
user_id: `user${i}_${j}`,
|
|
event: 'test_event',
|
|
properties: { batch: i, index: j }
|
|
}));
|
|
|
|
await batchProcessor.flush(events);
|
|
}
|
|
|
|
const metrics = batchProcessor.getMetrics();
|
|
expect(metrics.deadLetterQueueSize).toBe(100); // Should be capped at 100
|
|
expect(metrics.eventsDropped).toBeGreaterThan(0); // Some events should be dropped
|
|
});
|
|
|
|
it('should handle mixed events and workflows in dead letter queue', async () => {
|
|
const error = new Error('Mixed error');
|
|
const errorResponse = createMockSupabaseResponse(error);
|
|
vi.mocked(mockSupabase.from).mockImplementation((table) => ({
|
|
insert: vi.fn().mockResolvedValue(errorResponse)
|
|
}));
|
|
|
|
const events: TelemetryEvent[] = [
|
|
{ user_id: 'user1', event: 'event1', properties: {} }
|
|
];
|
|
|
|
const workflows: WorkflowTelemetry[] = [
|
|
{
|
|
user_id: 'user1',
|
|
workflow_hash: 'hash1',
|
|
node_count: 1,
|
|
node_types: ['webhook'],
|
|
has_trigger: true,
|
|
has_webhook: true,
|
|
complexity: 'simple',
|
|
sanitized_workflow: { nodes: [], connections: {} }
|
|
}
|
|
];
|
|
|
|
await batchProcessor.flush(events, workflows);
|
|
|
|
expect(batchProcessor.getMetrics().deadLetterQueueSize).toBe(2);
|
|
|
|
// Mock successful operations for dead letter queue processing
|
|
vi.mocked(mockSupabase.from).mockImplementation((table) => ({
|
|
insert: vi.fn().mockResolvedValue(createMockSupabaseResponse())
|
|
}));
|
|
|
|
await batchProcessor.flush([]);
|
|
expect(batchProcessor.getMetrics().deadLetterQueueSize).toBe(0);
|
|
});
|
|
});
|
|
|
|
describe('circuit breaker integration', () => {
|
|
it('should update circuit breaker on success', async () => {
|
|
const events: TelemetryEvent[] = [
|
|
{ user_id: 'user1', event: 'test_event', properties: {} }
|
|
];
|
|
|
|
await batchProcessor.flush(events);
|
|
|
|
const metrics = batchProcessor.getMetrics();
|
|
expect(metrics.circuitBreakerState.state).toBe('closed');
|
|
expect(metrics.circuitBreakerState.failureCount).toBe(0);
|
|
});
|
|
|
|
it('should update circuit breaker on failure', async () => {
|
|
const error = new Error('Network error');
|
|
const errorResponse = createMockSupabaseResponse(error);
|
|
vi.mocked(mockSupabase.from('telemetry_events').insert).mockResolvedValue(errorResponse);
|
|
|
|
const events: TelemetryEvent[] = [
|
|
{ user_id: 'user1', event: 'test_event', properties: {} }
|
|
];
|
|
|
|
await batchProcessor.flush(events);
|
|
|
|
const metrics = batchProcessor.getMetrics();
|
|
expect(metrics.circuitBreakerState.failureCount).toBeGreaterThan(0);
|
|
});
|
|
});
|
|
|
|
describe('metrics collection', () => {
|
|
it('should collect comprehensive metrics', async () => {
|
|
const events: TelemetryEvent[] = [
|
|
{ user_id: 'user1', event: 'event1', properties: {} },
|
|
{ user_id: 'user2', event: 'event2', properties: {} }
|
|
];
|
|
|
|
await batchProcessor.flush(events);
|
|
|
|
const metrics = batchProcessor.getMetrics();
|
|
|
|
expect(metrics).toHaveProperty('eventsTracked');
|
|
expect(metrics).toHaveProperty('eventsDropped');
|
|
expect(metrics).toHaveProperty('eventsFailed');
|
|
expect(metrics).toHaveProperty('batchesSent');
|
|
expect(metrics).toHaveProperty('batchesFailed');
|
|
expect(metrics).toHaveProperty('averageFlushTime');
|
|
expect(metrics).toHaveProperty('lastFlushTime');
|
|
expect(metrics).toHaveProperty('rateLimitHits');
|
|
expect(metrics).toHaveProperty('circuitBreakerState');
|
|
expect(metrics).toHaveProperty('deadLetterQueueSize');
|
|
|
|
expect(metrics.eventsTracked).toBe(2);
|
|
expect(metrics.batchesSent).toBe(1);
|
|
});
|
|
|
|
it('should track flush time statistics', async () => {
|
|
const events: TelemetryEvent[] = [
|
|
{ user_id: 'user1', event: 'test_event', properties: {} }
|
|
];
|
|
|
|
// Perform multiple flushes to test average calculation
|
|
await batchProcessor.flush(events);
|
|
await batchProcessor.flush(events);
|
|
await batchProcessor.flush(events);
|
|
|
|
const metrics = batchProcessor.getMetrics();
|
|
expect(metrics.averageFlushTime).toBeGreaterThanOrEqual(0);
|
|
expect(metrics.lastFlushTime).toBeGreaterThanOrEqual(0);
|
|
});
|
|
|
|
it('should maintain limited flush time history', async () => {
|
|
const events: TelemetryEvent[] = [
|
|
{ user_id: 'user1', event: 'test_event', properties: {} }
|
|
];
|
|
|
|
// Perform more than 100 flushes to test history limit
|
|
for (let i = 0; i < 105; i++) {
|
|
await batchProcessor.flush(events);
|
|
}
|
|
|
|
// Should still calculate average correctly (history is limited internally)
|
|
const metrics = batchProcessor.getMetrics();
|
|
expect(metrics.averageFlushTime).toBeGreaterThanOrEqual(0);
|
|
});
|
|
});
|
|
|
|
describe('resetMetrics()', () => {
|
|
it('should reset all metrics to initial state', async () => {
|
|
const events: TelemetryEvent[] = [
|
|
{ user_id: 'user1', event: 'test_event', properties: {} }
|
|
];
|
|
|
|
// Generate some metrics
|
|
await batchProcessor.flush(events);
|
|
|
|
// Verify metrics exist
|
|
let metrics = batchProcessor.getMetrics();
|
|
expect(metrics.eventsTracked).toBeGreaterThan(0);
|
|
expect(metrics.batchesSent).toBeGreaterThan(0);
|
|
|
|
// Reset metrics
|
|
batchProcessor.resetMetrics();
|
|
|
|
// Verify reset
|
|
metrics = batchProcessor.getMetrics();
|
|
expect(metrics.eventsTracked).toBe(0);
|
|
expect(metrics.eventsDropped).toBe(0);
|
|
expect(metrics.eventsFailed).toBe(0);
|
|
expect(metrics.batchesSent).toBe(0);
|
|
expect(metrics.batchesFailed).toBe(0);
|
|
expect(metrics.averageFlushTime).toBe(0);
|
|
expect(metrics.rateLimitHits).toBe(0);
|
|
expect(metrics.circuitBreakerState.state).toBe('closed');
|
|
expect(metrics.circuitBreakerState.failureCount).toBe(0);
|
|
});
|
|
});
|
|
|
|
describe('edge cases', () => {
|
|
it('should handle empty arrays gracefully', async () => {
|
|
await batchProcessor.flush([], []);
|
|
|
|
expect(mockSupabase.from).not.toHaveBeenCalled();
|
|
|
|
const metrics = batchProcessor.getMetrics();
|
|
expect(metrics.eventsTracked).toBe(0);
|
|
expect(metrics.batchesSent).toBe(0);
|
|
});
|
|
|
|
it('should handle undefined inputs gracefully', async () => {
|
|
await batchProcessor.flush();
|
|
|
|
expect(mockSupabase.from).not.toHaveBeenCalled();
|
|
});
|
|
|
|
it('should handle null Supabase client gracefully', async () => {
|
|
const processor = new TelemetryBatchProcessor(null, mockIsEnabled);
|
|
const events: TelemetryEvent[] = [
|
|
{ user_id: 'user1', event: 'test_event', properties: {} }
|
|
];
|
|
|
|
await expect(processor.flush(events)).resolves.not.toThrow();
|
|
});
|
|
|
|
it('should handle concurrent flush operations', async () => {
|
|
const events: TelemetryEvent[] = [
|
|
{ user_id: 'user1', event: 'test_event', properties: {} }
|
|
];
|
|
|
|
// Start multiple flush operations concurrently
|
|
const flushPromises = [
|
|
batchProcessor.flush(events),
|
|
batchProcessor.flush(events),
|
|
batchProcessor.flush(events)
|
|
];
|
|
|
|
await Promise.all(flushPromises);
|
|
|
|
// Should handle concurrent operations gracefully
|
|
const metrics = batchProcessor.getMetrics();
|
|
expect(metrics.eventsTracked).toBeGreaterThan(0);
|
|
});
|
|
});
|
|
|
|
describe('process lifecycle integration', () => {
|
|
it('should flush on process beforeExit', async () => {
|
|
const flushSpy = vi.spyOn(batchProcessor, 'flush');
|
|
|
|
batchProcessor.start();
|
|
|
|
// Trigger beforeExit event
|
|
process.emit('beforeExit', 0);
|
|
|
|
expect(flushSpy).toHaveBeenCalled();
|
|
});
|
|
|
|
it('should flush and exit on SIGINT', async () => {
|
|
const flushSpy = vi.spyOn(batchProcessor, 'flush');
|
|
|
|
batchProcessor.start();
|
|
|
|
// Trigger SIGINT event
|
|
process.emit('SIGINT', 'SIGINT');
|
|
|
|
expect(flushSpy).toHaveBeenCalled();
|
|
expect(mockProcessExit).toHaveBeenCalledWith(0);
|
|
});
|
|
|
|
it('should flush and exit on SIGTERM', async () => {
|
|
const flushSpy = vi.spyOn(batchProcessor, 'flush');
|
|
|
|
batchProcessor.start();
|
|
|
|
// Trigger SIGTERM event
|
|
process.emit('SIGTERM', 'SIGTERM');
|
|
|
|
expect(flushSpy).toHaveBeenCalled();
|
|
expect(mockProcessExit).toHaveBeenCalledWith(0);
|
|
});
|
|
});
|
|
}); |