""" Parallel Orchestrator ===================== Unified orchestrator that handles all agent lifecycle: - Initialization: Creates features from app_spec if needed - Coding agents: Implement features one at a time - Testing agents: Regression test passing features (optional) Uses dependency-aware scheduling to ensure features are only started when their dependencies are satisfied. Usage: # Entry point (always uses orchestrator) python autonomous_agent_demo.py --project-dir my-app --concurrency 3 # Direct orchestrator usage python parallel_orchestrator.py --project-dir my-app --max-concurrency 3 """ import asyncio import os import subprocess import sys import threading from datetime import datetime from pathlib import Path from typing import Callable, Literal import psutil from api.database import Feature, create_database from api.dependency_resolver import are_dependencies_satisfied, compute_scheduling_scores from progress import has_features # Root directory of autocoder (where this script and autonomous_agent_demo.py live) AUTOCODER_ROOT = Path(__file__).parent.resolve() # Debug log file path DEBUG_LOG_FILE = AUTOCODER_ROOT / "orchestrator_debug.log" class DebugLogger: """Thread-safe debug logger that writes to a file.""" def __init__(self, log_file: Path = DEBUG_LOG_FILE): self.log_file = log_file self._lock = threading.Lock() self._session_started = False # DON'T clear on import - only mark session start when run_loop begins def start_session(self): """Mark the start of a new orchestrator session. Clears previous logs.""" with self._lock: self._session_started = True with open(self.log_file, "w") as f: f.write(f"=== Orchestrator Debug Log Started: {datetime.now().isoformat()} ===\n") f.write(f"=== PID: {os.getpid()} ===\n\n") def log(self, category: str, message: str, **kwargs): """Write a timestamped log entry.""" timestamp = datetime.now().strftime("%H:%M:%S.%f")[:-3] with self._lock: with open(self.log_file, "a") as f: f.write(f"[{timestamp}] [{category}] {message}\n") for key, value in kwargs.items(): f.write(f" {key}: {value}\n") f.write("\n") def section(self, title: str): """Write a section header.""" with self._lock: with open(self.log_file, "a") as f: f.write(f"\n{'='*60}\n") f.write(f" {title}\n") f.write(f"{'='*60}\n\n") # Global debug logger instance debug_log = DebugLogger() def _dump_database_state(session, label: str = ""): """Helper to dump full database state to debug log.""" from api.database import Feature all_features = session.query(Feature).all() passing = [f for f in all_features if f.passes] in_progress = [f for f in all_features if f.in_progress and not f.passes] pending = [f for f in all_features if not f.passes and not f.in_progress] debug_log.log("DB_DUMP", f"Full database state {label}", total_features=len(all_features), passing_count=len(passing), passing_ids=[f.id for f in passing], in_progress_count=len(in_progress), in_progress_ids=[f.id for f in in_progress], pending_count=len(pending), pending_ids=[f.id for f in pending[:10]]) # First 10 pending only # Performance: Limit parallel agents to prevent memory exhaustion MAX_PARALLEL_AGENTS = 5 MAX_TOTAL_AGENTS = 10 # Hard limit on total agents (coding + testing) DEFAULT_CONCURRENCY = 3 POLL_INTERVAL = 5 # seconds between checking for ready features MAX_FEATURE_RETRIES = 3 # Maximum times to retry a failed feature INITIALIZER_TIMEOUT = 1800 # 30 minutes timeout for initializer def _kill_process_tree(proc: subprocess.Popen, timeout: float = 5.0) -> None: """Kill a process and all its child processes. On Windows, subprocess.terminate() only kills the immediate process, leaving orphaned child processes (e.g., spawned browser instances). This function uses psutil to kill the entire process tree. Args: proc: The subprocess.Popen object to kill timeout: Seconds to wait for graceful termination before force-killing """ try: parent = psutil.Process(proc.pid) # Get all children recursively before terminating children = parent.children(recursive=True) # Terminate children first (graceful) for child in children: try: child.terminate() except psutil.NoSuchProcess: pass # Wait for children to terminate _, still_alive = psutil.wait_procs(children, timeout=timeout) # Force kill any remaining children for child in still_alive: try: child.kill() except psutil.NoSuchProcess: pass # Now terminate the parent proc.terminate() try: proc.wait(timeout=timeout) except subprocess.TimeoutExpired: proc.kill() proc.wait() except psutil.NoSuchProcess: # Process already dead, just ensure cleanup try: proc.terminate() proc.wait(timeout=1) except (subprocess.TimeoutExpired, OSError): try: proc.kill() except OSError: pass class ParallelOrchestrator: """Orchestrates parallel execution of independent features.""" def __init__( self, project_dir: Path, max_concurrency: int = DEFAULT_CONCURRENCY, model: str = None, yolo_mode: bool = False, testing_agent_ratio: int = 1, count_testing_in_concurrency: bool = False, on_output: Callable[[int, str], None] = None, on_status: Callable[[int, str], None] = None, ): """Initialize the orchestrator. Args: project_dir: Path to the project directory max_concurrency: Maximum number of concurrent coding agents (1-5) model: Claude model to use (or None for default) yolo_mode: Whether to run in YOLO mode (skip testing agents) testing_agent_ratio: Testing agents per coding agent (0-3, default 1) count_testing_in_concurrency: If True, testing agents count toward concurrency limit on_output: Callback for agent output (feature_id, line) on_status: Callback for agent status changes (feature_id, status) """ self.project_dir = project_dir self.max_concurrency = min(max(max_concurrency, 1), MAX_PARALLEL_AGENTS) self.model = model self.yolo_mode = yolo_mode self.testing_agent_ratio = min(max(testing_agent_ratio, 0), 3) # Clamp 0-3 self.count_testing_in_concurrency = count_testing_in_concurrency self.on_output = on_output self.on_status = on_status # Thread-safe state self._lock = threading.Lock() # Coding agents: feature_id -> process self.running_coding_agents: dict[int, subprocess.Popen] = {} # Testing agents: list of processes (not tied to specific features) self.running_testing_agents: list[subprocess.Popen] = [] # Legacy alias for backward compatibility self.running_agents = self.running_coding_agents self.abort_events: dict[int, threading.Event] = {} self.is_running = False # Track feature failures to prevent infinite retry loops self._failure_counts: dict[int, int] = {} # Database session for this orchestrator self._engine, self._session_maker = create_database(project_dir) def get_session(self): """Get a new database session.""" return self._session_maker() def get_resumable_features(self) -> list[dict]: """Get features that were left in_progress from a previous session. These are features where in_progress=True but passes=False, and they're not currently being worked on by this orchestrator. This handles the case where a previous session was interrupted before completing the feature. """ session = self.get_session() try: # Force fresh read from database to avoid stale cached data # This is critical when agent subprocesses have committed changes session.expire_all() # Find features that are in_progress but not complete stale = session.query(Feature).filter( Feature.in_progress == True, Feature.passes == False ).all() resumable = [] for f in stale: # Skip if already running in this orchestrator instance with self._lock: if f.id in self.running_coding_agents: continue # Skip if feature has failed too many times if self._failure_counts.get(f.id, 0) >= MAX_FEATURE_RETRIES: continue resumable.append(f.to_dict()) # Sort by scheduling score (higher = first), then priority, then id all_dicts = [f.to_dict() for f in session.query(Feature).all()] scores = compute_scheduling_scores(all_dicts) resumable.sort(key=lambda f: (-scores.get(f["id"], 0), f["priority"], f["id"])) return resumable finally: session.close() def get_ready_features(self) -> list[dict]: """Get features with satisfied dependencies, not already running.""" session = self.get_session() try: # Force fresh read from database to avoid stale cached data # This is critical when agent subprocesses have committed changes session.expire_all() all_features = session.query(Feature).all() all_dicts = [f.to_dict() for f in all_features] ready = [] skipped_reasons = {"passes": 0, "in_progress": 0, "running": 0, "failed": 0, "deps": 0} for f in all_features: if f.passes: skipped_reasons["passes"] += 1 continue if f.in_progress: skipped_reasons["in_progress"] += 1 continue # Skip if already running in this orchestrator with self._lock: if f.id in self.running_coding_agents: skipped_reasons["running"] += 1 continue # Skip if feature has failed too many times if self._failure_counts.get(f.id, 0) >= MAX_FEATURE_RETRIES: skipped_reasons["failed"] += 1 continue # Check dependencies if are_dependencies_satisfied(f.to_dict(), all_dicts): ready.append(f.to_dict()) else: skipped_reasons["deps"] += 1 # Sort by scheduling score (higher = first), then priority, then id scores = compute_scheduling_scores(all_dicts) ready.sort(key=lambda f: (-scores.get(f["id"], 0), f["priority"], f["id"])) # Debug logging passing = sum(1 for f in all_features if f.passes) in_progress = sum(1 for f in all_features if f.in_progress and not f.passes) print( f"[DEBUG] get_ready_features: {len(ready)} ready, " f"{passing} passing, {in_progress} in_progress, {len(all_features)} total", flush=True ) print( f"[DEBUG] Skipped: {skipped_reasons['passes']} passing, {skipped_reasons['in_progress']} in_progress, " f"{skipped_reasons['running']} running, {skipped_reasons['failed']} failed, {skipped_reasons['deps']} blocked by deps", flush=True ) # Log to debug file (but not every call to avoid spam) debug_log.log("READY", "get_ready_features() called", ready_count=len(ready), ready_ids=[f['id'] for f in ready[:5]], # First 5 only passing=passing, in_progress=in_progress, total=len(all_features), skipped=skipped_reasons) return ready finally: session.close() def get_all_complete(self) -> bool: """Check if all features are complete or permanently failed. Returns False if there are no features (initialization needed). """ session = self.get_session() try: # Force fresh read from database to avoid stale cached data # This is critical when agent subprocesses have committed changes session.expire_all() all_features = session.query(Feature).all() # No features = NOT complete, need initialization if len(all_features) == 0: return False passing_count = 0 failed_count = 0 pending_count = 0 for f in all_features: if f.passes: passing_count += 1 continue # Completed successfully if self._failure_counts.get(f.id, 0) >= MAX_FEATURE_RETRIES: failed_count += 1 continue # Permanently failed, count as "done" pending_count += 1 total = len(all_features) is_complete = pending_count == 0 print( f"[DEBUG] get_all_complete: {passing_count}/{total} passing, " f"{failed_count} failed, {pending_count} pending -> {is_complete}", flush=True ) return is_complete finally: session.close() def get_passing_count(self) -> int: """Get the number of passing features.""" session = self.get_session() try: session.expire_all() return session.query(Feature).filter(Feature.passes == True).count() finally: session.close() def start_feature(self, feature_id: int, resume: bool = False) -> tuple[bool, str]: """Start a single coding agent for a feature. Args: feature_id: ID of the feature to start resume: If True, resume a feature that's already in_progress from a previous session Returns: Tuple of (success, message) """ with self._lock: if feature_id in self.running_coding_agents: return False, "Feature already running" if len(self.running_coding_agents) >= self.max_concurrency: return False, "At max concurrency" # Mark as in_progress in database (or verify it's resumable) session = self.get_session() try: feature = session.query(Feature).filter(Feature.id == feature_id).first() if not feature: return False, "Feature not found" if feature.passes: return False, "Feature already complete" if resume: # Resuming: feature should already be in_progress if not feature.in_progress: return False, "Feature not in progress, cannot resume" else: # Starting fresh: feature should not be in_progress if feature.in_progress: return False, "Feature already in progress" feature.in_progress = True session.commit() finally: session.close() # Start coding agent subprocess success, message = self._spawn_coding_agent(feature_id) if not success: return False, message # Spawn ONE testing agent when coding agent STARTS (if not YOLO mode and passing features exist) # Testing agents exit after one test, so we spawn fresh ones with each coding agent start if not self.yolo_mode and self.testing_agent_ratio > 0: passing_count = self.get_passing_count() if passing_count > 0: print(f"[DEBUG] Coding agent started, spawning testing agent (passing_count={passing_count})", flush=True) debug_log.log("TESTING", "Spawning testing agent on coding agent start", feature_id=feature_id, passing_count=passing_count) self._spawn_testing_agent() return True, f"Started feature {feature_id}" def _spawn_coding_agent(self, feature_id: int) -> tuple[bool, str]: """Spawn a coding agent subprocess for a specific feature.""" # Create abort event abort_event = threading.Event() # Start subprocess for this feature cmd = [ sys.executable, "-u", # Force unbuffered stdout/stderr str(AUTOCODER_ROOT / "autonomous_agent_demo.py"), "--project-dir", str(self.project_dir), "--max-iterations", "1", "--agent-type", "coding", "--feature-id", str(feature_id), ] if self.model: cmd.extend(["--model", self.model]) if self.yolo_mode: cmd.append("--yolo") try: proc = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, cwd=str(AUTOCODER_ROOT), env={**os.environ, "PYTHONUNBUFFERED": "1"}, ) except Exception as e: # Reset in_progress on failure session = self.get_session() try: feature = session.query(Feature).filter(Feature.id == feature_id).first() if feature: feature.in_progress = False session.commit() finally: session.close() return False, f"Failed to start agent: {e}" with self._lock: self.running_coding_agents[feature_id] = proc self.abort_events[feature_id] = abort_event # Start output reader thread threading.Thread( target=self._read_output, args=(feature_id, proc, abort_event, "coding"), daemon=True ).start() if self.on_status: self.on_status(feature_id, "running") print(f"Started coding agent for feature #{feature_id}", flush=True) return True, f"Started feature {feature_id}" def _spawn_testing_agents(self) -> None: """Spawn testing agents based on testing_agent_ratio.""" for _ in range(self.testing_agent_ratio): # Check resource limits with self._lock: total_agents = len(self.running_coding_agents) + len(self.running_testing_agents) if total_agents >= MAX_TOTAL_AGENTS: print(f"[DEBUG] At max total agents ({MAX_TOTAL_AGENTS}), skipping testing agent", flush=True) break if self.count_testing_in_concurrency: if total_agents >= self.max_concurrency: print("[DEBUG] Testing agents count toward concurrency, at limit", flush=True) break # Spawn a testing agent self._spawn_testing_agent() def _spawn_testing_agent(self) -> tuple[bool, str]: """Spawn a testing agent subprocess for regression testing.""" debug_log.log("TESTING", "Attempting to spawn testing agent subprocess") cmd = [ sys.executable, "-u", str(AUTOCODER_ROOT / "autonomous_agent_demo.py"), "--project-dir", str(self.project_dir), "--max-iterations", "1", "--agent-type", "testing", ] if self.model: cmd.extend(["--model", self.model]) # Testing agents don't need --yolo flag (they use testing prompt regardless) try: proc = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, cwd=str(AUTOCODER_ROOT), env={**os.environ, "PYTHONUNBUFFERED": "1"}, ) except Exception as e: debug_log.log("TESTING", f"FAILED to spawn testing agent: {e}") return False, f"Failed to start testing agent: {e}" with self._lock: self.running_testing_agents.append(proc) testing_count = len(self.running_testing_agents) # Start output reader thread (feature_id=None for testing agents) threading.Thread( target=self._read_output, args=(None, proc, threading.Event(), "testing"), daemon=True ).start() print(f"Started testing agent (PID {proc.pid})", flush=True) debug_log.log("TESTING", "Successfully spawned testing agent", pid=proc.pid, total_testing_agents=testing_count) return True, "Started testing agent" async def _run_initializer(self) -> bool: """Run initializer agent as blocking subprocess. Returns True if initialization succeeded (features were created). """ debug_log.section("INITIALIZER PHASE") debug_log.log("INIT", "Starting initializer subprocess", project_dir=str(self.project_dir)) cmd = [ sys.executable, "-u", str(AUTOCODER_ROOT / "autonomous_agent_demo.py"), "--project-dir", str(self.project_dir), "--agent-type", "initializer", "--max-iterations", "1", ] if self.model: cmd.extend(["--model", self.model]) print("Running initializer agent...", flush=True) proc = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, cwd=str(AUTOCODER_ROOT), env={**os.environ, "PYTHONUNBUFFERED": "1"}, ) debug_log.log("INIT", "Initializer subprocess started", pid=proc.pid) # Stream output with timeout loop = asyncio.get_running_loop() try: async def stream_output(): while True: line = await loop.run_in_executor(None, proc.stdout.readline) if not line: break print(line.rstrip(), flush=True) if self.on_output: self.on_output(0, line.rstrip()) # Use 0 as feature_id for initializer proc.wait() await asyncio.wait_for(stream_output(), timeout=INITIALIZER_TIMEOUT) except asyncio.TimeoutError: print(f"ERROR: Initializer timed out after {INITIALIZER_TIMEOUT // 60} minutes", flush=True) debug_log.log("INIT", "TIMEOUT - Initializer exceeded time limit", timeout_minutes=INITIALIZER_TIMEOUT // 60) _kill_process_tree(proc) return False debug_log.log("INIT", "Initializer subprocess completed", return_code=proc.returncode, success=proc.returncode == 0) if proc.returncode != 0: print(f"ERROR: Initializer failed with exit code {proc.returncode}", flush=True) return False return True def _read_output( self, feature_id: int | None, proc: subprocess.Popen, abort: threading.Event, agent_type: Literal["coding", "testing"] = "coding", ): """Read output from subprocess and emit events.""" try: for line in proc.stdout: if abort.is_set(): break line = line.rstrip() if self.on_output: self.on_output(feature_id or 0, line) else: if agent_type == "testing": print(f"[Testing] {line}", flush=True) else: print(f"[Feature #{feature_id}] {line}", flush=True) proc.wait() finally: self._on_agent_complete(feature_id, proc.returncode, agent_type, proc) def _on_agent_complete( self, feature_id: int | None, return_code: int, agent_type: Literal["coding", "testing"], proc: subprocess.Popen, ): """Handle agent completion. For coding agents: - ALWAYS clears in_progress when agent exits, regardless of success/failure. - This prevents features from getting stuck if an agent crashes or is killed. - The agent marks features as passing BEFORE clearing in_progress, so this is safe. For testing agents: - Just remove from the running list. """ if agent_type == "testing": with self._lock: if proc in self.running_testing_agents: self.running_testing_agents.remove(proc) status = "completed" if return_code == 0 else "failed" print(f"Testing agent (PID {proc.pid}) {status}", flush=True) debug_log.log("COMPLETE", "Testing agent finished", pid=proc.pid, status=status) return # Coding agent completion debug_log.log("COMPLETE", f"Coding agent for feature #{feature_id} finished", return_code=return_code, status="success" if return_code == 0 else "failed") with self._lock: self.running_coding_agents.pop(feature_id, None) self.abort_events.pop(feature_id, None) # BEFORE dispose: Query database state to see if it's stale session_before = self.get_session() try: session_before.expire_all() feature_before = session_before.query(Feature).filter(Feature.id == feature_id).first() all_before = session_before.query(Feature).all() passing_before = sum(1 for f in all_before if f.passes) debug_log.log("DB", f"BEFORE engine.dispose() - Feature #{feature_id} state", passes=feature_before.passes if feature_before else None, in_progress=feature_before.in_progress if feature_before else None, total_passing_in_db=passing_before) finally: session_before.close() # CRITICAL: Refresh database connection to see subprocess commits # The coding agent runs as a subprocess and commits changes (e.g., passes=True). # SQLAlchemy may have stale connections. Disposing the engine forces new connections # that will see the subprocess's committed changes. debug_log.log("DB", "Disposing database engine now...") self._engine.dispose() # AFTER dispose: Query again to compare session = self.get_session() try: feature = session.query(Feature).filter(Feature.id == feature_id).first() all_after = session.query(Feature).all() passing_after = sum(1 for f in all_after if f.passes) feature_passes = feature.passes if feature else None feature_in_progress = feature.in_progress if feature else None debug_log.log("DB", f"AFTER engine.dispose() - Feature #{feature_id} state", passes=feature_passes, in_progress=feature_in_progress, total_passing_in_db=passing_after, passing_changed=(passing_after != passing_before) if 'passing_before' in dir() else "unknown") if feature and feature.in_progress and not feature.passes: feature.in_progress = False session.commit() debug_log.log("DB", f"Cleared in_progress for feature #{feature_id} (agent failed)") finally: session.close() # Track failures to prevent infinite retry loops if return_code != 0: with self._lock: self._failure_counts[feature_id] = self._failure_counts.get(feature_id, 0) + 1 failure_count = self._failure_counts[feature_id] if failure_count >= MAX_FEATURE_RETRIES: print(f"Feature #{feature_id} has failed {failure_count} times, will not retry", flush=True) debug_log.log("COMPLETE", f"Feature #{feature_id} exceeded max retries", failure_count=failure_count) status = "completed" if return_code == 0 else "failed" if self.on_status: self.on_status(feature_id, status) # CRITICAL: This print triggers the WebSocket to emit agent_update with state='error' or 'success' print(f"Feature #{feature_id} {status}", flush=True) # NOTE: Testing agents are now spawned in start_feature() when coding agents START, # not here when they complete. This ensures 1:1 ratio and proper termination. def stop_feature(self, feature_id: int) -> tuple[bool, str]: """Stop a running coding agent and all its child processes.""" with self._lock: if feature_id not in self.running_coding_agents: return False, "Feature not running" abort = self.abort_events.get(feature_id) proc = self.running_coding_agents.get(feature_id) if abort: abort.set() if proc: # Kill entire process tree to avoid orphaned children (e.g., browser instances) _kill_process_tree(proc, timeout=5.0) return True, f"Stopped feature {feature_id}" def stop_all(self) -> None: """Stop all running agents (coding and testing).""" self.is_running = False # Stop coding agents with self._lock: feature_ids = list(self.running_coding_agents.keys()) for fid in feature_ids: self.stop_feature(fid) # Stop testing agents with self._lock: testing_procs = list(self.running_testing_agents) for proc in testing_procs: _kill_process_tree(proc, timeout=5.0) async def run_loop(self): """Main orchestration loop.""" self.is_running = True # Start debug logging session (clears previous logs) debug_log.start_session() # Log startup to debug file debug_log.section("ORCHESTRATOR STARTUP") debug_log.log("STARTUP", "Orchestrator run_loop starting", project_dir=str(self.project_dir), max_concurrency=self.max_concurrency, yolo_mode=self.yolo_mode, testing_agent_ratio=self.testing_agent_ratio, count_testing_in_concurrency=self.count_testing_in_concurrency) print("=" * 70, flush=True) print(" UNIFIED ORCHESTRATOR SETTINGS", flush=True) print("=" * 70, flush=True) print(f"Project: {self.project_dir}", flush=True) print(f"Max concurrency: {self.max_concurrency} coding agents", flush=True) print(f"YOLO mode: {self.yolo_mode}", flush=True) print(f"Testing agent ratio: {self.testing_agent_ratio} per coding agent", flush=True) print(f"Count testing in concurrency: {self.count_testing_in_concurrency}", flush=True) print("=" * 70, flush=True) print(flush=True) # Phase 1: Check if initialization needed if not has_features(self.project_dir): print("=" * 70, flush=True) print(" INITIALIZATION PHASE", flush=True) print("=" * 70, flush=True) print("No features found - running initializer agent first...", flush=True) print("NOTE: This may take 10-20+ minutes to generate features.", flush=True) print(flush=True) success = await self._run_initializer() if not success or not has_features(self.project_dir): print("ERROR: Initializer did not create features. Exiting.", flush=True) return print(flush=True) print("=" * 70, flush=True) print(" INITIALIZATION COMPLETE - Starting feature loop", flush=True) print("=" * 70, flush=True) print(flush=True) # CRITICAL: Recreate database connection after initializer subprocess commits # The initializer runs as a subprocess and commits to the database file. # SQLAlchemy may have stale connections or cached state. Disposing the old # engine and creating a fresh engine/session_maker ensures we see all the # newly created features. debug_log.section("INITIALIZATION COMPLETE") debug_log.log("INIT", "Disposing old database engine and creating fresh connection") print("[DEBUG] Recreating database connection after initialization...", flush=True) if self._engine is not None: self._engine.dispose() self._engine, self._session_maker = create_database(self.project_dir) # Debug: Show state immediately after initialization print("[DEBUG] Post-initialization state check:", flush=True) print(f"[DEBUG] max_concurrency={self.max_concurrency}", flush=True) print(f"[DEBUG] yolo_mode={self.yolo_mode}", flush=True) print(f"[DEBUG] testing_agent_ratio={self.testing_agent_ratio}", flush=True) # Verify features were created and are visible session = self.get_session() try: feature_count = session.query(Feature).count() all_features = session.query(Feature).all() feature_names = [f"{f.id}: {f.name}" for f in all_features[:10]] print(f"[DEBUG] features in database={feature_count}", flush=True) debug_log.log("INIT", "Post-initialization database state", max_concurrency=self.max_concurrency, yolo_mode=self.yolo_mode, testing_agent_ratio=self.testing_agent_ratio, feature_count=feature_count, first_10_features=feature_names) finally: session.close() # Phase 2: Feature loop # Check for features to resume from previous session resumable = self.get_resumable_features() if resumable: print(f"Found {len(resumable)} feature(s) to resume from previous session:", flush=True) for f in resumable: print(f" - Feature #{f['id']}: {f['name']}", flush=True) print(flush=True) debug_log.section("FEATURE LOOP STARTING") loop_iteration = 0 while self.is_running: loop_iteration += 1 if loop_iteration <= 3: print(f"[DEBUG] === Loop iteration {loop_iteration} ===", flush=True) # Log every iteration to debug file (first 10, then every 5th) if loop_iteration <= 10 or loop_iteration % 5 == 0: with self._lock: running_ids = list(self.running_coding_agents.keys()) testing_count = len(self.running_testing_agents) debug_log.log("LOOP", f"Iteration {loop_iteration}", running_coding_agents=running_ids, running_testing_agents=testing_count, max_concurrency=self.max_concurrency) # Full database dump every 5 iterations if loop_iteration == 1 or loop_iteration % 5 == 0: session = self.get_session() try: _dump_database_state(session, f"(iteration {loop_iteration})") finally: session.close() try: # Check if all complete if self.get_all_complete(): print("\nAll features complete!", flush=True) break # Check capacity with self._lock: current = len(self.running_coding_agents) current_testing = len(self.running_testing_agents) running_ids = list(self.running_coding_agents.keys()) debug_log.log("CAPACITY", "Checking capacity", current_coding=current, current_testing=current_testing, running_coding_ids=running_ids, max_concurrency=self.max_concurrency, at_capacity=(current >= self.max_concurrency)) if current >= self.max_concurrency: debug_log.log("CAPACITY", "At max capacity, sleeping...") await asyncio.sleep(POLL_INTERVAL) continue # Priority 1: Resume features from previous session resumable = self.get_resumable_features() if resumable: slots = self.max_concurrency - current for feature in resumable[:slots]: print(f"Resuming feature #{feature['id']}: {feature['name']}", flush=True) self.start_feature(feature["id"], resume=True) await asyncio.sleep(2) continue # Priority 2: Start new ready features ready = self.get_ready_features() if not ready: # Wait for running features to complete if current > 0: await asyncio.sleep(POLL_INTERVAL) continue else: # No ready features and nothing running # Force a fresh database check before declaring blocked # This handles the case where subprocess commits weren't visible yet session = self.get_session() try: session.expire_all() finally: session.close() # Recheck if all features are now complete if self.get_all_complete(): print("\nAll features complete!", flush=True) break # Still have pending features but all are blocked by dependencies print("No ready features available. All remaining features may be blocked by dependencies.", flush=True) await asyncio.sleep(POLL_INTERVAL * 2) continue # Start features up to capacity slots = self.max_concurrency - current print(f"[DEBUG] Spawning loop: {len(ready)} ready, {slots} slots available, max_concurrency={self.max_concurrency}", flush=True) print(f"[DEBUG] Will attempt to start {min(len(ready), slots)} features", flush=True) features_to_start = ready[:slots] print(f"[DEBUG] Features to start: {[f['id'] for f in features_to_start]}", flush=True) debug_log.log("SPAWN", "Starting features batch", ready_count=len(ready), slots_available=slots, features_to_start=[f['id'] for f in features_to_start]) for i, feature in enumerate(features_to_start): print(f"[DEBUG] Starting feature {i+1}/{len(features_to_start)}: #{feature['id']} - {feature['name']}", flush=True) success, msg = self.start_feature(feature["id"]) if not success: print(f"[DEBUG] Failed to start feature #{feature['id']}: {msg}", flush=True) debug_log.log("SPAWN", f"FAILED to start feature #{feature['id']}", feature_name=feature['name'], error=msg) else: print(f"[DEBUG] Successfully started feature #{feature['id']}", flush=True) with self._lock: running_count = len(self.running_coding_agents) print(f"[DEBUG] Running coding agents after start: {running_count}", flush=True) debug_log.log("SPAWN", f"Successfully started feature #{feature['id']}", feature_name=feature['name'], running_coding_agents=running_count) await asyncio.sleep(2) # Brief pause between starts except Exception as e: print(f"Orchestrator error: {e}", flush=True) await asyncio.sleep(POLL_INTERVAL) # Wait for remaining agents to complete print("Waiting for running agents to complete...", flush=True) while True: with self._lock: coding_done = len(self.running_coding_agents) == 0 testing_done = len(self.running_testing_agents) == 0 if coding_done and testing_done: break await asyncio.sleep(1) print("Orchestrator finished.", flush=True) def get_status(self) -> dict: """Get current orchestrator status.""" with self._lock: return { "running_features": list(self.running_coding_agents.keys()), "coding_agent_count": len(self.running_coding_agents), "testing_agent_count": len(self.running_testing_agents), "count": len(self.running_coding_agents), # Legacy compatibility "max_concurrency": self.max_concurrency, "testing_agent_ratio": self.testing_agent_ratio, "count_testing_in_concurrency": self.count_testing_in_concurrency, "is_running": self.is_running, "yolo_mode": self.yolo_mode, } async def run_parallel_orchestrator( project_dir: Path, max_concurrency: int = DEFAULT_CONCURRENCY, model: str = None, yolo_mode: bool = False, testing_agent_ratio: int = 1, count_testing_in_concurrency: bool = False, ) -> None: """Run the unified orchestrator. Args: project_dir: Path to the project directory max_concurrency: Maximum number of concurrent coding agents model: Claude model to use yolo_mode: Whether to run in YOLO mode (skip testing agents) testing_agent_ratio: Testing agents per coding agent (0-3) count_testing_in_concurrency: If True, testing agents count toward concurrency limit """ print(f"[ORCHESTRATOR] run_parallel_orchestrator called with max_concurrency={max_concurrency}", flush=True) orchestrator = ParallelOrchestrator( project_dir=project_dir, max_concurrency=max_concurrency, model=model, yolo_mode=yolo_mode, testing_agent_ratio=testing_agent_ratio, count_testing_in_concurrency=count_testing_in_concurrency, ) try: await orchestrator.run_loop() except KeyboardInterrupt: print("\n\nInterrupted by user. Stopping agents...", flush=True) orchestrator.stop_all() def main(): """Main entry point for parallel orchestration.""" import argparse from dotenv import load_dotenv from registry import DEFAULT_MODEL, get_project_path load_dotenv() parser = argparse.ArgumentParser( description="Parallel Feature Orchestrator - Run multiple agent instances", ) parser.add_argument( "--project-dir", type=str, required=True, help="Project directory path (absolute) or registered project name", ) parser.add_argument( "--max-concurrency", "-p", type=int, default=DEFAULT_CONCURRENCY, help=f"Maximum concurrent agents (1-{MAX_PARALLEL_AGENTS}, default: {DEFAULT_CONCURRENCY})", ) parser.add_argument( "--model", type=str, default=DEFAULT_MODEL, help=f"Claude model to use (default: {DEFAULT_MODEL})", ) parser.add_argument( "--yolo", action="store_true", default=False, help="Enable YOLO mode: rapid prototyping without browser testing", ) args = parser.parse_args() # Resolve project directory project_dir_input = args.project_dir project_dir = Path(project_dir_input) if project_dir.is_absolute(): if not project_dir.exists(): print(f"Error: Project directory does not exist: {project_dir}", flush=True) sys.exit(1) else: registered_path = get_project_path(project_dir_input) if registered_path: project_dir = registered_path else: print(f"Error: Project '{project_dir_input}' not found in registry", flush=True) sys.exit(1) try: asyncio.run(run_parallel_orchestrator( project_dir=project_dir, max_concurrency=args.max_concurrency, model=args.model, yolo_mode=args.yolo, )) except KeyboardInterrupt: print("\n\nInterrupted by user", flush=True) if __name__ == "__main__": main()