diff --git a/CLAUDE.md b/CLAUDE.md index 205cab7..5a0eb7d 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -245,6 +245,8 @@ ui (standalone frontend application) - cli/server/shared: esbuild - ui: Vite + TypeScript 5. **@musistudio/llms**: This is an external dependency package providing the core server framework and transformer functionality, type definitions in `packages/server/src/types.d.ts` +6. **Code comments**: All comments in code MUST be written in English +7. **Documentation**: When implementing new features, add documentation to the docs project instead of creating standalone md files ## Configuration Example Locations diff --git a/packages/core/package.json b/packages/core/package.json index d24db76..47d31f2 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -4,9 +4,11 @@ "description": "A universal LLM API transformation server", "main": "dist/cjs/server.cjs", "module": "dist/esm/server.mjs", + "types": "dist/plugins.d.ts", "type": "module", "exports": { ".": { + "types": "./dist/plugins.d.ts", "import": "./dist/esm/server.mjs", "require": "./dist/cjs/server.cjs" } @@ -36,6 +38,7 @@ "@google/genai": "^1.7.0", "dotenv": "^16.5.0", "fastify": "^5.4.0", + "fastify-plugin": "^5.1.0", "google-auth-library": "^10.1.0", "json5": "^2.2.3", "jsonrepair": "^3.13.0", diff --git a/packages/core/src/plugins/index.ts b/packages/core/src/plugins/index.ts new file mode 100644 index 0000000..0385647 --- /dev/null +++ b/packages/core/src/plugins/index.ts @@ -0,0 +1,4 @@ +export * from './types'; +export { pluginManager } from './plugin-manager'; +export { tokenSpeedPlugin } from './token-speed'; +export * from './output'; diff --git a/packages/core/src/plugins/output/console-handler.ts b/packages/core/src/plugins/output/console-handler.ts new file mode 100644 index 0000000..32c66e8 --- /dev/null +++ b/packages/core/src/plugins/output/console-handler.ts @@ -0,0 +1,150 @@ +import { OutputHandler, OutputOptions, ConsoleOutputConfig } from './types'; + +/** + * Console output handler + * Supports colored output and multiple log levels + */ +export class ConsoleOutputHandler implements OutputHandler { + type = 'console' as const; + private config: ConsoleOutputConfig; + + // ANSI color codes + private colors = { + reset: '\x1b[0m', + bright: '\x1b[1m', + dim: '\x1b[2m', + red: '\x1b[31m', + green: '\x1b[32m', + yellow: '\x1b[33m', + blue: '\x1b[34m', + magenta: '\x1b[35m', + cyan: '\x1b[36m', + white: '\x1b[37m' + }; + + constructor(config: ConsoleOutputConfig = {}) { + this.config = { + colors: true, + level: 'log', + ...config + }; + } + + /** + * Format output data + */ + private formatData(data: any, options: OutputOptions): string { + const { format = 'text', timestamp = true, prefix, metadata } = options || {}; + + // Build prefix + let output = ''; + + if (timestamp) { + const time = new Date().toISOString(); + output += this.config.colors + ? `${this.colors.cyan}[${time}]${this.colors.reset} ` + : `[${time}] `; + } + + if (prefix) { + output += this.config.colors + ? `${this.colors.bright}${prefix}${this.colors.reset} ` + : `${prefix} `; + } + + // Format data + switch (format) { + case 'json': + output += JSON.stringify(data, null, 2); + break; + + case 'markdown': + if (typeof data === 'object') { + output += this.toMarkdown(data); + } else { + output += String(data); + } + break; + + case 'text': + default: + if (typeof data === 'object') { + output += JSON.stringify(data, null, 2); + } else { + output += String(data); + } + break; + } + + // Add metadata + if (metadata && Object.keys(metadata).length > 0) { + output += '\n' + (this.config.colors ? `${this.colors.dim}` : ''); + output += 'Metadata: ' + JSON.stringify(metadata, null, 2); + if (this.config.colors) output += this.colors.reset; + } + + return output; + } + + /** + * Convert object to Markdown format + */ + private toMarkdown(data: any, indent = 0): string { + const padding = ' '.repeat(indent); + + if (Array.isArray(data)) { + return data.map(item => { + if (typeof item === 'object') { + return `${padding}-\n${this.toMarkdown(item, indent + 1)}`; + } + return `${padding}- ${item}`; + }).join('\n'); + } + + if (typeof data === 'object' && data !== null) { + return Object.entries(data).map(([key, value]) => { + if (typeof value === 'object' && value !== null) { + return `${padding}${key}:\n${this.toMarkdown(value, indent + 1)}`; + } + return `${padding}${key}: ${value}`; + }).join('\n'); + } + + return `${padding}${data}`; + } + + /** + * Output data + */ + async output(data: any, options: OutputOptions = {}): Promise { + try { + const formatted = this.formatData(data, options); + const logMethod = this.config.level || 'log'; + + // Output based on configured log level + switch (logMethod) { + case 'info': + console.info(formatted); + break; + case 'warn': + console.warn(formatted); + break; + case 'error': + console.error(formatted); + break; + case 'debug': + console.debug(formatted); + break; + case 'log': + default: + console.log(formatted); + break; + } + + return true; + } catch (error) { + console.error('[ConsoleOutputHandler] Output failed:', error); + return false; + } + } +} diff --git a/packages/core/src/plugins/output/index.ts b/packages/core/src/plugins/output/index.ts new file mode 100644 index 0000000..aece3f6 --- /dev/null +++ b/packages/core/src/plugins/output/index.ts @@ -0,0 +1,48 @@ +// Type definitions +export * from './types'; + +// Output handler implementations +export { ConsoleOutputHandler } from './console-handler'; +export { WebhookOutputHandler } from './webhook-handler'; + +// Output manager +export { outputManager, output, outputTo } from './output-manager'; + +/** + * Convenience function: Create and register a Console output handler + * @param config Console output handler configuration + * @returns Output manager instance + */ +export function registerConsoleOutput(config?: import('./types').ConsoleOutputConfig) { + const { ConsoleOutputHandler } = require('./console-handler'); + const handler = new ConsoleOutputHandler(config); + const { outputManager } = require('./output-manager'); + const name = 'console_' + Date.now(); + outputManager.registerHandler(name, handler); + return outputManager; +} + +/** + * Convenience function: Create and register a Webhook output handler + * @param config Webhook output handler configuration + * @returns Output manager instance + */ +export function registerWebhookOutput(config: import('./types').WebhookOutputConfig) { + const { WebhookOutputHandler } = require('./webhook-handler'); + const handler = new WebhookOutputHandler(config); + const { outputManager } = require('./output-manager'); + const name = 'webhook_' + Date.now(); + outputManager.registerHandler(name, handler); + return outputManager; +} + +/** + * Convenience function: Register output handlers in batch + * @param configs Output handler configuration array + * @returns Output manager instance + */ +export function registerOutputHandlers(configs: import('./types').OutputHandlerConfig[]) { + const { outputManager } = require('./output-manager'); + outputManager.registerHandlers(configs); + return outputManager; +} diff --git a/packages/core/src/plugins/output/output-manager.ts b/packages/core/src/plugins/output/output-manager.ts new file mode 100644 index 0000000..e4c97a8 --- /dev/null +++ b/packages/core/src/plugins/output/output-manager.ts @@ -0,0 +1,224 @@ +import { OutputHandler, OutputOptions, OutputHandlerConfig } from './types'; +import { ConsoleOutputHandler } from './console-handler'; +import { WebhookOutputHandler } from './webhook-handler'; + +/** + * Output manager + * Manages multiple output handlers and provides unified output interface + */ +class OutputManager { + private handlers: Map = new Map(); + private defaultOptions: OutputOptions = {}; + + /** + * Register output handler + * @param name Output handler name + * @param handler Output handler instance + */ + registerHandler(name: string, handler: OutputHandler): void { + this.handlers.set(name, handler); + } + + /** + * Register output handlers in batch + * @param configs Output handler configuration array + */ + registerHandlers(configs: OutputHandlerConfig[]): void { + for (const config of configs) { + if (config.enabled === false) { + continue; + } + + try { + const handler = this.createHandler(config); + const name = config.type + '_' + Date.now(); + this.registerHandler(name, handler); + } catch (error) { + console.error(`[OutputManager] Failed to register ${config.type} handler:`, error); + } + } + } + + /** + * Create output handler instance + * @param config Output handler configuration + */ + private createHandler(config: OutputHandlerConfig): OutputHandler { + switch (config.type) { + case 'console': + return new ConsoleOutputHandler(config.config as any); + + case 'webhook': + return new WebhookOutputHandler(config.config as any); + + // Reserved for other output handler types + // case 'websocket': + // return new WebSocketOutputHandler(config.config as any); + + default: + throw new Error(`Unknown output handler type: ${config.type}`); + } + } + + /** + * Remove output handler + * @param name Output handler name + */ + unregisterHandler(name: string): boolean { + return this.handlers.delete(name); + } + + /** + * Get output handler + * @param name Output handler name + */ + getHandler(name: string): OutputHandler | undefined { + return this.handlers.get(name); + } + + /** + * Get all output handlers + */ + getAllHandlers(): Map { + return new Map(this.handlers); + } + + /** + * Clear all output handlers + */ + clearHandlers(): void { + this.handlers.clear(); + } + + /** + * Set default output options + * @param options Output options + */ + setDefaultOptions(options: OutputOptions): void { + this.defaultOptions = { ...this.defaultOptions, ...options }; + } + + /** + * Get default output options + */ + getDefaultOptions(): OutputOptions { + return { ...this.defaultOptions }; + } + + /** + * Output data to all registered output handlers + * @param data Data to output + * @param options Output options + * @returns Promise<{success: string[], failed: string[]}> Names of successful and failed handlers + */ + async output( + data: any, + options?: OutputOptions + ): Promise<{ success: string[]; failed: string[] }> { + const mergedOptions = { ...this.defaultOptions, ...options }; + const results = { success: [] as string[], failed: [] as string[] }; + + // Send data to all handlers in parallel + const promises = Array.from(this.handlers.entries()).map( + async ([name, handler]) => { + try { + const success = await handler.output(data, mergedOptions); + if (success) { + results.success.push(name); + } else { + results.failed.push(name); + } + } catch (error) { + console.error(`[OutputManager] Handler ${name} failed:`, error); + results.failed.push(name); + } + } + ); + + await Promise.all(promises); + return results; + } + + /** + * Output data to specified output handlers + * @param handlerNames Array of output handler names + * @param data Data to output + * @param options Output options + * @returns Promise<{success: string[], failed: string[]}> Names of successful and failed handlers + */ + async outputTo( + handlerNames: string[], + data: any, + options?: OutputOptions + ): Promise<{ success: string[]; failed: string[] }> { + const mergedOptions = { ...this.defaultOptions, ...options }; + const results = { success: [] as string[], failed: [] as string[] }; + + const promises = handlerNames.map(async name => { + const handler = this.handlers.get(name); + if (!handler) { + console.warn(`[OutputManager] Handler ${name} not found`); + results.failed.push(name); + return; + } + + try { + const success = await handler.output(data, mergedOptions); + if (success) { + results.success.push(name); + } else { + results.failed.push(name); + } + } catch (error) { + console.error(`[OutputManager] Handler ${name} failed:`, error); + results.failed.push(name); + } + }); + + await Promise.all(promises); + return results; + } + + /** + * Output data to specified type of output handlers + * @param type Output handler type + * @param data Data to output + * @param options Output options + * @returns Promise<{success: string[], failed: string[]}> Names of successful and failed handlers + */ + async outputToType( + type: string, + data: any, + options?: OutputOptions + ): Promise<{ success: string[]; failed: string[] }> { + const targetHandlers = Array.from(this.handlers.entries()) + .filter(([_, handler]) => handler.type === type) + .map(([name]) => name); + + return this.outputTo(targetHandlers, data, options); + } +} + +/** + * Global output manager instance + */ +export const outputManager = new OutputManager(); + +/** + * Convenience method: Quickly output data to all registered handlers + * @param data Data to output + * @param options Output options + */ +export async function output(data: any, options?: OutputOptions) { + return outputManager.output(data, options); +} + +/** + * Convenience method: Quickly output data to specified type of handlers + * @param type Output handler type ('console' | 'webhook' | 'websocket') + * @param data Data to output + * @param options Output options + */ +export async function outputTo(type: string, data: any, options?: OutputOptions) { + return outputManager.outputToType(type, data, options); +} diff --git a/packages/core/src/plugins/output/types.ts b/packages/core/src/plugins/output/types.ts new file mode 100644 index 0000000..c8d9a6f --- /dev/null +++ b/packages/core/src/plugins/output/types.ts @@ -0,0 +1,156 @@ +/** + * Output handler interface + * All output handlers must implement this interface + */ +export interface OutputHandler { + /** + * Output handler type name + */ + type: string; + + /** + * Output data + * @param data Data to output + * @param options Output options + * @returns Promise Whether output was successful + */ + output(data: any, options?: OutputOptions): Promise; +} + +/** + * Output options + */ +export interface OutputOptions { + /** + * Output format + */ + format?: 'json' | 'text' | 'markdown'; + + /** + * Whether to include timestamp + */ + timestamp?: boolean; + + /** + * Custom prefix + */ + prefix?: string; + + /** + * Additional metadata + */ + metadata?: Record; + + /** + * Timeout (milliseconds) + */ + timeout?: number; +} + +/** + * Console output handler configuration + */ +export interface ConsoleOutputConfig { + /** + * Whether to use colored output + */ + colors?: boolean; + + /** + * Log level + */ + level?: 'log' | 'info' | 'warn' | 'error' | 'debug'; +} + +/** + * Webhook output handler configuration + */ +export interface WebhookOutputConfig { + /** + * Webhook URL + */ + url: string; + + /** + * HTTP request method + */ + method?: 'POST' | 'PUT' | 'PATCH'; + + /** + * Request headers + */ + headers?: Record; + + /** + * Authentication information + */ + auth?: { + type: 'bearer' | 'basic' | 'custom'; + token?: string; + username?: string; + password?: string; + custom?: { + header: string; + value: string; + }; + }; + + /** + * Retry configuration + */ + retry?: { + maxAttempts: number; + backoffMs: number; + }; + + /** + * Whether to handle failures silently (only log, don't throw) + */ + silent?: boolean; +} + +/** + * WebSocket output handler configuration (reserved for future use) + */ +export interface WebSocketOutputConfig { + /** + * WebSocket URL + */ + url: string; + + /** + * Reconnection configuration + */ + reconnect?: { + maxAttempts: number; + intervalMs: number; + }; + + /** + * Heartbeat configuration + */ + heartbeat?: { + intervalMs: number; + message?: string; + }; +} + +/** + * Output handler registration configuration + */ +export interface OutputHandlerConfig { + /** + * Output handler type + */ + type: 'console' | 'webhook' | 'websocket'; + + /** + * Whether enabled + */ + enabled?: boolean; + + /** + * Configuration options + */ + config?: ConsoleOutputConfig | WebhookOutputConfig | WebSocketOutputConfig; +} diff --git a/packages/core/src/plugins/output/webhook-handler.ts b/packages/core/src/plugins/output/webhook-handler.ts new file mode 100644 index 0000000..8be3e9f --- /dev/null +++ b/packages/core/src/plugins/output/webhook-handler.ts @@ -0,0 +1,202 @@ +import { OutputHandler, OutputOptions, WebhookOutputConfig } from './types'; + +/** + * Webhook output handler + * Supports sending data to HTTP endpoints with retry and authentication + */ +export class WebhookOutputHandler implements OutputHandler { + type = 'webhook' as const; + private config: WebhookOutputConfig; + private defaultTimeout = 30000; // 30 second default timeout + + constructor(config: WebhookOutputConfig) { + if (!config.url) { + throw new Error('Webhook URL is required'); + } + this.config = { + method: 'POST', + retry: { + maxAttempts: 3, + backoffMs: 1000 + }, + silent: false, + ...config + }; + } + + /** + * Build request headers + */ + private buildHeaders(): Record { + const headers: Record = { + 'Content-Type': 'application/json', + ...(this.config.headers || {}) + }; + + // Add authentication headers + if (this.config.auth) { + switch (this.config.auth.type) { + case 'bearer': + if (this.config.auth.token) { + headers['Authorization'] = `Bearer ${this.config.auth.token}`; + } + break; + + case 'basic': + if (this.config.auth.username && this.config.auth.password) { + const credentials = Buffer.from( + `${this.config.auth.username}:${this.config.auth.password}` + ).toString('base64'); + headers['Authorization'] = `Basic ${credentials}`; + } + break; + + case 'custom': + if (this.config.auth.custom) { + headers[this.config.auth.custom.header] = this.config.auth.custom.value; + } + break; + } + } + + return headers; + } + + /** + * Build request body + */ + private buildBody(data: any, options: OutputOptions): any { + const { format = 'json', timestamp = true, prefix, metadata } = options || {}; + + const body: any = { + data + }; + + if (timestamp) { + body.timestamp = new Date().toISOString(); + } + + if (prefix) { + body.prefix = prefix; + } + + if (metadata && Object.keys(metadata).length > 0) { + body.metadata = metadata; + } + + return body; + } + + /** + * Send HTTP request + */ + private async sendRequest( + url: string, + method: string, + headers: Record, + body: any, + timeout: number + ): Promise { + const controller = new AbortController(); + const timeoutId = setTimeout(() => controller.abort(), timeout); + + try { + const response = await fetch(url, { + method, + headers, + body: JSON.stringify(body), + signal: controller.signal + }); + + clearTimeout(timeoutId); + + if (!response.ok) { + throw new Error(`HTTP ${response.status}: ${response.statusText}`); + } + + return response; + } catch (error) { + clearTimeout(timeoutId); + throw error; + } + } + + /** + * Delay function (for retry backoff) + */ + private delay(ms: number): Promise { + return new Promise(resolve => setTimeout(resolve, ms)); + } + + /** + * Send request with retry + */ + private async sendWithRetry( + url: string, + method: string, + headers: Record, + body: any, + timeout: number, + retry: { maxAttempts: number; backoffMs: number } + ): Promise { + let lastError: Error | null = null; + + for (let attempt = 1; attempt <= retry.maxAttempts; attempt++) { + try { + return await this.sendRequest(url, method, headers, body, timeout); + } catch (error) { + lastError = error as Error; + + // If this is the last attempt, throw error directly + if (attempt === retry.maxAttempts) { + break; + } + + // Calculate backoff time (exponential backoff) + const backoffTime = retry.backoffMs * Math.pow(2, attempt - 1); + + console.warn( + `[WebhookOutputHandler] Request failed (attempt ${attempt}/${retry.maxAttempts}), ` + + `retrying in ${backoffTime}ms...`, + (error as Error).message + ); + + await this.delay(backoffTime); + } + } + + throw lastError; + } + + /** + * Output data to Webhook + */ + async output(data: any, options: OutputOptions = {}): Promise { + const timeout = options.timeout || this.defaultTimeout; + + try { + const headers = this.buildHeaders(); + const body = this.buildBody(data, options); + + const response = await this.sendWithRetry( + this.config.url, + this.config.method!, + headers, + body, + timeout, + this.config.retry! + ); + + return true; + } catch (error) { + const errorMessage = error instanceof Error ? error.message : String(error); + + if (this.config.silent) { + console.error(`[WebhookOutputHandler] Failed to send data: ${errorMessage}`); + return false; + } + + throw new Error(`Webhook output failed: ${errorMessage}`); + } + } +} diff --git a/packages/core/src/plugins/plugin-manager.ts b/packages/core/src/plugins/plugin-manager.ts new file mode 100644 index 0000000..bbfe546 --- /dev/null +++ b/packages/core/src/plugins/plugin-manager.ts @@ -0,0 +1,121 @@ +import { FastifyInstance } from 'fastify'; +import { CCRPlugin, PluginMetadata } from './types'; + +/** + * Plugin manager + */ +class PluginManager { + private plugins: Map = new Map(); + private pluginInstances: Map = new Map(); + + /** + * Register a plugin + * @param plugin Plugin instance + * @param options Plugin configuration options + */ + registerPlugin(plugin: CCRPlugin, options: any = {}): void { + this.pluginInstances.set(plugin.name, plugin); + this.plugins.set(plugin.name, { + name: plugin.name, + enabled: options.enabled !== false, + options + }); + } + + /** + * Enable a single plugin + * @param name Plugin name + * @param fastify Fastify instance + */ + async enablePlugin(name: string, fastify: FastifyInstance): Promise { + const metadata = this.plugins.get(name); + const plugin = this.pluginInstances.get(name); + if (!metadata || !plugin) { + throw new Error(`Plugin ${name} not found`); + } + + if (metadata.enabled) { + await fastify.register(plugin.register, metadata.options); + } + } + + /** + * Enable all registered plugins in batch + * @param fastify Fastify instance + */ + async enablePlugins(fastify: FastifyInstance): Promise { + for (const [name, metadata] of this.plugins) { + if (metadata.enabled) { + try { + await this.enablePlugin(name, fastify); + } catch (error) { + const errorMessage = error instanceof Error ? error.message : String(error); + fastify.log?.error(`Failed to enable plugin ${name}: ${errorMessage}`); + } + } + } + } + + /** + * Get list of registered plugins + */ + getPlugins(): PluginMetadata[] { + return Array.from(this.plugins.values()); + } + + /** + * Get plugin instance + * @param name Plugin name + */ + getPlugin(name: string): CCRPlugin | undefined { + return this.pluginInstances.get(name); + } + + /** + * Check if plugin is registered + * @param name Plugin name + */ + hasPlugin(name: string): boolean { + return this.pluginInstances.has(name); + } + + /** + * Check if plugin is enabled + * @param name Plugin name + */ + isPluginEnabled(name: string): boolean { + const metadata = this.plugins.get(name); + return metadata?.enabled || false; + } + + /** + * Dynamically enable/disable plugin + * @param name Plugin name + * @param enabled Whether to enable + */ + setPluginEnabled(name: string, enabled: boolean): void { + const metadata = this.plugins.get(name); + if (metadata) { + metadata.enabled = enabled; + } + } + + /** + * Remove plugin + * @param name Plugin name + */ + removePlugin(name: string): void { + this.plugins.delete(name); + this.pluginInstances.delete(name); + } + + /** + * Clear all plugins + */ + clear(): void { + this.plugins.clear(); + this.pluginInstances.clear(); + } +} + +export const pluginManager = new PluginManager(); diff --git a/packages/core/src/plugins/token-speed.ts b/packages/core/src/plugins/token-speed.ts new file mode 100644 index 0000000..14d4e99 --- /dev/null +++ b/packages/core/src/plugins/token-speed.ts @@ -0,0 +1,350 @@ +import fp from 'fastify-plugin'; +import { CCRPlugin, CCRPluginOptions } from './types'; +import { SSEParserTransform } from '../utils/sse'; +import { Tiktoken } from 'tiktoken'; +import { OutputHandlerConfig, OutputOptions, outputManager } from './output'; + +/** + * Token statistics interface + */ +interface TokenStats { + requestId: string; + startTime: number; + firstTokenTime?: number; + lastTokenTime: number; + tokenCount: number; + tokensPerSecond: number; + timeToFirstToken?: number; + contentBlocks: { + index: number; + tokenCount: number; + speed: number; + }[]; +} + +/** + * Plugin options + */ +interface TokenSpeedOptions extends CCRPluginOptions { + logInterval?: number; // Log every N tokens + enableCrossRequestStats?: boolean; // Enable cross-request statistics + statsWindow?: number; // Statistics window size (last N requests) + + /** + * Output handler configurations + * Supports console, webhook, and other output handlers + */ + outputHandlers?: OutputHandlerConfig[]; + + /** + * Default output options (format, prefix, etc.) + */ + outputOptions?: OutputOptions; +} + +// Store request-level statistics +const requestStats = new Map(); + +// Cross-request statistics +const globalStats = { + totalRequests: 0, + totalTokens: 0, + totalTime: 0, + avgTokensPerSecond: 0, + minTokensPerSecond: Infinity, + maxTokensPerSecond: 0, + avgTimeToFirstToken: 0, + allSpeeds: [] as number[] // Used for calculating percentiles +}; + +/** + * Token speed measurement plugin + */ +export const tokenSpeedPlugin: CCRPlugin = { + name: 'token-speed', + version: '1.0.0', + description: 'Statistics for streaming response token generation speed', + + // Use fp() to break encapsulation and apply hooks globally + register: fp(async (fastify, options: TokenSpeedOptions) => { + const opts = { + logInterval: 10, + enableCrossRequestStats: true, + statsWindow: 100, + ...options + }; + + // Initialize output handlers + if (opts.outputHandlers && opts.outputHandlers.length > 0) { + outputManager.registerHandlers(opts.outputHandlers); + } else { + // Default to console output if no handlers configured + outputManager.registerHandlers([{ + type: 'console', + enabled: true, + config: { + colors: true, + level: 'log' + } + }]); + } + + // Set default output options + if (opts.outputOptions) { + outputManager.setDefaultOptions(opts.outputOptions); + } + + // Initialize tiktoken encoder + let encoding: Tiktoken | null = null; + try { + const { get_encoding } = await import('tiktoken'); + encoding = get_encoding('cl100k_base'); + } catch (error) { + fastify.log?.warn('Failed to load tiktoken, falling back to estimation'); + } + + // Add onSend hook to intercept streaming responses + fastify.addHook('onSend', async (request, reply, payload) => { + // Only handle streaming responses + if (!(payload instanceof ReadableStream)) { + return payload; + } + + const requestId = (request as any).id || Date.now().toString(); + const startTime = Date.now(); + + // Initialize statistics + requestStats.set(requestId, { + requestId, + startTime, + lastTokenTime: startTime, + tokenCount: 0, + tokensPerSecond: 0, + contentBlocks: [] + }); + // Tee the stream: one for stats, one for the client + const [originalStream, statsStream] = payload.tee(); + + // Process stats in background + const processStats = async () => { + let currentBlockIndex = -1; + let blockStartTime = 0; + let blockTokenCount = 0; + + try { + // Decode byte stream to text, then parse SSE events + const eventStream = statsStream + .pipeThrough(new TextDecoderStream()) + .pipeThrough(new SSEParserTransform()); + const reader = eventStream.getReader(); + + while (true) { + const { done, value } = await reader.read(); + if (done) break; + + const data = value; + const stats = requestStats.get(requestId); + if (!stats) continue; + + // Detect content_block_start event + if (data.event === 'content_block_start' && data.data?.content_block?.type === 'text') { + currentBlockIndex = data.data.index; + blockStartTime = Date.now(); + blockTokenCount = 0; + } + + // Detect content_block_delta event (incremental tokens) + if (data.event === 'content_block_delta' && data.data?.delta?.type === 'text_delta') { + const text = data.data.delta.text; + const tokenCount = encoding + ? encoding.encode(text).length + : estimateTokens(text); + + stats.tokenCount += tokenCount; + stats.lastTokenTime = Date.now(); + + // Record first token time + if (!stats.firstTokenTime) { + stats.firstTokenTime = stats.lastTokenTime; + stats.timeToFirstToken = stats.firstTokenTime - stats.startTime; + } + + // Calculate current block token count + if (currentBlockIndex >= 0) { + blockTokenCount += tokenCount; + } + + // Calculate speed + const elapsed = (stats.lastTokenTime - stats.startTime) / 1000; + stats.tokensPerSecond = stats.tokenCount / elapsed; + + // Log periodically + if (stats.tokenCount % opts.logInterval === 0) { + await outputStats(stats, opts.outputOptions); + } + } + + // Detect content_block_stop event + if (data.event === 'content_block_stop' && currentBlockIndex >= 0) { + const blockElapsed = (Date.now() - blockStartTime) / 1000; + const blockSpeed = blockElapsed > 0 ? blockTokenCount / blockElapsed : 0; + + stats.contentBlocks.push({ + index: currentBlockIndex, + tokenCount: blockTokenCount, + speed: blockSpeed + }); + + currentBlockIndex = -1; + } + + // Output final statistics when message ends + if (data.event === 'message_stop') { + // Update global statistics + if (opts.enableCrossRequestStats) { + updateGlobalStats(stats, opts.statsWindow); + } + + await outputStats(stats, opts.outputOptions, true); + + if (opts.enableCrossRequestStats) { + await outputGlobalStats(opts.outputOptions); + } + + requestStats.delete(requestId); + } + } + } catch (error: any) { + console.error(error); + if (error.name !== 'AbortError' && error.code !== 'ERR_STREAM_PREMATURE_CLOSE') { + fastify.log?.warn(`Error processing token stats: ${error.message}`); + } + } + }; + + // Start background processing without blocking + processStats().catch((error) => { + console.log(error); + fastify.log?.warn(`Background stats processing failed: ${error.message}`); + }); + + // Return original stream to client + return originalStream; + }); + }), +}; + +/** + * Update global statistics + */ +function updateGlobalStats(stats: TokenStats, windowSize: number) { + globalStats.totalRequests++; + globalStats.totalTokens += stats.tokenCount; + globalStats.totalTime += (stats.lastTokenTime - stats.startTime) / 1000; + + if (stats.tokensPerSecond < globalStats.minTokensPerSecond) { + globalStats.minTokensPerSecond = stats.tokensPerSecond; + } + if (stats.tokensPerSecond > globalStats.maxTokensPerSecond) { + globalStats.maxTokensPerSecond = stats.tokensPerSecond; + } + + if (stats.timeToFirstToken) { + globalStats.avgTimeToFirstToken = + (globalStats.avgTimeToFirstToken * (globalStats.totalRequests - 1) + stats.timeToFirstToken) / + globalStats.totalRequests; + } + + globalStats.allSpeeds.push(stats.tokensPerSecond); + + // Maintain window size + if (globalStats.allSpeeds.length > windowSize) { + globalStats.allSpeeds.shift(); + } + + globalStats.avgTokensPerSecond = globalStats.totalTokens / globalStats.totalTime; +} + +/** + * Calculate percentile + */ +function calculatePercentile(data: number[], percentile: number): number { + if (data.length === 0) return 0; + const sorted = [...data].sort((a, b) => a - b); + const index = Math.ceil((percentile / 100) * sorted.length) - 1; + return sorted[index]; +} + +/** + * Estimate token count (fallback method) + */ +function estimateTokens(text: string): number { + // Rough estimation: English ~4 chars/token, Chinese ~1.5 chars/token + const chineseChars = (text.match(/[\u4e00-\u9fa5]/g) || []).length; + const otherChars = text.length - chineseChars; + return Math.ceil(chineseChars / 1.5 + otherChars / 4); +} + +/** + * Output single request statistics + */ +async function outputStats(stats: TokenStats, options?: OutputOptions, isFinal = false) { + const prefix = isFinal ? '[Token Speed Final]' : '[Token Speed]'; + + // Calculate average speed of each block + const avgBlockSpeed = stats.contentBlocks.length > 0 + ? stats.contentBlocks.reduce((sum, b) => sum + b.speed, 0) / stats.contentBlocks.length + : 0; + + const logData = { + requestId: stats.requestId.substring(0, 8), + tokenCount: stats.tokenCount, + tokensPerSecond: stats.tokensPerSecond.toFixed(2), + timeToFirstToken: stats.timeToFirstToken ? `${stats.timeToFirstToken}ms` : 'N/A', + duration: `${((stats.lastTokenTime - stats.startTime) / 1000).toFixed(2)}s`, + contentBlocks: stats.contentBlocks.length, + avgBlockSpeed: avgBlockSpeed.toFixed(2), + ...(isFinal && stats.contentBlocks.length > 1 ? { + blocks: stats.contentBlocks.map(b => ({ + index: b.index, + tokenCount: b.tokenCount, + speed: b.speed.toFixed(2) + })) + } : {}) + }; + + // Output through output manager + await outputManager.output(logData, { + prefix, + ...options + }); +} + +/** + * Output global statistics + */ +async function outputGlobalStats(options?: OutputOptions) { + const p50 = calculatePercentile(globalStats.allSpeeds, 50); + const p95 = calculatePercentile(globalStats.allSpeeds, 95); + const p99 = calculatePercentile(globalStats.allSpeeds, 99); + + const logData = { + totalRequests: globalStats.totalRequests, + totalTokens: globalStats.totalTokens, + avgTokensPerSecond: globalStats.avgTokensPerSecond.toFixed(2), + minSpeed: globalStats.minTokensPerSecond === Infinity ? 0 : globalStats.minTokensPerSecond.toFixed(2), + maxSpeed: globalStats.maxTokensPerSecond.toFixed(2), + avgTimeToFirstToken: `${globalStats.avgTimeToFirstToken.toFixed(0)}ms`, + percentiles: { + p50: p50.toFixed(2), + p95: p95.toFixed(2), + p99: p99.toFixed(2) + } + }; + + // Output through output manager + await outputManager.output(logData, { + prefix: '[Token Speed Global Stats]', + ...options + }); +} diff --git a/packages/core/src/plugins/types.ts b/packages/core/src/plugins/types.ts new file mode 100644 index 0000000..226bd40 --- /dev/null +++ b/packages/core/src/plugins/types.ts @@ -0,0 +1,28 @@ +import { FastifyPluginAsync } from 'fastify'; + +/** + * Plugin configuration interface + */ +export interface CCRPluginOptions { + enabled?: boolean; + [key: string]: any; +} + +/** + * Plugin interface + */ +export interface CCRPlugin { + name: string; + version?: string; + description?: string; + register: FastifyPluginAsync; +} + +/** + * Plugin metadata + */ +export interface PluginMetadata { + name: string; + enabled: boolean; + options?: any; +} diff --git a/packages/core/src/server.ts b/packages/core/src/server.ts index bbbc7e3..7823bbf 100644 --- a/packages/core/src/server.ts +++ b/packages/core/src/server.ts @@ -248,3 +248,5 @@ export { searchProjectBySession }; export { ConfigService } from "./services/config"; export { ProviderService } from "./services/provider"; export { TransformerService } from "./services/transformer"; +export { pluginManager, tokenSpeedPlugin, CCRPlugin, CCRPluginOptions, PluginMetadata } from "./plugins"; +export { SSEParserTransform, SSESerializerTransform, rewriteStream } from "./utils/sse"; diff --git a/packages/core/src/utils/sse/SSEParser.transform.ts b/packages/core/src/utils/sse/SSEParser.transform.ts new file mode 100644 index 0000000..9a22a3a --- /dev/null +++ b/packages/core/src/utils/sse/SSEParser.transform.ts @@ -0,0 +1,71 @@ +export class SSEParserTransform extends TransformStream { + private buffer = ''; + private currentEvent: Record = {}; + + constructor() { + super({ + transform: (chunk: string, controller) => { + this.buffer += chunk; + const lines = this.buffer.split('\n'); + + // Keep last line (may be incomplete) + this.buffer = lines.pop() || ''; + + for (const line of lines) { + const event = this.processLine(line); + if (event) { + controller.enqueue(event); + } + } + }, + flush: (controller) => { + // Process remaining content in buffer + if (this.buffer.trim()) { + const events: any[] = []; + this.processLine(this.buffer.trim(), events); + events.forEach(event => controller.enqueue(event)); + } + + // Push last event (if any) + if (Object.keys(this.currentEvent).length > 0) { + controller.enqueue(this.currentEvent); + } + } + }); + } + + private processLine(line: string, events?: any[]): any | null { + if (!line.trim()) { + if (Object.keys(this.currentEvent).length > 0) { + const event = { ...this.currentEvent }; + this.currentEvent = {}; + if (events) { + events.push(event); + return null; + } + return event; + } + return null; + } + + if (line.startsWith('event:')) { + this.currentEvent.event = line.slice(6).trim(); + } else if (line.startsWith('data:')) { + const data = line.slice(5).trim(); + if (data === '[DONE]') { + this.currentEvent.data = { type: 'done' }; + } else { + try { + this.currentEvent.data = JSON.parse(data); + } catch (e) { + this.currentEvent.data = { raw: data, error: 'JSON parse failed' }; + } + } + } else if (line.startsWith('id:')) { + this.currentEvent.id = line.slice(3).trim(); + } else if (line.startsWith('retry:')) { + this.currentEvent.retry = parseInt(line.slice(6).trim()); + } + return null; + } +} diff --git a/packages/core/src/utils/sse/SSESerializer.transform.ts b/packages/core/src/utils/sse/SSESerializer.transform.ts new file mode 100644 index 0000000..8e2c79c --- /dev/null +++ b/packages/core/src/utils/sse/SSESerializer.transform.ts @@ -0,0 +1,29 @@ +export class SSESerializerTransform extends TransformStream { + constructor() { + super({ + transform: (event, controller) => { + let output = ''; + + if (event.event) { + output += `event: ${event.event}\n`; + } + if (event.id) { + output += `id: ${event.id}\n`; + } + if (event.retry) { + output += `retry: ${event.retry}\n`; + } + if (event.data) { + if (event.data.type === 'done') { + output += 'data: [DONE]\n'; + } else { + output += `data: ${JSON.stringify(event.data)}\n`; + } + } + + output += '\n'; + controller.enqueue(output); + } + }); + } +} diff --git a/packages/core/src/utils/sse/index.ts b/packages/core/src/utils/sse/index.ts new file mode 100644 index 0000000..11af518 --- /dev/null +++ b/packages/core/src/utils/sse/index.ts @@ -0,0 +1,3 @@ +export { SSEParserTransform } from './SSEParser.transform'; +export { SSESerializerTransform } from './SSESerializer.transform'; +export { rewriteStream } from './rewriteStream'; diff --git a/packages/core/src/utils/sse/rewriteStream.ts b/packages/core/src/utils/sse/rewriteStream.ts new file mode 100644 index 0000000..0ce456e --- /dev/null +++ b/packages/core/src/utils/sse/rewriteStream.ts @@ -0,0 +1,34 @@ +/** + * Rewrite stream utility + * Reads source readablestream and returns a new readablestream, + * processor processes source data and pushes returned new value to new stream, + * no push if no return value + * @param stream + * @param processor + */ +export const rewriteStream = (stream: ReadableStream, processor: (data: any, controller: ReadableStreamController) => Promise): ReadableStream => { + const reader = stream.getReader() + + return new ReadableStream({ + async start(controller) { + try { + while (true) { + const { done, value } = await reader.read() + if (done) { + controller.close() + break + } + + const processed = await processor(value, controller) + if (processed !== undefined) { + controller.enqueue(processed) + } + } + } catch (error) { + controller.error(error) + } finally { + reader.releaseLock() + } + } + }) +} diff --git a/packages/server/src/index.ts b/packages/server/src/index.ts index 30f36d5..126cf47 100644 --- a/packages/server/src/index.ts +++ b/packages/server/src/index.ts @@ -15,6 +15,7 @@ import JSON5 from "json5"; import { IAgent, ITool } from "./agents/type"; import agentsManager from "./agents"; import { EventEmitter } from "node:events"; +import { pluginManager, tokenSpeedPlugin } from "@musistudio/llms"; const event = new EventEmitter() @@ -43,6 +44,44 @@ interface RunOptions { logger?: any; } +/** + * Plugin configuration from config file + */ +interface PluginConfig { + name: string; + enabled?: boolean; + options?: Record; +} + +/** + * Register plugins from configuration + * @param serverInstance Server instance + * @param config Application configuration + */ +async function registerPluginsFromConfig(serverInstance: any, config: any): Promise { + // Get plugins configuration from config file + const pluginsConfig: PluginConfig[] = config.plugins || config.Plugins || []; + + for (const pluginConfig of pluginsConfig) { + const { name, enabled = false, options = {} } = pluginConfig; + + switch (name) { + case 'token-speed': + pluginManager.registerPlugin(tokenSpeedPlugin, { + enabled, + ...options + }); + break; + + default: + console.warn(`Unknown plugin: ${name}`); + break; + } + } + // Enable all registered plugins + await pluginManager.enablePlugins(serverInstance); +} + async function getServer(options: RunOptions = {}) { await initializeClaudeConfig(); await initDir(); @@ -141,6 +180,9 @@ async function getServer(options: RunOptions = {}) { presets.map(async preset => await serverInstance.registerNamespace(`/preset/${preset.name}`, preset.config)) ) + // Register and configure plugins from config + await registerPluginsFromConfig(serverInstance, config); + // Add async preHandler hook for authentication serverInstance.addHook("preHandler", async (req: any, reply: any) => { return new Promise((resolve, reject) => { @@ -404,6 +446,7 @@ export { getServer }; export type { RunOptions }; export type { IAgent, ITool } from "./agents/type"; export { initDir, initConfig, readConfigFile, writeConfigFile, backupConfigFile } from "./utils"; +export { pluginManager, tokenSpeedPlugin } from "@musistudio/llms"; // Start service if this file is run directly if (require.main === module) { diff --git a/packages/server/src/types.d.ts b/packages/server/src/types.d.ts index ad50686..885ad0b 100644 --- a/packages/server/src/types.d.ts +++ b/packages/server/src/types.d.ts @@ -8,6 +8,15 @@ declare module "@musistudio/llms" { logger?: any; } + /** + * Plugin configuration from config file + */ + export interface PluginConfig { + name: string; + enabled?: boolean; + options?: Record; + } + export interface Server { app: FastifyInstance; logger: FastifyBaseLogger; diff --git a/packages/server/src/types/llms-plugin.d.ts b/packages/server/src/types/llms-plugin.d.ts new file mode 100644 index 0000000..dd40711 --- /dev/null +++ b/packages/server/src/types/llms-plugin.d.ts @@ -0,0 +1,62 @@ +import { FastifyPluginAsync } from 'fastify'; + +declare module '@musistudio/llms' { + + export interface CCRPluginOptions { + enabled?: boolean; + [key: string]: any; + } + + + export interface CCRPlugin { + name: string; + version?: string; + description?: string; + register: FastifyPluginAsync; + } + + + export interface PluginMetadata { + name: string; + enabled: boolean; + options?: any; + } + + + export class PluginManager { + private plugins; + private pluginInstances; + registerPlugin(plugin: CCRPlugin, options?: any): void; + enablePlugin(name: string, fastify: import('fastify').FastifyInstance): Promise; + enablePlugins(fastify: import('fastify').FastifyInstance): Promise; + getPlugins(): PluginMetadata[]; + getPlugin(name: string): CCRPlugin | undefined; + hasPlugin(name: string): boolean; + isPluginEnabled(name: string): boolean; + setPluginEnabled(name: string, enabled: boolean): void; + removePlugin(name: string): void; + clear(): void; + } + + + export const pluginManager: PluginManager; + + + export const tokenSpeedPlugin: CCRPlugin; + + + export class SSEParserTransform extends TransformStream { + constructor(); + } + + + export class SSESerializerTransform extends TransformStream { + constructor(); + } + + + export function rewriteStream( + stream: ReadableStream, + processor: (data: any, controller: ReadableStreamController) => Promise + ): ReadableStream; +} diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 6951edf..061a9c9 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -124,6 +124,9 @@ importers: fastify: specifier: ^5.4.0 version: 5.6.1 + fastify-plugin: + specifier: ^5.1.0 + version: 5.1.0 google-auth-library: specifier: ^10.1.0 version: 10.4.0