fix stream handler error

This commit is contained in:
musistudio
2025-09-02 21:23:21 +08:00
parent 5d53571fe6
commit 8c4fec4f5f

View File

@@ -168,10 +168,11 @@ async function run(options: RunOptions = {}) {
await router(req, reply, config); await router(req, reply, config);
} }
}); });
server.addHook("onSend", async (req, reply, payload) => { server.addHook("onSend", (req, reply, payload, done) => {
if (req.sessionId && req.url.startsWith("/v1/messages")) { if (req.sessionId && req.url.startsWith("/v1/messages")) {
if (payload instanceof ReadableStream) { if (payload instanceof ReadableStream) {
if (req.agents) { if (req.agents) {
const abortController = new AbortController();
const eventStream = payload.pipeThrough(new SSEParserTransform()) const eventStream = payload.pipeThrough(new SSEParserTransform())
let currentAgent: undefined | IAgent; let currentAgent: undefined | IAgent;
let currentToolIndex = -1 let currentToolIndex = -1
@@ -181,118 +182,162 @@ async function run(options: RunOptions = {}) {
const toolMessages: any[] = [] const toolMessages: any[] = []
const assistantMessages: any[] = [] const assistantMessages: any[] = []
// 存储Anthropic格式的消息体区分文本和工具类型 // 存储Anthropic格式的消息体区分文本和工具类型
return rewriteStream(eventStream, async (data, controller) => { return done(null, rewriteStream(eventStream, async (data, controller) => {
// 检测工具调用开始 try {
if (data.event === 'content_block_start' && data?.data?.content_block?.name) { // 检测工具调用开始
const agent = req.agents.find((name: string) => agentsManager.getAgent(name)?.tools.get(data.data.content_block.name)) if (data.event === 'content_block_start' && data?.data?.content_block?.name) {
if (agent) { const agent = req.agents.find((name: string) => agentsManager.getAgent(name)?.tools.get(data.data.content_block.name))
currentAgent = agentsManager.getAgent(agent) if (agent) {
currentToolIndex = data.data.index currentAgent = agentsManager.getAgent(agent)
currentToolName = data.data.content_block.name currentToolIndex = data.data.index
currentToolId = data.data.content_block.id currentToolName = data.data.content_block.name
currentToolId = data.data.content_block.id
return undefined;
}
}
// 收集工具参数
if (currentToolIndex > -1 && data.data.index === currentToolIndex && data.data?.delta?.type === 'input_json_delta') {
currentToolArgs += data.data?.delta?.partial_json;
return undefined; return undefined;
} }
}
// 收集工具参数 // 工具调用完成处理agent调用
if (currentToolIndex > -1 && data.data.index === currentToolIndex && data.data?.delta?.type === 'input_json_delta') { if (currentToolIndex > -1 && data.data.index === currentToolIndex && data.data.type === 'content_block_stop') {
currentToolArgs += data.data?.delta?.partial_json; try {
return undefined; const args = JSON5.parse(currentToolArgs);
} assistantMessages.push({
type: "tool_use",
// 工具调用完成处理agent调用 id: currentToolId,
if (currentToolIndex > -1 && data.data.index === currentToolIndex && data.data.type === 'content_block_stop') { name: currentToolName,
try { input: args
const args = JSON5.parse(currentToolArgs); })
assistantMessages.push({ const toolResult = await currentAgent?.tools.get(currentToolName)?.handler(args, {
type: "tool_use", req,
id: currentToolId, config
name: currentToolName, });
input: args console.log('result', toolResult)
}) toolMessages.push({
const toolResult = await currentAgent?.tools.get(currentToolName)?.handler(args, { "tool_use_id": currentToolId,
req, "type": "tool_result",
config "content": toolResult
}); })
console.log('result', toolResult) currentAgent = undefined
toolMessages.push({ currentToolIndex = -1
"tool_use_id": currentToolId, currentToolName = ''
"type": "tool_result", currentToolArgs = ''
"content": toolResult currentToolId = ''
}) } catch (e) {
currentAgent = undefined console.log(e);
currentToolIndex = -1 }
currentToolName = ''
currentToolArgs = ''
currentToolId = ''
} catch (e) {
console.log(e);
}
return undefined;
}
if (data.event === 'message_delta' && toolMessages.length) {
req.body.messages.push({
role: 'assistant',
content: assistantMessages
})
req.body.messages.push({
role: 'user',
content: toolMessages
})
const response = await fetch(`http://127.0.0.1:${config.PORT}/v1/messages`, {
method: "POST",
headers: {
'x-api-key': config.APIKEY,
'content-type': 'application/json',
},
body: JSON.stringify(req.body),
})
if (!response.ok) {
return undefined; return undefined;
} }
const stream = response.body!.pipeThrough(new SSEParserTransform())
const reader = stream.getReader() if (data.event === 'message_delta' && toolMessages.length) {
while (true) { req.body.messages.push({
const {value, done} = await reader.read(); role: 'assistant',
if (done) { content: assistantMessages
break; })
req.body.messages.push({
role: 'user',
content: toolMessages
})
const response = await fetch(`http://127.0.0.1:${config.PORT}/v1/messages`, {
method: "POST",
headers: {
'x-api-key': config.APIKEY,
'content-type': 'application/json',
},
body: JSON.stringify(req.body),
})
if (!response.ok) {
return undefined;
} }
if (['message_start', 'message_stop'].includes(value.event)) { const stream = response.body!.pipeThrough(new SSEParserTransform())
continue const reader = stream.getReader()
while (true) {
try {
const {value, done} = await reader.read();
if (done) {
break;
}
if (['message_start', 'message_stop'].includes(value.event)) {
continue
}
// 检查流是否仍然可写
if (!controller.desiredSize) {
console.log('Stream backpressure detected');
break;
}
controller.enqueue(value)
}catch (readError: any) {
if (readError.name === 'AbortError' || readError.code === 'ERR_STREAM_PREMATURE_CLOSE') {
console.log('Stream reading aborted due to client disconnect');
abortController.abort(); // 中止所有相关操作
break;
}
throw readError;
}
} }
controller.enqueue(value) return undefined
} }
return undefined return data
}catch (error: any) {
console.error('Unexpected error in stream processing:', error);
// 处理流提前关闭的错误
if (error.code === 'ERR_STREAM_PREMATURE_CLOSE') {
console.log('Stream prematurely closed, aborting operations');
abortController.abort();
return undefined;
}
// 其他错误仍然抛出
throw error;
} }
return data }).pipeThrough(new SSESerializerTransform()))
}).pipeThrough(new SSESerializerTransform())
} }
const [originalStream, clonedStream] = payload.tee(); const [originalStream, clonedStream] = payload.tee();
const read = async (stream: ReadableStream) => { const read = async (stream: ReadableStream) => {
const reader = stream.getReader(); const reader = stream.getReader();
while (true) { try {
const { done, value } = await reader.read(); while (true) {
if (done) break; const { done, value } = await reader.read();
// Process the value if needed if (done) break;
const dataStr = new TextDecoder().decode(value); // Process the value if needed
if (!dataStr.startsWith("event: message_delta")) { const dataStr = new TextDecoder().decode(value);
continue; if (!dataStr.startsWith("event: message_delta")) {
continue;
}
const str = dataStr.slice(27);
try {
const message = JSON.parse(str);
sessionUsageCache.put(req.sessionId, message.usage);
} catch {}
} }
const str = dataStr.slice(27); } catch (readError: any) {
try { if (readError.name === 'AbortError' || readError.code === 'ERR_STREAM_PREMATURE_CLOSE') {
const message = JSON.parse(str); console.log('Background read stream closed prematurely');
sessionUsageCache.put(req.sessionId, message.usage); } else {
} catch {} console.error('Error in background stream reading:', readError);
}
} finally {
reader.releaseLock();
} }
} }
read(clonedStream); read(clonedStream);
return originalStream return done(null, originalStream)
} }
sessionUsageCache.put(req.sessionId, payload.usage); sessionUsageCache.put(req.sessionId, payload.usage);
} }
return payload; if (typeof payload ==='object' && payload.error) {
done(payload.error, null)
}
done(null, payload)
}); });
server.start(); server.start();