mirror of
https://github.com/leonvanzyl/autocoder.git
synced 2026-01-30 06:12:06 +00:00
fix: prevent orchestrator early exit due to stale session cache
The parallel orchestrator was exiting prematurely with "All features complete!" while pending features remained. This was caused by SQLAlchemy session caching not seeing commits made by agent subprocesses. Changes: - Add session.expire_all() to get_resumable_features() to force fresh reads - Add session.expire_all() to get_ready_features() to force fresh reads - Add session.expire_all() to get_all_complete() to force fresh reads - Add defensive retry logic in run_loop() when no features are ready but nothing is running - now forces a fresh check before declaring blocked - Add debug logging to get_all_complete() and get_ready_features() to track passing/pending/in_progress counts for easier diagnosis The root cause was cross-process database visibility: when an agent subprocess committed feature completion, the orchestrator's session had cached the old state and didn't see the update. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -140,6 +140,10 @@ class ParallelOrchestrator:
|
||||
"""
|
||||
session = self.get_session()
|
||||
try:
|
||||
# Force fresh read from database to avoid stale cached data
|
||||
# This is critical when agent subprocesses have committed changes
|
||||
session.expire_all()
|
||||
|
||||
# Find features that are in_progress but not complete
|
||||
stale = session.query(Feature).filter(
|
||||
Feature.in_progress == True,
|
||||
@@ -169,6 +173,10 @@ class ParallelOrchestrator:
|
||||
"""Get features with satisfied dependencies, not already running."""
|
||||
session = self.get_session()
|
||||
try:
|
||||
# Force fresh read from database to avoid stale cached data
|
||||
# This is critical when agent subprocesses have committed changes
|
||||
session.expire_all()
|
||||
|
||||
all_features = session.query(Feature).all()
|
||||
all_dicts = [f.to_dict() for f in all_features]
|
||||
|
||||
@@ -190,6 +198,15 @@ class ParallelOrchestrator:
|
||||
# Sort by scheduling score (higher = first), then priority, then id
|
||||
scores = compute_scheduling_scores(all_dicts)
|
||||
ready.sort(key=lambda f: (-scores.get(f["id"], 0), f["priority"], f["id"]))
|
||||
|
||||
# Debug logging
|
||||
passing = sum(1 for f in all_features if f.passes)
|
||||
in_progress = sum(1 for f in all_features if f.in_progress and not f.passes)
|
||||
print(
|
||||
f"[DEBUG] get_ready_features: {len(ready)} ready, "
|
||||
f"{passing} passing, {in_progress} in_progress, {len(all_features)} total",
|
||||
flush=True
|
||||
)
|
||||
return ready
|
||||
finally:
|
||||
session.close()
|
||||
@@ -198,14 +215,31 @@ class ParallelOrchestrator:
|
||||
"""Check if all features are complete or permanently failed."""
|
||||
session = self.get_session()
|
||||
try:
|
||||
# Force fresh read from database to avoid stale cached data
|
||||
# This is critical when agent subprocesses have committed changes
|
||||
session.expire_all()
|
||||
|
||||
all_features = session.query(Feature).all()
|
||||
passing_count = 0
|
||||
failed_count = 0
|
||||
pending_count = 0
|
||||
for f in all_features:
|
||||
if f.passes:
|
||||
passing_count += 1
|
||||
continue # Completed successfully
|
||||
if self._failure_counts.get(f.id, 0) >= MAX_FEATURE_RETRIES:
|
||||
failed_count += 1
|
||||
continue # Permanently failed, count as "done"
|
||||
return False # Still workable
|
||||
return True
|
||||
pending_count += 1
|
||||
|
||||
total = len(all_features)
|
||||
is_complete = pending_count == 0
|
||||
print(
|
||||
f"[DEBUG] get_all_complete: {passing_count}/{total} passing, "
|
||||
f"{failed_count} failed, {pending_count} pending -> {is_complete}",
|
||||
flush=True
|
||||
)
|
||||
return is_complete
|
||||
finally:
|
||||
session.close()
|
||||
|
||||
@@ -429,7 +463,21 @@ class ParallelOrchestrator:
|
||||
await asyncio.sleep(POLL_INTERVAL)
|
||||
continue
|
||||
else:
|
||||
# No ready features and nothing running - might be blocked
|
||||
# No ready features and nothing running
|
||||
# Force a fresh database check before declaring blocked
|
||||
# This handles the case where subprocess commits weren't visible yet
|
||||
session = self.get_session()
|
||||
try:
|
||||
session.expire_all()
|
||||
finally:
|
||||
session.close()
|
||||
|
||||
# Recheck if all features are now complete
|
||||
if self.get_all_complete():
|
||||
print("\nAll features complete!", flush=True)
|
||||
break
|
||||
|
||||
# Still have pending features but all are blocked by dependencies
|
||||
print("No ready features available. All remaining features may be blocked by dependencies.", flush=True)
|
||||
await asyncio.sleep(POLL_INTERVAL * 2)
|
||||
continue
|
||||
|
||||
Reference in New Issue
Block a user