feat: implement timeout for plan approval and enhance error handling

- Added a 30-minute timeout for user plan approval to prevent indefinite waiting and memory leaks.
- Wrapped resolve/reject functions in the waitForPlanApproval method to ensure timeout is cleared upon resolution.
- Enhanced error handling in the stream processing loop to ensure proper cleanup and logging of errors.
- Improved the handling of task execution and phase completion events for better tracking and user feedback.
This commit is contained in:
Shirone
2026-01-04 03:45:21 +01:00
parent 3ed3a90bf6
commit ef06c13c1a
2 changed files with 501 additions and 437 deletions

View File

@@ -1426,23 +1426,53 @@ Format your response as a structured markdown document.`;
/** /**
* Wait for plan approval from the user. * Wait for plan approval from the user.
* Returns a promise that resolves when the user approves/rejects the plan. * Returns a promise that resolves when the user approves/rejects the plan.
* Times out after 30 minutes to prevent indefinite memory retention.
*/ */
waitForPlanApproval( waitForPlanApproval(
featureId: string, featureId: string,
projectPath: string projectPath: string
): Promise<{ approved: boolean; editedPlan?: string; feedback?: string }> { ): Promise<{ approved: boolean; editedPlan?: string; feedback?: string }> {
const APPROVAL_TIMEOUT_MS = 30 * 60 * 1000; // 30 minutes
logger.info(`Registering pending approval for feature ${featureId}`); logger.info(`Registering pending approval for feature ${featureId}`);
logger.info( logger.info(
`Current pending approvals: ${Array.from(this.pendingApprovals.keys()).join(', ') || 'none'}` `Current pending approvals: ${Array.from(this.pendingApprovals.keys()).join(', ') || 'none'}`
); );
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
// Set up timeout to prevent indefinite waiting and memory leaks
const timeoutId = setTimeout(() => {
const pending = this.pendingApprovals.get(featureId);
if (pending) {
logger.warn(`Plan approval for feature ${featureId} timed out after 30 minutes`);
this.pendingApprovals.delete(featureId);
reject(
new Error('Plan approval timed out after 30 minutes - feature execution cancelled')
);
}
}, APPROVAL_TIMEOUT_MS);
// Wrap resolve/reject to clear timeout when approval is resolved
const wrappedResolve = (result: {
approved: boolean;
editedPlan?: string;
feedback?: string;
}) => {
clearTimeout(timeoutId);
resolve(result);
};
const wrappedReject = (error: Error) => {
clearTimeout(timeoutId);
reject(error);
};
this.pendingApprovals.set(featureId, { this.pendingApprovals.set(featureId, {
resolve, resolve: wrappedResolve,
reject, reject: wrappedReject,
featureId, featureId,
projectPath, projectPath,
}); });
logger.info(`Pending approval registered for feature ${featureId}`); logger.info(`Pending approval registered for feature ${featureId} (timeout: 30 minutes)`);
}); });
} }
@@ -2178,6 +2208,8 @@ This mock response was generated because AUTOMAKER_MOCK_AGENT=true was set.
}, WRITE_DEBOUNCE_MS); }, WRITE_DEBOUNCE_MS);
}; };
// Wrap stream processing in try/finally to ensure timeout cleanup on any error/abort
try {
streamLoop: for await (const msg of stream) { streamLoop: for await (const msg of stream) {
// Log raw stream event for debugging // Log raw stream event for debugging
appendRawEvent(msg); appendRawEvent(msg);
@@ -2460,7 +2492,9 @@ After generating the revised spec, output:
// CRITICAL: After approval, we need to make a second call to continue implementation // CRITICAL: After approval, we need to make a second call to continue implementation
// The agent is waiting for "approved" - we need to send it and continue // The agent is waiting for "approved" - we need to send it and continue
logger.info(`Making continuation call after plan approval for feature ${featureId}`); logger.info(
`Making continuation call after plan approval for feature ${featureId}`
);
// Update planSpec status to approved (handles both manual and auto-approval paths) // Update planSpec status to approved (handles both manual and auto-approval paths)
await this.updateFeaturePlanSpec(projectPath, featureId, { await this.updateFeaturePlanSpec(projectPath, featureId, {
@@ -2686,19 +2720,11 @@ Implement all the changes described in the plan above.`;
} }
} }
// Clear any pending timeout and do a final write to ensure all content is saved // Final write - ensure all accumulated content is saved (on success path)
if (writeTimeout) {
clearTimeout(writeTimeout);
}
// Final write - ensure all accumulated content is saved
await writeToFile(); await writeToFile();
// Flush remaining raw output (only if enabled) // Flush remaining raw output (only if enabled, on success path)
if (enableRawOutput) { if (enableRawOutput && rawOutputLines.length > 0) {
if (rawWriteTimeout) {
clearTimeout(rawWriteTimeout);
}
if (rawOutputLines.length > 0) {
try { try {
await secureFs.mkdir(path.dirname(rawOutputPath), { recursive: true }); await secureFs.mkdir(path.dirname(rawOutputPath), { recursive: true });
await secureFs.appendFile(rawOutputPath, rawOutputLines.join('\n') + '\n'); await secureFs.appendFile(rawOutputPath, rawOutputLines.join('\n') + '\n');
@@ -2706,6 +2732,17 @@ Implement all the changes described in the plan above.`;
logger.error(`Failed to write final raw output for ${featureId}:`, error); logger.error(`Failed to write final raw output for ${featureId}:`, error);
} }
} }
} finally {
// ALWAYS clear pending timeouts to prevent memory leaks
// This runs on success, error, or abort
if (writeTimeout) {
clearTimeout(writeTimeout);
writeTimeout = null;
}
if (rawWriteTimeout) {
clearTimeout(rawWriteTimeout);
rawWriteTimeout = null;
}
} }
} }

View File

@@ -87,17 +87,27 @@ export async function* spawnJSONLProcess(options: SubprocessOptions): AsyncGener
resetTimeout(); resetTimeout();
// Setup abort handling // Setup abort handling with cleanup
let abortHandler: (() => void) | null = null;
if (abortController) { if (abortController) {
abortController.signal.addEventListener('abort', () => { abortHandler = () => {
console.log('[SubprocessManager] Abort signal received, killing process'); console.log('[SubprocessManager] Abort signal received, killing process');
if (timeoutHandle) { if (timeoutHandle) {
clearTimeout(timeoutHandle); clearTimeout(timeoutHandle);
} }
childProcess.kill('SIGTERM'); childProcess.kill('SIGTERM');
}); };
abortController.signal.addEventListener('abort', abortHandler);
} }
// Helper to clean up abort listener
const cleanupAbortListener = () => {
if (abortController && abortHandler) {
abortController.signal.removeEventListener('abort', abortHandler);
abortHandler = null;
}
};
// Parse stdout as JSONL (one JSON object per line) // Parse stdout as JSONL (one JSON object per line)
if (childProcess.stdout) { if (childProcess.stdout) {
const rl = readline.createInterface({ const rl = readline.createInterface({
@@ -130,7 +140,12 @@ export async function* spawnJSONLProcess(options: SubprocessOptions): AsyncGener
if (timeoutHandle) { if (timeoutHandle) {
clearTimeout(timeoutHandle); clearTimeout(timeoutHandle);
} }
rl.close();
cleanupAbortListener();
} }
} else {
// No stdout - still need to cleanup abort listener when process exits
cleanupAbortListener();
} }
// Wait for process to exit // Wait for process to exit
@@ -195,19 +210,31 @@ export async function spawnProcess(options: SubprocessOptions): Promise<Subproce
}); });
} }
// Setup abort handling // Setup abort handling with cleanup
let abortHandler: (() => void) | null = null;
const cleanupAbortListener = () => {
if (abortController && abortHandler) {
abortController.signal.removeEventListener('abort', abortHandler);
abortHandler = null;
}
};
if (abortController) { if (abortController) {
abortController.signal.addEventListener('abort', () => { abortHandler = () => {
cleanupAbortListener();
childProcess.kill('SIGTERM'); childProcess.kill('SIGTERM');
reject(new Error('Process aborted')); reject(new Error('Process aborted'));
}); };
abortController.signal.addEventListener('abort', abortHandler);
} }
childProcess.on('exit', (code) => { childProcess.on('exit', (code) => {
cleanupAbortListener();
resolve({ stdout, stderr, exitCode: code }); resolve({ stdout, stderr, exitCode: code });
}); });
childProcess.on('error', (error) => { childProcess.on('error', (error) => {
cleanupAbortListener();
reject(error); reject(error);
}); });
}); });