feat: add streaming support to callClaude for large PRDs
This commit is contained in:
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "claude-task-master",
|
||||
"version": "1.1.0",
|
||||
"version": "1.2.0",
|
||||
"description": "A task management system for AI-driven development with Claude",
|
||||
"main": "index.js",
|
||||
"type": "module",
|
||||
|
||||
186
scripts/dev.js
186
scripts/dev.js
@@ -198,6 +198,14 @@ async function callClaude(prdContent, prdPath, numTasks, retryCount = 0) {
|
||||
}
|
||||
}
|
||||
|
||||
// Determine if we should use streaming based on PRD size
|
||||
// For PRDs larger than 20,000 characters (roughly 5,000 tokens), use streaming
|
||||
const useStreaming = prdContent.length > 20000;
|
||||
|
||||
if (useStreaming) {
|
||||
log('info', `Large PRD detected (${prdContent.length} characters). Using streaming API...`);
|
||||
return await handleStreamingRequest(prdContent, prdPath, numTasks, maxTokens, systemPrompt, loadingIndicator);
|
||||
} else {
|
||||
log('info', "Sending request to Claude API...");
|
||||
|
||||
const response = await anthropic.messages.create({
|
||||
@@ -219,59 +227,33 @@ async function callClaude(prdContent, prdPath, numTasks, retryCount = 0) {
|
||||
|
||||
// Extract the text content from the response
|
||||
const textContent = response.content[0].text;
|
||||
log('debug', `Response length: ${textContent.length} characters`);
|
||||
|
||||
try {
|
||||
// Check if the response is wrapped in a Markdown code block and extract the JSON
|
||||
log('info', "Parsing response as JSON...");
|
||||
let jsonText = textContent;
|
||||
const codeBlockMatch = textContent.match(/```(?:json)?\s*([\s\S]*?)\s*```/);
|
||||
if (codeBlockMatch) {
|
||||
log('debug', "Detected JSON wrapped in Markdown code block, extracting...");
|
||||
jsonText = codeBlockMatch[1];
|
||||
}
|
||||
|
||||
// Try to parse the response as JSON
|
||||
const parsedJson = JSON.parse(jsonText);
|
||||
|
||||
// Check if the response seems incomplete (e.g., missing closing brackets)
|
||||
if (!parsedJson.tasks || parsedJson.tasks.length === 0) {
|
||||
log('warn', "Parsed JSON has no tasks. Response may be incomplete.");
|
||||
|
||||
// If we have a numTasks parameter and it's greater than 5, try again with fewer tasks
|
||||
if (numTasks && numTasks > 5 && retryCount < MAX_RETRIES) {
|
||||
const reducedTasks = Math.max(5, Math.floor(numTasks * 0.7)); // Reduce by 30%, minimum 5
|
||||
log('info', `Retrying with reduced task count: ${reducedTasks} (was ${numTasks})`);
|
||||
return callClaude(prdContent, prdPath, reducedTasks, retryCount + 1);
|
||||
}
|
||||
}
|
||||
|
||||
log('info', `Successfully parsed JSON with ${parsedJson.tasks?.length || 0} tasks`);
|
||||
return parsedJson;
|
||||
} catch (error) {
|
||||
log('error', "Failed to parse Claude's response as JSON:", error);
|
||||
log('debug', "Raw response:", textContent);
|
||||
|
||||
// Check if we should retry with different parameters
|
||||
if (retryCount < MAX_RETRIES) {
|
||||
// If we have a numTasks parameter, try again with fewer tasks
|
||||
if (numTasks && numTasks > 3) {
|
||||
const reducedTasks = Math.max(3, Math.floor(numTasks * 0.6)); // Reduce by 40%, minimum 3
|
||||
log('info', `Retrying with reduced task count: ${reducedTasks} (was ${numTasks})`);
|
||||
return callClaude(prdContent, prdPath, reducedTasks, retryCount + 1);
|
||||
} else {
|
||||
// Otherwise, just retry with the same parameters
|
||||
log('info', `Retrying Claude API call (attempt ${retryCount + 1}/${MAX_RETRIES})...`);
|
||||
return callClaude(prdContent, prdPath, numTasks, retryCount + 1);
|
||||
}
|
||||
}
|
||||
|
||||
throw new Error("Failed to parse Claude's response as JSON after multiple attempts. See console for details.");
|
||||
return processClaudeResponse(textContent, numTasks, retryCount, prdContent, prdPath);
|
||||
}
|
||||
} catch (error) {
|
||||
// Stop loading indicator
|
||||
stopLoadingIndicator(loadingIndicator);
|
||||
|
||||
// Check if this is the streaming recommendation error
|
||||
if (error.message && error.message.includes("Streaming is strongly recommended")) {
|
||||
log('info', "Claude recommends streaming for this large PRD. Switching to streaming mode...");
|
||||
try {
|
||||
// Calculate appropriate max tokens based on PRD size
|
||||
let maxTokens = CONFIG.maxTokens;
|
||||
const estimatedPrdTokens = Math.ceil(prdContent.length / 4);
|
||||
const suggestedMaxTokens = Math.min(32000, estimatedPrdTokens * 2);
|
||||
if (suggestedMaxTokens > maxTokens) {
|
||||
maxTokens = suggestedMaxTokens;
|
||||
}
|
||||
|
||||
// Restart the loading indicator
|
||||
const newLoadingIndicator = startLoadingIndicator(loadingMessage);
|
||||
return await handleStreamingRequest(prdContent, prdPath, numTasks, maxTokens, systemPrompt, newLoadingIndicator);
|
||||
} catch (streamingError) {
|
||||
log('error', "Error with streaming API call:", streamingError);
|
||||
throw streamingError;
|
||||
}
|
||||
}
|
||||
|
||||
log('error', "Error calling Claude API:", error);
|
||||
|
||||
// Implement exponential backoff for retries
|
||||
@@ -334,6 +316,112 @@ async function callClaude(prdContent, prdPath, numTasks, retryCount = 0) {
|
||||
}
|
||||
}
|
||||
|
||||
// Helper function to handle streaming requests to Claude API
|
||||
async function handleStreamingRequest(prdContent, prdPath, numTasks, maxTokens, systemPrompt, loadingIndicator) {
|
||||
log('info', "Sending streaming request to Claude API...");
|
||||
|
||||
let fullResponse = '';
|
||||
let streamComplete = false;
|
||||
let streamError = null;
|
||||
|
||||
try {
|
||||
const stream = await anthropic.messages.create({
|
||||
max_tokens: maxTokens,
|
||||
model: CONFIG.model,
|
||||
temperature: CONFIG.temperature,
|
||||
messages: [
|
||||
{
|
||||
role: "user",
|
||||
content: prdContent
|
||||
}
|
||||
],
|
||||
system: systemPrompt,
|
||||
stream: true
|
||||
});
|
||||
|
||||
// Update loading indicator to show streaming progress
|
||||
let dotCount = 0;
|
||||
const streamingInterval = setInterval(() => {
|
||||
readline.cursorTo(process.stdout, 0);
|
||||
process.stdout.write(`Receiving streaming response from Claude${'.'.repeat(dotCount)}`);
|
||||
dotCount = (dotCount + 1) % 4;
|
||||
}, 500);
|
||||
|
||||
// Process the stream
|
||||
for await (const chunk of stream) {
|
||||
if (chunk.type === 'content_block_delta' && chunk.delta.text) {
|
||||
fullResponse += chunk.delta.text;
|
||||
}
|
||||
}
|
||||
|
||||
clearInterval(streamingInterval);
|
||||
streamComplete = true;
|
||||
|
||||
// Stop loading indicator
|
||||
stopLoadingIndicator(loadingIndicator);
|
||||
log('info', "Completed streaming response from Claude API!");
|
||||
log('debug', `Streaming response length: ${fullResponse.length} characters`);
|
||||
|
||||
return processClaudeResponse(fullResponse, numTasks, 0, prdContent, prdPath);
|
||||
} catch (error) {
|
||||
clearInterval(streamingInterval);
|
||||
stopLoadingIndicator(loadingIndicator);
|
||||
log('error', "Error during streaming response:", error);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
// Helper function to process Claude's response text
|
||||
function processClaudeResponse(textContent, numTasks, retryCount, prdContent, prdPath) {
|
||||
try {
|
||||
// Check if the response is wrapped in a Markdown code block and extract the JSON
|
||||
log('info', "Parsing response as JSON...");
|
||||
let jsonText = textContent;
|
||||
const codeBlockMatch = textContent.match(/```(?:json)?\s*([\s\S]*?)\s*```/);
|
||||
if (codeBlockMatch) {
|
||||
log('debug', "Detected JSON wrapped in Markdown code block, extracting...");
|
||||
jsonText = codeBlockMatch[1];
|
||||
}
|
||||
|
||||
// Try to parse the response as JSON
|
||||
const parsedJson = JSON.parse(jsonText);
|
||||
|
||||
// Check if the response seems incomplete (e.g., missing closing brackets)
|
||||
if (!parsedJson.tasks || parsedJson.tasks.length === 0) {
|
||||
log('warn', "Parsed JSON has no tasks. Response may be incomplete.");
|
||||
|
||||
// If we have a numTasks parameter and it's greater than 5, try again with fewer tasks
|
||||
if (numTasks && numTasks > 5 && retryCount < MAX_RETRIES) {
|
||||
const reducedTasks = Math.max(5, Math.floor(numTasks * 0.7)); // Reduce by 30%, minimum 5
|
||||
log('info', `Retrying with reduced task count: ${reducedTasks} (was ${numTasks})`);
|
||||
return callClaude(prdContent, prdPath, reducedTasks, retryCount + 1);
|
||||
}
|
||||
}
|
||||
|
||||
log('info', `Successfully parsed JSON with ${parsedJson.tasks?.length || 0} tasks`);
|
||||
return parsedJson;
|
||||
} catch (error) {
|
||||
log('error', "Failed to parse Claude's response as JSON:", error);
|
||||
log('debug', "Raw response:", textContent);
|
||||
|
||||
// Check if we should retry with different parameters
|
||||
if (retryCount < MAX_RETRIES) {
|
||||
// If we have a numTasks parameter, try again with fewer tasks
|
||||
if (numTasks && numTasks > 3) {
|
||||
const reducedTasks = Math.max(3, Math.floor(numTasks * 0.6)); // Reduce by 40%, minimum 3
|
||||
log('info', `Retrying with reduced task count: ${reducedTasks} (was ${numTasks})`);
|
||||
return callClaude(prdContent, prdPath, reducedTasks, retryCount + 1);
|
||||
} else {
|
||||
// Otherwise, just retry with the same parameters
|
||||
log('info', `Retrying Claude API call (attempt ${retryCount + 1}/${MAX_RETRIES})...`);
|
||||
return callClaude(prdContent, prdPath, numTasks, retryCount + 1);
|
||||
}
|
||||
}
|
||||
|
||||
throw new Error("Failed to parse Claude's response as JSON after multiple attempts. See console for details.");
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
// 1) parse-prd
|
||||
//
|
||||
@@ -412,7 +500,7 @@ function generateTaskFiles(tasksPath, outputDir) {
|
||||
log('info', `Reading tasks from ${tasksPath}...`);
|
||||
const data = readJSON(tasksPath);
|
||||
if (!data || !data.tasks) {
|
||||
log('error', "No valid tasks to generate. Please run parse-prd first.");
|
||||
log('error', "No valid tasks to generate files for.");
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user