add token speed block

This commit is contained in:
musistudio
2025-12-31 22:44:16 +08:00
parent e7073790b3
commit 10c69a586b
14 changed files with 900 additions and 369 deletions

View File

@@ -17,20 +17,18 @@
], ],
"author": "musistudio", "author": "musistudio",
"license": "MIT", "license": "MIT",
"dependencies": { "devDependencies": {
"@CCR/server": "workspace:*", "@CCR/server": "workspace:*",
"@CCR/shared": "workspace:*", "@CCR/shared": "workspace:*",
"@inquirer/prompts": "^5.0.0", "@inquirer/prompts": "^5.0.0",
"adm-zip": "^0.5.16",
"archiver": "^7.0.1",
"find-process": "^2.0.0",
"minimist": "^1.2.8",
"openurl": "^1.1.1"
},
"devDependencies": {
"@types/archiver": "^7.0.0", "@types/archiver": "^7.0.0",
"@types/node": "^24.0.15", "@types/node": "^24.0.15",
"adm-zip": "^0.5.16",
"archiver": "^7.0.1",
"esbuild": "^0.25.1", "esbuild": "^0.25.1",
"find-process": "^2.0.0",
"minimist": "^1.2.8",
"openurl": "^1.1.1",
"ts-node": "^10.9.2", "ts-node": "^10.9.2",
"typescript": "^5.8.2" "typescript": "^5.8.2"
} }

File diff suppressed because it is too large Load Diff

View File

@@ -1,4 +1,4 @@
export * from './types'; export * from './types';
export { pluginManager } from './plugin-manager'; export { pluginManager } from './plugin-manager';
export { tokenSpeedPlugin } from './token-speed'; export { tokenSpeedPlugin, getTokenSpeedStats, getGlobalTokenSpeedStats } from './token-speed';
export * from './output'; export * from './output';

View File

@@ -4,6 +4,7 @@ export * from './types';
// Output handler implementations // Output handler implementations
export { ConsoleOutputHandler } from './console-handler'; export { ConsoleOutputHandler } from './console-handler';
export { WebhookOutputHandler } from './webhook-handler'; export { WebhookOutputHandler } from './webhook-handler';
export { TempFileOutputHandler } from './temp-file-handler';
// Output manager // Output manager
export { outputManager, output, outputTo } from './output-manager'; export { outputManager, output, outputTo } from './output-manager';
@@ -36,6 +37,20 @@ export function registerWebhookOutput(config: import('./types').WebhookOutputCon
return outputManager; return outputManager;
} }
/**
* Convenience function: Create and register a Temp File output handler
* @param config Temp file output handler configuration
* @returns Output manager instance
*/
export function registerTempFileOutput(config?: import('./types').TempFileOutputConfig) {
const { TempFileOutputHandler } = require('./temp-file-handler');
const handler = new TempFileOutputHandler(config);
const { outputManager } = require('./output-manager');
const name = 'temp-file_' + Date.now();
outputManager.registerHandler(name, handler);
return outputManager;
}
/** /**
* Convenience function: Register output handlers in batch * Convenience function: Register output handlers in batch
* @param configs Output handler configuration array * @param configs Output handler configuration array

View File

@@ -1,6 +1,7 @@
import { OutputHandler, OutputOptions, OutputHandlerConfig } from './types'; import { OutputHandler, OutputOptions, OutputHandlerConfig } from './types';
import { ConsoleOutputHandler } from './console-handler'; import { ConsoleOutputHandler } from './console-handler';
import { WebhookOutputHandler } from './webhook-handler'; import { WebhookOutputHandler } from './webhook-handler';
import { TempFileOutputHandler } from './temp-file-handler';
/** /**
* Output manager * Output manager
@@ -51,6 +52,9 @@ class OutputManager {
case 'webhook': case 'webhook':
return new WebhookOutputHandler(config.config as any); return new WebhookOutputHandler(config.config as any);
case 'temp-file':
return new TempFileOutputHandler(config.config as any);
// Reserved for other output handler types // Reserved for other output handler types
// case 'websocket': // case 'websocket':
// return new WebSocketOutputHandler(config.config as any); // return new WebSocketOutputHandler(config.config as any);

View File

@@ -0,0 +1,140 @@
import { OutputHandler, OutputOptions } from './types';
import { writeFileSync, existsSync, mkdirSync } from 'fs';
import { join } from 'path';
import { tmpdir } from 'os';
/**
* Temp file output handler configuration
*/
export interface TempFileOutputConfig {
/**
* Subdirectory under system temp directory (default: 'claude-code-router')
*/
subdirectory?: string;
/**
* File extension (default: 'json')
*/
extension?: string;
/**
* Whether to include timestamp in filename (default: true)
*/
includeTimestamp?: boolean;
/**
* Custom prefix for temp files (default: 'session')
*/
prefix?: string;
}
/**
* Temp file output handler
* Writes data to temporary files in system temp directory
*/
export class TempFileOutputHandler implements OutputHandler {
type = 'temp-file' as const;
private config: TempFileOutputConfig;
private baseDir: string;
constructor(config: TempFileOutputConfig = {}) {
this.config = {
subdirectory: 'claude-code-router',
extension: 'json',
includeTimestamp: true,
prefix: 'session',
...config
};
// Use system temp directory
const systemTempDir = tmpdir();
this.baseDir = join(systemTempDir, this.config.subdirectory!);
// Ensure directory exists
this.ensureDir();
}
/**
* Ensure directory exists
*/
private ensureDir(): void {
try {
if (!existsSync(this.baseDir)) {
mkdirSync(this.baseDir, { recursive: true });
}
} catch (error) {
// Silently fail
}
}
/**
* Extract session ID from user_id string
* Format: "user_..._session_<uuid>"
*/
private extractSessionId(userId: string): string | null {
try {
const match = userId.match(/_session_([a-f0-9-]+)/i);
return match ? match[1] : null;
} catch {
return null;
}
}
/**
* Get file path for temp file
*/
private getFilePath(sessionId: string): string {
const prefix = this.config.prefix || 'session';
const ext = this.config.extension ? `.${this.config.extension}` : '';
let filename: string;
if (this.config.includeTimestamp) {
// Include timestamp in filename: prefix-sessionId-timestamp.ext
const timestamp = Date.now();
filename = `${prefix}-${sessionId}-${timestamp}${ext}`;
} else {
// Simple filename: prefix-sessionId.ext
filename = `${prefix}-${sessionId}${ext}`;
}
return join(this.baseDir, filename);
}
/**
* Output data to temp file
*/
async output(data: any, options: OutputOptions = {}): Promise<boolean> {
try {
// Extract session ID from metadata
const sessionId = options.metadata?.sessionId;
if (!sessionId) {
// No session ID, skip output
return false;
}
// Prepare output data
const outputData = {
...data,
timestamp: Date.now(),
sessionId
};
// Write to file
const filePath = this.getFilePath(sessionId);
writeFileSync(filePath, JSON.stringify(outputData, null, 2), 'utf-8');
return true;
} catch (error) {
// Silently fail to avoid disrupting main flow
return false;
}
}
/**
* Get the base directory where temp files are stored
*/
getBaseDir(): string {
return this.baseDir;
}
}

View File

@@ -135,6 +135,31 @@ export interface WebSocketOutputConfig {
}; };
} }
/**
* Temp file output handler configuration
*/
export interface TempFileOutputConfig {
/**
* Subdirectory under system temp directory (default: 'claude-code-router')
*/
subdirectory?: string;
/**
* File extension (default: 'json')
*/
extension?: string;
/**
* Whether to include timestamp in filename (default: true)
*/
includeTimestamp?: boolean;
/**
* Custom prefix for temp files (default: 'session')
*/
prefix?: string;
}
/** /**
* Output handler registration configuration * Output handler registration configuration
*/ */
@@ -142,7 +167,7 @@ export interface OutputHandlerConfig {
/** /**
* Output handler type * Output handler type
*/ */
type: 'console' | 'webhook' | 'websocket'; type: 'console' | 'webhook' | 'websocket' | 'temp-file';
/** /**
* Whether enabled * Whether enabled
@@ -152,5 +177,5 @@ export interface OutputHandlerConfig {
/** /**
* Configuration options * Configuration options
*/ */
config?: ConsoleOutputConfig | WebhookOutputConfig | WebSocketOutputConfig; config?: ConsoleOutputConfig | WebhookOutputConfig | WebSocketOutputConfig | TempFileOutputConfig;
} }

View File

@@ -9,26 +9,27 @@ import { ITokenizer, TokenizerConfig } from '../types/tokenizer';
*/ */
interface TokenStats { interface TokenStats {
requestId: string; requestId: string;
sessionId?: string;
startTime: number; startTime: number;
firstTokenTime?: number; firstTokenTime?: number;
lastTokenTime: number; lastTokenTime: number;
tokenCount: number; tokenCount: number;
tokensPerSecond: number; tokensPerSecond: number;
timeToFirstToken?: number; timeToFirstToken?: number;
contentBlocks: { stream: boolean; // Whether this is a streaming request
index: number; tokenTimestamps: number[]; // Store timestamps of each token for per-second calculation
tokenCount: number;
speed: number;
}[];
} }
/** /**
* Plugin options * Plugin options
*/ */
interface TokenSpeedOptions extends CCRPluginOptions { interface TokenSpeedOptions extends CCRPluginOptions {
logInterval?: number; // Log every N tokens /**
enableCrossRequestStats?: boolean; // Enable cross-request statistics * Reporter type(s) to use for output
statsWindow?: number; // Statistics window size (last N requests) * Can be a single type or an array of types: 'console' | 'temp-file' | 'webhook'
* Default: ['console', 'temp-file']
*/
reporter?: string | string[];
/** /**
* Output handler configurations * Output handler configurations
@@ -48,18 +49,6 @@ const requestStats = new Map<string, TokenStats>();
// Cache tokenizers by provider and model to avoid repeated initialization // Cache tokenizers by provider and model to avoid repeated initialization
const tokenizerCache = new Map<string, ITokenizer>(); const tokenizerCache = new Map<string, ITokenizer>();
// Cross-request statistics
const globalStats = {
totalRequests: 0,
totalTokens: 0,
totalTime: 0,
avgTokensPerSecond: 0,
minTokensPerSecond: Infinity,
maxTokensPerSecond: 0,
avgTimeToFirstToken: 0,
allSpeeds: [] as number[] // Used for calculating percentiles
};
/** /**
* Token speed measurement plugin * Token speed measurement plugin
*/ */
@@ -71,25 +60,50 @@ export const tokenSpeedPlugin: CCRPlugin = {
// Use fp() to break encapsulation and apply hooks globally // Use fp() to break encapsulation and apply hooks globally
register: fp(async (fastify, options: TokenSpeedOptions) => { register: fp(async (fastify, options: TokenSpeedOptions) => {
const opts = { const opts = {
logInterval: 10, reporter: ['console', 'temp-file'],
enableCrossRequestStats: true,
statsWindow: 100,
...options ...options
}; };
// Initialize output handlers // Normalize reporter to array
const reporters = Array.isArray(opts.reporter) ? opts.reporter : [opts.reporter];
// Initialize output handlers based on reporters if not explicitly configured
if (opts.outputHandlers && opts.outputHandlers.length > 0) { if (opts.outputHandlers && opts.outputHandlers.length > 0) {
outputManager.registerHandlers(opts.outputHandlers); outputManager.registerHandlers(opts.outputHandlers);
} else { } else {
// Default to console output if no handlers configured // Auto-register handlers based on reporter types
outputManager.registerHandlers([{ const handlersToRegister: OutputHandlerConfig[] = [];
type: 'console',
enabled: true, for (const reporter of reporters) {
config: { if (reporter === 'console') {
colors: true, handlersToRegister.push({
level: 'log' type: 'console',
enabled: true,
config: {
colors: true,
level: 'log'
}
});
} else if (reporter === 'temp-file') {
handlersToRegister.push({
type: 'temp-file',
enabled: true,
config: {
subdirectory: 'claude-code-router',
extension: 'json',
includeTimestamp: true,
prefix: 'session'
}
});
} else if (reporter === 'webhook') {
// Webhook requires explicit config, skip auto-registration
console.warn(`[TokenSpeedPlugin] Webhook reporter requires explicit configuration in outputHandlers`);
} }
}]); }
if (handlersToRegister.length > 0) {
outputManager.registerHandlers(handlersToRegister);
}
} }
// Set default output options // Set default output options
@@ -144,182 +158,243 @@ export const tokenSpeedPlugin: CCRPlugin = {
} }
}; };
// Add onSend hook to intercept streaming responses // Add onRequest hook to capture actual request start time (before processing)
fastify.addHook('onSend', async (request, reply, payload) => { fastify.addHook('onRequest', async (request) => {
// Only handle streaming responses (request as any).requestStartTime = performance.now();
if (!(payload instanceof ReadableStream)) { });
return payload;
}
// Add onSend hook to intercept both streaming and non-streaming responses
fastify.addHook('onSend', async (request, _reply, payload) => {
const requestId = (request as any).id || Date.now().toString(); const requestId = (request as any).id || Date.now().toString();
const startTime = Date.now(); const startTime = (request as any).requestStartTime || performance.now();
// Initialize statistics // Extract session ID from request body metadata
requestStats.set(requestId, { let sessionId: string | undefined;
requestId, try {
startTime, const userId = (request.body as any)?.metadata?.user_id;
lastTokenTime: startTime, if (userId && typeof userId === 'string') {
tokenCount: 0, const match = userId.match(/_session_([a-f0-9-]+)/i);
tokensPerSecond: 0, sessionId = match ? match[1] : undefined;
contentBlocks: [] }
}); } catch (error) {
// Ignore errors extracting session ID
}
// Get tokenizer for this specific request // Get tokenizer for this specific request
const tokenizer = await getTokenizerForRequest(request); const tokenizer = await getTokenizerForRequest(request);
// Tee the stream: one for stats, one for the client // Handle streaming responses
const [originalStream, statsStream] = payload.tee(); if (payload instanceof ReadableStream) {
// Mark this request as streaming
requestStats.set(requestId, {
requestId,
sessionId,
startTime,
lastTokenTime: startTime,
tokenCount: 0,
tokensPerSecond: 0,
tokenTimestamps: [],
stream: true
});
// Process stats in background // Tee the stream: one for stats, one for the client
const processStats = async () => { const [originalStream, statsStream] = payload.tee();
let currentBlockIndex = -1;
let blockStartTime = 0;
let blockTokenCount = 0;
try { // Process stats in background
// Decode byte stream to text, then parse SSE events const processStats = async () => {
const eventStream = statsStream let outputTimer: NodeJS.Timeout | null = null;
.pipeThrough(new TextDecoderStream())
.pipeThrough(new SSEParserTransform());
const reader = eventStream.getReader();
while (true) { // Output stats function - calculate current speed using sliding window
const { done, value } = await reader.read(); const doOutput = async (isFinal: boolean) => {
if (done) break;
const data = value;
const stats = requestStats.get(requestId); const stats = requestStats.get(requestId);
if (!stats) continue; if (!stats) return;
// Detect content_block_start event const now = performance.now();
if (data.event === 'content_block_start' && data.data?.content_block?.type === 'text') {
currentBlockIndex = data.data.index;
blockStartTime = Date.now();
blockTokenCount = 0;
}
// Detect content_block_delta event (incremental tokens) if (!isFinal) {
if (data.event === 'content_block_delta' && data.data?.delta?.type === 'text_delta') { // For streaming output, use sliding window: count tokens in last 1 second
const text = data.data.delta.text; const oneSecondAgo = now - 1000;
const tokenCount = tokenizer stats.tokenTimestamps = stats.tokenTimestamps.filter(ts => ts > oneSecondAgo);
? (tokenizer.encodeText ? tokenizer.encodeText(text).length : estimateTokens(text)) stats.tokensPerSecond = stats.tokenTimestamps.length;
: estimateTokens(text); } else {
// For final output, use average speed over entire request
stats.tokenCount += tokenCount; const duration = (stats.lastTokenTime - stats.startTime) / 1000; // seconds
stats.lastTokenTime = Date.now(); if (duration > 0) {
stats.tokensPerSecond = Math.round(stats.tokenCount / duration);
// Record first token time
if (!stats.firstTokenTime) {
stats.firstTokenTime = stats.lastTokenTime;
stats.timeToFirstToken = stats.firstTokenTime - stats.startTime;
}
// Calculate current block token count
if (currentBlockIndex >= 0) {
blockTokenCount += tokenCount;
}
// Calculate speed
const elapsed = (stats.lastTokenTime - stats.startTime) / 1000;
stats.tokensPerSecond = stats.tokenCount / elapsed;
// Log periodically
if (stats.tokenCount % opts.logInterval === 0) {
await outputStats(stats, opts.outputOptions);
} }
} }
// Detect content_block_stop event await outputStats(stats, reporters, opts.outputOptions, isFinal).catch(err => {
if (data.event === 'content_block_stop' && currentBlockIndex >= 0) { fastify.log?.warn(`Failed to output streaming stats: ${err.message}`);
const blockElapsed = (Date.now() - blockStartTime) / 1000; });
const blockSpeed = blockElapsed > 0 ? blockTokenCount / blockElapsed : 0; };
stats.contentBlocks.push({ try {
index: currentBlockIndex, // Decode byte stream to text, then parse SSE events
tokenCount: blockTokenCount, const eventStream = statsStream
speed: blockSpeed .pipeThrough(new TextDecoderStream())
}); .pipeThrough(new SSEParserTransform());
const reader = eventStream.getReader();
currentBlockIndex = -1; // Start timer immediately - output every 1 second
outputTimer = setInterval(async () => {
const stats = requestStats.get(requestId);
if (stats) {
await doOutput(false);
}
}, 1000);
while (true) {
const { done, value } = await reader.read();
if (done) break;
const data = value;
const stats = requestStats.get(requestId);
if (!stats) continue;
const now = performance.now();
// Record first token time when we receive any content-related event
// This includes: content_block_start, content_block_delta, text_block
if (!stats.firstTokenTime && (
data.event === 'content_block_start' ||
data.event === 'content_block_delta' ||
data.event === 'text_block' ||
data.event === 'content_block'
)) {
stats.firstTokenTime = now;
stats.timeToFirstToken = Math.round(now - stats.startTime);
}
// Detect content_block_delta event (incremental tokens)
// Support multiple delta types: text_delta, input_json_delta, thinking_delta
if (data.event === 'content_block_delta' && data.data?.delta) {
const deltaType = data.data.delta.type;
let text = '';
// Extract text based on delta type
if (deltaType === 'text_delta') {
text = data.data.delta.text || '';
} else if (deltaType === 'input_json_delta') {
text = data.data.delta.partial_json || '';
} else if (deltaType === 'thinking_delta') {
text = data.data.delta.thinking || '';
}
// Calculate tokens if we have text content
if (text) {
const tokenCount = tokenizer
? (tokenizer.encodeText ? tokenizer.encodeText(text).length : estimateTokens(text))
: estimateTokens(text);
stats.tokenCount += tokenCount;
stats.lastTokenTime = now;
// Record timestamps for each token (for sliding window calculation)
for (let i = 0; i < tokenCount; i++) {
stats.tokenTimestamps.push(now);
}
}
}
// Output final statistics when message ends
if (data.event === 'message_stop') {
// Clear timer
if (outputTimer) {
clearInterval(outputTimer);
outputTimer = null;
}
await doOutput(true);
requestStats.delete(requestId);
}
} }
} catch (error: any) {
// Output final statistics when message ends // Clean up timer on error
if (data.event === 'message_stop') { if (outputTimer) {
// Update global statistics clearInterval(outputTimer);
if (opts.enableCrossRequestStats) { }
updateGlobalStats(stats, opts.statsWindow); if (error.name !== 'AbortError' && error.code !== 'ERR_STREAM_PREMATURE_CLOSE') {
} fastify.log?.warn(`Error processing token stats: ${error.message}`);
await outputStats(stats, opts.outputOptions, true);
if (opts.enableCrossRequestStats) {
await outputGlobalStats(opts.outputOptions);
}
requestStats.delete(requestId);
} }
} }
} catch (error: any) { };
console.error(error);
if (error.name !== 'AbortError' && error.code !== 'ERR_STREAM_PREMATURE_CLOSE') { // Start background processing without blocking
fastify.log?.warn(`Error processing token stats: ${error.message}`); processStats().catch((error) => {
console.log(error);
fastify.log?.warn(`Background stats processing failed: ${error.message}`);
});
// Return original stream to client
return originalStream;
}
// Handle non-streaming responses
// Try to extract token count from the response payload
const endTime = performance.now();
let tokenCount = 0;
// Payload should be a string or object for non-streaming responses
if (payload && typeof payload === 'string') {
try {
const response = JSON.parse(payload);
// Prefer usage.output_tokens if available (most accurate)
if (response.usage?.output_tokens) {
tokenCount = response.usage.output_tokens;
} else {
// Fallback: calculate from content
const content = response.content || response.message?.content || '';
if (tokenizer) {
if (Array.isArray(content)) {
tokenCount = content.reduce((sum: number, block: any) => {
if (block.type === 'text') {
const text = block.text || '';
return sum + (tokenizer.encodeText ? tokenizer.encodeText(text).length : estimateTokens(text));
}
return sum;
}, 0);
} else if (typeof content === 'string') {
tokenCount = tokenizer.encodeText ? tokenizer.encodeText(content).length : estimateTokens(content);
}
} else {
const text = Array.isArray(content) ? content.map((c: any) => c.text).join('') : content;
tokenCount = estimateTokens(text);
}
} }
} catch (error) {
// Could not parse or extract tokens
} }
}; }
// Start background processing without blocking // Only output stats if we found tokens
processStats().catch((error) => { if (tokenCount > 0) {
console.log(error); const duration = (endTime - startTime) / 1000; // seconds
fastify.log?.warn(`Background stats processing failed: ${error.message}`);
});
// Return original stream to client const stats: TokenStats = {
return originalStream; requestId,
sessionId,
startTime,
lastTokenTime: endTime,
tokenCount,
tokensPerSecond: duration > 0 ? Math.round(tokenCount / duration) : 0,
timeToFirstToken: Math.round(endTime - startTime),
stream: false,
tokenTimestamps: []
};
await outputStats(stats, reporters, opts.outputOptions, true);
}
// Return payload as-is
return payload;
}); });
}), }),
}; };
/**
* Update global statistics
*/
function updateGlobalStats(stats: TokenStats, windowSize: number) {
globalStats.totalRequests++;
globalStats.totalTokens += stats.tokenCount;
globalStats.totalTime += (stats.lastTokenTime - stats.startTime) / 1000;
if (stats.tokensPerSecond < globalStats.minTokensPerSecond) {
globalStats.minTokensPerSecond = stats.tokensPerSecond;
}
if (stats.tokensPerSecond > globalStats.maxTokensPerSecond) {
globalStats.maxTokensPerSecond = stats.tokensPerSecond;
}
if (stats.timeToFirstToken) {
globalStats.avgTimeToFirstToken =
(globalStats.avgTimeToFirstToken * (globalStats.totalRequests - 1) + stats.timeToFirstToken) /
globalStats.totalRequests;
}
globalStats.allSpeeds.push(stats.tokensPerSecond);
// Maintain window size
if (globalStats.allSpeeds.length > windowSize) {
globalStats.allSpeeds.shift();
}
globalStats.avgTokensPerSecond = globalStats.totalTokens / globalStats.totalTime;
}
/**
* Calculate percentile
*/
function calculatePercentile(data: number[], percentile: number): number {
if (data.length === 0) return 0;
const sorted = [...data].sort((a, b) => a - b);
const index = Math.ceil((percentile / 100) * sorted.length) - 1;
return sorted[index];
}
/** /**
* Estimate token count (fallback method) * Estimate token count (fallback method)
*/ */
@@ -333,63 +408,39 @@ function estimateTokens(text: string): number {
/** /**
* Output single request statistics * Output single request statistics
*/ */
async function outputStats(stats: TokenStats, options?: OutputOptions, isFinal = false) { async function outputStats(
stats: TokenStats,
reporters: string[],
options?: OutputOptions,
isFinal = false
) {
const prefix = isFinal ? '[Token Speed Final]' : '[Token Speed]'; const prefix = isFinal ? '[Token Speed Final]' : '[Token Speed]';
// Calculate average speed of each block
const avgBlockSpeed = stats.contentBlocks.length > 0
? stats.contentBlocks.reduce((sum, b) => sum + b.speed, 0) / stats.contentBlocks.length
: 0;
const logData = { const logData = {
requestId: stats.requestId.substring(0, 8), requestId: stats.requestId.substring(0, 8),
sessionId: stats.sessionId,
stream: stats.stream,
tokenCount: stats.tokenCount, tokenCount: stats.tokenCount,
tokensPerSecond: stats.tokensPerSecond.toFixed(2), tokensPerSecond: stats.tokensPerSecond,
timeToFirstToken: stats.timeToFirstToken ? `${stats.timeToFirstToken}ms` : 'N/A', timeToFirstToken: stats.timeToFirstToken ? `${stats.timeToFirstToken}ms` : 'N/A',
duration: `${((stats.lastTokenTime - stats.startTime) / 1000).toFixed(2)}s`, duration: `${((stats.lastTokenTime - stats.startTime) / 1000).toFixed(2)}s`,
contentBlocks: stats.contentBlocks.length, timestamp: Date.now()
avgBlockSpeed: avgBlockSpeed.toFixed(2),
...(isFinal && stats.contentBlocks.length > 1 ? {
blocks: stats.contentBlocks.map(b => ({
index: b.index,
tokenCount: b.tokenCount,
speed: b.speed.toFixed(2)
}))
} : {})
}; };
// Output through output manager const outputOptions = {
await outputManager.output(logData, {
prefix, prefix,
metadata: {
sessionId: stats.sessionId
},
...options ...options
});
}
/**
* Output global statistics
*/
async function outputGlobalStats(options?: OutputOptions) {
const p50 = calculatePercentile(globalStats.allSpeeds, 50);
const p95 = calculatePercentile(globalStats.allSpeeds, 95);
const p99 = calculatePercentile(globalStats.allSpeeds, 99);
const logData = {
totalRequests: globalStats.totalRequests,
totalTokens: globalStats.totalTokens,
avgTokensPerSecond: globalStats.avgTokensPerSecond.toFixed(2),
minSpeed: globalStats.minTokensPerSecond === Infinity ? 0 : globalStats.minTokensPerSecond.toFixed(2),
maxSpeed: globalStats.maxTokensPerSecond.toFixed(2),
avgTimeToFirstToken: `${globalStats.avgTimeToFirstToken.toFixed(0)}ms`,
percentiles: {
p50: p50.toFixed(2),
p95: p95.toFixed(2),
p99: p99.toFixed(2)
}
}; };
// Output through output manager // Output to each specified reporter type
await outputManager.output(logData, { for (const reporter of reporters) {
prefix: '[Token Speed Global Stats]', try {
...options await outputManager.outputToType(reporter, logData, outputOptions);
}); } catch (error) {
console.error(`[TokenSpeedPlugin] Failed to output to ${reporter}:`, error);
}
}
} }

View File

@@ -270,5 +270,5 @@ export { ConfigService } from "./services/config";
export { ProviderService } from "./services/provider"; export { ProviderService } from "./services/provider";
export { TransformerService } from "./services/transformer"; export { TransformerService } from "./services/transformer";
export { TokenizerService } from "./services/tokenizer"; export { TokenizerService } from "./services/tokenizer";
export { pluginManager, tokenSpeedPlugin, CCRPlugin, CCRPluginOptions, PluginMetadata } from "./plugins"; export { pluginManager, tokenSpeedPlugin, getTokenSpeedStats, getGlobalTokenSpeedStats, CCRPlugin, CCRPluginOptions, PluginMetadata } from "./plugins";
export { SSEParserTransform, SSESerializerTransform, rewriteStream } from "./utils/sse"; export { SSEParserTransform, SSESerializerTransform, rewriteStream } from "./utils/sse";

View File

@@ -69,6 +69,12 @@ async function registerPluginsFromConfig(serverInstance: any, config: any): Prom
case 'token-speed': case 'token-speed':
pluginManager.registerPlugin(tokenSpeedPlugin, { pluginManager.registerPlugin(tokenSpeedPlugin, {
enabled, enabled,
outputHandlers: [
{
type: 'temp-file',
enabled: true
}
],
...options ...options
}); });
break; break;

View File

@@ -4,7 +4,8 @@ export const apiKeyAuth =
(config: any) => (config: any) =>
async (req: FastifyRequest, reply: FastifyReply, done: () => void) => { async (req: FastifyRequest, reply: FastifyReply, done: () => void) => {
// Public endpoints that don't require authentication // Public endpoints that don't require authentication
if (["/", "/health"].includes(req.url) || req.url.startsWith("/ui")) { const publicPaths = ["/", "/health"];
if (publicPaths.includes(req.url) || req.url.startsWith("/ui")) {
return done(); return done();
} }

View File

@@ -118,4 +118,46 @@ declare module "@musistudio/llms" {
clearCache(): void; clearCache(): void;
dispose(): void; dispose(): void;
} }
// Token speed statistics types
export interface TokenStats {
requestId: string;
startTime: number;
firstTokenTime?: number;
lastTokenTime: number;
tokenCount: number;
tokensPerSecond: number;
timeToFirstToken?: number;
contentBlocks: {
index: number;
tokenCount: number;
speed: number;
}[];
}
export function getTokenSpeedStats(): {
current: TokenStats | null;
global: {
totalRequests: number;
totalTokens: number;
totalTime: number;
avgTokensPerSecond: number;
minTokensPerSecond: number;
maxTokensPerSecond: number;
avgTimeToFirstToken: number;
allSpeeds: number[];
};
lastUpdate: number;
};
export function getGlobalTokenSpeedStats(): {
totalRequests: number;
totalTokens: number;
totalTime: number;
avgTokensPerSecond: number;
minTokensPerSecond: number;
maxTokensPerSecond: number;
avgTimeToFirstToken: number;
allSpeeds: number[];
};
} }

View File

@@ -51,6 +51,7 @@ const MODULE_TYPES = [
{ label: "gitBranch", value: "gitBranch" }, { label: "gitBranch", value: "gitBranch" },
{ label: "model", value: "model" }, { label: "model", value: "model" },
{ label: "usage", value: "usage" }, { label: "usage", value: "usage" },
{ label: "speed", value: "speed" },
{ label: "script", value: "script" }, { label: "script", value: "script" },
]; ];
@@ -936,6 +937,14 @@ export function StatusLineConfigDialog({
color: "bright_magenta", color: "bright_magenta",
}; };
break; break;
case "speed":
newModule = {
type: "speed",
icon: "⚡",
text: "{{tokenSpeed}}",
color: "bright_green",
};
break;
case "script": case "script":
newModule = { newModule = {
type: "script", type: "script",

View File

@@ -46,7 +46,7 @@ try {
// Step 4: Build the CLI application // Step 4: Build the CLI application
console.log('Building CLI application...'); console.log('Building CLI application...');
execSync('esbuild src/cli.ts --bundle --platform=node --outfile=dist/cli.js', { execSync('esbuild src/cli.ts --bundle --platform=node --minify --tree-shaking=true --outfile=dist/cli.js', {
stdio: 'inherit', stdio: 'inherit',
cwd: cliDir cwd: cliDir
}); });