From 8c4fec4f5f7732707c8ae3350fa9956e3c2971aa Mon Sep 17 00:00:00 2001 From: musistudio Date: Tue, 2 Sep 2025 21:23:21 +0800 Subject: [PATCH] fix stream handler error --- src/index.ts | 229 ++++++++++++++++++++++++++++++--------------------- 1 file changed, 137 insertions(+), 92 deletions(-) diff --git a/src/index.ts b/src/index.ts index 1b72f01..a971758 100644 --- a/src/index.ts +++ b/src/index.ts @@ -168,10 +168,11 @@ async function run(options: RunOptions = {}) { await router(req, reply, config); } }); - server.addHook("onSend", async (req, reply, payload) => { + server.addHook("onSend", (req, reply, payload, done) => { if (req.sessionId && req.url.startsWith("/v1/messages")) { if (payload instanceof ReadableStream) { if (req.agents) { + const abortController = new AbortController(); const eventStream = payload.pipeThrough(new SSEParserTransform()) let currentAgent: undefined | IAgent; let currentToolIndex = -1 @@ -181,118 +182,162 @@ async function run(options: RunOptions = {}) { const toolMessages: any[] = [] const assistantMessages: any[] = [] // 存储Anthropic格式的消息体,区分文本和工具类型 - return rewriteStream(eventStream, async (data, controller) => { - // 检测工具调用开始 - if (data.event === 'content_block_start' && data?.data?.content_block?.name) { - const agent = req.agents.find((name: string) => agentsManager.getAgent(name)?.tools.get(data.data.content_block.name)) - if (agent) { - currentAgent = agentsManager.getAgent(agent) - currentToolIndex = data.data.index - currentToolName = data.data.content_block.name - currentToolId = data.data.content_block.id + return done(null, rewriteStream(eventStream, async (data, controller) => { + try { + // 检测工具调用开始 + if (data.event === 'content_block_start' && data?.data?.content_block?.name) { + const agent = req.agents.find((name: string) => agentsManager.getAgent(name)?.tools.get(data.data.content_block.name)) + if (agent) { + currentAgent = agentsManager.getAgent(agent) + currentToolIndex = data.data.index + currentToolName = data.data.content_block.name + currentToolId = data.data.content_block.id + return undefined; + } + } + + // 收集工具参数 + if (currentToolIndex > -1 && data.data.index === currentToolIndex && data.data?.delta?.type === 'input_json_delta') { + currentToolArgs += data.data?.delta?.partial_json; return undefined; } - } - // 收集工具参数 - if (currentToolIndex > -1 && data.data.index === currentToolIndex && data.data?.delta?.type === 'input_json_delta') { - currentToolArgs += data.data?.delta?.partial_json; - return undefined; - } - - // 工具调用完成,处理agent调用 - if (currentToolIndex > -1 && data.data.index === currentToolIndex && data.data.type === 'content_block_stop') { - try { - const args = JSON5.parse(currentToolArgs); - assistantMessages.push({ - type: "tool_use", - id: currentToolId, - name: currentToolName, - input: args - }) - const toolResult = await currentAgent?.tools.get(currentToolName)?.handler(args, { - req, - config - }); - console.log('result', toolResult) - toolMessages.push({ - "tool_use_id": currentToolId, - "type": "tool_result", - "content": toolResult - }) - currentAgent = undefined - currentToolIndex = -1 - currentToolName = '' - currentToolArgs = '' - currentToolId = '' - } catch (e) { - console.log(e); - } - return undefined; - } - - if (data.event === 'message_delta' && toolMessages.length) { - req.body.messages.push({ - role: 'assistant', - content: assistantMessages - }) - req.body.messages.push({ - role: 'user', - content: toolMessages - }) - const response = await fetch(`http://127.0.0.1:${config.PORT}/v1/messages`, { - method: "POST", - headers: { - 'x-api-key': config.APIKEY, - 'content-type': 'application/json', - }, - body: JSON.stringify(req.body), - }) - if (!response.ok) { + // 工具调用完成,处理agent调用 + if (currentToolIndex > -1 && data.data.index === currentToolIndex && data.data.type === 'content_block_stop') { + try { + const args = JSON5.parse(currentToolArgs); + assistantMessages.push({ + type: "tool_use", + id: currentToolId, + name: currentToolName, + input: args + }) + const toolResult = await currentAgent?.tools.get(currentToolName)?.handler(args, { + req, + config + }); + console.log('result', toolResult) + toolMessages.push({ + "tool_use_id": currentToolId, + "type": "tool_result", + "content": toolResult + }) + currentAgent = undefined + currentToolIndex = -1 + currentToolName = '' + currentToolArgs = '' + currentToolId = '' + } catch (e) { + console.log(e); + } return undefined; } - const stream = response.body!.pipeThrough(new SSEParserTransform()) - const reader = stream.getReader() - while (true) { - const {value, done} = await reader.read(); - if (done) { - break; + + if (data.event === 'message_delta' && toolMessages.length) { + req.body.messages.push({ + role: 'assistant', + content: assistantMessages + }) + req.body.messages.push({ + role: 'user', + content: toolMessages + }) + const response = await fetch(`http://127.0.0.1:${config.PORT}/v1/messages`, { + method: "POST", + headers: { + 'x-api-key': config.APIKEY, + 'content-type': 'application/json', + }, + body: JSON.stringify(req.body), + }) + if (!response.ok) { + return undefined; } - if (['message_start', 'message_stop'].includes(value.event)) { - continue + const stream = response.body!.pipeThrough(new SSEParserTransform()) + const reader = stream.getReader() + while (true) { + try { + const {value, done} = await reader.read(); + if (done) { + break; + } + if (['message_start', 'message_stop'].includes(value.event)) { + continue + } + + // 检查流是否仍然可写 + if (!controller.desiredSize) { + console.log('Stream backpressure detected'); + break; + } + + controller.enqueue(value) + }catch (readError: any) { + if (readError.name === 'AbortError' || readError.code === 'ERR_STREAM_PREMATURE_CLOSE') { + console.log('Stream reading aborted due to client disconnect'); + abortController.abort(); // 中止所有相关操作 + break; + } + throw readError; + } + } - controller.enqueue(value) + return undefined } - return undefined + return data + }catch (error: any) { + console.error('Unexpected error in stream processing:', error); + + // 处理流提前关闭的错误 + if (error.code === 'ERR_STREAM_PREMATURE_CLOSE') { + console.log('Stream prematurely closed, aborting operations'); + abortController.abort(); + return undefined; + } + + // 其他错误仍然抛出 + throw error; } - return data - }).pipeThrough(new SSESerializerTransform()) + }).pipeThrough(new SSESerializerTransform())) } const [originalStream, clonedStream] = payload.tee(); const read = async (stream: ReadableStream) => { const reader = stream.getReader(); - while (true) { - const { done, value } = await reader.read(); - if (done) break; - // Process the value if needed - const dataStr = new TextDecoder().decode(value); - if (!dataStr.startsWith("event: message_delta")) { - continue; + try { + while (true) { + const { done, value } = await reader.read(); + if (done) break; + // Process the value if needed + const dataStr = new TextDecoder().decode(value); + if (!dataStr.startsWith("event: message_delta")) { + continue; + } + const str = dataStr.slice(27); + try { + const message = JSON.parse(str); + sessionUsageCache.put(req.sessionId, message.usage); + } catch {} } - const str = dataStr.slice(27); - try { - const message = JSON.parse(str); - sessionUsageCache.put(req.sessionId, message.usage); - } catch {} + } catch (readError: any) { + if (readError.name === 'AbortError' || readError.code === 'ERR_STREAM_PREMATURE_CLOSE') { + console.log('Background read stream closed prematurely'); + } else { + console.error('Error in background stream reading:', readError); + } + } finally { + reader.releaseLock(); } } read(clonedStream); - return originalStream + return done(null, originalStream) } sessionUsageCache.put(req.sessionId, payload.usage); } - return payload; + if (typeof payload ==='object' && payload.error) { + done(payload.error, null) + } + done(null, payload) }); server.start();