From 5f786078faf4b227de23f8c86116b888c6978486 Mon Sep 17 00:00:00 2001 From: Auto Date: Sat, 17 Jan 2026 15:25:12 +0200 Subject: [PATCH] fix: prevent orchestrator early exit due to stale session cache The parallel orchestrator was exiting prematurely with "All features complete!" while pending features remained. This was caused by SQLAlchemy session caching not seeing commits made by agent subprocesses. Changes: - Add session.expire_all() to get_resumable_features() to force fresh reads - Add session.expire_all() to get_ready_features() to force fresh reads - Add session.expire_all() to get_all_complete() to force fresh reads - Add defensive retry logic in run_loop() when no features are ready but nothing is running - now forces a fresh check before declaring blocked - Add debug logging to get_all_complete() and get_ready_features() to track passing/pending/in_progress counts for easier diagnosis The root cause was cross-process database visibility: when an agent subprocess committed feature completion, the orchestrator's session had cached the old state and didn't see the update. Co-Authored-By: Claude Opus 4.5 --- parallel_orchestrator.py | 54 +++++++++++++++++++++++++++++++++++++--- 1 file changed, 51 insertions(+), 3 deletions(-) diff --git a/parallel_orchestrator.py b/parallel_orchestrator.py index cf79e8e..da348c8 100644 --- a/parallel_orchestrator.py +++ b/parallel_orchestrator.py @@ -140,6 +140,10 @@ class ParallelOrchestrator: """ session = self.get_session() try: + # Force fresh read from database to avoid stale cached data + # This is critical when agent subprocesses have committed changes + session.expire_all() + # Find features that are in_progress but not complete stale = session.query(Feature).filter( Feature.in_progress == True, @@ -169,6 +173,10 @@ class ParallelOrchestrator: """Get features with satisfied dependencies, not already running.""" session = self.get_session() try: + # Force fresh read from database to avoid stale cached data + # This is critical when agent subprocesses have committed changes + session.expire_all() + all_features = session.query(Feature).all() all_dicts = [f.to_dict() for f in all_features] @@ -190,6 +198,15 @@ class ParallelOrchestrator: # Sort by scheduling score (higher = first), then priority, then id scores = compute_scheduling_scores(all_dicts) ready.sort(key=lambda f: (-scores.get(f["id"], 0), f["priority"], f["id"])) + + # Debug logging + passing = sum(1 for f in all_features if f.passes) + in_progress = sum(1 for f in all_features if f.in_progress and not f.passes) + print( + f"[DEBUG] get_ready_features: {len(ready)} ready, " + f"{passing} passing, {in_progress} in_progress, {len(all_features)} total", + flush=True + ) return ready finally: session.close() @@ -198,14 +215,31 @@ class ParallelOrchestrator: """Check if all features are complete or permanently failed.""" session = self.get_session() try: + # Force fresh read from database to avoid stale cached data + # This is critical when agent subprocesses have committed changes + session.expire_all() + all_features = session.query(Feature).all() + passing_count = 0 + failed_count = 0 + pending_count = 0 for f in all_features: if f.passes: + passing_count += 1 continue # Completed successfully if self._failure_counts.get(f.id, 0) >= MAX_FEATURE_RETRIES: + failed_count += 1 continue # Permanently failed, count as "done" - return False # Still workable - return True + pending_count += 1 + + total = len(all_features) + is_complete = pending_count == 0 + print( + f"[DEBUG] get_all_complete: {passing_count}/{total} passing, " + f"{failed_count} failed, {pending_count} pending -> {is_complete}", + flush=True + ) + return is_complete finally: session.close() @@ -429,7 +463,21 @@ class ParallelOrchestrator: await asyncio.sleep(POLL_INTERVAL) continue else: - # No ready features and nothing running - might be blocked + # No ready features and nothing running + # Force a fresh database check before declaring blocked + # This handles the case where subprocess commits weren't visible yet + session = self.get_session() + try: + session.expire_all() + finally: + session.close() + + # Recheck if all features are now complete + if self.get_all_complete(): + print("\nAll features complete!", flush=True) + break + + # Still have pending features but all are blocked by dependencies print("No ready features available. All remaining features may be blocked by dependencies.", flush=True) await asyncio.sleep(POLL_INTERVAL * 2) continue