mirror of
https://github.com/leonvanzyl/autocoder.git
synced 2026-01-30 06:12:06 +00:00
Merge remote-tracking branch 'origin/master'
Resolved conflicts by combining: - stdin=DEVNULL and CREATE_NO_WINDOW (blocking fix) - PYTHONUNBUFFERED env var (output buffering fix) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -2,11 +2,19 @@
|
||||
Parallel Orchestrator
|
||||
=====================
|
||||
|
||||
Coordinates parallel execution of independent features using multiple agent processes.
|
||||
Unified orchestrator that handles all agent lifecycle:
|
||||
- Initialization: Creates features from app_spec if needed
|
||||
- Coding agents: Implement features one at a time
|
||||
- Testing agents: Regression test passing features (optional)
|
||||
|
||||
Uses dependency-aware scheduling to ensure features are only started when their
|
||||
dependencies are satisfied.
|
||||
|
||||
Usage:
|
||||
# Entry point (always uses orchestrator)
|
||||
python autonomous_agent_demo.py --project-dir my-app --concurrency 3
|
||||
|
||||
# Direct orchestrator usage
|
||||
python parallel_orchestrator.py --project-dir my-app --max-concurrency 3
|
||||
"""
|
||||
|
||||
@@ -15,22 +23,88 @@ import os
|
||||
import subprocess
|
||||
import sys
|
||||
import threading
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Callable
|
||||
from typing import Callable, Literal
|
||||
|
||||
import psutil
|
||||
|
||||
from api.database import Feature, create_database
|
||||
from api.dependency_resolver import are_dependencies_satisfied, compute_scheduling_scores
|
||||
from progress import has_features
|
||||
|
||||
# Root directory of autocoder (where this script and autonomous_agent_demo.py live)
|
||||
AUTOCODER_ROOT = Path(__file__).parent.resolve()
|
||||
|
||||
# Debug log file path
|
||||
DEBUG_LOG_FILE = AUTOCODER_ROOT / "orchestrator_debug.log"
|
||||
|
||||
|
||||
class DebugLogger:
|
||||
"""Thread-safe debug logger that writes to a file."""
|
||||
|
||||
def __init__(self, log_file: Path = DEBUG_LOG_FILE):
|
||||
self.log_file = log_file
|
||||
self._lock = threading.Lock()
|
||||
self._session_started = False
|
||||
# DON'T clear on import - only mark session start when run_loop begins
|
||||
|
||||
def start_session(self):
|
||||
"""Mark the start of a new orchestrator session. Clears previous logs."""
|
||||
with self._lock:
|
||||
self._session_started = True
|
||||
with open(self.log_file, "w") as f:
|
||||
f.write(f"=== Orchestrator Debug Log Started: {datetime.now().isoformat()} ===\n")
|
||||
f.write(f"=== PID: {os.getpid()} ===\n\n")
|
||||
|
||||
def log(self, category: str, message: str, **kwargs):
|
||||
"""Write a timestamped log entry."""
|
||||
timestamp = datetime.now().strftime("%H:%M:%S.%f")[:-3]
|
||||
with self._lock:
|
||||
with open(self.log_file, "a") as f:
|
||||
f.write(f"[{timestamp}] [{category}] {message}\n")
|
||||
for key, value in kwargs.items():
|
||||
f.write(f" {key}: {value}\n")
|
||||
f.write("\n")
|
||||
|
||||
def section(self, title: str):
|
||||
"""Write a section header."""
|
||||
with self._lock:
|
||||
with open(self.log_file, "a") as f:
|
||||
f.write(f"\n{'='*60}\n")
|
||||
f.write(f" {title}\n")
|
||||
f.write(f"{'='*60}\n\n")
|
||||
|
||||
|
||||
# Global debug logger instance
|
||||
debug_log = DebugLogger()
|
||||
|
||||
|
||||
def _dump_database_state(session, label: str = ""):
|
||||
"""Helper to dump full database state to debug log."""
|
||||
from api.database import Feature
|
||||
all_features = session.query(Feature).all()
|
||||
|
||||
passing = [f for f in all_features if f.passes]
|
||||
in_progress = [f for f in all_features if f.in_progress and not f.passes]
|
||||
pending = [f for f in all_features if not f.passes and not f.in_progress]
|
||||
|
||||
debug_log.log("DB_DUMP", f"Full database state {label}",
|
||||
total_features=len(all_features),
|
||||
passing_count=len(passing),
|
||||
passing_ids=[f.id for f in passing],
|
||||
in_progress_count=len(in_progress),
|
||||
in_progress_ids=[f.id for f in in_progress],
|
||||
pending_count=len(pending),
|
||||
pending_ids=[f.id for f in pending[:10]]) # First 10 pending only
|
||||
|
||||
# Performance: Limit parallel agents to prevent memory exhaustion
|
||||
MAX_PARALLEL_AGENTS = 5
|
||||
MAX_TOTAL_AGENTS = 10 # Hard limit on total agents (coding + testing)
|
||||
DEFAULT_CONCURRENCY = 3
|
||||
POLL_INTERVAL = 5 # seconds between checking for ready features
|
||||
MAX_FEATURE_RETRIES = 3 # Maximum times to retry a failed feature
|
||||
INITIALIZER_TIMEOUT = 1800 # 30 minutes timeout for initializer
|
||||
|
||||
|
||||
def _kill_process_tree(proc: subprocess.Popen, timeout: float = 5.0) -> None:
|
||||
@@ -95,6 +169,8 @@ class ParallelOrchestrator:
|
||||
max_concurrency: int = DEFAULT_CONCURRENCY,
|
||||
model: str = None,
|
||||
yolo_mode: bool = False,
|
||||
testing_agent_ratio: int = 1,
|
||||
count_testing_in_concurrency: bool = False,
|
||||
on_output: Callable[[int, str], None] = None,
|
||||
on_status: Callable[[int, str], None] = None,
|
||||
):
|
||||
@@ -102,9 +178,11 @@ class ParallelOrchestrator:
|
||||
|
||||
Args:
|
||||
project_dir: Path to the project directory
|
||||
max_concurrency: Maximum number of concurrent agents (1-5)
|
||||
max_concurrency: Maximum number of concurrent coding agents (1-5)
|
||||
model: Claude model to use (or None for default)
|
||||
yolo_mode: Whether to run in YOLO mode (skip browser testing)
|
||||
yolo_mode: Whether to run in YOLO mode (skip testing agents)
|
||||
testing_agent_ratio: Testing agents per coding agent (0-3, default 1)
|
||||
count_testing_in_concurrency: If True, testing agents count toward concurrency limit
|
||||
on_output: Callback for agent output (feature_id, line)
|
||||
on_status: Callback for agent status changes (feature_id, status)
|
||||
"""
|
||||
@@ -112,12 +190,19 @@ class ParallelOrchestrator:
|
||||
self.max_concurrency = min(max(max_concurrency, 1), MAX_PARALLEL_AGENTS)
|
||||
self.model = model
|
||||
self.yolo_mode = yolo_mode
|
||||
self.testing_agent_ratio = min(max(testing_agent_ratio, 0), 3) # Clamp 0-3
|
||||
self.count_testing_in_concurrency = count_testing_in_concurrency
|
||||
self.on_output = on_output
|
||||
self.on_status = on_status
|
||||
|
||||
# Thread-safe state
|
||||
self._lock = threading.Lock()
|
||||
self.running_agents: dict[int, subprocess.Popen] = {}
|
||||
# Coding agents: feature_id -> process
|
||||
self.running_coding_agents: dict[int, subprocess.Popen] = {}
|
||||
# Testing agents: list of processes (not tied to specific features)
|
||||
self.running_testing_agents: list[subprocess.Popen] = []
|
||||
# Legacy alias for backward compatibility
|
||||
self.running_agents = self.running_coding_agents
|
||||
self.abort_events: dict[int, threading.Event] = {}
|
||||
self.is_running = False
|
||||
|
||||
@@ -154,7 +239,7 @@ class ParallelOrchestrator:
|
||||
for f in stale:
|
||||
# Skip if already running in this orchestrator instance
|
||||
with self._lock:
|
||||
if f.id in self.running_agents:
|
||||
if f.id in self.running_coding_agents:
|
||||
continue
|
||||
# Skip if feature has failed too many times
|
||||
if self._failure_counts.get(f.id, 0) >= MAX_FEATURE_RETRIES:
|
||||
@@ -181,19 +266,28 @@ class ParallelOrchestrator:
|
||||
all_dicts = [f.to_dict() for f in all_features]
|
||||
|
||||
ready = []
|
||||
skipped_reasons = {"passes": 0, "in_progress": 0, "running": 0, "failed": 0, "deps": 0}
|
||||
for f in all_features:
|
||||
if f.passes or f.in_progress:
|
||||
if f.passes:
|
||||
skipped_reasons["passes"] += 1
|
||||
continue
|
||||
if f.in_progress:
|
||||
skipped_reasons["in_progress"] += 1
|
||||
continue
|
||||
# Skip if already running in this orchestrator
|
||||
with self._lock:
|
||||
if f.id in self.running_agents:
|
||||
if f.id in self.running_coding_agents:
|
||||
skipped_reasons["running"] += 1
|
||||
continue
|
||||
# Skip if feature has failed too many times
|
||||
if self._failure_counts.get(f.id, 0) >= MAX_FEATURE_RETRIES:
|
||||
skipped_reasons["failed"] += 1
|
||||
continue
|
||||
# Check dependencies
|
||||
if are_dependencies_satisfied(f.to_dict(), all_dicts):
|
||||
ready.append(f.to_dict())
|
||||
else:
|
||||
skipped_reasons["deps"] += 1
|
||||
|
||||
# Sort by scheduling score (higher = first), then priority, then id
|
||||
scores = compute_scheduling_scores(all_dicts)
|
||||
@@ -207,12 +301,30 @@ class ParallelOrchestrator:
|
||||
f"{passing} passing, {in_progress} in_progress, {len(all_features)} total",
|
||||
flush=True
|
||||
)
|
||||
print(
|
||||
f"[DEBUG] Skipped: {skipped_reasons['passes']} passing, {skipped_reasons['in_progress']} in_progress, "
|
||||
f"{skipped_reasons['running']} running, {skipped_reasons['failed']} failed, {skipped_reasons['deps']} blocked by deps",
|
||||
flush=True
|
||||
)
|
||||
|
||||
# Log to debug file (but not every call to avoid spam)
|
||||
debug_log.log("READY", "get_ready_features() called",
|
||||
ready_count=len(ready),
|
||||
ready_ids=[f['id'] for f in ready[:5]], # First 5 only
|
||||
passing=passing,
|
||||
in_progress=in_progress,
|
||||
total=len(all_features),
|
||||
skipped=skipped_reasons)
|
||||
|
||||
return ready
|
||||
finally:
|
||||
session.close()
|
||||
|
||||
def get_all_complete(self) -> bool:
|
||||
"""Check if all features are complete or permanently failed."""
|
||||
"""Check if all features are complete or permanently failed.
|
||||
|
||||
Returns False if there are no features (initialization needed).
|
||||
"""
|
||||
session = self.get_session()
|
||||
try:
|
||||
# Force fresh read from database to avoid stale cached data
|
||||
@@ -220,6 +332,11 @@ class ParallelOrchestrator:
|
||||
session.expire_all()
|
||||
|
||||
all_features = session.query(Feature).all()
|
||||
|
||||
# No features = NOT complete, need initialization
|
||||
if len(all_features) == 0:
|
||||
return False
|
||||
|
||||
passing_count = 0
|
||||
failed_count = 0
|
||||
pending_count = 0
|
||||
@@ -243,8 +360,17 @@ class ParallelOrchestrator:
|
||||
finally:
|
||||
session.close()
|
||||
|
||||
def get_passing_count(self) -> int:
|
||||
"""Get the number of passing features."""
|
||||
session = self.get_session()
|
||||
try:
|
||||
session.expire_all()
|
||||
return session.query(Feature).filter(Feature.passes == True).count()
|
||||
finally:
|
||||
session.close()
|
||||
|
||||
def start_feature(self, feature_id: int, resume: bool = False) -> tuple[bool, str]:
|
||||
"""Start a single feature agent.
|
||||
"""Start a single coding agent for a feature.
|
||||
|
||||
Args:
|
||||
feature_id: ID of the feature to start
|
||||
@@ -254,9 +380,9 @@ class ParallelOrchestrator:
|
||||
Tuple of (success, message)
|
||||
"""
|
||||
with self._lock:
|
||||
if feature_id in self.running_agents:
|
||||
if feature_id in self.running_coding_agents:
|
||||
return False, "Feature already running"
|
||||
if len(self.running_agents) >= self.max_concurrency:
|
||||
if len(self.running_coding_agents) >= self.max_concurrency:
|
||||
return False, "At max concurrency"
|
||||
|
||||
# Mark as in_progress in database (or verify it's resumable)
|
||||
@@ -281,6 +407,19 @@ class ParallelOrchestrator:
|
||||
finally:
|
||||
session.close()
|
||||
|
||||
# Start coding agent subprocess
|
||||
success, message = self._spawn_coding_agent(feature_id)
|
||||
if not success:
|
||||
return False, message
|
||||
|
||||
# NOTE: Testing agents are spawned in _on_agent_complete() after a coding agent
|
||||
# succeeds, not here. This ensures we only spawn testing agents when there are
|
||||
# actually passing features to test.
|
||||
|
||||
return True, f"Started feature {feature_id}"
|
||||
|
||||
def _spawn_coding_agent(self, feature_id: int) -> tuple[bool, str]:
|
||||
"""Spawn a coding agent subprocess for a specific feature."""
|
||||
# Create abort event
|
||||
abort_event = threading.Event()
|
||||
|
||||
@@ -290,8 +429,9 @@ class ParallelOrchestrator:
|
||||
"-u", # Force unbuffered stdout/stderr
|
||||
str(AUTOCODER_ROOT / "autonomous_agent_demo.py"),
|
||||
"--project-dir", str(self.project_dir),
|
||||
"--max-iterations", "1", # Single feature mode
|
||||
"--feature-id", str(feature_id), # Work on this specific feature only
|
||||
"--max-iterations", "1",
|
||||
"--agent-type", "coding",
|
||||
"--feature-id", str(feature_id),
|
||||
]
|
||||
if self.model:
|
||||
cmd.extend(["--model", self.model])
|
||||
@@ -300,6 +440,7 @@ class ParallelOrchestrator:
|
||||
|
||||
try:
|
||||
# CREATE_NO_WINDOW on Windows prevents console window pop-ups
|
||||
# stdin=DEVNULL prevents blocking on stdin reads
|
||||
popen_kwargs = {
|
||||
"stdin": subprocess.DEVNULL,
|
||||
"stdout": subprocess.PIPE,
|
||||
@@ -325,23 +466,157 @@ class ParallelOrchestrator:
|
||||
return False, f"Failed to start agent: {e}"
|
||||
|
||||
with self._lock:
|
||||
self.running_agents[feature_id] = proc
|
||||
self.running_coding_agents[feature_id] = proc
|
||||
self.abort_events[feature_id] = abort_event
|
||||
|
||||
# Start output reader thread
|
||||
threading.Thread(
|
||||
target=self._read_output,
|
||||
args=(feature_id, proc, abort_event),
|
||||
args=(feature_id, proc, abort_event, "coding"),
|
||||
daemon=True
|
||||
).start()
|
||||
|
||||
if self.on_status:
|
||||
self.on_status(feature_id, "running")
|
||||
|
||||
print(f"Started agent for feature #{feature_id}", flush=True)
|
||||
print(f"Started coding agent for feature #{feature_id}", flush=True)
|
||||
return True, f"Started feature {feature_id}"
|
||||
|
||||
def _read_output(self, feature_id: int, proc: subprocess.Popen, abort: threading.Event):
|
||||
def _spawn_testing_agents(self) -> None:
|
||||
"""Spawn testing agents based on testing_agent_ratio."""
|
||||
for _ in range(self.testing_agent_ratio):
|
||||
# Check resource limits
|
||||
with self._lock:
|
||||
total_agents = len(self.running_coding_agents) + len(self.running_testing_agents)
|
||||
if total_agents >= MAX_TOTAL_AGENTS:
|
||||
print(f"[DEBUG] At max total agents ({MAX_TOTAL_AGENTS}), skipping testing agent", flush=True)
|
||||
break
|
||||
|
||||
if self.count_testing_in_concurrency:
|
||||
if total_agents >= self.max_concurrency:
|
||||
print("[DEBUG] Testing agents count toward concurrency, at limit", flush=True)
|
||||
break
|
||||
|
||||
# Spawn a testing agent
|
||||
self._spawn_testing_agent()
|
||||
|
||||
def _spawn_testing_agent(self) -> tuple[bool, str]:
|
||||
"""Spawn a testing agent subprocess for regression testing."""
|
||||
debug_log.log("TESTING", "Attempting to spawn testing agent subprocess")
|
||||
|
||||
cmd = [
|
||||
sys.executable,
|
||||
"-u",
|
||||
str(AUTOCODER_ROOT / "autonomous_agent_demo.py"),
|
||||
"--project-dir", str(self.project_dir),
|
||||
"--max-iterations", "1",
|
||||
"--agent-type", "testing",
|
||||
]
|
||||
if self.model:
|
||||
cmd.extend(["--model", self.model])
|
||||
# Testing agents don't need --yolo flag (they use testing prompt regardless)
|
||||
|
||||
try:
|
||||
proc = subprocess.Popen(
|
||||
cmd,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.STDOUT,
|
||||
text=True,
|
||||
cwd=str(AUTOCODER_ROOT),
|
||||
env={**os.environ, "PYTHONUNBUFFERED": "1"},
|
||||
)
|
||||
except Exception as e:
|
||||
debug_log.log("TESTING", f"FAILED to spawn testing agent: {e}")
|
||||
return False, f"Failed to start testing agent: {e}"
|
||||
|
||||
with self._lock:
|
||||
self.running_testing_agents.append(proc)
|
||||
testing_count = len(self.running_testing_agents)
|
||||
|
||||
# Start output reader thread (feature_id=None for testing agents)
|
||||
threading.Thread(
|
||||
target=self._read_output,
|
||||
args=(None, proc, threading.Event(), "testing"),
|
||||
daemon=True
|
||||
).start()
|
||||
|
||||
print(f"Started testing agent (PID {proc.pid})", flush=True)
|
||||
debug_log.log("TESTING", "Successfully spawned testing agent",
|
||||
pid=proc.pid,
|
||||
total_testing_agents=testing_count)
|
||||
return True, "Started testing agent"
|
||||
|
||||
async def _run_initializer(self) -> bool:
|
||||
"""Run initializer agent as blocking subprocess.
|
||||
|
||||
Returns True if initialization succeeded (features were created).
|
||||
"""
|
||||
debug_log.section("INITIALIZER PHASE")
|
||||
debug_log.log("INIT", "Starting initializer subprocess",
|
||||
project_dir=str(self.project_dir))
|
||||
|
||||
cmd = [
|
||||
sys.executable, "-u",
|
||||
str(AUTOCODER_ROOT / "autonomous_agent_demo.py"),
|
||||
"--project-dir", str(self.project_dir),
|
||||
"--agent-type", "initializer",
|
||||
"--max-iterations", "1",
|
||||
]
|
||||
if self.model:
|
||||
cmd.extend(["--model", self.model])
|
||||
|
||||
print("Running initializer agent...", flush=True)
|
||||
|
||||
proc = subprocess.Popen(
|
||||
cmd,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.STDOUT,
|
||||
text=True,
|
||||
cwd=str(AUTOCODER_ROOT),
|
||||
env={**os.environ, "PYTHONUNBUFFERED": "1"},
|
||||
)
|
||||
|
||||
debug_log.log("INIT", "Initializer subprocess started", pid=proc.pid)
|
||||
|
||||
# Stream output with timeout
|
||||
loop = asyncio.get_running_loop()
|
||||
try:
|
||||
async def stream_output():
|
||||
while True:
|
||||
line = await loop.run_in_executor(None, proc.stdout.readline)
|
||||
if not line:
|
||||
break
|
||||
print(line.rstrip(), flush=True)
|
||||
if self.on_output:
|
||||
self.on_output(0, line.rstrip()) # Use 0 as feature_id for initializer
|
||||
proc.wait()
|
||||
|
||||
await asyncio.wait_for(stream_output(), timeout=INITIALIZER_TIMEOUT)
|
||||
|
||||
except asyncio.TimeoutError:
|
||||
print(f"ERROR: Initializer timed out after {INITIALIZER_TIMEOUT // 60} minutes", flush=True)
|
||||
debug_log.log("INIT", "TIMEOUT - Initializer exceeded time limit",
|
||||
timeout_minutes=INITIALIZER_TIMEOUT // 60)
|
||||
_kill_process_tree(proc)
|
||||
return False
|
||||
|
||||
debug_log.log("INIT", "Initializer subprocess completed",
|
||||
return_code=proc.returncode,
|
||||
success=proc.returncode == 0)
|
||||
|
||||
if proc.returncode != 0:
|
||||
print(f"ERROR: Initializer failed with exit code {proc.returncode}", flush=True)
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def _read_output(
|
||||
self,
|
||||
feature_id: int | None,
|
||||
proc: subprocess.Popen,
|
||||
abort: threading.Event,
|
||||
agent_type: Literal["coding", "testing"] = "coding",
|
||||
):
|
||||
"""Read output from subprocess and emit events."""
|
||||
try:
|
||||
for line in proc.stdout:
|
||||
@@ -349,34 +624,93 @@ class ParallelOrchestrator:
|
||||
break
|
||||
line = line.rstrip()
|
||||
if self.on_output:
|
||||
self.on_output(feature_id, line)
|
||||
self.on_output(feature_id or 0, line)
|
||||
else:
|
||||
print(f"[Feature #{feature_id}] {line}", flush=True)
|
||||
if agent_type == "testing":
|
||||
print(f"[Testing] {line}", flush=True)
|
||||
else:
|
||||
print(f"[Feature #{feature_id}] {line}", flush=True)
|
||||
proc.wait()
|
||||
finally:
|
||||
self._on_feature_complete(feature_id, proc.returncode)
|
||||
self._on_agent_complete(feature_id, proc.returncode, agent_type, proc)
|
||||
|
||||
def _on_feature_complete(self, feature_id: int, return_code: int):
|
||||
"""Handle feature completion.
|
||||
def _on_agent_complete(
|
||||
self,
|
||||
feature_id: int | None,
|
||||
return_code: int,
|
||||
agent_type: Literal["coding", "testing"],
|
||||
proc: subprocess.Popen,
|
||||
):
|
||||
"""Handle agent completion.
|
||||
|
||||
ALWAYS clears in_progress when agent exits, regardless of success/failure.
|
||||
This prevents features from getting stuck if an agent crashes or is killed.
|
||||
The agent marks features as passing BEFORE clearing in_progress, so this
|
||||
is safe - we won't accidentally clear a feature that's being worked on.
|
||||
For coding agents:
|
||||
- ALWAYS clears in_progress when agent exits, regardless of success/failure.
|
||||
- This prevents features from getting stuck if an agent crashes or is killed.
|
||||
- The agent marks features as passing BEFORE clearing in_progress, so this
|
||||
is safe.
|
||||
|
||||
For testing agents:
|
||||
- Just remove from the running list.
|
||||
"""
|
||||
if agent_type == "testing":
|
||||
with self._lock:
|
||||
if proc in self.running_testing_agents:
|
||||
self.running_testing_agents.remove(proc)
|
||||
|
||||
status = "completed" if return_code == 0 else "failed"
|
||||
print(f"Testing agent (PID {proc.pid}) {status}", flush=True)
|
||||
debug_log.log("COMPLETE", "Testing agent finished",
|
||||
pid=proc.pid,
|
||||
status=status)
|
||||
return
|
||||
|
||||
# Coding agent completion
|
||||
debug_log.log("COMPLETE", f"Coding agent for feature #{feature_id} finished",
|
||||
return_code=return_code,
|
||||
status="success" if return_code == 0 else "failed")
|
||||
|
||||
with self._lock:
|
||||
self.running_agents.pop(feature_id, None)
|
||||
self.running_coding_agents.pop(feature_id, None)
|
||||
self.abort_events.pop(feature_id, None)
|
||||
|
||||
# ALWAYS clear in_progress when agent exits to prevent stuck features
|
||||
# The agent marks features as passing before clearing in_progress,
|
||||
# so if in_progress is still True here, the feature didn't complete successfully
|
||||
# BEFORE dispose: Query database state to see if it's stale
|
||||
session_before = self.get_session()
|
||||
try:
|
||||
session_before.expire_all()
|
||||
feature_before = session_before.query(Feature).filter(Feature.id == feature_id).first()
|
||||
all_before = session_before.query(Feature).all()
|
||||
passing_before = sum(1 for f in all_before if f.passes)
|
||||
debug_log.log("DB", f"BEFORE engine.dispose() - Feature #{feature_id} state",
|
||||
passes=feature_before.passes if feature_before else None,
|
||||
in_progress=feature_before.in_progress if feature_before else None,
|
||||
total_passing_in_db=passing_before)
|
||||
finally:
|
||||
session_before.close()
|
||||
|
||||
# CRITICAL: Refresh database connection to see subprocess commits
|
||||
# The coding agent runs as a subprocess and commits changes (e.g., passes=True).
|
||||
# SQLAlchemy may have stale connections. Disposing the engine forces new connections
|
||||
# that will see the subprocess's committed changes.
|
||||
debug_log.log("DB", "Disposing database engine now...")
|
||||
self._engine.dispose()
|
||||
|
||||
# AFTER dispose: Query again to compare
|
||||
session = self.get_session()
|
||||
try:
|
||||
feature = session.query(Feature).filter(Feature.id == feature_id).first()
|
||||
all_after = session.query(Feature).all()
|
||||
passing_after = sum(1 for f in all_after if f.passes)
|
||||
feature_passes = feature.passes if feature else None
|
||||
feature_in_progress = feature.in_progress if feature else None
|
||||
debug_log.log("DB", f"AFTER engine.dispose() - Feature #{feature_id} state",
|
||||
passes=feature_passes,
|
||||
in_progress=feature_in_progress,
|
||||
total_passing_in_db=passing_after,
|
||||
passing_changed=(passing_after != passing_before) if 'passing_before' in dir() else "unknown")
|
||||
if feature and feature.in_progress and not feature.passes:
|
||||
feature.in_progress = False
|
||||
session.commit()
|
||||
debug_log.log("DB", f"Cleared in_progress for feature #{feature_id} (agent failed)")
|
||||
finally:
|
||||
session.close()
|
||||
|
||||
@@ -387,6 +721,8 @@ class ParallelOrchestrator:
|
||||
failure_count = self._failure_counts[feature_id]
|
||||
if failure_count >= MAX_FEATURE_RETRIES:
|
||||
print(f"Feature #{feature_id} has failed {failure_count} times, will not retry", flush=True)
|
||||
debug_log.log("COMPLETE", f"Feature #{feature_id} exceeded max retries",
|
||||
failure_count=failure_count)
|
||||
|
||||
status = "completed" if return_code == 0 else "failed"
|
||||
if self.on_status:
|
||||
@@ -394,14 +730,32 @@ class ParallelOrchestrator:
|
||||
# CRITICAL: This print triggers the WebSocket to emit agent_update with state='error' or 'success'
|
||||
print(f"Feature #{feature_id} {status}", flush=True)
|
||||
|
||||
# Spawn testing agents after successful coding agent completion
|
||||
# This is the correct place to spawn testing agents - after we know there are
|
||||
# passing features (the one this agent just completed, plus any previous ones)
|
||||
if return_code == 0 and not self.yolo_mode and self.testing_agent_ratio > 0:
|
||||
passing_count = self.get_passing_count()
|
||||
print(f"[DEBUG] Coding agent completed successfully, passing_count={passing_count}", flush=True)
|
||||
debug_log.log("TESTING", "Checking if testing agents should spawn",
|
||||
yolo_mode=self.yolo_mode,
|
||||
testing_agent_ratio=self.testing_agent_ratio,
|
||||
passing_count=passing_count)
|
||||
if passing_count > 0:
|
||||
print(f"[DEBUG] Spawning testing agents (ratio={self.testing_agent_ratio})", flush=True)
|
||||
debug_log.log("TESTING", f"Spawning {self.testing_agent_ratio} testing agent(s)")
|
||||
self._spawn_testing_agents()
|
||||
elif return_code == 0:
|
||||
debug_log.log("TESTING", "Skipping testing agents",
|
||||
reason="yolo_mode" if self.yolo_mode else f"ratio={self.testing_agent_ratio}")
|
||||
|
||||
def stop_feature(self, feature_id: int) -> tuple[bool, str]:
|
||||
"""Stop a running feature agent and all its child processes."""
|
||||
"""Stop a running coding agent and all its child processes."""
|
||||
with self._lock:
|
||||
if feature_id not in self.running_agents:
|
||||
if feature_id not in self.running_coding_agents:
|
||||
return False, "Feature not running"
|
||||
|
||||
abort = self.abort_events.get(feature_id)
|
||||
proc = self.running_agents.get(feature_id)
|
||||
proc = self.running_coding_agents.get(feature_id)
|
||||
|
||||
if abort:
|
||||
abort.set()
|
||||
@@ -412,22 +766,106 @@ class ParallelOrchestrator:
|
||||
return True, f"Stopped feature {feature_id}"
|
||||
|
||||
def stop_all(self) -> None:
|
||||
"""Stop all running feature agents."""
|
||||
"""Stop all running agents (coding and testing)."""
|
||||
self.is_running = False
|
||||
|
||||
# Stop coding agents
|
||||
with self._lock:
|
||||
feature_ids = list(self.running_agents.keys())
|
||||
feature_ids = list(self.running_coding_agents.keys())
|
||||
|
||||
for fid in feature_ids:
|
||||
self.stop_feature(fid)
|
||||
|
||||
# Stop testing agents
|
||||
with self._lock:
|
||||
testing_procs = list(self.running_testing_agents)
|
||||
|
||||
for proc in testing_procs:
|
||||
_kill_process_tree(proc, timeout=5.0)
|
||||
|
||||
async def run_loop(self):
|
||||
"""Main orchestration loop."""
|
||||
self.is_running = True
|
||||
|
||||
print(f"Starting parallel orchestrator with max_concurrency={self.max_concurrency}", flush=True)
|
||||
# Start debug logging session (clears previous logs)
|
||||
debug_log.start_session()
|
||||
|
||||
# Log startup to debug file
|
||||
debug_log.section("ORCHESTRATOR STARTUP")
|
||||
debug_log.log("STARTUP", "Orchestrator run_loop starting",
|
||||
project_dir=str(self.project_dir),
|
||||
max_concurrency=self.max_concurrency,
|
||||
yolo_mode=self.yolo_mode,
|
||||
testing_agent_ratio=self.testing_agent_ratio,
|
||||
count_testing_in_concurrency=self.count_testing_in_concurrency)
|
||||
|
||||
print("=" * 70, flush=True)
|
||||
print(" UNIFIED ORCHESTRATOR SETTINGS", flush=True)
|
||||
print("=" * 70, flush=True)
|
||||
print(f"Project: {self.project_dir}", flush=True)
|
||||
print(f"Max concurrency: {self.max_concurrency} coding agents", flush=True)
|
||||
print(f"YOLO mode: {self.yolo_mode}", flush=True)
|
||||
print(f"Testing agent ratio: {self.testing_agent_ratio} per coding agent", flush=True)
|
||||
print(f"Count testing in concurrency: {self.count_testing_in_concurrency}", flush=True)
|
||||
print("=" * 70, flush=True)
|
||||
print(flush=True)
|
||||
|
||||
# Phase 1: Check if initialization needed
|
||||
if not has_features(self.project_dir):
|
||||
print("=" * 70, flush=True)
|
||||
print(" INITIALIZATION PHASE", flush=True)
|
||||
print("=" * 70, flush=True)
|
||||
print("No features found - running initializer agent first...", flush=True)
|
||||
print("NOTE: This may take 10-20+ minutes to generate features.", flush=True)
|
||||
print(flush=True)
|
||||
|
||||
success = await self._run_initializer()
|
||||
|
||||
if not success or not has_features(self.project_dir):
|
||||
print("ERROR: Initializer did not create features. Exiting.", flush=True)
|
||||
return
|
||||
|
||||
print(flush=True)
|
||||
print("=" * 70, flush=True)
|
||||
print(" INITIALIZATION COMPLETE - Starting feature loop", flush=True)
|
||||
print("=" * 70, flush=True)
|
||||
print(flush=True)
|
||||
|
||||
# CRITICAL: Recreate database connection after initializer subprocess commits
|
||||
# The initializer runs as a subprocess and commits to the database file.
|
||||
# SQLAlchemy may have stale connections or cached state. Disposing the old
|
||||
# engine and creating a fresh engine/session_maker ensures we see all the
|
||||
# newly created features.
|
||||
debug_log.section("INITIALIZATION COMPLETE")
|
||||
debug_log.log("INIT", "Disposing old database engine and creating fresh connection")
|
||||
print("[DEBUG] Recreating database connection after initialization...", flush=True)
|
||||
if self._engine is not None:
|
||||
self._engine.dispose()
|
||||
self._engine, self._session_maker = create_database(self.project_dir)
|
||||
|
||||
# Debug: Show state immediately after initialization
|
||||
print("[DEBUG] Post-initialization state check:", flush=True)
|
||||
print(f"[DEBUG] max_concurrency={self.max_concurrency}", flush=True)
|
||||
print(f"[DEBUG] yolo_mode={self.yolo_mode}", flush=True)
|
||||
print(f"[DEBUG] testing_agent_ratio={self.testing_agent_ratio}", flush=True)
|
||||
|
||||
# Verify features were created and are visible
|
||||
session = self.get_session()
|
||||
try:
|
||||
feature_count = session.query(Feature).count()
|
||||
all_features = session.query(Feature).all()
|
||||
feature_names = [f"{f.id}: {f.name}" for f in all_features[:10]]
|
||||
print(f"[DEBUG] features in database={feature_count}", flush=True)
|
||||
debug_log.log("INIT", "Post-initialization database state",
|
||||
max_concurrency=self.max_concurrency,
|
||||
yolo_mode=self.yolo_mode,
|
||||
testing_agent_ratio=self.testing_agent_ratio,
|
||||
feature_count=feature_count,
|
||||
first_10_features=feature_names)
|
||||
finally:
|
||||
session.close()
|
||||
|
||||
# Phase 2: Feature loop
|
||||
# Check for features to resume from previous session
|
||||
resumable = self.get_resumable_features()
|
||||
if resumable:
|
||||
@@ -436,7 +874,31 @@ class ParallelOrchestrator:
|
||||
print(f" - Feature #{f['id']}: {f['name']}", flush=True)
|
||||
print(flush=True)
|
||||
|
||||
debug_log.section("FEATURE LOOP STARTING")
|
||||
loop_iteration = 0
|
||||
while self.is_running:
|
||||
loop_iteration += 1
|
||||
if loop_iteration <= 3:
|
||||
print(f"[DEBUG] === Loop iteration {loop_iteration} ===", flush=True)
|
||||
|
||||
# Log every iteration to debug file (first 10, then every 5th)
|
||||
if loop_iteration <= 10 or loop_iteration % 5 == 0:
|
||||
with self._lock:
|
||||
running_ids = list(self.running_coding_agents.keys())
|
||||
testing_count = len(self.running_testing_agents)
|
||||
debug_log.log("LOOP", f"Iteration {loop_iteration}",
|
||||
running_coding_agents=running_ids,
|
||||
running_testing_agents=testing_count,
|
||||
max_concurrency=self.max_concurrency)
|
||||
|
||||
# Full database dump every 5 iterations
|
||||
if loop_iteration == 1 or loop_iteration % 5 == 0:
|
||||
session = self.get_session()
|
||||
try:
|
||||
_dump_database_state(session, f"(iteration {loop_iteration})")
|
||||
finally:
|
||||
session.close()
|
||||
|
||||
try:
|
||||
# Check if all complete
|
||||
if self.get_all_complete():
|
||||
@@ -445,8 +907,19 @@ class ParallelOrchestrator:
|
||||
|
||||
# Check capacity
|
||||
with self._lock:
|
||||
current = len(self.running_agents)
|
||||
current = len(self.running_coding_agents)
|
||||
current_testing = len(self.running_testing_agents)
|
||||
running_ids = list(self.running_coding_agents.keys())
|
||||
|
||||
debug_log.log("CAPACITY", "Checking capacity",
|
||||
current_coding=current,
|
||||
current_testing=current_testing,
|
||||
running_coding_ids=running_ids,
|
||||
max_concurrency=self.max_concurrency,
|
||||
at_capacity=(current >= self.max_concurrency))
|
||||
|
||||
if current >= self.max_concurrency:
|
||||
debug_log.log("CAPACITY", "At max capacity, sleeping...")
|
||||
await asyncio.sleep(POLL_INTERVAL)
|
||||
continue
|
||||
|
||||
@@ -489,9 +962,32 @@ class ParallelOrchestrator:
|
||||
|
||||
# Start features up to capacity
|
||||
slots = self.max_concurrency - current
|
||||
for feature in ready[:slots]:
|
||||
print(f"Starting feature #{feature['id']}: {feature['name']}", flush=True)
|
||||
self.start_feature(feature["id"])
|
||||
print(f"[DEBUG] Spawning loop: {len(ready)} ready, {slots} slots available, max_concurrency={self.max_concurrency}", flush=True)
|
||||
print(f"[DEBUG] Will attempt to start {min(len(ready), slots)} features", flush=True)
|
||||
features_to_start = ready[:slots]
|
||||
print(f"[DEBUG] Features to start: {[f['id'] for f in features_to_start]}", flush=True)
|
||||
|
||||
debug_log.log("SPAWN", "Starting features batch",
|
||||
ready_count=len(ready),
|
||||
slots_available=slots,
|
||||
features_to_start=[f['id'] for f in features_to_start])
|
||||
|
||||
for i, feature in enumerate(features_to_start):
|
||||
print(f"[DEBUG] Starting feature {i+1}/{len(features_to_start)}: #{feature['id']} - {feature['name']}", flush=True)
|
||||
success, msg = self.start_feature(feature["id"])
|
||||
if not success:
|
||||
print(f"[DEBUG] Failed to start feature #{feature['id']}: {msg}", flush=True)
|
||||
debug_log.log("SPAWN", f"FAILED to start feature #{feature['id']}",
|
||||
feature_name=feature['name'],
|
||||
error=msg)
|
||||
else:
|
||||
print(f"[DEBUG] Successfully started feature #{feature['id']}", flush=True)
|
||||
with self._lock:
|
||||
running_count = len(self.running_coding_agents)
|
||||
print(f"[DEBUG] Running coding agents after start: {running_count}", flush=True)
|
||||
debug_log.log("SPAWN", f"Successfully started feature #{feature['id']}",
|
||||
feature_name=feature['name'],
|
||||
running_coding_agents=running_count)
|
||||
|
||||
await asyncio.sleep(2) # Brief pause between starts
|
||||
|
||||
@@ -503,7 +999,9 @@ class ParallelOrchestrator:
|
||||
print("Waiting for running agents to complete...", flush=True)
|
||||
while True:
|
||||
with self._lock:
|
||||
if not self.running_agents:
|
||||
coding_done = len(self.running_coding_agents) == 0
|
||||
testing_done = len(self.running_testing_agents) == 0
|
||||
if coding_done and testing_done:
|
||||
break
|
||||
await asyncio.sleep(1)
|
||||
|
||||
@@ -513,10 +1011,15 @@ class ParallelOrchestrator:
|
||||
"""Get current orchestrator status."""
|
||||
with self._lock:
|
||||
return {
|
||||
"running_features": list(self.running_agents.keys()),
|
||||
"count": len(self.running_agents),
|
||||
"running_features": list(self.running_coding_agents.keys()),
|
||||
"coding_agent_count": len(self.running_coding_agents),
|
||||
"testing_agent_count": len(self.running_testing_agents),
|
||||
"count": len(self.running_coding_agents), # Legacy compatibility
|
||||
"max_concurrency": self.max_concurrency,
|
||||
"testing_agent_ratio": self.testing_agent_ratio,
|
||||
"count_testing_in_concurrency": self.count_testing_in_concurrency,
|
||||
"is_running": self.is_running,
|
||||
"yolo_mode": self.yolo_mode,
|
||||
}
|
||||
|
||||
|
||||
@@ -525,20 +1028,27 @@ async def run_parallel_orchestrator(
|
||||
max_concurrency: int = DEFAULT_CONCURRENCY,
|
||||
model: str = None,
|
||||
yolo_mode: bool = False,
|
||||
testing_agent_ratio: int = 1,
|
||||
count_testing_in_concurrency: bool = False,
|
||||
) -> None:
|
||||
"""Run the parallel orchestrator.
|
||||
"""Run the unified orchestrator.
|
||||
|
||||
Args:
|
||||
project_dir: Path to the project directory
|
||||
max_concurrency: Maximum number of concurrent agents
|
||||
max_concurrency: Maximum number of concurrent coding agents
|
||||
model: Claude model to use
|
||||
yolo_mode: Whether to run in YOLO mode
|
||||
yolo_mode: Whether to run in YOLO mode (skip testing agents)
|
||||
testing_agent_ratio: Testing agents per coding agent (0-3)
|
||||
count_testing_in_concurrency: If True, testing agents count toward concurrency limit
|
||||
"""
|
||||
print(f"[ORCHESTRATOR] run_parallel_orchestrator called with max_concurrency={max_concurrency}", flush=True)
|
||||
orchestrator = ParallelOrchestrator(
|
||||
project_dir=project_dir,
|
||||
max_concurrency=max_concurrency,
|
||||
model=model,
|
||||
yolo_mode=yolo_mode,
|
||||
testing_agent_ratio=testing_agent_ratio,
|
||||
count_testing_in_concurrency=count_testing_in_concurrency,
|
||||
)
|
||||
|
||||
try:
|
||||
|
||||
Reference in New Issue
Block a user