diff --git a/apps/server/src/services/auto-mode-service.ts b/apps/server/src/services/auto-mode-service.ts index ed4834a6..30caeea7 100644 --- a/apps/server/src/services/auto-mode-service.ts +++ b/apps/server/src/services/auto-mode-service.ts @@ -1426,23 +1426,53 @@ Format your response as a structured markdown document.`; /** * Wait for plan approval from the user. * Returns a promise that resolves when the user approves/rejects the plan. + * Times out after 30 minutes to prevent indefinite memory retention. */ waitForPlanApproval( featureId: string, projectPath: string ): Promise<{ approved: boolean; editedPlan?: string; feedback?: string }> { + const APPROVAL_TIMEOUT_MS = 30 * 60 * 1000; // 30 minutes + logger.info(`Registering pending approval for feature ${featureId}`); logger.info( `Current pending approvals: ${Array.from(this.pendingApprovals.keys()).join(', ') || 'none'}` ); return new Promise((resolve, reject) => { + // Set up timeout to prevent indefinite waiting and memory leaks + const timeoutId = setTimeout(() => { + const pending = this.pendingApprovals.get(featureId); + if (pending) { + logger.warn(`Plan approval for feature ${featureId} timed out after 30 minutes`); + this.pendingApprovals.delete(featureId); + reject( + new Error('Plan approval timed out after 30 minutes - feature execution cancelled') + ); + } + }, APPROVAL_TIMEOUT_MS); + + // Wrap resolve/reject to clear timeout when approval is resolved + const wrappedResolve = (result: { + approved: boolean; + editedPlan?: string; + feedback?: string; + }) => { + clearTimeout(timeoutId); + resolve(result); + }; + + const wrappedReject = (error: Error) => { + clearTimeout(timeoutId); + reject(error); + }; + this.pendingApprovals.set(featureId, { - resolve, - reject, + resolve: wrappedResolve, + reject: wrappedReject, featureId, projectPath, }); - logger.info(`Pending approval registered for feature ${featureId}`); + logger.info(`Pending approval registered for feature ${featureId} (timeout: 30 minutes)`); }); } @@ -2178,183 +2208,185 @@ This mock response was generated because AUTOMAKER_MOCK_AGENT=true was set. }, WRITE_DEBOUNCE_MS); }; - streamLoop: for await (const msg of stream) { - // Log raw stream event for debugging - appendRawEvent(msg); + // Wrap stream processing in try/finally to ensure timeout cleanup on any error/abort + try { + streamLoop: for await (const msg of stream) { + // Log raw stream event for debugging + appendRawEvent(msg); - logger.info(`Stream message received:`, msg.type, msg.subtype || ''); - if (msg.type === 'assistant' && msg.message?.content) { - for (const block of msg.message.content) { - if (block.type === 'text') { - const newText = block.text || ''; + logger.info(`Stream message received:`, msg.type, msg.subtype || ''); + if (msg.type === 'assistant' && msg.message?.content) { + for (const block of msg.message.content) { + if (block.type === 'text') { + const newText = block.text || ''; - // Skip empty text - if (!newText) continue; + // Skip empty text + if (!newText) continue; - // Note: Cursor-specific dedup (duplicate blocks, accumulated text) is now - // handled in CursorProvider.deduplicateTextBlocks() for cleaner separation + // Note: Cursor-specific dedup (duplicate blocks, accumulated text) is now + // handled in CursorProvider.deduplicateTextBlocks() for cleaner separation - // Only add separator when we're at a natural paragraph break: - // - Previous text ends with sentence terminator AND new text starts a new thought - // - Don't add separators mid-word or mid-sentence (for streaming providers like Cursor) - if (responseText.length > 0 && newText.length > 0) { - const lastChar = responseText.slice(-1); - const endsWithSentence = /[.!?:]\s*$/.test(responseText); - const endsWithNewline = /\n\s*$/.test(responseText); - const startsNewParagraph = /^[\n#\-*>]/.test(newText); + // Only add separator when we're at a natural paragraph break: + // - Previous text ends with sentence terminator AND new text starts a new thought + // - Don't add separators mid-word or mid-sentence (for streaming providers like Cursor) + if (responseText.length > 0 && newText.length > 0) { + const lastChar = responseText.slice(-1); + const endsWithSentence = /[.!?:]\s*$/.test(responseText); + const endsWithNewline = /\n\s*$/.test(responseText); + const startsNewParagraph = /^[\n#\-*>]/.test(newText); - // Add paragraph break only at natural boundaries + // Add paragraph break only at natural boundaries + if ( + !endsWithNewline && + (endsWithSentence || startsNewParagraph) && + !/[a-zA-Z0-9]/.test(lastChar) // Not mid-word + ) { + responseText += '\n\n'; + } + } + responseText += newText; + + // Check for authentication errors in the response if ( - !endsWithNewline && - (endsWithSentence || startsNewParagraph) && - !/[a-zA-Z0-9]/.test(lastChar) // Not mid-word + block.text && + (block.text.includes('Invalid API key') || + block.text.includes('authentication_failed') || + block.text.includes('Fix external API key')) ) { - responseText += '\n\n'; - } - } - responseText += newText; - - // Check for authentication errors in the response - if ( - block.text && - (block.text.includes('Invalid API key') || - block.text.includes('authentication_failed') || - block.text.includes('Fix external API key')) - ) { - throw new Error( - 'Authentication failed: Invalid or expired API key. ' + - "Please check your ANTHROPIC_API_KEY, or run 'claude login' to re-authenticate." - ); - } - - // Schedule incremental file write (debounced) - scheduleWrite(); - - // Check for [SPEC_GENERATED] marker in planning modes (spec or full) - if ( - planningModeRequiresApproval && - !specDetected && - responseText.includes('[SPEC_GENERATED]') - ) { - specDetected = true; - - // Extract plan content (everything before the marker) - const markerIndex = responseText.indexOf('[SPEC_GENERATED]'); - const planContent = responseText.substring(0, markerIndex).trim(); - - // Parse tasks from the generated spec (for spec and full modes) - // Use let since we may need to update this after plan revision - let parsedTasks = parseTasksFromSpec(planContent); - const tasksTotal = parsedTasks.length; - - logger.info(`Parsed ${tasksTotal} tasks from spec for feature ${featureId}`); - if (parsedTasks.length > 0) { - logger.info(`Tasks: ${parsedTasks.map((t) => t.id).join(', ')}`); + throw new Error( + 'Authentication failed: Invalid or expired API key. ' + + "Please check your ANTHROPIC_API_KEY, or run 'claude login' to re-authenticate." + ); } - // Update planSpec status to 'generated' and save content with parsed tasks - await this.updateFeaturePlanSpec(projectPath, featureId, { - status: 'generated', - content: planContent, - version: 1, - generatedAt: new Date().toISOString(), - reviewedByUser: false, - tasks: parsedTasks, - tasksTotal, - tasksCompleted: 0, - }); + // Schedule incremental file write (debounced) + scheduleWrite(); - let approvedPlanContent = planContent; - let userFeedback: string | undefined; - let currentPlanContent = planContent; - let planVersion = 1; + // Check for [SPEC_GENERATED] marker in planning modes (spec or full) + if ( + planningModeRequiresApproval && + !specDetected && + responseText.includes('[SPEC_GENERATED]') + ) { + specDetected = true; - // Only pause for approval if requirePlanApproval is true - if (requiresApproval) { - // ======================================== - // PLAN REVISION LOOP - // Keep regenerating plan until user approves - // ======================================== - let planApproved = false; + // Extract plan content (everything before the marker) + const markerIndex = responseText.indexOf('[SPEC_GENERATED]'); + const planContent = responseText.substring(0, markerIndex).trim(); - while (!planApproved) { - logger.info( - `Spec v${planVersion} generated for feature ${featureId}, waiting for approval` - ); + // Parse tasks from the generated spec (for spec and full modes) + // Use let since we may need to update this after plan revision + let parsedTasks = parseTasksFromSpec(planContent); + const tasksTotal = parsedTasks.length; - // CRITICAL: Register pending approval BEFORE emitting event - const approvalPromise = this.waitForPlanApproval(featureId, projectPath); + logger.info(`Parsed ${tasksTotal} tasks from spec for feature ${featureId}`); + if (parsedTasks.length > 0) { + logger.info(`Tasks: ${parsedTasks.map((t) => t.id).join(', ')}`); + } - // Emit plan_approval_required event - this.emitAutoModeEvent('plan_approval_required', { - featureId, - projectPath, - planContent: currentPlanContent, - planningMode, - planVersion, - }); + // Update planSpec status to 'generated' and save content with parsed tasks + await this.updateFeaturePlanSpec(projectPath, featureId, { + status: 'generated', + content: planContent, + version: 1, + generatedAt: new Date().toISOString(), + reviewedByUser: false, + tasks: parsedTasks, + tasksTotal, + tasksCompleted: 0, + }); - // Wait for user response - try { - const approvalResult = await approvalPromise; + let approvedPlanContent = planContent; + let userFeedback: string | undefined; + let currentPlanContent = planContent; + let planVersion = 1; - if (approvalResult.approved) { - // User approved the plan - logger.info(`Plan v${planVersion} approved for feature ${featureId}`); - planApproved = true; + // Only pause for approval if requirePlanApproval is true + if (requiresApproval) { + // ======================================== + // PLAN REVISION LOOP + // Keep regenerating plan until user approves + // ======================================== + let planApproved = false; - // If user provided edits, use the edited version - if (approvalResult.editedPlan) { - approvedPlanContent = approvalResult.editedPlan; - await this.updateFeaturePlanSpec(projectPath, featureId, { - content: approvalResult.editedPlan, + while (!planApproved) { + logger.info( + `Spec v${planVersion} generated for feature ${featureId}, waiting for approval` + ); + + // CRITICAL: Register pending approval BEFORE emitting event + const approvalPromise = this.waitForPlanApproval(featureId, projectPath); + + // Emit plan_approval_required event + this.emitAutoModeEvent('plan_approval_required', { + featureId, + projectPath, + planContent: currentPlanContent, + planningMode, + planVersion, + }); + + // Wait for user response + try { + const approvalResult = await approvalPromise; + + if (approvalResult.approved) { + // User approved the plan + logger.info(`Plan v${planVersion} approved for feature ${featureId}`); + planApproved = true; + + // If user provided edits, use the edited version + if (approvalResult.editedPlan) { + approvedPlanContent = approvalResult.editedPlan; + await this.updateFeaturePlanSpec(projectPath, featureId, { + content: approvalResult.editedPlan, + }); + } else { + approvedPlanContent = currentPlanContent; + } + + // Capture any additional feedback for implementation + userFeedback = approvalResult.feedback; + + // Emit approval event + this.emitAutoModeEvent('plan_approved', { + featureId, + projectPath, + hasEdits: !!approvalResult.editedPlan, + planVersion, }); } else { - approvedPlanContent = currentPlanContent; - } + // User rejected - check if they provided feedback for revision + const hasFeedback = + approvalResult.feedback && approvalResult.feedback.trim().length > 0; + const hasEdits = + approvalResult.editedPlan && approvalResult.editedPlan.trim().length > 0; - // Capture any additional feedback for implementation - userFeedback = approvalResult.feedback; + if (!hasFeedback && !hasEdits) { + // No feedback or edits = explicit cancel + logger.info( + `Plan rejected without feedback for feature ${featureId}, cancelling` + ); + throw new Error('Plan cancelled by user'); + } - // Emit approval event - this.emitAutoModeEvent('plan_approved', { - featureId, - projectPath, - hasEdits: !!approvalResult.editedPlan, - planVersion, - }); - } else { - // User rejected - check if they provided feedback for revision - const hasFeedback = - approvalResult.feedback && approvalResult.feedback.trim().length > 0; - const hasEdits = - approvalResult.editedPlan && approvalResult.editedPlan.trim().length > 0; - - if (!hasFeedback && !hasEdits) { - // No feedback or edits = explicit cancel + // User wants revisions - regenerate the plan logger.info( - `Plan rejected without feedback for feature ${featureId}, cancelling` + `Plan v${planVersion} rejected with feedback for feature ${featureId}, regenerating...` ); - throw new Error('Plan cancelled by user'); - } + planVersion++; - // User wants revisions - regenerate the plan - logger.info( - `Plan v${planVersion} rejected with feedback for feature ${featureId}, regenerating...` - ); - planVersion++; + // Emit revision event + this.emitAutoModeEvent('plan_revision_requested', { + featureId, + projectPath, + feedback: approvalResult.feedback, + hasEdits: !!hasEdits, + planVersion, + }); - // Emit revision event - this.emitAutoModeEvent('plan_revision_requested', { - featureId, - projectPath, - feedback: approvalResult.feedback, - hasEdits: !!hasEdits, - planVersion, - }); - - // Build revision prompt - let revisionPrompt = `The user has requested revisions to the plan/specification. + // Build revision prompt + let revisionPrompt = `The user has requested revisions to the plan/specification. ## Previous Plan (v${planVersion - 1}) ${hasEdits ? approvalResult.editedPlan : currentPlanContent} @@ -2369,155 +2401,247 @@ After generating the revised spec, output: "[SPEC_GENERATED] Please review the revised specification above." `; - // Update status to regenerating - await this.updateFeaturePlanSpec(projectPath, featureId, { - status: 'generating', - version: planVersion, - }); + // Update status to regenerating + await this.updateFeaturePlanSpec(projectPath, featureId, { + status: 'generating', + version: planVersion, + }); - // Make revision call - const revisionStream = provider.executeQuery({ - prompt: revisionPrompt, - model: finalModel, - maxTurns: maxTurns || 100, - cwd: workDir, - allowedTools: allowedTools, - abortController, - mcpServers: Object.keys(mcpServers).length > 0 ? mcpServers : undefined, - mcpAutoApproveTools: mcpPermissions.mcpAutoApproveTools, - mcpUnrestrictedTools: mcpPermissions.mcpUnrestrictedTools, - }); + // Make revision call + const revisionStream = provider.executeQuery({ + prompt: revisionPrompt, + model: finalModel, + maxTurns: maxTurns || 100, + cwd: workDir, + allowedTools: allowedTools, + abortController, + mcpServers: Object.keys(mcpServers).length > 0 ? mcpServers : undefined, + mcpAutoApproveTools: mcpPermissions.mcpAutoApproveTools, + mcpUnrestrictedTools: mcpPermissions.mcpUnrestrictedTools, + }); - let revisionText = ''; - for await (const msg of revisionStream) { - if (msg.type === 'assistant' && msg.message?.content) { - for (const block of msg.message.content) { - if (block.type === 'text') { - revisionText += block.text || ''; - this.emitAutoModeEvent('auto_mode_progress', { - featureId, - content: block.text, - }); + let revisionText = ''; + for await (const msg of revisionStream) { + if (msg.type === 'assistant' && msg.message?.content) { + for (const block of msg.message.content) { + if (block.type === 'text') { + revisionText += block.text || ''; + this.emitAutoModeEvent('auto_mode_progress', { + featureId, + content: block.text, + }); + } } + } else if (msg.type === 'error') { + throw new Error(msg.error || 'Error during plan revision'); + } else if (msg.type === 'result' && msg.subtype === 'success') { + revisionText += msg.result || ''; } - } else if (msg.type === 'error') { - throw new Error(msg.error || 'Error during plan revision'); - } else if (msg.type === 'result' && msg.subtype === 'success') { - revisionText += msg.result || ''; } + + // Extract new plan content + const markerIndex = revisionText.indexOf('[SPEC_GENERATED]'); + if (markerIndex > 0) { + currentPlanContent = revisionText.substring(0, markerIndex).trim(); + } else { + currentPlanContent = revisionText.trim(); + } + + // Re-parse tasks from revised plan + const revisedTasks = parseTasksFromSpec(currentPlanContent); + logger.info(`Revised plan has ${revisedTasks.length} tasks`); + + // Update planSpec with revised content + await this.updateFeaturePlanSpec(projectPath, featureId, { + status: 'generated', + content: currentPlanContent, + version: planVersion, + tasks: revisedTasks, + tasksTotal: revisedTasks.length, + tasksCompleted: 0, + }); + + // Update parsedTasks for implementation + parsedTasks = revisedTasks; + + responseText += revisionText; } - - // Extract new plan content - const markerIndex = revisionText.indexOf('[SPEC_GENERATED]'); - if (markerIndex > 0) { - currentPlanContent = revisionText.substring(0, markerIndex).trim(); - } else { - currentPlanContent = revisionText.trim(); + } catch (error) { + if ((error as Error).message.includes('cancelled')) { + throw error; } - - // Re-parse tasks from revised plan - const revisedTasks = parseTasksFromSpec(currentPlanContent); - logger.info(`Revised plan has ${revisedTasks.length} tasks`); - - // Update planSpec with revised content - await this.updateFeaturePlanSpec(projectPath, featureId, { - status: 'generated', - content: currentPlanContent, - version: planVersion, - tasks: revisedTasks, - tasksTotal: revisedTasks.length, - tasksCompleted: 0, - }); - - // Update parsedTasks for implementation - parsedTasks = revisedTasks; - - responseText += revisionText; + throw new Error(`Plan approval failed: ${(error as Error).message}`); } - } catch (error) { - if ((error as Error).message.includes('cancelled')) { - throw error; - } - throw new Error(`Plan approval failed: ${(error as Error).message}`); } - } - } else { - // Auto-approve: requirePlanApproval is false, just continue without pausing - logger.info( - `Spec generated for feature ${featureId}, auto-approving (requirePlanApproval=false)` - ); - - // Emit info event for frontend - this.emitAutoModeEvent('plan_auto_approved', { - featureId, - projectPath, - planContent, - planningMode, - }); - - approvedPlanContent = planContent; - } - - // CRITICAL: After approval, we need to make a second call to continue implementation - // The agent is waiting for "approved" - we need to send it and continue - logger.info(`Making continuation call after plan approval for feature ${featureId}`); - - // Update planSpec status to approved (handles both manual and auto-approval paths) - await this.updateFeaturePlanSpec(projectPath, featureId, { - status: 'approved', - approvedAt: new Date().toISOString(), - reviewedByUser: requiresApproval, - }); - - // ======================================== - // MULTI-AGENT TASK EXECUTION - // Each task gets its own focused agent call - // ======================================== - - if (parsedTasks.length > 0) { - logger.info( - `Starting multi-agent execution: ${parsedTasks.length} tasks for feature ${featureId}` - ); - - // Execute each task with a separate agent - for (let taskIndex = 0; taskIndex < parsedTasks.length; taskIndex++) { - const task = parsedTasks[taskIndex]; - - // Check for abort - if (abortController.signal.aborted) { - throw new Error('Feature execution aborted'); - } - - // Emit task started - logger.info(`Starting task ${task.id}: ${task.description}`); - this.emitAutoModeEvent('auto_mode_task_started', { - featureId, - projectPath, - taskId: task.id, - taskDescription: task.description, - taskIndex, - tasksTotal: parsedTasks.length, - }); - - // Update planSpec with current task - await this.updateFeaturePlanSpec(projectPath, featureId, { - currentTaskId: task.id, - }); - - // Build focused prompt for this specific task - const taskPrompt = this.buildTaskPrompt( - task, - parsedTasks, - taskIndex, - approvedPlanContent, - userFeedback + } else { + // Auto-approve: requirePlanApproval is false, just continue without pausing + logger.info( + `Spec generated for feature ${featureId}, auto-approving (requirePlanApproval=false)` ); - // Execute task with dedicated agent - const taskStream = provider.executeQuery({ - prompt: taskPrompt, + // Emit info event for frontend + this.emitAutoModeEvent('plan_auto_approved', { + featureId, + projectPath, + planContent, + planningMode, + }); + + approvedPlanContent = planContent; + } + + // CRITICAL: After approval, we need to make a second call to continue implementation + // The agent is waiting for "approved" - we need to send it and continue + logger.info( + `Making continuation call after plan approval for feature ${featureId}` + ); + + // Update planSpec status to approved (handles both manual and auto-approval paths) + await this.updateFeaturePlanSpec(projectPath, featureId, { + status: 'approved', + approvedAt: new Date().toISOString(), + reviewedByUser: requiresApproval, + }); + + // ======================================== + // MULTI-AGENT TASK EXECUTION + // Each task gets its own focused agent call + // ======================================== + + if (parsedTasks.length > 0) { + logger.info( + `Starting multi-agent execution: ${parsedTasks.length} tasks for feature ${featureId}` + ); + + // Execute each task with a separate agent + for (let taskIndex = 0; taskIndex < parsedTasks.length; taskIndex++) { + const task = parsedTasks[taskIndex]; + + // Check for abort + if (abortController.signal.aborted) { + throw new Error('Feature execution aborted'); + } + + // Emit task started + logger.info(`Starting task ${task.id}: ${task.description}`); + this.emitAutoModeEvent('auto_mode_task_started', { + featureId, + projectPath, + taskId: task.id, + taskDescription: task.description, + taskIndex, + tasksTotal: parsedTasks.length, + }); + + // Update planSpec with current task + await this.updateFeaturePlanSpec(projectPath, featureId, { + currentTaskId: task.id, + }); + + // Build focused prompt for this specific task + const taskPrompt = this.buildTaskPrompt( + task, + parsedTasks, + taskIndex, + approvedPlanContent, + userFeedback + ); + + // Execute task with dedicated agent + const taskStream = provider.executeQuery({ + prompt: taskPrompt, + model: finalModel, + maxTurns: Math.min(maxTurns || 100, 50), // Limit turns per task + cwd: workDir, + allowedTools: allowedTools, + abortController, + mcpServers: Object.keys(mcpServers).length > 0 ? mcpServers : undefined, + mcpAutoApproveTools: mcpPermissions.mcpAutoApproveTools, + mcpUnrestrictedTools: mcpPermissions.mcpUnrestrictedTools, + }); + + let taskOutput = ''; + + // Process task stream + for await (const msg of taskStream) { + if (msg.type === 'assistant' && msg.message?.content) { + for (const block of msg.message.content) { + if (block.type === 'text') { + taskOutput += block.text || ''; + responseText += block.text || ''; + this.emitAutoModeEvent('auto_mode_progress', { + featureId, + content: block.text, + }); + } else if (block.type === 'tool_use') { + this.emitAutoModeEvent('auto_mode_tool', { + featureId, + tool: block.name, + input: block.input, + }); + } + } + } else if (msg.type === 'error') { + throw new Error(msg.error || `Error during task ${task.id}`); + } else if (msg.type === 'result' && msg.subtype === 'success') { + taskOutput += msg.result || ''; + responseText += msg.result || ''; + } + } + + // Emit task completed + logger.info(`Task ${task.id} completed for feature ${featureId}`); + this.emitAutoModeEvent('auto_mode_task_complete', { + featureId, + projectPath, + taskId: task.id, + tasksCompleted: taskIndex + 1, + tasksTotal: parsedTasks.length, + }); + + // Update planSpec with progress + await this.updateFeaturePlanSpec(projectPath, featureId, { + tasksCompleted: taskIndex + 1, + }); + + // Check for phase completion (group tasks by phase) + if (task.phase) { + const nextTask = parsedTasks[taskIndex + 1]; + if (!nextTask || nextTask.phase !== task.phase) { + // Phase changed, emit phase complete + const phaseMatch = task.phase.match(/Phase\s*(\d+)/i); + if (phaseMatch) { + this.emitAutoModeEvent('auto_mode_phase_complete', { + featureId, + projectPath, + phaseNumber: parseInt(phaseMatch[1], 10), + }); + } + } + } + } + + logger.info(`All ${parsedTasks.length} tasks completed for feature ${featureId}`); + } else { + // No parsed tasks - fall back to single-agent execution + logger.info( + `No parsed tasks, using single-agent execution for feature ${featureId}` + ); + + const continuationPrompt = `The plan/specification has been approved. Now implement it. +${userFeedback ? `\n## User Feedback\n${userFeedback}\n` : ''} +## Approved Plan + +${approvedPlanContent} + +## Instructions + +Implement all the changes described in the plan above.`; + + const continuationStream = provider.executeQuery({ + prompt: continuationPrompt, model: finalModel, - maxTurns: Math.min(maxTurns || 100, 50), // Limit turns per task + maxTurns: maxTurns, cwd: workDir, allowedTools: allowedTools, abortController, @@ -2526,14 +2650,10 @@ After generating the revised spec, output: mcpUnrestrictedTools: mcpPermissions.mcpUnrestrictedTools, }); - let taskOutput = ''; - - // Process task stream - for await (const msg of taskStream) { + for await (const msg of continuationStream) { if (msg.type === 'assistant' && msg.message?.content) { for (const block of msg.message.content) { if (block.type === 'text') { - taskOutput += block.text || ''; responseText += block.text || ''; this.emitAutoModeEvent('auto_mode_progress', { featureId, @@ -2548,157 +2668,63 @@ After generating the revised spec, output: } } } else if (msg.type === 'error') { - throw new Error(msg.error || `Error during task ${task.id}`); + throw new Error(msg.error || 'Unknown error during implementation'); } else if (msg.type === 'result' && msg.subtype === 'success') { - taskOutput += msg.result || ''; responseText += msg.result || ''; } } - - // Emit task completed - logger.info(`Task ${task.id} completed for feature ${featureId}`); - this.emitAutoModeEvent('auto_mode_task_complete', { - featureId, - projectPath, - taskId: task.id, - tasksCompleted: taskIndex + 1, - tasksTotal: parsedTasks.length, - }); - - // Update planSpec with progress - await this.updateFeaturePlanSpec(projectPath, featureId, { - tasksCompleted: taskIndex + 1, - }); - - // Check for phase completion (group tasks by phase) - if (task.phase) { - const nextTask = parsedTasks[taskIndex + 1]; - if (!nextTask || nextTask.phase !== task.phase) { - // Phase changed, emit phase complete - const phaseMatch = task.phase.match(/Phase\s*(\d+)/i); - if (phaseMatch) { - this.emitAutoModeEvent('auto_mode_phase_complete', { - featureId, - projectPath, - phaseNumber: parseInt(phaseMatch[1], 10), - }); - } - } - } } - logger.info(`All ${parsedTasks.length} tasks completed for feature ${featureId}`); - } else { - // No parsed tasks - fall back to single-agent execution - logger.info( - `No parsed tasks, using single-agent execution for feature ${featureId}` - ); - - const continuationPrompt = `The plan/specification has been approved. Now implement it. -${userFeedback ? `\n## User Feedback\n${userFeedback}\n` : ''} -## Approved Plan - -${approvedPlanContent} - -## Instructions - -Implement all the changes described in the plan above.`; - - const continuationStream = provider.executeQuery({ - prompt: continuationPrompt, - model: finalModel, - maxTurns: maxTurns, - cwd: workDir, - allowedTools: allowedTools, - abortController, - mcpServers: Object.keys(mcpServers).length > 0 ? mcpServers : undefined, - mcpAutoApproveTools: mcpPermissions.mcpAutoApproveTools, - mcpUnrestrictedTools: mcpPermissions.mcpUnrestrictedTools, - }); - - for await (const msg of continuationStream) { - if (msg.type === 'assistant' && msg.message?.content) { - for (const block of msg.message.content) { - if (block.type === 'text') { - responseText += block.text || ''; - this.emitAutoModeEvent('auto_mode_progress', { - featureId, - content: block.text, - }); - } else if (block.type === 'tool_use') { - this.emitAutoModeEvent('auto_mode_tool', { - featureId, - tool: block.name, - input: block.input, - }); - } - } - } else if (msg.type === 'error') { - throw new Error(msg.error || 'Unknown error during implementation'); - } else if (msg.type === 'result' && msg.subtype === 'success') { - responseText += msg.result || ''; - } - } + logger.info(`Implementation completed for feature ${featureId}`); + // Exit the original stream loop since continuation is done + break streamLoop; } - logger.info(`Implementation completed for feature ${featureId}`); - // Exit the original stream loop since continuation is done - break streamLoop; - } - - // Only emit progress for non-marker text (marker was already handled above) - if (!specDetected) { - logger.info( - `Emitting progress event for ${featureId}, content length: ${block.text?.length || 0}` - ); - this.emitAutoModeEvent('auto_mode_progress', { + // Only emit progress for non-marker text (marker was already handled above) + if (!specDetected) { + logger.info( + `Emitting progress event for ${featureId}, content length: ${block.text?.length || 0}` + ); + this.emitAutoModeEvent('auto_mode_progress', { + featureId, + content: block.text, + }); + } + } else if (block.type === 'tool_use') { + // Emit event for real-time UI + this.emitAutoModeEvent('auto_mode_tool', { featureId, - content: block.text, + tool: block.name, + input: block.input, }); - } - } else if (block.type === 'tool_use') { - // Emit event for real-time UI - this.emitAutoModeEvent('auto_mode_tool', { - featureId, - tool: block.name, - input: block.input, - }); - // Also add to file output for persistence - if (responseText.length > 0 && !responseText.endsWith('\n')) { - responseText += '\n'; + // Also add to file output for persistence + if (responseText.length > 0 && !responseText.endsWith('\n')) { + responseText += '\n'; + } + responseText += `\n🔧 Tool: ${block.name}\n`; + if (block.input) { + responseText += `Input: ${JSON.stringify(block.input, null, 2)}\n`; + } + scheduleWrite(); } - responseText += `\n🔧 Tool: ${block.name}\n`; - if (block.input) { - responseText += `Input: ${JSON.stringify(block.input, null, 2)}\n`; - } - scheduleWrite(); } + } else if (msg.type === 'error') { + // Handle error messages + throw new Error(msg.error || 'Unknown error'); + } else if (msg.type === 'result' && msg.subtype === 'success') { + // Don't replace responseText - the accumulated content is the full history + // The msg.result is just a summary which would lose all tool use details + // Just ensure final write happens + scheduleWrite(); } - } else if (msg.type === 'error') { - // Handle error messages - throw new Error(msg.error || 'Unknown error'); - } else if (msg.type === 'result' && msg.subtype === 'success') { - // Don't replace responseText - the accumulated content is the full history - // The msg.result is just a summary which would lose all tool use details - // Just ensure final write happens - scheduleWrite(); } - } - // Clear any pending timeout and do a final write to ensure all content is saved - if (writeTimeout) { - clearTimeout(writeTimeout); - } - // Final write - ensure all accumulated content is saved - await writeToFile(); + // Final write - ensure all accumulated content is saved (on success path) + await writeToFile(); - // Flush remaining raw output (only if enabled) - if (enableRawOutput) { - if (rawWriteTimeout) { - clearTimeout(rawWriteTimeout); - } - if (rawOutputLines.length > 0) { + // Flush remaining raw output (only if enabled, on success path) + if (enableRawOutput && rawOutputLines.length > 0) { try { await secureFs.mkdir(path.dirname(rawOutputPath), { recursive: true }); await secureFs.appendFile(rawOutputPath, rawOutputLines.join('\n') + '\n'); @@ -2706,6 +2732,17 @@ Implement all the changes described in the plan above.`; logger.error(`Failed to write final raw output for ${featureId}:`, error); } } + } finally { + // ALWAYS clear pending timeouts to prevent memory leaks + // This runs on success, error, or abort + if (writeTimeout) { + clearTimeout(writeTimeout); + writeTimeout = null; + } + if (rawWriteTimeout) { + clearTimeout(rawWriteTimeout); + rawWriteTimeout = null; + } } } diff --git a/libs/platform/src/subprocess.ts b/libs/platform/src/subprocess.ts index ba0c2c42..ddfa220e 100644 --- a/libs/platform/src/subprocess.ts +++ b/libs/platform/src/subprocess.ts @@ -87,17 +87,27 @@ export async function* spawnJSONLProcess(options: SubprocessOptions): AsyncGener resetTimeout(); - // Setup abort handling + // Setup abort handling with cleanup + let abortHandler: (() => void) | null = null; if (abortController) { - abortController.signal.addEventListener('abort', () => { + abortHandler = () => { console.log('[SubprocessManager] Abort signal received, killing process'); if (timeoutHandle) { clearTimeout(timeoutHandle); } childProcess.kill('SIGTERM'); - }); + }; + abortController.signal.addEventListener('abort', abortHandler); } + // Helper to clean up abort listener + const cleanupAbortListener = () => { + if (abortController && abortHandler) { + abortController.signal.removeEventListener('abort', abortHandler); + abortHandler = null; + } + }; + // Parse stdout as JSONL (one JSON object per line) if (childProcess.stdout) { const rl = readline.createInterface({ @@ -130,7 +140,12 @@ export async function* spawnJSONLProcess(options: SubprocessOptions): AsyncGener if (timeoutHandle) { clearTimeout(timeoutHandle); } + rl.close(); + cleanupAbortListener(); } + } else { + // No stdout - still need to cleanup abort listener when process exits + cleanupAbortListener(); } // Wait for process to exit @@ -195,19 +210,31 @@ export async function spawnProcess(options: SubprocessOptions): Promise void) | null = null; + const cleanupAbortListener = () => { + if (abortController && abortHandler) { + abortController.signal.removeEventListener('abort', abortHandler); + abortHandler = null; + } + }; + if (abortController) { - abortController.signal.addEventListener('abort', () => { + abortHandler = () => { + cleanupAbortListener(); childProcess.kill('SIGTERM'); reject(new Error('Process aborted')); - }); + }; + abortController.signal.addEventListener('abort', abortHandler); } childProcess.on('exit', (code) => { + cleanupAbortListener(); resolve({ stdout, stderr, exitCode: code }); }); childProcess.on('error', (error) => { + cleanupAbortListener(); reject(error); }); });