add plugin system

This commit is contained in:
musistudio
2025-12-31 08:42:37 +08:00
parent 3cb1275e0c
commit 38cc5feadb
20 changed files with 1544 additions and 0 deletions

View File

@@ -245,6 +245,8 @@ ui (standalone frontend application)
- cli/server/shared: esbuild
- ui: Vite + TypeScript
5. **@musistudio/llms**: This is an external dependency package providing the core server framework and transformer functionality, type definitions in `packages/server/src/types.d.ts`
6. **Code comments**: All comments in code MUST be written in English
7. **Documentation**: When implementing new features, add documentation to the docs project instead of creating standalone md files
## Configuration Example Locations

View File

@@ -4,9 +4,11 @@
"description": "A universal LLM API transformation server",
"main": "dist/cjs/server.cjs",
"module": "dist/esm/server.mjs",
"types": "dist/plugins.d.ts",
"type": "module",
"exports": {
".": {
"types": "./dist/plugins.d.ts",
"import": "./dist/esm/server.mjs",
"require": "./dist/cjs/server.cjs"
}
@@ -36,6 +38,7 @@
"@google/genai": "^1.7.0",
"dotenv": "^16.5.0",
"fastify": "^5.4.0",
"fastify-plugin": "^5.1.0",
"google-auth-library": "^10.1.0",
"json5": "^2.2.3",
"jsonrepair": "^3.13.0",

View File

@@ -0,0 +1,4 @@
export * from './types';
export { pluginManager } from './plugin-manager';
export { tokenSpeedPlugin } from './token-speed';
export * from './output';

View File

@@ -0,0 +1,150 @@
import { OutputHandler, OutputOptions, ConsoleOutputConfig } from './types';
/**
* Console output handler
* Supports colored output and multiple log levels
*/
export class ConsoleOutputHandler implements OutputHandler {
type = 'console' as const;
private config: ConsoleOutputConfig;
// ANSI color codes
private colors = {
reset: '\x1b[0m',
bright: '\x1b[1m',
dim: '\x1b[2m',
red: '\x1b[31m',
green: '\x1b[32m',
yellow: '\x1b[33m',
blue: '\x1b[34m',
magenta: '\x1b[35m',
cyan: '\x1b[36m',
white: '\x1b[37m'
};
constructor(config: ConsoleOutputConfig = {}) {
this.config = {
colors: true,
level: 'log',
...config
};
}
/**
* Format output data
*/
private formatData(data: any, options: OutputOptions): string {
const { format = 'text', timestamp = true, prefix, metadata } = options || {};
// Build prefix
let output = '';
if (timestamp) {
const time = new Date().toISOString();
output += this.config.colors
? `${this.colors.cyan}[${time}]${this.colors.reset} `
: `[${time}] `;
}
if (prefix) {
output += this.config.colors
? `${this.colors.bright}${prefix}${this.colors.reset} `
: `${prefix} `;
}
// Format data
switch (format) {
case 'json':
output += JSON.stringify(data, null, 2);
break;
case 'markdown':
if (typeof data === 'object') {
output += this.toMarkdown(data);
} else {
output += String(data);
}
break;
case 'text':
default:
if (typeof data === 'object') {
output += JSON.stringify(data, null, 2);
} else {
output += String(data);
}
break;
}
// Add metadata
if (metadata && Object.keys(metadata).length > 0) {
output += '\n' + (this.config.colors ? `${this.colors.dim}` : '');
output += 'Metadata: ' + JSON.stringify(metadata, null, 2);
if (this.config.colors) output += this.colors.reset;
}
return output;
}
/**
* Convert object to Markdown format
*/
private toMarkdown(data: any, indent = 0): string {
const padding = ' '.repeat(indent);
if (Array.isArray(data)) {
return data.map(item => {
if (typeof item === 'object') {
return `${padding}-\n${this.toMarkdown(item, indent + 1)}`;
}
return `${padding}- ${item}`;
}).join('\n');
}
if (typeof data === 'object' && data !== null) {
return Object.entries(data).map(([key, value]) => {
if (typeof value === 'object' && value !== null) {
return `${padding}${key}:\n${this.toMarkdown(value, indent + 1)}`;
}
return `${padding}${key}: ${value}`;
}).join('\n');
}
return `${padding}${data}`;
}
/**
* Output data
*/
async output(data: any, options: OutputOptions = {}): Promise<boolean> {
try {
const formatted = this.formatData(data, options);
const logMethod = this.config.level || 'log';
// Output based on configured log level
switch (logMethod) {
case 'info':
console.info(formatted);
break;
case 'warn':
console.warn(formatted);
break;
case 'error':
console.error(formatted);
break;
case 'debug':
console.debug(formatted);
break;
case 'log':
default:
console.log(formatted);
break;
}
return true;
} catch (error) {
console.error('[ConsoleOutputHandler] Output failed:', error);
return false;
}
}
}

View File

@@ -0,0 +1,48 @@
// Type definitions
export * from './types';
// Output handler implementations
export { ConsoleOutputHandler } from './console-handler';
export { WebhookOutputHandler } from './webhook-handler';
// Output manager
export { outputManager, output, outputTo } from './output-manager';
/**
* Convenience function: Create and register a Console output handler
* @param config Console output handler configuration
* @returns Output manager instance
*/
export function registerConsoleOutput(config?: import('./types').ConsoleOutputConfig) {
const { ConsoleOutputHandler } = require('./console-handler');
const handler = new ConsoleOutputHandler(config);
const { outputManager } = require('./output-manager');
const name = 'console_' + Date.now();
outputManager.registerHandler(name, handler);
return outputManager;
}
/**
* Convenience function: Create and register a Webhook output handler
* @param config Webhook output handler configuration
* @returns Output manager instance
*/
export function registerWebhookOutput(config: import('./types').WebhookOutputConfig) {
const { WebhookOutputHandler } = require('./webhook-handler');
const handler = new WebhookOutputHandler(config);
const { outputManager } = require('./output-manager');
const name = 'webhook_' + Date.now();
outputManager.registerHandler(name, handler);
return outputManager;
}
/**
* Convenience function: Register output handlers in batch
* @param configs Output handler configuration array
* @returns Output manager instance
*/
export function registerOutputHandlers(configs: import('./types').OutputHandlerConfig[]) {
const { outputManager } = require('./output-manager');
outputManager.registerHandlers(configs);
return outputManager;
}

View File

@@ -0,0 +1,224 @@
import { OutputHandler, OutputOptions, OutputHandlerConfig } from './types';
import { ConsoleOutputHandler } from './console-handler';
import { WebhookOutputHandler } from './webhook-handler';
/**
* Output manager
* Manages multiple output handlers and provides unified output interface
*/
class OutputManager {
private handlers: Map<string, OutputHandler> = new Map();
private defaultOptions: OutputOptions = {};
/**
* Register output handler
* @param name Output handler name
* @param handler Output handler instance
*/
registerHandler(name: string, handler: OutputHandler): void {
this.handlers.set(name, handler);
}
/**
* Register output handlers in batch
* @param configs Output handler configuration array
*/
registerHandlers(configs: OutputHandlerConfig[]): void {
for (const config of configs) {
if (config.enabled === false) {
continue;
}
try {
const handler = this.createHandler(config);
const name = config.type + '_' + Date.now();
this.registerHandler(name, handler);
} catch (error) {
console.error(`[OutputManager] Failed to register ${config.type} handler:`, error);
}
}
}
/**
* Create output handler instance
* @param config Output handler configuration
*/
private createHandler(config: OutputHandlerConfig): OutputHandler {
switch (config.type) {
case 'console':
return new ConsoleOutputHandler(config.config as any);
case 'webhook':
return new WebhookOutputHandler(config.config as any);
// Reserved for other output handler types
// case 'websocket':
// return new WebSocketOutputHandler(config.config as any);
default:
throw new Error(`Unknown output handler type: ${config.type}`);
}
}
/**
* Remove output handler
* @param name Output handler name
*/
unregisterHandler(name: string): boolean {
return this.handlers.delete(name);
}
/**
* Get output handler
* @param name Output handler name
*/
getHandler(name: string): OutputHandler | undefined {
return this.handlers.get(name);
}
/**
* Get all output handlers
*/
getAllHandlers(): Map<string, OutputHandler> {
return new Map(this.handlers);
}
/**
* Clear all output handlers
*/
clearHandlers(): void {
this.handlers.clear();
}
/**
* Set default output options
* @param options Output options
*/
setDefaultOptions(options: OutputOptions): void {
this.defaultOptions = { ...this.defaultOptions, ...options };
}
/**
* Get default output options
*/
getDefaultOptions(): OutputOptions {
return { ...this.defaultOptions };
}
/**
* Output data to all registered output handlers
* @param data Data to output
* @param options Output options
* @returns Promise<{success: string[], failed: string[]}> Names of successful and failed handlers
*/
async output(
data: any,
options?: OutputOptions
): Promise<{ success: string[]; failed: string[] }> {
const mergedOptions = { ...this.defaultOptions, ...options };
const results = { success: [] as string[], failed: [] as string[] };
// Send data to all handlers in parallel
const promises = Array.from(this.handlers.entries()).map(
async ([name, handler]) => {
try {
const success = await handler.output(data, mergedOptions);
if (success) {
results.success.push(name);
} else {
results.failed.push(name);
}
} catch (error) {
console.error(`[OutputManager] Handler ${name} failed:`, error);
results.failed.push(name);
}
}
);
await Promise.all(promises);
return results;
}
/**
* Output data to specified output handlers
* @param handlerNames Array of output handler names
* @param data Data to output
* @param options Output options
* @returns Promise<{success: string[], failed: string[]}> Names of successful and failed handlers
*/
async outputTo(
handlerNames: string[],
data: any,
options?: OutputOptions
): Promise<{ success: string[]; failed: string[] }> {
const mergedOptions = { ...this.defaultOptions, ...options };
const results = { success: [] as string[], failed: [] as string[] };
const promises = handlerNames.map(async name => {
const handler = this.handlers.get(name);
if (!handler) {
console.warn(`[OutputManager] Handler ${name} not found`);
results.failed.push(name);
return;
}
try {
const success = await handler.output(data, mergedOptions);
if (success) {
results.success.push(name);
} else {
results.failed.push(name);
}
} catch (error) {
console.error(`[OutputManager] Handler ${name} failed:`, error);
results.failed.push(name);
}
});
await Promise.all(promises);
return results;
}
/**
* Output data to specified type of output handlers
* @param type Output handler type
* @param data Data to output
* @param options Output options
* @returns Promise<{success: string[], failed: string[]}> Names of successful and failed handlers
*/
async outputToType(
type: string,
data: any,
options?: OutputOptions
): Promise<{ success: string[]; failed: string[] }> {
const targetHandlers = Array.from(this.handlers.entries())
.filter(([_, handler]) => handler.type === type)
.map(([name]) => name);
return this.outputTo(targetHandlers, data, options);
}
}
/**
* Global output manager instance
*/
export const outputManager = new OutputManager();
/**
* Convenience method: Quickly output data to all registered handlers
* @param data Data to output
* @param options Output options
*/
export async function output(data: any, options?: OutputOptions) {
return outputManager.output(data, options);
}
/**
* Convenience method: Quickly output data to specified type of handlers
* @param type Output handler type ('console' | 'webhook' | 'websocket')
* @param data Data to output
* @param options Output options
*/
export async function outputTo(type: string, data: any, options?: OutputOptions) {
return outputManager.outputToType(type, data, options);
}

View File

@@ -0,0 +1,156 @@
/**
* Output handler interface
* All output handlers must implement this interface
*/
export interface OutputHandler {
/**
* Output handler type name
*/
type: string;
/**
* Output data
* @param data Data to output
* @param options Output options
* @returns Promise<boolean> Whether output was successful
*/
output(data: any, options?: OutputOptions): Promise<boolean>;
}
/**
* Output options
*/
export interface OutputOptions {
/**
* Output format
*/
format?: 'json' | 'text' | 'markdown';
/**
* Whether to include timestamp
*/
timestamp?: boolean;
/**
* Custom prefix
*/
prefix?: string;
/**
* Additional metadata
*/
metadata?: Record<string, any>;
/**
* Timeout (milliseconds)
*/
timeout?: number;
}
/**
* Console output handler configuration
*/
export interface ConsoleOutputConfig {
/**
* Whether to use colored output
*/
colors?: boolean;
/**
* Log level
*/
level?: 'log' | 'info' | 'warn' | 'error' | 'debug';
}
/**
* Webhook output handler configuration
*/
export interface WebhookOutputConfig {
/**
* Webhook URL
*/
url: string;
/**
* HTTP request method
*/
method?: 'POST' | 'PUT' | 'PATCH';
/**
* Request headers
*/
headers?: Record<string, string>;
/**
* Authentication information
*/
auth?: {
type: 'bearer' | 'basic' | 'custom';
token?: string;
username?: string;
password?: string;
custom?: {
header: string;
value: string;
};
};
/**
* Retry configuration
*/
retry?: {
maxAttempts: number;
backoffMs: number;
};
/**
* Whether to handle failures silently (only log, don't throw)
*/
silent?: boolean;
}
/**
* WebSocket output handler configuration (reserved for future use)
*/
export interface WebSocketOutputConfig {
/**
* WebSocket URL
*/
url: string;
/**
* Reconnection configuration
*/
reconnect?: {
maxAttempts: number;
intervalMs: number;
};
/**
* Heartbeat configuration
*/
heartbeat?: {
intervalMs: number;
message?: string;
};
}
/**
* Output handler registration configuration
*/
export interface OutputHandlerConfig {
/**
* Output handler type
*/
type: 'console' | 'webhook' | 'websocket';
/**
* Whether enabled
*/
enabled?: boolean;
/**
* Configuration options
*/
config?: ConsoleOutputConfig | WebhookOutputConfig | WebSocketOutputConfig;
}

View File

@@ -0,0 +1,202 @@
import { OutputHandler, OutputOptions, WebhookOutputConfig } from './types';
/**
* Webhook output handler
* Supports sending data to HTTP endpoints with retry and authentication
*/
export class WebhookOutputHandler implements OutputHandler {
type = 'webhook' as const;
private config: WebhookOutputConfig;
private defaultTimeout = 30000; // 30 second default timeout
constructor(config: WebhookOutputConfig) {
if (!config.url) {
throw new Error('Webhook URL is required');
}
this.config = {
method: 'POST',
retry: {
maxAttempts: 3,
backoffMs: 1000
},
silent: false,
...config
};
}
/**
* Build request headers
*/
private buildHeaders(): Record<string, string> {
const headers: Record<string, string> = {
'Content-Type': 'application/json',
...(this.config.headers || {})
};
// Add authentication headers
if (this.config.auth) {
switch (this.config.auth.type) {
case 'bearer':
if (this.config.auth.token) {
headers['Authorization'] = `Bearer ${this.config.auth.token}`;
}
break;
case 'basic':
if (this.config.auth.username && this.config.auth.password) {
const credentials = Buffer.from(
`${this.config.auth.username}:${this.config.auth.password}`
).toString('base64');
headers['Authorization'] = `Basic ${credentials}`;
}
break;
case 'custom':
if (this.config.auth.custom) {
headers[this.config.auth.custom.header] = this.config.auth.custom.value;
}
break;
}
}
return headers;
}
/**
* Build request body
*/
private buildBody(data: any, options: OutputOptions): any {
const { format = 'json', timestamp = true, prefix, metadata } = options || {};
const body: any = {
data
};
if (timestamp) {
body.timestamp = new Date().toISOString();
}
if (prefix) {
body.prefix = prefix;
}
if (metadata && Object.keys(metadata).length > 0) {
body.metadata = metadata;
}
return body;
}
/**
* Send HTTP request
*/
private async sendRequest(
url: string,
method: string,
headers: Record<string, string>,
body: any,
timeout: number
): Promise<Response> {
const controller = new AbortController();
const timeoutId = setTimeout(() => controller.abort(), timeout);
try {
const response = await fetch(url, {
method,
headers,
body: JSON.stringify(body),
signal: controller.signal
});
clearTimeout(timeoutId);
if (!response.ok) {
throw new Error(`HTTP ${response.status}: ${response.statusText}`);
}
return response;
} catch (error) {
clearTimeout(timeoutId);
throw error;
}
}
/**
* Delay function (for retry backoff)
*/
private delay(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
/**
* Send request with retry
*/
private async sendWithRetry(
url: string,
method: string,
headers: Record<string, string>,
body: any,
timeout: number,
retry: { maxAttempts: number; backoffMs: number }
): Promise<Response> {
let lastError: Error | null = null;
for (let attempt = 1; attempt <= retry.maxAttempts; attempt++) {
try {
return await this.sendRequest(url, method, headers, body, timeout);
} catch (error) {
lastError = error as Error;
// If this is the last attempt, throw error directly
if (attempt === retry.maxAttempts) {
break;
}
// Calculate backoff time (exponential backoff)
const backoffTime = retry.backoffMs * Math.pow(2, attempt - 1);
console.warn(
`[WebhookOutputHandler] Request failed (attempt ${attempt}/${retry.maxAttempts}), ` +
`retrying in ${backoffTime}ms...`,
(error as Error).message
);
await this.delay(backoffTime);
}
}
throw lastError;
}
/**
* Output data to Webhook
*/
async output(data: any, options: OutputOptions = {}): Promise<boolean> {
const timeout = options.timeout || this.defaultTimeout;
try {
const headers = this.buildHeaders();
const body = this.buildBody(data, options);
const response = await this.sendWithRetry(
this.config.url,
this.config.method!,
headers,
body,
timeout,
this.config.retry!
);
return true;
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error);
if (this.config.silent) {
console.error(`[WebhookOutputHandler] Failed to send data: ${errorMessage}`);
return false;
}
throw new Error(`Webhook output failed: ${errorMessage}`);
}
}
}

View File

@@ -0,0 +1,121 @@
import { FastifyInstance } from 'fastify';
import { CCRPlugin, PluginMetadata } from './types';
/**
* Plugin manager
*/
class PluginManager {
private plugins: Map<string, PluginMetadata> = new Map();
private pluginInstances: Map<string, CCRPlugin> = new Map();
/**
* Register a plugin
* @param plugin Plugin instance
* @param options Plugin configuration options
*/
registerPlugin(plugin: CCRPlugin, options: any = {}): void {
this.pluginInstances.set(plugin.name, plugin);
this.plugins.set(plugin.name, {
name: plugin.name,
enabled: options.enabled !== false,
options
});
}
/**
* Enable a single plugin
* @param name Plugin name
* @param fastify Fastify instance
*/
async enablePlugin(name: string, fastify: FastifyInstance): Promise<void> {
const metadata = this.plugins.get(name);
const plugin = this.pluginInstances.get(name);
if (!metadata || !plugin) {
throw new Error(`Plugin ${name} not found`);
}
if (metadata.enabled) {
await fastify.register(plugin.register, metadata.options);
}
}
/**
* Enable all registered plugins in batch
* @param fastify Fastify instance
*/
async enablePlugins(fastify: FastifyInstance): Promise<void> {
for (const [name, metadata] of this.plugins) {
if (metadata.enabled) {
try {
await this.enablePlugin(name, fastify);
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error);
fastify.log?.error(`Failed to enable plugin ${name}: ${errorMessage}`);
}
}
}
}
/**
* Get list of registered plugins
*/
getPlugins(): PluginMetadata[] {
return Array.from(this.plugins.values());
}
/**
* Get plugin instance
* @param name Plugin name
*/
getPlugin(name: string): CCRPlugin | undefined {
return this.pluginInstances.get(name);
}
/**
* Check if plugin is registered
* @param name Plugin name
*/
hasPlugin(name: string): boolean {
return this.pluginInstances.has(name);
}
/**
* Check if plugin is enabled
* @param name Plugin name
*/
isPluginEnabled(name: string): boolean {
const metadata = this.plugins.get(name);
return metadata?.enabled || false;
}
/**
* Dynamically enable/disable plugin
* @param name Plugin name
* @param enabled Whether to enable
*/
setPluginEnabled(name: string, enabled: boolean): void {
const metadata = this.plugins.get(name);
if (metadata) {
metadata.enabled = enabled;
}
}
/**
* Remove plugin
* @param name Plugin name
*/
removePlugin(name: string): void {
this.plugins.delete(name);
this.pluginInstances.delete(name);
}
/**
* Clear all plugins
*/
clear(): void {
this.plugins.clear();
this.pluginInstances.clear();
}
}
export const pluginManager = new PluginManager();

View File

@@ -0,0 +1,350 @@
import fp from 'fastify-plugin';
import { CCRPlugin, CCRPluginOptions } from './types';
import { SSEParserTransform } from '../utils/sse';
import { Tiktoken } from 'tiktoken';
import { OutputHandlerConfig, OutputOptions, outputManager } from './output';
/**
* Token statistics interface
*/
interface TokenStats {
requestId: string;
startTime: number;
firstTokenTime?: number;
lastTokenTime: number;
tokenCount: number;
tokensPerSecond: number;
timeToFirstToken?: number;
contentBlocks: {
index: number;
tokenCount: number;
speed: number;
}[];
}
/**
* Plugin options
*/
interface TokenSpeedOptions extends CCRPluginOptions {
logInterval?: number; // Log every N tokens
enableCrossRequestStats?: boolean; // Enable cross-request statistics
statsWindow?: number; // Statistics window size (last N requests)
/**
* Output handler configurations
* Supports console, webhook, and other output handlers
*/
outputHandlers?: OutputHandlerConfig[];
/**
* Default output options (format, prefix, etc.)
*/
outputOptions?: OutputOptions;
}
// Store request-level statistics
const requestStats = new Map<string, TokenStats>();
// Cross-request statistics
const globalStats = {
totalRequests: 0,
totalTokens: 0,
totalTime: 0,
avgTokensPerSecond: 0,
minTokensPerSecond: Infinity,
maxTokensPerSecond: 0,
avgTimeToFirstToken: 0,
allSpeeds: [] as number[] // Used for calculating percentiles
};
/**
* Token speed measurement plugin
*/
export const tokenSpeedPlugin: CCRPlugin = {
name: 'token-speed',
version: '1.0.0',
description: 'Statistics for streaming response token generation speed',
// Use fp() to break encapsulation and apply hooks globally
register: fp(async (fastify, options: TokenSpeedOptions) => {
const opts = {
logInterval: 10,
enableCrossRequestStats: true,
statsWindow: 100,
...options
};
// Initialize output handlers
if (opts.outputHandlers && opts.outputHandlers.length > 0) {
outputManager.registerHandlers(opts.outputHandlers);
} else {
// Default to console output if no handlers configured
outputManager.registerHandlers([{
type: 'console',
enabled: true,
config: {
colors: true,
level: 'log'
}
}]);
}
// Set default output options
if (opts.outputOptions) {
outputManager.setDefaultOptions(opts.outputOptions);
}
// Initialize tiktoken encoder
let encoding: Tiktoken | null = null;
try {
const { get_encoding } = await import('tiktoken');
encoding = get_encoding('cl100k_base');
} catch (error) {
fastify.log?.warn('Failed to load tiktoken, falling back to estimation');
}
// Add onSend hook to intercept streaming responses
fastify.addHook('onSend', async (request, reply, payload) => {
// Only handle streaming responses
if (!(payload instanceof ReadableStream)) {
return payload;
}
const requestId = (request as any).id || Date.now().toString();
const startTime = Date.now();
// Initialize statistics
requestStats.set(requestId, {
requestId,
startTime,
lastTokenTime: startTime,
tokenCount: 0,
tokensPerSecond: 0,
contentBlocks: []
});
// Tee the stream: one for stats, one for the client
const [originalStream, statsStream] = payload.tee();
// Process stats in background
const processStats = async () => {
let currentBlockIndex = -1;
let blockStartTime = 0;
let blockTokenCount = 0;
try {
// Decode byte stream to text, then parse SSE events
const eventStream = statsStream
.pipeThrough(new TextDecoderStream())
.pipeThrough(new SSEParserTransform());
const reader = eventStream.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const data = value;
const stats = requestStats.get(requestId);
if (!stats) continue;
// Detect content_block_start event
if (data.event === 'content_block_start' && data.data?.content_block?.type === 'text') {
currentBlockIndex = data.data.index;
blockStartTime = Date.now();
blockTokenCount = 0;
}
// Detect content_block_delta event (incremental tokens)
if (data.event === 'content_block_delta' && data.data?.delta?.type === 'text_delta') {
const text = data.data.delta.text;
const tokenCount = encoding
? encoding.encode(text).length
: estimateTokens(text);
stats.tokenCount += tokenCount;
stats.lastTokenTime = Date.now();
// Record first token time
if (!stats.firstTokenTime) {
stats.firstTokenTime = stats.lastTokenTime;
stats.timeToFirstToken = stats.firstTokenTime - stats.startTime;
}
// Calculate current block token count
if (currentBlockIndex >= 0) {
blockTokenCount += tokenCount;
}
// Calculate speed
const elapsed = (stats.lastTokenTime - stats.startTime) / 1000;
stats.tokensPerSecond = stats.tokenCount / elapsed;
// Log periodically
if (stats.tokenCount % opts.logInterval === 0) {
await outputStats(stats, opts.outputOptions);
}
}
// Detect content_block_stop event
if (data.event === 'content_block_stop' && currentBlockIndex >= 0) {
const blockElapsed = (Date.now() - blockStartTime) / 1000;
const blockSpeed = blockElapsed > 0 ? blockTokenCount / blockElapsed : 0;
stats.contentBlocks.push({
index: currentBlockIndex,
tokenCount: blockTokenCount,
speed: blockSpeed
});
currentBlockIndex = -1;
}
// Output final statistics when message ends
if (data.event === 'message_stop') {
// Update global statistics
if (opts.enableCrossRequestStats) {
updateGlobalStats(stats, opts.statsWindow);
}
await outputStats(stats, opts.outputOptions, true);
if (opts.enableCrossRequestStats) {
await outputGlobalStats(opts.outputOptions);
}
requestStats.delete(requestId);
}
}
} catch (error: any) {
console.error(error);
if (error.name !== 'AbortError' && error.code !== 'ERR_STREAM_PREMATURE_CLOSE') {
fastify.log?.warn(`Error processing token stats: ${error.message}`);
}
}
};
// Start background processing without blocking
processStats().catch((error) => {
console.log(error);
fastify.log?.warn(`Background stats processing failed: ${error.message}`);
});
// Return original stream to client
return originalStream;
});
}),
};
/**
* Update global statistics
*/
function updateGlobalStats(stats: TokenStats, windowSize: number) {
globalStats.totalRequests++;
globalStats.totalTokens += stats.tokenCount;
globalStats.totalTime += (stats.lastTokenTime - stats.startTime) / 1000;
if (stats.tokensPerSecond < globalStats.minTokensPerSecond) {
globalStats.minTokensPerSecond = stats.tokensPerSecond;
}
if (stats.tokensPerSecond > globalStats.maxTokensPerSecond) {
globalStats.maxTokensPerSecond = stats.tokensPerSecond;
}
if (stats.timeToFirstToken) {
globalStats.avgTimeToFirstToken =
(globalStats.avgTimeToFirstToken * (globalStats.totalRequests - 1) + stats.timeToFirstToken) /
globalStats.totalRequests;
}
globalStats.allSpeeds.push(stats.tokensPerSecond);
// Maintain window size
if (globalStats.allSpeeds.length > windowSize) {
globalStats.allSpeeds.shift();
}
globalStats.avgTokensPerSecond = globalStats.totalTokens / globalStats.totalTime;
}
/**
* Calculate percentile
*/
function calculatePercentile(data: number[], percentile: number): number {
if (data.length === 0) return 0;
const sorted = [...data].sort((a, b) => a - b);
const index = Math.ceil((percentile / 100) * sorted.length) - 1;
return sorted[index];
}
/**
* Estimate token count (fallback method)
*/
function estimateTokens(text: string): number {
// Rough estimation: English ~4 chars/token, Chinese ~1.5 chars/token
const chineseChars = (text.match(/[\u4e00-\u9fa5]/g) || []).length;
const otherChars = text.length - chineseChars;
return Math.ceil(chineseChars / 1.5 + otherChars / 4);
}
/**
* Output single request statistics
*/
async function outputStats(stats: TokenStats, options?: OutputOptions, isFinal = false) {
const prefix = isFinal ? '[Token Speed Final]' : '[Token Speed]';
// Calculate average speed of each block
const avgBlockSpeed = stats.contentBlocks.length > 0
? stats.contentBlocks.reduce((sum, b) => sum + b.speed, 0) / stats.contentBlocks.length
: 0;
const logData = {
requestId: stats.requestId.substring(0, 8),
tokenCount: stats.tokenCount,
tokensPerSecond: stats.tokensPerSecond.toFixed(2),
timeToFirstToken: stats.timeToFirstToken ? `${stats.timeToFirstToken}ms` : 'N/A',
duration: `${((stats.lastTokenTime - stats.startTime) / 1000).toFixed(2)}s`,
contentBlocks: stats.contentBlocks.length,
avgBlockSpeed: avgBlockSpeed.toFixed(2),
...(isFinal && stats.contentBlocks.length > 1 ? {
blocks: stats.contentBlocks.map(b => ({
index: b.index,
tokenCount: b.tokenCount,
speed: b.speed.toFixed(2)
}))
} : {})
};
// Output through output manager
await outputManager.output(logData, {
prefix,
...options
});
}
/**
* Output global statistics
*/
async function outputGlobalStats(options?: OutputOptions) {
const p50 = calculatePercentile(globalStats.allSpeeds, 50);
const p95 = calculatePercentile(globalStats.allSpeeds, 95);
const p99 = calculatePercentile(globalStats.allSpeeds, 99);
const logData = {
totalRequests: globalStats.totalRequests,
totalTokens: globalStats.totalTokens,
avgTokensPerSecond: globalStats.avgTokensPerSecond.toFixed(2),
minSpeed: globalStats.minTokensPerSecond === Infinity ? 0 : globalStats.minTokensPerSecond.toFixed(2),
maxSpeed: globalStats.maxTokensPerSecond.toFixed(2),
avgTimeToFirstToken: `${globalStats.avgTimeToFirstToken.toFixed(0)}ms`,
percentiles: {
p50: p50.toFixed(2),
p95: p95.toFixed(2),
p99: p99.toFixed(2)
}
};
// Output through output manager
await outputManager.output(logData, {
prefix: '[Token Speed Global Stats]',
...options
});
}

View File

@@ -0,0 +1,28 @@
import { FastifyPluginAsync } from 'fastify';
/**
* Plugin configuration interface
*/
export interface CCRPluginOptions {
enabled?: boolean;
[key: string]: any;
}
/**
* Plugin interface
*/
export interface CCRPlugin {
name: string;
version?: string;
description?: string;
register: FastifyPluginAsync<CCRPluginOptions>;
}
/**
* Plugin metadata
*/
export interface PluginMetadata {
name: string;
enabled: boolean;
options?: any;
}

View File

@@ -248,3 +248,5 @@ export { searchProjectBySession };
export { ConfigService } from "./services/config";
export { ProviderService } from "./services/provider";
export { TransformerService } from "./services/transformer";
export { pluginManager, tokenSpeedPlugin, CCRPlugin, CCRPluginOptions, PluginMetadata } from "./plugins";
export { SSEParserTransform, SSESerializerTransform, rewriteStream } from "./utils/sse";

View File

@@ -0,0 +1,71 @@
export class SSEParserTransform extends TransformStream<string, any> {
private buffer = '';
private currentEvent: Record<string, any> = {};
constructor() {
super({
transform: (chunk: string, controller) => {
this.buffer += chunk;
const lines = this.buffer.split('\n');
// Keep last line (may be incomplete)
this.buffer = lines.pop() || '';
for (const line of lines) {
const event = this.processLine(line);
if (event) {
controller.enqueue(event);
}
}
},
flush: (controller) => {
// Process remaining content in buffer
if (this.buffer.trim()) {
const events: any[] = [];
this.processLine(this.buffer.trim(), events);
events.forEach(event => controller.enqueue(event));
}
// Push last event (if any)
if (Object.keys(this.currentEvent).length > 0) {
controller.enqueue(this.currentEvent);
}
}
});
}
private processLine(line: string, events?: any[]): any | null {
if (!line.trim()) {
if (Object.keys(this.currentEvent).length > 0) {
const event = { ...this.currentEvent };
this.currentEvent = {};
if (events) {
events.push(event);
return null;
}
return event;
}
return null;
}
if (line.startsWith('event:')) {
this.currentEvent.event = line.slice(6).trim();
} else if (line.startsWith('data:')) {
const data = line.slice(5).trim();
if (data === '[DONE]') {
this.currentEvent.data = { type: 'done' };
} else {
try {
this.currentEvent.data = JSON.parse(data);
} catch (e) {
this.currentEvent.data = { raw: data, error: 'JSON parse failed' };
}
}
} else if (line.startsWith('id:')) {
this.currentEvent.id = line.slice(3).trim();
} else if (line.startsWith('retry:')) {
this.currentEvent.retry = parseInt(line.slice(6).trim());
}
return null;
}
}

View File

@@ -0,0 +1,29 @@
export class SSESerializerTransform extends TransformStream<any, string> {
constructor() {
super({
transform: (event, controller) => {
let output = '';
if (event.event) {
output += `event: ${event.event}\n`;
}
if (event.id) {
output += `id: ${event.id}\n`;
}
if (event.retry) {
output += `retry: ${event.retry}\n`;
}
if (event.data) {
if (event.data.type === 'done') {
output += 'data: [DONE]\n';
} else {
output += `data: ${JSON.stringify(event.data)}\n`;
}
}
output += '\n';
controller.enqueue(output);
}
});
}
}

View File

@@ -0,0 +1,3 @@
export { SSEParserTransform } from './SSEParser.transform';
export { SSESerializerTransform } from './SSESerializer.transform';
export { rewriteStream } from './rewriteStream';

View File

@@ -0,0 +1,34 @@
/**
* Rewrite stream utility
* Reads source readablestream and returns a new readablestream,
* processor processes source data and pushes returned new value to new stream,
* no push if no return value
* @param stream
* @param processor
*/
export const rewriteStream = (stream: ReadableStream, processor: (data: any, controller: ReadableStreamController<any>) => Promise<any>): ReadableStream => {
const reader = stream.getReader()
return new ReadableStream({
async start(controller) {
try {
while (true) {
const { done, value } = await reader.read()
if (done) {
controller.close()
break
}
const processed = await processor(value, controller)
if (processed !== undefined) {
controller.enqueue(processed)
}
}
} catch (error) {
controller.error(error)
} finally {
reader.releaseLock()
}
}
})
}

View File

@@ -15,6 +15,7 @@ import JSON5 from "json5";
import { IAgent, ITool } from "./agents/type";
import agentsManager from "./agents";
import { EventEmitter } from "node:events";
import { pluginManager, tokenSpeedPlugin } from "@musistudio/llms";
const event = new EventEmitter()
@@ -43,6 +44,44 @@ interface RunOptions {
logger?: any;
}
/**
* Plugin configuration from config file
*/
interface PluginConfig {
name: string;
enabled?: boolean;
options?: Record<string, any>;
}
/**
* Register plugins from configuration
* @param serverInstance Server instance
* @param config Application configuration
*/
async function registerPluginsFromConfig(serverInstance: any, config: any): Promise<void> {
// Get plugins configuration from config file
const pluginsConfig: PluginConfig[] = config.plugins || config.Plugins || [];
for (const pluginConfig of pluginsConfig) {
const { name, enabled = false, options = {} } = pluginConfig;
switch (name) {
case 'token-speed':
pluginManager.registerPlugin(tokenSpeedPlugin, {
enabled,
...options
});
break;
default:
console.warn(`Unknown plugin: ${name}`);
break;
}
}
// Enable all registered plugins
await pluginManager.enablePlugins(serverInstance);
}
async function getServer(options: RunOptions = {}) {
await initializeClaudeConfig();
await initDir();
@@ -141,6 +180,9 @@ async function getServer(options: RunOptions = {}) {
presets.map(async preset => await serverInstance.registerNamespace(`/preset/${preset.name}`, preset.config))
)
// Register and configure plugins from config
await registerPluginsFromConfig(serverInstance, config);
// Add async preHandler hook for authentication
serverInstance.addHook("preHandler", async (req: any, reply: any) => {
return new Promise<void>((resolve, reject) => {
@@ -404,6 +446,7 @@ export { getServer };
export type { RunOptions };
export type { IAgent, ITool } from "./agents/type";
export { initDir, initConfig, readConfigFile, writeConfigFile, backupConfigFile } from "./utils";
export { pluginManager, tokenSpeedPlugin } from "@musistudio/llms";
// Start service if this file is run directly
if (require.main === module) {

View File

@@ -8,6 +8,15 @@ declare module "@musistudio/llms" {
logger?: any;
}
/**
* Plugin configuration from config file
*/
export interface PluginConfig {
name: string;
enabled?: boolean;
options?: Record<string, any>;
}
export interface Server {
app: FastifyInstance;
logger: FastifyBaseLogger;

View File

@@ -0,0 +1,62 @@
import { FastifyPluginAsync } from 'fastify';
declare module '@musistudio/llms' {
export interface CCRPluginOptions {
enabled?: boolean;
[key: string]: any;
}
export interface CCRPlugin {
name: string;
version?: string;
description?: string;
register: FastifyPluginAsync<CCRPluginOptions>;
}
export interface PluginMetadata {
name: string;
enabled: boolean;
options?: any;
}
export class PluginManager {
private plugins;
private pluginInstances;
registerPlugin(plugin: CCRPlugin, options?: any): void;
enablePlugin(name: string, fastify: import('fastify').FastifyInstance): Promise<void>;
enablePlugins(fastify: import('fastify').FastifyInstance): Promise<void>;
getPlugins(): PluginMetadata[];
getPlugin(name: string): CCRPlugin | undefined;
hasPlugin(name: string): boolean;
isPluginEnabled(name: string): boolean;
setPluginEnabled(name: string, enabled: boolean): void;
removePlugin(name: string): void;
clear(): void;
}
export const pluginManager: PluginManager;
export const tokenSpeedPlugin: CCRPlugin;
export class SSEParserTransform extends TransformStream<string, any> {
constructor();
}
export class SSESerializerTransform extends TransformStream<any, string> {
constructor();
}
export function rewriteStream(
stream: ReadableStream,
processor: (data: any, controller: ReadableStreamController<any>) => Promise<any>
): ReadableStream;
}

3
pnpm-lock.yaml generated
View File

@@ -124,6 +124,9 @@ importers:
fastify:
specifier: ^5.4.0
version: 5.6.1
fastify-plugin:
specifier: ^5.1.0
version: 5.1.0
google-auth-library:
specifier: ^10.1.0
version: 10.4.0