fix(core): Fix race condition in workflow state persistence (#1339)

Co-authored-by: Claude <noreply@anthropic.com>
This commit is contained in:
Ralph Khreish
2025-10-31 15:41:21 +01:00
committed by GitHub
parent 3e70edfa3a
commit 3b09b5da2a
8 changed files with 113 additions and 89 deletions

View File

@@ -33,6 +33,7 @@
"@supabase/supabase-js": "^2.57.4",
"fs-extra": "^11.3.2",
"simple-git": "^3.28.0",
"steno": "^4.0.2",
"zod": "^4.1.11"
},
"devDependencies": {

View File

@@ -9,6 +9,7 @@
import fs from 'node:fs/promises';
import path from 'node:path';
import os from 'node:os';
import { Writer } from 'steno';
import type { WorkflowState } from '../types.js';
import { getLogger } from '../../../common/logger/index.js';
@@ -28,6 +29,8 @@ export class WorkflowStateManager {
private readonly sessionDir: string;
private maxBackups: number;
private readonly logger = getLogger('WorkflowStateManager');
private writer: Writer | null = null;
private writerInitPromise: Promise<void> | null = null;
constructor(projectRoot: string, maxBackups = 5) {
this.projectRoot = path.resolve(projectRoot);
@@ -69,6 +72,31 @@ export class WorkflowStateManager {
return sanitized;
}
/**
* Ensure the steno Writer is initialized
* This ensures the session directory exists before creating the writer
*/
private async ensureWriter(): Promise<void> {
if (this.writer) {
return;
}
// If another call is already initializing, wait for it
if (this.writerInitPromise) {
await this.writerInitPromise;
return;
}
this.writerInitPromise = (async () => {
// Ensure session directory exists before creating writer
await fs.mkdir(this.sessionDir, { recursive: true });
this.writer = new Writer(this.statePath);
})();
await this.writerInitPromise;
this.writerInitPromise = null;
}
/**
* Check if workflow state exists
*/
@@ -98,11 +126,12 @@ export class WorkflowStateManager {
/**
* Save workflow state to disk
* Uses steno for atomic writes and automatic queueing of concurrent saves
*/
async save(state: WorkflowState): Promise<void> {
try {
// Ensure session directory exists
await fs.mkdir(this.sessionDir, { recursive: true });
// Ensure writer is initialized (creates directory if needed)
await this.ensureWriter();
// Serialize and validate JSON
const jsonContent = JSON.stringify(state, null, 2);
@@ -115,10 +144,8 @@ export class WorkflowStateManager {
throw new Error('Failed to generate valid JSON from workflow state');
}
// Write state atomically with newline at end
const tempPath = `${this.statePath}.tmp`;
await fs.writeFile(tempPath, jsonContent + '\n', 'utf-8');
await fs.rename(tempPath, this.statePath);
// Write using steno (handles queuing and atomic writes automatically)
await this.writer!.write(jsonContent + '\n');
this.logger.debug(`Saved workflow state (${jsonContent.length} bytes)`);
} catch (error: any) {

View File

@@ -98,7 +98,7 @@ export class WorkflowOrchestrator {
/**
* Transition to next state based on event
*/
transition(event: WorkflowEvent): void {
async transition(event: WorkflowEvent): Promise<void> {
// Check if workflow is aborted
if (this.aborted && event.type !== 'ABORT') {
throw new Error('Workflow has been aborted');
@@ -107,26 +107,26 @@ export class WorkflowOrchestrator {
// Handle special events that work across all phases
if (event.type === 'ERROR') {
this.handleError(event.error);
void this.triggerAutoPersist();
await this.triggerAutoPersist();
return;
}
if (event.type === 'ABORT') {
this.aborted = true;
void this.triggerAutoPersist();
await this.triggerAutoPersist();
return;
}
if (event.type === 'RETRY') {
this.handleRetry();
void this.triggerAutoPersist();
await this.triggerAutoPersist();
return;
}
// Handle TDD phase transitions within SUBTASK_LOOP
if (this.currentPhase === 'SUBTASK_LOOP') {
this.handleTDDPhaseTransition(event);
void this.triggerAutoPersist();
await this.handleTDDPhaseTransition(event);
await this.triggerAutoPersist();
return;
}
@@ -143,13 +143,13 @@ export class WorkflowOrchestrator {
// Execute transition
this.executeTransition(validTransition, event);
void this.triggerAutoPersist();
await this.triggerAutoPersist();
}
/**
* Handle TDD phase transitions (RED -> GREEN -> COMMIT)
*/
private handleTDDPhaseTransition(event: WorkflowEvent): void {
private async handleTDDPhaseTransition(event: WorkflowEvent): Promise<void> {
const currentTDD = this.context.currentTDDPhase || 'RED';
switch (event.type) {
@@ -201,7 +201,7 @@ export class WorkflowOrchestrator {
this.emit('subtask:started');
} else {
// All subtasks complete, transition to FINALIZE
this.transition({ type: 'ALL_SUBTASKS_COMPLETE' });
await this.transition({ type: 'ALL_SUBTASKS_COMPLETE' });
}
break;
}
@@ -272,7 +272,7 @@ export class WorkflowOrchestrator {
this.emit('subtask:started');
} else {
// All subtasks complete, transition to FINALIZE
this.transition({ type: 'ALL_SUBTASKS_COMPLETE' });
await this.transition({ type: 'ALL_SUBTASKS_COMPLETE' });
}
break;

View File

@@ -168,7 +168,7 @@ export class WorkflowService {
this.activityLogger.start();
// Transition through PREFLIGHT and BRANCH_SETUP phases
this.orchestrator.transition({ type: 'PREFLIGHT_COMPLETE' });
await this.orchestrator.transition({ type: 'PREFLIGHT_COMPLETE' });
// Create git branch with descriptive name
const branchName = this.generateBranchName(taskId, taskTitle, tag);
@@ -181,7 +181,7 @@ export class WorkflowService {
}
// Transition to SUBTASK_LOOP with RED phase
this.orchestrator.transition({
await this.orchestrator.transition({
type: 'BRANCH_CREATED',
branchName
});
@@ -363,13 +363,13 @@ export class WorkflowService {
// Transition based on current phase
switch (tddPhase) {
case 'RED':
this.orchestrator.transition({
await this.orchestrator.transition({
type: 'RED_PHASE_COMPLETE',
testResults
});
break;
case 'GREEN':
this.orchestrator.transition({
await this.orchestrator.transition({
type: 'GREEN_PHASE_COMPLETE',
testResults
});
@@ -402,17 +402,17 @@ export class WorkflowService {
}
// Transition COMMIT phase complete
this.orchestrator.transition({
await this.orchestrator.transition({
type: 'COMMIT_COMPLETE'
});
// Check if should advance to next subtask
const progress = this.orchestrator.getProgress();
if (progress.current < progress.total) {
this.orchestrator.transition({ type: 'SUBTASK_COMPLETE' });
await this.orchestrator.transition({ type: 'SUBTASK_COMPLETE' });
} else {
// All subtasks complete
this.orchestrator.transition({ type: 'ALL_SUBTASKS_COMPLETE' });
await this.orchestrator.transition({ type: 'ALL_SUBTASKS_COMPLETE' });
}
return this.getStatus();
@@ -448,7 +448,7 @@ export class WorkflowService {
}
// Transition to COMPLETE
this.orchestrator.transition({ type: 'FINALIZE_COMPLETE' });
await this.orchestrator.transition({ type: 'FINALIZE_COMPLETE' });
return this.getStatus();
}
@@ -458,7 +458,7 @@ export class WorkflowService {
*/
async abortWorkflow(): Promise<void> {
if (this.orchestrator) {
this.orchestrator.transition({ type: 'ABORT' });
await this.orchestrator.transition({ type: 'ABORT' });
}
// Delete state file