diff --git a/.claude/templates/testing_prompt.template.md b/.claude/templates/testing_prompt.template.md index c6c8447..aa8f54f 100644 --- a/.claude/templates/testing_prompt.template.md +++ b/.claude/templates/testing_prompt.template.md @@ -40,15 +40,19 @@ chmod +x init.sh Otherwise, start servers manually. -### STEP 3: GET A FEATURE TO TEST +### STEP 3: CLAIM A FEATURE TO TEST -Request ONE passing feature for regression testing: +Atomically claim ONE passing feature for regression testing: ``` -Use the feature_get_for_regression tool with limit=1 +Use the feature_claim_for_testing tool ``` -This returns a random feature that is currently marked as passing. Your job is to verify it still works. +This atomically claims a random passing feature that: +- Is not being worked on by coding agents +- Is not already being tested by another testing agent + +**CRITICAL:** You MUST call `feature_release_testing` when done, regardless of pass/fail. ### STEP 4: VERIFY THE FEATURE @@ -83,9 +87,12 @@ Use browser automation tools: #### If the feature PASSES: -The feature still works correctly. Simply confirm this and end your session: +The feature still works correctly. Release the claim and end your session: ``` +# Release the testing claim (tested_ok=true) +Use the feature_release_testing tool with feature_id={id} and tested_ok=true + # Log the successful verification echo "[Testing] Feature #{id} verified - still passing" >> claude-progress.txt ``` @@ -120,7 +127,13 @@ A regression has been introduced. You MUST fix it: Use the feature_mark_passing tool with feature_id={id} ``` -6. **Commit the fix:** +6. **Release the testing claim:** + ``` + Use the feature_release_testing tool with feature_id={id} and tested_ok=false + ``` + Note: tested_ok=false because we found a regression (even though we fixed it). + +7. **Commit the fix:** ```bash git add . git commit -m "Fix regression in [feature name] @@ -144,7 +157,9 @@ echo "[Testing] Session complete - verified/fixed feature #{id}" >> claude-progr ### Feature Management - `feature_get_stats` - Get progress overview (passing/in_progress/total counts) -- `feature_get_for_regression` - Get a random passing feature to test +- `feature_claim_for_testing` - **USE THIS** - Atomically claim a feature for testing +- `feature_release_testing` - **REQUIRED** - Release claim after testing (pass tested_ok=true/false) +- `feature_get_for_regression` - (Legacy) Get random passing features without claiming - `feature_mark_failing` - Mark a feature as failing (when you find a regression) - `feature_mark_passing` - Mark a feature as passing (after fixing a regression) @@ -176,12 +191,18 @@ All interaction tools have **built-in auto-wait** - no manual timeouts needed. - Visual appearance correct - API calls succeed +**CRITICAL - Always release your claim:** +- Call `feature_release_testing` when done, whether pass or fail +- Pass `tested_ok=true` if the feature passed +- Pass `tested_ok=false` if you found a regression + **If you find a regression:** 1. Mark the feature as failing immediately 2. Fix the issue 3. Verify the fix with browser automation 4. Mark as passing only after thorough verification -5. Commit the fix +5. Release the testing claim with `tested_ok=false` +6. Commit the fix **You have one iteration.** Focus on testing ONE feature thoroughly. diff --git a/CLAUDE.md b/CLAUDE.md index a6857db..eba3bd7 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -215,6 +215,62 @@ When running with `--parallel`, the orchestrator: 4. Browser contexts are isolated per agent using `--isolated` flag 5. AgentTracker parses output and emits `agent_update` messages for UI +### Process Limits (Parallel Mode) + +The orchestrator enforces strict bounds on concurrent processes: +- `MAX_PARALLEL_AGENTS = 5` - Maximum concurrent coding agents +- `MAX_TOTAL_AGENTS = 10` - Hard limit on total agents (coding + testing) +- Testing agents are capped at `max_concurrency` (same as coding agents) + +**Expected process count during normal operation:** +- 1 orchestrator process +- Up to 5 coding agents +- Up to 5 testing agents +- Total: never exceeds 11 Python processes + +**Stress Test Verification:** + +```bash +# Windows - verify process bounds +# 1. Note baseline count +tasklist | findstr python | find /c /v "" + +# 2. Start parallel agent (max concurrency) +python autonomous_agent_demo.py --project-dir test --parallel --max-concurrency 5 + +# 3. During run - should NEVER exceed baseline + 11 +tasklist | findstr python | find /c /v "" + +# 4. After stop via UI - should return to baseline +tasklist | findstr python | find /c /v "" +``` + +```bash +# macOS/Linux - verify process bounds +# 1. Note baseline count +pgrep -c python + +# 2. Start parallel agent +python autonomous_agent_demo.py --project-dir test --parallel --max-concurrency 5 + +# 3. During run - should NEVER exceed baseline + 11 +pgrep -c python + +# 4. After stop - should return to baseline +pgrep -c python +``` + +**Log Verification:** + +```bash +# Check spawn vs completion balance +grep "Started testing agent" orchestrator_debug.log | wc -l +grep "Testing agent.*completed\|failed" orchestrator_debug.log | wc -l + +# Watch for cap enforcement messages +grep "at max testing agents\|At max total agents" orchestrator_debug.log +``` + ### Design System The UI uses a **neobrutalism** design with Tailwind CSS v4: diff --git a/api/database.py b/api/database.py index 271b7cc..fd82847 100644 --- a/api/database.py +++ b/api/database.py @@ -50,6 +50,10 @@ class Feature(Base): # Dependencies: list of feature IDs that must be completed before this feature # NULL/empty = no dependencies (backwards compatible) dependencies = Column(JSON, nullable=True, default=None) + # Regression testing: prevent concurrent testing of the same feature + testing_in_progress = Column(Boolean, nullable=False, default=False, index=True) + # Last time this feature was tested (for session-based regression tracking) + last_tested_at = Column(DateTime, nullable=True, default=None) def to_dict(self) -> dict: """Convert feature to dictionary for JSON serialization.""" @@ -65,6 +69,9 @@ class Feature(Base): "in_progress": self.in_progress if self.in_progress is not None else False, # Dependencies: NULL/empty treated as empty list for backwards compat "dependencies": self.dependencies if self.dependencies else [], + # Regression testing fields + "testing_in_progress": self.testing_in_progress if self.testing_in_progress is not None else False, + "last_tested_at": self.last_tested_at.isoformat() if self.last_tested_at else None, } def get_dependencies_safe(self) -> list[int]: @@ -225,6 +232,26 @@ def _migrate_add_dependencies_column(engine) -> None: conn.commit() +def _migrate_add_testing_columns(engine) -> None: + """Add testing_in_progress and last_tested_at columns for regression testing. + + These columns support atomic claiming of features for regression testing + and tracking when features were last tested in a session. + """ + with engine.connect() as conn: + # Check existing columns + result = conn.execute(text("PRAGMA table_info(features)")) + columns = [row[1] for row in result.fetchall()] + + if "testing_in_progress" not in columns: + conn.execute(text("ALTER TABLE features ADD COLUMN testing_in_progress BOOLEAN DEFAULT 0")) + conn.commit() + + if "last_tested_at" not in columns: + conn.execute(text("ALTER TABLE features ADD COLUMN last_tested_at DATETIME DEFAULT NULL")) + conn.commit() + + def _is_network_path(path: Path) -> bool: """Detect if path is on a network filesystem. @@ -341,6 +368,7 @@ def create_database(project_dir: Path) -> tuple: _migrate_add_in_progress_column(engine) _migrate_fix_null_boolean_fields(engine) _migrate_add_dependencies_column(engine) + _migrate_add_testing_columns(engine) # Migrate to add schedules tables _migrate_add_schedules_tables(engine) diff --git a/autonomous_agent_demo.py b/autonomous_agent_demo.py index abe8992..f841775 100644 --- a/autonomous_agent_demo.py +++ b/autonomous_agent_demo.py @@ -149,13 +149,6 @@ Authentication: help="Testing agents per coding agent (0-3, default: 1). Set to 0 to disable testing agents.", ) - parser.add_argument( - "--count-testing", - action="store_true", - default=False, - help="Count testing agents toward concurrency limit (default: false)", - ) - return parser.parse_args() @@ -222,7 +215,6 @@ def main() -> None: model=args.model, yolo_mode=args.yolo, testing_agent_ratio=args.testing_ratio, - count_testing_in_concurrency=args.count_testing, ) ) except KeyboardInterrupt: diff --git a/mcp_server/feature_mcp.py b/mcp_server/feature_mcp.py index e46403b..fbcfc30 100755 --- a/mcp_server/feature_mcp.py +++ b/mcp_server/feature_mcp.py @@ -26,12 +26,14 @@ import sys import threading import time as _time from contextlib import asynccontextmanager +from datetime import datetime, timezone from pathlib import Path from typing import Annotated from mcp.server.fastmcp import FastMCP from pydantic import BaseModel, Field from sqlalchemy import text +from sqlalchemy.exc import OperationalError from sqlalchemy.sql.expression import func # Add parent directory to path so we can import from api module @@ -224,112 +226,119 @@ def feature_get_next() -> str: # Maximum retry attempts for feature claiming under contention -MAX_CLAIM_RETRIES = 10 +MAX_CLAIM_RETRIES = 5 -def _feature_claim_next_internal(attempt: int = 0) -> str: - """Internal implementation of feature claiming with retry tracking. +def _feature_claim_next_internal() -> str: + """Internal implementation of feature claiming with iterative retry. - Args: - attempt: Current retry attempt (0-indexed) + Uses an iterative loop instead of recursion to avoid double session.close() + issues and deep call stacks under contention. Returns: JSON with claimed feature details, or error message if no feature available. """ - if attempt >= MAX_CLAIM_RETRIES: - return json.dumps({ - "error": "Failed to claim feature after maximum retries", - "hint": "High contention detected - try again or reduce parallel agents" - }) + for attempt in range(MAX_CLAIM_RETRIES): + session = get_session() + try: + # Use a lock to prevent concurrent claims within this process + with _priority_lock: + all_features = session.query(Feature).all() + all_feature_ids = {f.id for f in all_features} + passing_ids = {f.id for f in all_features if f.passes} - session = get_session() - try: - # Use a lock to prevent concurrent claims within this process - with _priority_lock: - all_features = session.query(Feature).all() - all_feature_ids = {f.id for f in all_features} - passing_ids = {f.id for f in all_features if f.passes} + # Get pending, non-in-progress features + pending = [f for f in all_features if not f.passes and not f.in_progress] - # Get pending, non-in-progress features - pending = [f for f in all_features if not f.passes and not f.in_progress] + # Sort by scheduling score (higher = first), then priority, then id + all_dicts = [f.to_dict() for f in all_features] + scores = compute_scheduling_scores(all_dicts) + pending.sort(key=lambda f: (-scores.get(f.id, 0), f.priority, f.id)) - # Sort by scheduling score (higher = first), then priority, then id - all_dicts = [f.to_dict() for f in all_features] - scores = compute_scheduling_scores(all_dicts) - pending.sort(key=lambda f: (-scores.get(f.id, 0), f.priority, f.id)) + if not pending: + if any(f.in_progress for f in all_features if not f.passes): + return json.dumps({"error": "All pending features are in progress by other agents"}) + return json.dumps({"error": "All features are passing! No more work to do."}) - if not pending: - if any(f.in_progress for f in all_features if not f.passes): - return json.dumps({"error": "All pending features are in progress by other agents"}) - return json.dumps({"error": "All features are passing! No more work to do."}) - - # Find first feature with satisfied dependencies - candidate_id = None - for feature in pending: - deps = feature.dependencies or [] - # Filter out orphaned dependencies (IDs that no longer exist) - valid_deps = [d for d in deps if d in all_feature_ids] - if all(dep_id in passing_ids for dep_id in valid_deps): - candidate_id = feature.id - break - - if candidate_id is None: - # All pending features are blocked by unmet dependencies - blocking_info = [] - for feature in pending[:3]: + # Find first feature with satisfied dependencies + candidate_id = None + for feature in pending: deps = feature.dependencies or [] + # Filter out orphaned dependencies (IDs that no longer exist) valid_deps = [d for d in deps if d in all_feature_ids] - orphaned = [d for d in deps if d not in all_feature_ids] - unmet = [d for d in valid_deps if d not in passing_ids] - info = f"#{feature.id} '{feature.name}'" - if unmet: - info += f" blocked by: {unmet}" - if orphaned: - info += f" (orphaned deps ignored: {orphaned})" - blocking_info.append(info) + if all(dep_id in passing_ids for dep_id in valid_deps): + candidate_id = feature.id + break - return json.dumps({ - "error": "All pending features are blocked by unmet dependencies", - "blocked_features": len(pending), - "examples": blocking_info, - "hint": "Complete the blocking dependencies first, or remove invalid dependencies" - }, indent=2) + if candidate_id is None: + # All pending features are blocked by unmet dependencies + blocking_info = [] + for feature in pending[:3]: + deps = feature.dependencies or [] + valid_deps = [d for d in deps if d in all_feature_ids] + orphaned = [d for d in deps if d not in all_feature_ids] + unmet = [d for d in valid_deps if d not in passing_ids] + info = f"#{feature.id} '{feature.name}'" + if unmet: + info += f" blocked by: {unmet}" + if orphaned: + info += f" (orphaned deps ignored: {orphaned})" + blocking_info.append(info) - # Atomic claim: UPDATE only if still claimable - # This prevents race conditions even across processes - result = session.execute( - text(""" - UPDATE features - SET in_progress = 1 - WHERE id = :feature_id - AND in_progress = 0 - AND passes = 0 - """), - {"feature_id": candidate_id} - ) - session.commit() + return json.dumps({ + "error": "All pending features are blocked by unmet dependencies", + "blocked_features": len(pending), + "examples": blocking_info, + "hint": "Complete the blocking dependencies first, or remove invalid dependencies" + }, indent=2) - # Check if we actually claimed it - if result.rowcount == 0: - # Another process claimed it first - retry with backoff - session.close() - # Exponential backoff with jitter: base 0.1s, 0.2s, 0.4s, ... up to 1.0s - # Jitter of up to 30% prevents synchronized retries under high contention - backoff = min(0.1 * (2 ** attempt), 1.0) - jitter = random.uniform(0, backoff * 0.3) - _time.sleep(backoff + jitter) - return _feature_claim_next_internal(attempt + 1) + # Atomic claim: UPDATE only if still claimable + # This prevents race conditions even across processes + result = session.execute( + text(""" + UPDATE features + SET in_progress = 1 + WHERE id = :feature_id + AND in_progress = 0 + AND passes = 0 + """), + {"feature_id": candidate_id} + ) + session.commit() - # Fetch the claimed feature - session.expire_all() # Clear cache to get fresh data - claimed_feature = session.query(Feature).filter(Feature.id == candidate_id).first() - return json.dumps(claimed_feature.to_dict(), indent=2) + # Check if we actually claimed it + if result.rowcount == 0: + # Another process claimed it first - will retry after backoff + pass # Fall through to finally block, then retry loop + else: + # Successfully claimed - fetch and return + session.expire_all() # Clear cache to get fresh data + claimed_feature = session.query(Feature).filter(Feature.id == candidate_id).first() + return json.dumps(claimed_feature.to_dict(), indent=2) - except Exception as e: - session.rollback() - return json.dumps({"error": f"Failed to claim feature: {str(e)}"}) - finally: - session.close() + except OperationalError: + # Transient database error (e.g., SQLITE_BUSY) - rollback and retry + session.rollback() + # Fall through to backoff and retry + except Exception as e: + # Non-transient error - fail immediately + session.rollback() + return json.dumps({"error": f"Failed to claim feature: {str(e)}"}) + finally: + session.close() + + # Exponential backoff with jitter before next attempt + # Base 0.1s, 0.2s, 0.4s, 0.8s, 1.0s (capped) + # Jitter of up to 30% prevents synchronized retries under high contention + backoff = min(0.1 * (2 ** attempt), 1.0) + jitter = random.uniform(0, backoff * 0.3) + _time.sleep(backoff + jitter) + + # Exhausted all retries + return json.dumps({ + "error": "Failed to claim feature after maximum retries", + "hint": "High contention detected - try again or reduce parallel agents" + }) @mcp.tool() @@ -346,11 +355,12 @@ def feature_claim_next() -> str: 3. All dependency IDs actually exist (orphaned dependencies are ignored) On success, the feature's in_progress flag is set to True. + Uses exponential backoff retry (up to 5 attempts) under contention. Returns: JSON with claimed feature details, or error message if no feature available. """ - return _feature_claim_next_internal(attempt=0) + return _feature_claim_next_internal() @mcp.tool() @@ -389,6 +399,156 @@ def feature_get_for_regression( session.close() +def _feature_claim_for_testing_internal() -> str: + """Internal implementation of testing feature claim with iterative retry. + + Uses an iterative loop instead of recursion to avoid double session.close() + issues and deep call stacks under contention. + + Returns: + JSON with claimed feature details, or message if no features available. + """ + for attempt in range(MAX_CLAIM_RETRIES): + session = get_session() + try: + # Use lock to prevent concurrent claims within this process + with _priority_lock: + # Find a candidate feature + candidate = ( + session.query(Feature) + .filter(Feature.passes == True) + .filter(Feature.in_progress == False) + .filter(Feature.testing_in_progress == False) + .order_by(func.random()) + .first() + ) + + if not candidate: + return json.dumps({ + "message": "No features available for testing", + "hint": "All passing features are either being coded or tested" + }) + + # Atomic claim using UPDATE with WHERE clause + # This prevents race conditions even across processes + result = session.execute( + text(""" + UPDATE features + SET testing_in_progress = 1 + WHERE id = :feature_id + AND passes = 1 + AND in_progress = 0 + AND testing_in_progress = 0 + """), + {"feature_id": candidate.id} + ) + session.commit() + + # Check if we actually claimed it + if result.rowcount == 0: + # Another process claimed it first - will retry after backoff + pass # Fall through to finally block, then retry loop + else: + # Successfully claimed - fetch and return + session.expire_all() + claimed = session.query(Feature).filter(Feature.id == candidate.id).first() + return json.dumps(claimed.to_dict(), indent=2) + + except OperationalError: + # Transient database error (e.g., SQLITE_BUSY) - rollback and retry + session.rollback() + # Fall through to backoff and retry + except Exception as e: + # Non-transient error - fail immediately + session.rollback() + return json.dumps({"error": f"Failed to claim feature: {str(e)}"}) + finally: + session.close() + + # Exponential backoff with jitter before next attempt + backoff = min(0.1 * (2 ** attempt), 1.0) + jitter = random.uniform(0, backoff * 0.3) + _time.sleep(backoff + jitter) + + # Exhausted all retries + return json.dumps({ + "error": "Failed to claim feature after maximum retries", + "hint": "High contention detected - try again or reduce testing agents" + }) + + +@mcp.tool() +def feature_claim_for_testing() -> str: + """Atomically claim a passing feature for regression testing. + + Returns a random passing feature that is: + - Currently passing (passes=True) + - Not being worked on by coding agents (in_progress=False) + - Not already being tested (testing_in_progress=False) + + The feature's testing_in_progress flag is set to True atomically to prevent + other testing agents from claiming the same feature. Uses exponential backoff + retry (up to 5 attempts) under contention. + + After testing, you MUST call feature_release_testing() to release the claim. + + Returns: + JSON with feature details if available, or message if no features available. + """ + return _feature_claim_for_testing_internal() + + +@mcp.tool() +def feature_release_testing( + feature_id: Annotated[int, Field(description="The ID of the feature to release", ge=1)], + tested_ok: Annotated[bool, Field(description="True if the feature passed testing, False if regression found")] = True +) -> str: + """Release a feature after regression testing completes. + + Clears the testing_in_progress flag and updates last_tested_at timestamp. + + This should be called after testing is complete, whether the feature + passed or failed. If tested_ok=False, the feature was marked as failing + by a previous call to feature_mark_failing. + + Args: + feature_id: The ID of the feature that was being tested + tested_ok: True if testing passed, False if a regression was found + + Returns: + JSON with release confirmation or error message. + """ + session = get_session() + try: + feature = session.query(Feature).filter(Feature.id == feature_id).first() + + if feature is None: + return json.dumps({"error": f"Feature with ID {feature_id} not found"}) + + if not feature.testing_in_progress: + return json.dumps({ + "warning": f"Feature {feature_id} was not being tested", + "feature": feature.to_dict() + }) + + # Clear testing flag and update timestamp + feature.testing_in_progress = False + feature.last_tested_at = datetime.now(timezone.utc) + session.commit() + session.refresh(feature) + + status = "passed" if tested_ok else "failed (regression detected)" + return json.dumps({ + "message": f"Feature #{feature_id} testing {status}", + "feature": feature.to_dict() + }, indent=2) + except Exception as e: + session.rollback() + return json.dumps({"error": f"Failed to release testing claim: {str(e)}"}) + finally: + session.close() + + @mcp.tool() def feature_mark_passing( feature_id: Annotated[int, Field(description="The ID of the feature to mark as passing", ge=1)] @@ -417,6 +577,9 @@ def feature_mark_passing( session.refresh(feature) return json.dumps(feature.to_dict(), indent=2) + except Exception as e: + session.rollback() + return json.dumps({"error": f"Failed to mark feature passing: {str(e)}"}) finally: session.close() @@ -459,6 +622,9 @@ def feature_mark_failing( "message": f"Feature #{feature_id} marked as failing - regression detected", "feature": feature.to_dict() }, indent=2) + except Exception as e: + session.rollback() + return json.dumps({"error": f"Failed to mark feature failing: {str(e)}"}) finally: session.close() @@ -515,6 +681,9 @@ def feature_skip( "new_priority": new_priority, "message": f"Feature '{feature.name}' moved to end of queue" }, indent=2) + except Exception as e: + session.rollback() + return json.dumps({"error": f"Failed to skip feature: {str(e)}"}) finally: session.close() @@ -552,6 +721,9 @@ def feature_mark_in_progress( session.refresh(feature) return json.dumps(feature.to_dict(), indent=2) + except Exception as e: + session.rollback() + return json.dumps({"error": f"Failed to mark feature in-progress: {str(e)}"}) finally: session.close() @@ -583,6 +755,9 @@ def feature_clear_in_progress( session.refresh(feature) return json.dumps(feature.to_dict(), indent=2) + except Exception as e: + session.rollback() + return json.dumps({"error": f"Failed to clear in-progress status: {str(e)}"}) finally: session.close() @@ -807,6 +982,9 @@ def feature_add_dependency( "feature_id": feature_id, "dependencies": feature.dependencies }) + except Exception as e: + session.rollback() + return json.dumps({"error": f"Failed to add dependency: {str(e)}"}) finally: session.close() @@ -844,6 +1022,9 @@ def feature_remove_dependency( "feature_id": feature_id, "dependencies": feature.dependencies or [] }) + except Exception as e: + session.rollback() + return json.dumps({"error": f"Failed to remove dependency: {str(e)}"}) finally: session.close() @@ -1040,6 +1221,9 @@ def feature_set_dependencies( "feature_id": feature_id, "dependencies": feature.dependencies or [] }) + except Exception as e: + session.rollback() + return json.dumps({"error": f"Failed to set dependencies: {str(e)}"}) finally: session.close() diff --git a/parallel_orchestrator.py b/parallel_orchestrator.py index 5bbcba9..2414ac9 100644 --- a/parallel_orchestrator.py +++ b/parallel_orchestrator.py @@ -23,15 +23,16 @@ import os import subprocess import sys import threading -from datetime import datetime +from datetime import datetime, timedelta, timezone from pathlib import Path from typing import Callable, Literal -import psutil +from sqlalchemy import text from api.database import Feature, create_database from api.dependency_resolver import are_dependencies_satisfied, compute_scheduling_scores from progress import has_features +from server.utils.process_utils import kill_process_tree # Root directory of autocoder (where this script and autonomous_agent_demo.py live) AUTOCODER_ROOT = Path(__file__).parent.resolve() @@ -98,70 +99,44 @@ def _dump_database_state(session, label: str = ""): pending_count=len(pending), pending_ids=[f.id for f in pending[:10]]) # First 10 pending only -# Performance: Limit parallel agents to prevent memory exhaustion +# ============================================================================= +# Process Limits +# ============================================================================= +# These constants bound the number of concurrent agent processes to prevent +# resource exhaustion (memory, CPU, API rate limits). +# +# MAX_PARALLEL_AGENTS: Max concurrent coding agents (each is a Claude session) +# MAX_TOTAL_AGENTS: Hard limit on total child processes (coding + testing) +# +# Expected process count during normal operation: +# - 1 orchestrator process (this script) +# - Up to MAX_PARALLEL_AGENTS coding agents +# - Up to max_concurrency testing agents +# - Total never exceeds MAX_TOTAL_AGENTS + 1 (including orchestrator) +# +# Stress test verification: +# 1. Note baseline: tasklist | findstr python | find /c /v "" +# 2. Run: python autonomous_agent_demo.py --project-dir test --parallel --max-concurrency 5 +# 3. During run: count should never exceed baseline + 11 (1 orchestrator + 10 agents) +# 4. After stop: should return to baseline +# ============================================================================= MAX_PARALLEL_AGENTS = 5 -MAX_TOTAL_AGENTS = 10 # Hard limit on total agents (coding + testing) +MAX_TOTAL_AGENTS = 10 DEFAULT_CONCURRENCY = 3 POLL_INTERVAL = 5 # seconds between checking for ready features MAX_FEATURE_RETRIES = 3 # Maximum times to retry a failed feature INITIALIZER_TIMEOUT = 1800 # 30 minutes timeout for initializer - - -def _kill_process_tree(proc: subprocess.Popen, timeout: float = 5.0) -> None: - """Kill a process and all its child processes. - - On Windows, subprocess.terminate() only kills the immediate process, leaving - orphaned child processes (e.g., spawned browser instances). This function - uses psutil to kill the entire process tree. - - Args: - proc: The subprocess.Popen object to kill - timeout: Seconds to wait for graceful termination before force-killing - """ - try: - parent = psutil.Process(proc.pid) - # Get all children recursively before terminating - children = parent.children(recursive=True) - - # Terminate children first (graceful) - for child in children: - try: - child.terminate() - except psutil.NoSuchProcess: - pass - - # Wait for children to terminate - _, still_alive = psutil.wait_procs(children, timeout=timeout) - - # Force kill any remaining children - for child in still_alive: - try: - child.kill() - except psutil.NoSuchProcess: - pass - - # Now terminate the parent - proc.terminate() - try: - proc.wait(timeout=timeout) - except subprocess.TimeoutExpired: - proc.kill() - proc.wait() - - except psutil.NoSuchProcess: - # Process already dead, just ensure cleanup - try: - proc.terminate() - proc.wait(timeout=1) - except (subprocess.TimeoutExpired, OSError): - try: - proc.kill() - except OSError: - pass +STALE_TESTING_LOCK_MINUTES = 30 # Auto-release testing locks older than this class ParallelOrchestrator: - """Orchestrates parallel execution of independent features.""" + """Orchestrates parallel execution of independent features. + + Process bounds: + - Up to MAX_PARALLEL_AGENTS (5) coding agents concurrently + - Up to max_concurrency testing agents concurrently + - Hard limit of MAX_TOTAL_AGENTS (10) total child processes + """ def __init__( self, @@ -170,7 +145,6 @@ class ParallelOrchestrator: model: str = None, yolo_mode: bool = False, testing_agent_ratio: int = 1, - count_testing_in_concurrency: bool = False, on_output: Callable[[int, str], None] = None, on_status: Callable[[int, str], None] = None, ): @@ -178,11 +152,12 @@ class ParallelOrchestrator: Args: project_dir: Path to the project directory - max_concurrency: Maximum number of concurrent coding agents (1-5) + max_concurrency: Maximum number of concurrent coding agents (1-5). + Also caps testing agents at the same limit. model: Claude model to use (or None for default) - yolo_mode: Whether to run in YOLO mode (skip testing agents) - testing_agent_ratio: Testing agents per coding agent (0-3, default 1) - count_testing_in_concurrency: If True, testing agents count toward concurrency limit + yolo_mode: Whether to run in YOLO mode (skip testing agents entirely) + testing_agent_ratio: Number of regression testing agents to maintain (0-3). + 0 = disabled, 1-3 = maintain that many testing agents running independently. on_output: Callback for agent output (feature_id, line) on_status: Callback for agent status changes (feature_id, status) """ @@ -191,7 +166,6 @@ class ParallelOrchestrator: self.model = model self.yolo_mode = yolo_mode self.testing_agent_ratio = min(max(testing_agent_ratio, 0), 3) # Clamp 0-3 - self.count_testing_in_concurrency = count_testing_in_concurrency self.on_output = on_output self.on_status = on_status @@ -209,6 +183,9 @@ class ParallelOrchestrator: # Track feature failures to prevent infinite retry loops self._failure_counts: dict[int, int] = {} + # Session tracking for logging/debugging + self.session_start_time: datetime = None + # Database session for this orchestrator self._engine, self._session_maker = create_database(project_dir) @@ -369,6 +346,110 @@ class ParallelOrchestrator: finally: session.close() + def _cleanup_stale_testing_locks(self) -> None: + """Release stale testing locks from crashed testing agents. + + A feature is considered stale if: + - testing_in_progress=True AND + - last_tested_at is NOT NULL AND older than STALE_TESTING_LOCK_MINUTES + + Note: We do NOT release features with last_tested_at=NULL because that would + incorrectly release features that are legitimately in the middle of their + first test. The last_tested_at is only set when testing completes. + + This handles the case where a testing agent crashes mid-test, leaving + the feature locked until orchestrator restart. By checking periodically, + we can release these locks without requiring a restart. + """ + session = self.get_session() + try: + # Use timezone-aware UTC, then strip timezone for SQLite compatibility + # (SQLite stores datetimes as naive strings, but we want consistency with + # datetime.now(timezone.utc) used elsewhere in the codebase) + cutoff_time = (datetime.now(timezone.utc) - timedelta(minutes=STALE_TESTING_LOCK_MINUTES)).replace(tzinfo=None) + + # Find stale locks: testing_in_progress=True AND last_tested_at < cutoff + # Excludes NULL last_tested_at to avoid false positives on first-time tests + stale_features = ( + session.query(Feature) + .filter(Feature.testing_in_progress == True) + .filter(Feature.last_tested_at.isnot(None)) + .filter(Feature.last_tested_at < cutoff_time) + .all() + ) + + if stale_features: + stale_ids = [f.id for f in stale_features] + # Use ORM update instead of raw SQL for SQLite IN clause compatibility + session.query(Feature).filter(Feature.id.in_(stale_ids)).update( + {"testing_in_progress": False}, + synchronize_session=False + ) + session.commit() + print(f"[CLEANUP] Released {len(stale_ids)} stale testing locks: {stale_ids}", flush=True) + debug_log.log("CLEANUP", "Released stale testing locks", feature_ids=stale_ids) + except Exception as e: + session.rollback() + print(f"[CLEANUP] Error cleaning stale locks: {e}", flush=True) + debug_log.log("CLEANUP", f"Error cleaning stale locks: {e}") + finally: + session.close() + + def _maintain_testing_agents(self) -> None: + """Maintain the desired count of testing agents independently. + + This runs every loop iteration and spawns testing agents as needed to maintain + the configured testing_agent_ratio. Testing agents run independently from + coding agents and continuously re-test passing features to catch regressions. + + Also periodically releases stale testing locks (features stuck in + testing_in_progress=True for more than STALE_TESTING_LOCK_MINUTES). + + Stops spawning when: + - YOLO mode is enabled + - testing_agent_ratio is 0 + - No passing features exist yet + """ + # Skip if testing is disabled + if self.yolo_mode or self.testing_agent_ratio == 0: + return + + # Periodically clean up stale testing locks (features stuck mid-test due to crash) + # A feature is considered stale if testing_in_progress=True and last_tested_at + # is either NULL or older than STALE_TESTING_LOCK_MINUTES + self._cleanup_stale_testing_locks() + + # No testing until there are passing features + passing_count = self.get_passing_count() + if passing_count == 0: + return + + # Spawn testing agents one at a time, re-checking limits each time + # This avoids TOCTOU race by holding lock during the decision + while True: + # Check limits and decide whether to spawn (atomically) + with self._lock: + current_testing = len(self.running_testing_agents) + desired = self.testing_agent_ratio + total_agents = len(self.running_coding_agents) + current_testing + + # Check if we need more testing agents + if current_testing >= desired: + return # Already at desired count + + # Check hard limit on total agents + if total_agents >= MAX_TOTAL_AGENTS: + return # At max total agents + + # We're going to spawn - log while still holding lock + spawn_index = current_testing + 1 + debug_log.log("TESTING", f"Spawning testing agent ({spawn_index}/{desired})", + passing_count=passing_count) + + # Spawn outside lock (I/O bound operation) + print(f"[DEBUG] Spawning testing agent ({spawn_index}/{desired})", flush=True) + self._spawn_testing_agent() + def start_feature(self, feature_id: int, resume: bool = False) -> tuple[bool, str]: """Start a single coding agent for a feature. @@ -384,6 +465,10 @@ class ParallelOrchestrator: return False, "Feature already running" if len(self.running_coding_agents) >= self.max_concurrency: return False, "At max concurrency" + # Enforce hard limit on total agents (coding + testing) + total_agents = len(self.running_coding_agents) + len(self.running_testing_agents) + if total_agents >= MAX_TOTAL_AGENTS: + return False, f"At max total agents ({total_agents}/{MAX_TOTAL_AGENTS})" # Mark as in_progress in database (or verify it's resumable) session = self.get_session() @@ -412,16 +497,8 @@ class ParallelOrchestrator: if not success: return False, message - # Spawn ONE testing agent when coding agent STARTS (if not YOLO mode and passing features exist) - # Testing agents exit after one test, so we spawn fresh ones with each coding agent start - if not self.yolo_mode and self.testing_agent_ratio > 0: - passing_count = self.get_passing_count() - if passing_count > 0: - print(f"[DEBUG] Coding agent started, spawning testing agent (passing_count={passing_count})", flush=True) - debug_log.log("TESTING", "Spawning testing agent on coding agent start", - feature_id=feature_id, - passing_count=passing_count) - self._spawn_testing_agent() + # NOTE: Testing agents are now maintained independently via _maintain_testing_agents() + # called in the main loop, rather than being spawned when coding agents start. return True, f"Started feature {feature_id}" @@ -483,58 +560,58 @@ class ParallelOrchestrator: print(f"Started coding agent for feature #{feature_id}", flush=True) return True, f"Started feature {feature_id}" - def _spawn_testing_agents(self) -> None: - """Spawn testing agents based on testing_agent_ratio.""" - for _ in range(self.testing_agent_ratio): - # Check resource limits - with self._lock: - total_agents = len(self.running_coding_agents) + len(self.running_testing_agents) - if total_agents >= MAX_TOTAL_AGENTS: - print(f"[DEBUG] At max total agents ({MAX_TOTAL_AGENTS}), skipping testing agent", flush=True) - break - - if self.count_testing_in_concurrency: - if total_agents >= self.max_concurrency: - print("[DEBUG] Testing agents count toward concurrency, at limit", flush=True) - break - - # Spawn a testing agent - self._spawn_testing_agent() - def _spawn_testing_agent(self) -> tuple[bool, str]: - """Spawn a testing agent subprocess for regression testing.""" - debug_log.log("TESTING", "Attempting to spawn testing agent subprocess") - - cmd = [ - sys.executable, - "-u", - str(AUTOCODER_ROOT / "autonomous_agent_demo.py"), - "--project-dir", str(self.project_dir), - "--max-iterations", "1", - "--agent-type", "testing", - ] - if self.model: - cmd.extend(["--model", self.model]) - # Testing agents don't need --yolo flag (they use testing prompt regardless) - - try: - proc = subprocess.Popen( - cmd, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - text=True, - cwd=str(AUTOCODER_ROOT), - env={**os.environ, "PYTHONUNBUFFERED": "1"}, - ) - except Exception as e: - debug_log.log("TESTING", f"FAILED to spawn testing agent: {e}") - return False, f"Failed to start testing agent: {e}" + """Spawn a testing agent subprocess for regression testing. + CRITICAL: Lock is held during the entire spawn operation to prevent + TOCTOU race conditions where multiple threads could pass limit checks + and spawn excess agents. + """ + # Hold lock for entire operation to prevent TOCTOU race with self._lock: + # Check limits + current_testing_count = len(self.running_testing_agents) + if current_testing_count >= self.max_concurrency: + debug_log.log("TESTING", f"Skipped spawn - at max testing agents ({current_testing_count}/{self.max_concurrency})") + return False, f"At max testing agents ({current_testing_count})" + total_agents = len(self.running_coding_agents) + len(self.running_testing_agents) + if total_agents >= MAX_TOTAL_AGENTS: + debug_log.log("TESTING", f"Skipped spawn - at max total agents ({total_agents}/{MAX_TOTAL_AGENTS})") + return False, f"At max total agents ({total_agents})" + + debug_log.log("TESTING", "Attempting to spawn testing agent subprocess") + + cmd = [ + sys.executable, + "-u", + str(AUTOCODER_ROOT / "autonomous_agent_demo.py"), + "--project-dir", str(self.project_dir), + "--max-iterations", "1", + "--agent-type", "testing", + ] + if self.model: + cmd.extend(["--model", self.model]) + # Testing agents don't need --yolo flag (they use testing prompt regardless) + + try: + proc = subprocess.Popen( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + text=True, + cwd=str(AUTOCODER_ROOT), + env={**os.environ, "PYTHONUNBUFFERED": "1"}, + ) + except Exception as e: + debug_log.log("TESTING", f"FAILED to spawn testing agent: {e}") + return False, f"Failed to start testing agent: {e}" + + # Register process immediately while still holding lock self.running_testing_agents.append(proc) testing_count = len(self.running_testing_agents) # Start output reader thread (feature_id=None for testing agents) + # This can be outside lock since process is already registered threading.Thread( target=self._read_output, args=(None, proc, threading.Event(), "testing"), @@ -598,7 +675,9 @@ class ParallelOrchestrator: print(f"ERROR: Initializer timed out after {INITIALIZER_TIMEOUT // 60} minutes", flush=True) debug_log.log("INIT", "TIMEOUT - Initializer exceeded time limit", timeout_minutes=INITIALIZER_TIMEOUT // 60) - _kill_process_tree(proc) + result = kill_process_tree(proc) + debug_log.log("INIT", "Killed timed-out initializer process tree", + status=result.status, children_found=result.children_found) return False debug_log.log("INIT", "Initializer subprocess completed", @@ -747,7 +826,10 @@ class ParallelOrchestrator: abort.set() if proc: # Kill entire process tree to avoid orphaned children (e.g., browser instances) - _kill_process_tree(proc, timeout=5.0) + result = kill_process_tree(proc, timeout=5.0) + debug_log.log("STOP", f"Killed feature {feature_id} process tree", + status=result.status, children_found=result.children_found, + children_terminated=result.children_terminated, children_killed=result.children_killed) return True, f"Stopped feature {feature_id}" @@ -767,15 +849,35 @@ class ParallelOrchestrator: testing_procs = list(self.running_testing_agents) for proc in testing_procs: - _kill_process_tree(proc, timeout=5.0) + result = kill_process_tree(proc, timeout=5.0) + debug_log.log("STOP", f"Killed testing agent PID {proc.pid} process tree", + status=result.status, children_found=result.children_found, + children_terminated=result.children_terminated, children_killed=result.children_killed) async def run_loop(self): """Main orchestration loop.""" self.is_running = True - # Start debug logging session (clears previous logs) + # Track session start for regression testing (UTC for consistency with last_tested_at) + self.session_start_time = datetime.now(timezone.utc) + + # Start debug logging session FIRST (clears previous logs) + # Must happen before any debug_log.log() calls debug_log.start_session() + # Clear stale testing_in_progress flags from crashed testing agents + # This ensures features aren't permanently locked if a previous session crashed + session = self.get_session() + try: + stale_count = session.query(Feature).filter(Feature.testing_in_progress == True).count() + if stale_count > 0: + session.execute(text("UPDATE features SET testing_in_progress = 0 WHERE testing_in_progress = 1")) + session.commit() + print(f"[STARTUP] Cleared {stale_count} stale testing_in_progress flags", flush=True) + debug_log.log("STARTUP", f"Cleared {stale_count} stale testing_in_progress flags") + finally: + session.close() + # Log startup to debug file debug_log.section("ORCHESTRATOR STARTUP") debug_log.log("STARTUP", "Orchestrator run_loop starting", @@ -783,7 +885,7 @@ class ParallelOrchestrator: max_concurrency=self.max_concurrency, yolo_mode=self.yolo_mode, testing_agent_ratio=self.testing_agent_ratio, - count_testing_in_concurrency=self.count_testing_in_concurrency) + session_start_time=self.session_start_time.isoformat()) print("=" * 70, flush=True) print(" UNIFIED ORCHESTRATOR SETTINGS", flush=True) @@ -791,8 +893,7 @@ class ParallelOrchestrator: print(f"Project: {self.project_dir}", flush=True) print(f"Max concurrency: {self.max_concurrency} coding agents", flush=True) print(f"YOLO mode: {self.yolo_mode}", flush=True) - print(f"Testing agent ratio: {self.testing_agent_ratio} per coding agent", flush=True) - print(f"Count testing in concurrency: {self.count_testing_in_concurrency}", flush=True) + print(f"Regression agents: {self.testing_agent_ratio} (maintained independently)", flush=True) print("=" * 70, flush=True) print(flush=True) @@ -891,6 +992,9 @@ class ParallelOrchestrator: print("\nAll features complete!", flush=True) break + # Maintain testing agents independently (runs every iteration) + self._maintain_testing_agents() + # Check capacity with self._lock: current = len(self.running_coding_agents) @@ -1003,7 +1107,6 @@ class ParallelOrchestrator: "count": len(self.running_coding_agents), # Legacy compatibility "max_concurrency": self.max_concurrency, "testing_agent_ratio": self.testing_agent_ratio, - "count_testing_in_concurrency": self.count_testing_in_concurrency, "is_running": self.is_running, "yolo_mode": self.yolo_mode, } @@ -1015,7 +1118,6 @@ async def run_parallel_orchestrator( model: str = None, yolo_mode: bool = False, testing_agent_ratio: int = 1, - count_testing_in_concurrency: bool = False, ) -> None: """Run the unified orchestrator. @@ -1024,8 +1126,7 @@ async def run_parallel_orchestrator( max_concurrency: Maximum number of concurrent coding agents model: Claude model to use yolo_mode: Whether to run in YOLO mode (skip testing agents) - testing_agent_ratio: Testing agents per coding agent (0-3) - count_testing_in_concurrency: If True, testing agents count toward concurrency limit + testing_agent_ratio: Number of regression agents to maintain (0-3) """ print(f"[ORCHESTRATOR] run_parallel_orchestrator called with max_concurrency={max_concurrency}", flush=True) orchestrator = ParallelOrchestrator( @@ -1034,7 +1135,6 @@ async def run_parallel_orchestrator( model=model, yolo_mode=yolo_mode, testing_agent_ratio=testing_agent_ratio, - count_testing_in_concurrency=count_testing_in_concurrency, ) try: @@ -1082,6 +1182,12 @@ def main(): default=False, help="Enable YOLO mode: rapid prototyping without browser testing", ) + parser.add_argument( + "--testing-agent-ratio", + type=int, + default=1, + help="Number of regression testing agents (0-3, default: 1). Set to 0 to disable testing agents.", + ) args = parser.parse_args() @@ -1107,6 +1213,7 @@ def main(): max_concurrency=args.max_concurrency, model=args.model, yolo_mode=args.yolo, + testing_agent_ratio=args.testing_agent_ratio, )) except KeyboardInterrupt: print("\n\nInterrupted by user", flush=True) diff --git a/server/routers/agent.py b/server/routers/agent.py index 1f54b20..309024d 100644 --- a/server/routers/agent.py +++ b/server/routers/agent.py @@ -26,11 +26,11 @@ def _get_project_path(project_name: str) -> Path: return get_project_path(project_name) -def _get_settings_defaults() -> tuple[bool, str, int, bool]: +def _get_settings_defaults() -> tuple[bool, str, int]: """Get defaults from global settings. Returns: - Tuple of (yolo_mode, model, testing_agent_ratio, count_testing_in_concurrency) + Tuple of (yolo_mode, model, testing_agent_ratio) """ import sys root = Path(__file__).parent.parent.parent @@ -49,9 +49,7 @@ def _get_settings_defaults() -> tuple[bool, str, int, bool]: except (ValueError, TypeError): testing_agent_ratio = 1 - count_testing = (settings.get("count_testing_in_concurrency") or "false").lower() == "true" - - return yolo_mode, model, testing_agent_ratio, count_testing + return yolo_mode, model, testing_agent_ratio router = APIRouter(prefix="/api/projects/{project_name}/agent", tags=["agent"]) @@ -101,7 +99,6 @@ async def get_agent_status(project_name: str): parallel_mode=manager.parallel_mode, max_concurrency=manager.max_concurrency, testing_agent_ratio=manager.testing_agent_ratio, - count_testing_in_concurrency=manager.count_testing_in_concurrency, ) @@ -114,20 +111,18 @@ async def start_agent( manager = get_project_manager(project_name) # Get defaults from global settings if not provided in request - default_yolo, default_model, default_testing_ratio, default_count_testing = _get_settings_defaults() + default_yolo, default_model, default_testing_ratio = _get_settings_defaults() yolo_mode = request.yolo_mode if request.yolo_mode is not None else default_yolo model = request.model if request.model else default_model max_concurrency = request.max_concurrency or 1 testing_agent_ratio = request.testing_agent_ratio if request.testing_agent_ratio is not None else default_testing_ratio - count_testing = request.count_testing_in_concurrency if request.count_testing_in_concurrency is not None else default_count_testing success, message = await manager.start( yolo_mode=yolo_mode, model=model, max_concurrency=max_concurrency, testing_agent_ratio=testing_agent_ratio, - count_testing_in_concurrency=count_testing, ) # Notify scheduler of manual start (to prevent auto-stop during scheduled window) diff --git a/server/routers/features.py b/server/routers/features.py index 1214181..a830001 100644 --- a/server/routers/features.py +++ b/server/routers/features.py @@ -258,15 +258,16 @@ async def create_features_bulk(project_name: str, bulk: FeatureBulkCreate): try: with get_db_session(project_dir) as session: - # Determine starting priority with row-level lock to prevent race conditions + # Determine starting priority + # Note: SQLite uses file-level locking, not row-level locking, so we rely on + # SQLite's transaction isolation. Concurrent bulk creates may get overlapping + # priorities, but this is acceptable since priorities can be reordered. if bulk.starting_priority is not None: current_priority = bulk.starting_priority else: - # Lock the max priority row to prevent concurrent inserts from getting same priority max_priority_feature = ( session.query(Feature) .order_by(Feature.priority.desc()) - .with_for_update() .first() ) current_priority = (max_priority_feature.priority + 1) if max_priority_feature else 1 diff --git a/server/routers/settings.py b/server/routers/settings.py index 66bf88d..60b5fe8 100644 --- a/server/routers/settings.py +++ b/server/routers/settings.py @@ -79,7 +79,6 @@ async def get_settings(): model=all_settings.get("model", DEFAULT_MODEL), glm_mode=_is_glm_mode(), testing_agent_ratio=_parse_int(all_settings.get("testing_agent_ratio"), 1), - count_testing_in_concurrency=_parse_bool(all_settings.get("count_testing_in_concurrency")), ) @@ -95,9 +94,6 @@ async def update_settings(update: SettingsUpdate): if update.testing_agent_ratio is not None: set_setting("testing_agent_ratio", str(update.testing_agent_ratio)) - if update.count_testing_in_concurrency is not None: - set_setting("count_testing_in_concurrency", "true" if update.count_testing_in_concurrency else "false") - # Return updated settings all_settings = get_all_settings() return SettingsResponse( @@ -105,5 +101,4 @@ async def update_settings(update: SettingsUpdate): model=all_settings.get("model", DEFAULT_MODEL), glm_mode=_is_glm_mode(), testing_agent_ratio=_parse_int(all_settings.get("testing_agent_ratio"), 1), - count_testing_in_concurrency=_parse_bool(all_settings.get("count_testing_in_concurrency")), ) diff --git a/server/schemas.py b/server/schemas.py index 451baaf..844aaa1 100644 --- a/server/schemas.py +++ b/server/schemas.py @@ -171,8 +171,7 @@ class AgentStartRequest(BaseModel): model: str | None = None # None means use global settings parallel_mode: bool | None = None # DEPRECATED: Use max_concurrency instead max_concurrency: int | None = None # Max concurrent coding agents (1-5) - testing_agent_ratio: int | None = None # Testing agents per coding agent (0-3) - count_testing_in_concurrency: bool | None = None # Count testing toward limit + testing_agent_ratio: int | None = None # Regression testing agents (0-3) @field_validator('model') @classmethod @@ -208,8 +207,7 @@ class AgentStatus(BaseModel): model: str | None = None # Model being used by running agent parallel_mode: bool = False # DEPRECATED: Always True now (unified orchestrator) max_concurrency: int | None = None - testing_agent_ratio: int = 1 # Testing agents per coding agent - count_testing_in_concurrency: bool = False # Count testing toward limit + testing_agent_ratio: int = 1 # Regression testing agents (0-3) class AgentActionResponse(BaseModel): @@ -384,8 +382,7 @@ class SettingsResponse(BaseModel): yolo_mode: bool = False model: str = DEFAULT_MODEL glm_mode: bool = False # True if GLM API is configured via .env - testing_agent_ratio: int = 1 # Testing agents per coding agent (0-3) - count_testing_in_concurrency: bool = False # Count testing toward concurrency + testing_agent_ratio: int = 1 # Regression testing agents (0-3) class ModelsResponse(BaseModel): @@ -399,7 +396,6 @@ class SettingsUpdate(BaseModel): yolo_mode: bool | None = None model: str | None = None testing_agent_ratio: int | None = None # 0-3 - count_testing_in_concurrency: bool | None = None @field_validator('model') @classmethod diff --git a/server/services/dev_server_manager.py b/server/services/dev_server_manager.py index 3ca5eb0..063e076 100644 --- a/server/services/dev_server_manager.py +++ b/server/services/dev_server_manager.py @@ -24,6 +24,7 @@ from typing import Awaitable, Callable, Literal, Set import psutil from registry import list_registered_projects +from server.utils.process_utils import kill_process_tree logger = logging.getLogger(__name__) @@ -370,37 +371,16 @@ class DevServerProcessManager: except asyncio.CancelledError: pass - # Use psutil to terminate the entire process tree - # This is important for dev servers that spawn child processes - try: - parent = psutil.Process(self.process.pid) - children = parent.children(recursive=True) - - # Terminate children first - for child in children: - try: - child.terminate() - except psutil.NoSuchProcess: - pass - - # Terminate parent - parent.terminate() - - # Wait for graceful shutdown - _, still_alive = psutil.wait_procs( - [parent] + children, timeout=5 - ) - - # Force kill any remaining processes - for proc in still_alive: - try: - proc.kill() - except psutil.NoSuchProcess: - pass - - except psutil.NoSuchProcess: - # Process already gone - pass + # Use shared utility to terminate the entire process tree + # This is important for dev servers that spawn child processes (like Node.js) + proc = self.process # Capture reference before async call + loop = asyncio.get_running_loop() + result = await loop.run_in_executor(None, kill_process_tree, proc, 5.0) + logger.debug( + "Process tree kill result: status=%s, children=%d (terminated=%d, killed=%d)", + result.status, result.children_found, + result.children_terminated, result.children_killed + ) self._remove_lock() self.status = "stopped" diff --git a/server/services/process_manager.py b/server/services/process_manager.py index 0c50fd3..350905f 100644 --- a/server/services/process_manager.py +++ b/server/services/process_manager.py @@ -23,6 +23,7 @@ import psutil sys.path.insert(0, str(Path(__file__).parent.parent.parent)) from auth import AUTH_ERROR_HELP_SERVER as AUTH_ERROR_HELP # noqa: E402 from auth import is_auth_error +from server.utils.process_utils import kill_process_tree logger = logging.getLogger(__name__) @@ -83,8 +84,7 @@ class AgentProcessManager: self.model: str | None = None # Model being used self.parallel_mode: bool = False # Parallel execution mode self.max_concurrency: int | None = None # Max concurrent agents - self.testing_agent_ratio: int = 1 # Testing agents per coding agent - self.count_testing_in_concurrency: bool = False # Count testing toward limit + self.testing_agent_ratio: int = 1 # Regression testing agents (0-3) # Support multiple callbacks (for multiple WebSocket clients) self._output_callbacks: Set[Callable[[str], Awaitable[None]]] = set() @@ -296,7 +296,6 @@ class AgentProcessManager: parallel_mode: bool = False, max_concurrency: int | None = None, testing_agent_ratio: int = 1, - count_testing_in_concurrency: bool = False, ) -> tuple[bool, str]: """ Start the agent as a subprocess. @@ -306,8 +305,7 @@ class AgentProcessManager: model: Model to use (e.g., claude-opus-4-5-20251101) parallel_mode: DEPRECATED - ignored, always uses unified orchestrator max_concurrency: Max concurrent coding agents (1-5, default 1) - testing_agent_ratio: Testing agents per coding agent (0-3, default 1) - count_testing_in_concurrency: If True, testing agents count toward limit + testing_agent_ratio: Number of regression testing agents (0-3, default 1) Returns: Tuple of (success, message) @@ -324,7 +322,6 @@ class AgentProcessManager: self.parallel_mode = True # Always True now (unified orchestrator) self.max_concurrency = max_concurrency or 1 self.testing_agent_ratio = testing_agent_ratio - self.count_testing_in_concurrency = count_testing_in_concurrency # Build command - unified orchestrator with --concurrency cmd = [ @@ -348,8 +345,6 @@ class AgentProcessManager: # Add testing agent configuration cmd.extend(["--testing-ratio", str(testing_agent_ratio)]) - if count_testing_in_concurrency: - cmd.append("--count-testing") try: # Start subprocess with piped stdout/stderr @@ -387,7 +382,9 @@ class AgentProcessManager: async def stop(self) -> tuple[bool, str]: """ - Stop the agent (SIGTERM then SIGKILL if needed). + Stop the agent and all its child processes (SIGTERM then SIGKILL if needed). + + CRITICAL: Kills entire process tree to prevent orphaned coding/testing agents. Returns: Tuple of (success, message) @@ -404,20 +401,16 @@ class AgentProcessManager: except asyncio.CancelledError: pass - # Terminate gracefully first - self.process.terminate() - - # Wait up to 5 seconds for graceful shutdown + # CRITICAL: Kill entire process tree, not just orchestrator + # This ensures all spawned coding/testing agents are also terminated + proc = self.process # Capture reference before async call loop = asyncio.get_running_loop() - try: - await asyncio.wait_for( - loop.run_in_executor(None, self.process.wait), - timeout=5.0 - ) - except asyncio.TimeoutError: - # Force kill if still running - self.process.kill() - await loop.run_in_executor(None, self.process.wait) + result = await loop.run_in_executor(None, kill_process_tree, proc, 10.0) + logger.debug( + "Process tree kill result: status=%s, children=%d (terminated=%d, killed=%d)", + result.status, result.children_found, + result.children_terminated, result.children_killed + ) self._remove_lock() self.status = "stopped" @@ -428,7 +421,6 @@ class AgentProcessManager: self.parallel_mode = False # Reset parallel mode self.max_concurrency = None # Reset concurrency self.testing_agent_ratio = 1 # Reset testing ratio - self.count_testing_in_concurrency = False # Reset count testing return True, "Agent stopped" except Exception as e: @@ -514,7 +506,6 @@ class AgentProcessManager: "parallel_mode": self.parallel_mode, "max_concurrency": self.max_concurrency, "testing_agent_ratio": self.testing_agent_ratio, - "count_testing_in_concurrency": self.count_testing_in_concurrency, } diff --git a/server/utils/process_utils.py b/server/utils/process_utils.py new file mode 100644 index 0000000..40ec931 --- /dev/null +++ b/server/utils/process_utils.py @@ -0,0 +1,134 @@ +""" +Process Utilities +================= + +Shared utilities for process management across the codebase. +""" + +import logging +import subprocess +from dataclasses import dataclass +from typing import Literal + +import psutil + +logger = logging.getLogger(__name__) + + +@dataclass +class KillResult: + """Result of a process tree kill operation. + + Attributes: + status: "success" if all processes terminated, "partial" if some required + force-kill, "failure" if parent couldn't be killed + parent_pid: PID of the parent process + children_found: Number of child processes found + children_terminated: Number of children that terminated gracefully + children_killed: Number of children that required SIGKILL + parent_forcekilled: Whether the parent required SIGKILL + """ + + status: Literal["success", "partial", "failure"] + parent_pid: int + children_found: int = 0 + children_terminated: int = 0 + children_killed: int = 0 + parent_forcekilled: bool = False + + +def kill_process_tree(proc: subprocess.Popen, timeout: float = 5.0) -> KillResult: + """Kill a process and all its child processes. + + On Windows, subprocess.terminate() only kills the immediate process, leaving + orphaned child processes (e.g., spawned browser instances, coding/testing agents). + This function uses psutil to kill the entire process tree. + + Args: + proc: The subprocess.Popen object to kill + timeout: Seconds to wait for graceful termination before force-killing + + Returns: + KillResult with status and statistics about the termination + """ + result = KillResult(status="success", parent_pid=proc.pid) + + try: + parent = psutil.Process(proc.pid) + # Get all children recursively before terminating + children = parent.children(recursive=True) + result.children_found = len(children) + + logger.debug( + "Killing process tree: PID %d with %d children", + proc.pid, len(children) + ) + + # Terminate children first (graceful) + for child in children: + try: + logger.debug("Terminating child PID %d (%s)", child.pid, child.name()) + child.terminate() + except (psutil.NoSuchProcess, psutil.AccessDenied) as e: + # NoSuchProcess: already dead + # AccessDenied: Windows can raise this for system processes or already-exited processes + logger.debug("Child PID %d already gone or inaccessible: %s", child.pid, e) + + # Wait for children to terminate + gone, still_alive = psutil.wait_procs(children, timeout=timeout) + result.children_terminated = len(gone) + + logger.debug( + "Children after graceful wait: %d terminated, %d still alive", + len(gone), len(still_alive) + ) + + # Force kill any remaining children + for child in still_alive: + try: + logger.debug("Force-killing child PID %d", child.pid) + child.kill() + result.children_killed += 1 + except (psutil.NoSuchProcess, psutil.AccessDenied) as e: + logger.debug("Child PID %d gone during force-kill: %s", child.pid, e) + + if result.children_killed > 0: + result.status = "partial" + + # Now terminate the parent + logger.debug("Terminating parent PID %d", proc.pid) + proc.terminate() + try: + proc.wait(timeout=timeout) + logger.debug("Parent PID %d terminated gracefully", proc.pid) + except subprocess.TimeoutExpired: + logger.debug("Parent PID %d did not terminate, force-killing", proc.pid) + proc.kill() + proc.wait() + result.parent_forcekilled = True + result.status = "partial" + + logger.debug( + "Process tree kill complete: status=%s, children=%d (terminated=%d, killed=%d)", + result.status, result.children_found, + result.children_terminated, result.children_killed + ) + + except (psutil.NoSuchProcess, psutil.AccessDenied) as e: + # NoSuchProcess: Process already dead + # AccessDenied: Windows can raise this for protected/system processes + # In either case, just ensure cleanup + logger.debug("Parent PID %d inaccessible (%s), attempting direct cleanup", proc.pid, e) + try: + proc.terminate() + proc.wait(timeout=1) + logger.debug("Direct termination of PID %d succeeded", proc.pid) + except (subprocess.TimeoutExpired, OSError): + try: + proc.kill() + logger.debug("Direct force-kill of PID %d succeeded", proc.pid) + except OSError as kill_error: + logger.debug("Direct force-kill of PID %d failed: %s", proc.pid, kill_error) + result.status = "failure" + + return result diff --git a/server/websocket.py b/server/websocket.py index 6d8c849..a747d09 100644 --- a/server/websocket.py +++ b/server/websocket.py @@ -199,13 +199,23 @@ class AgentTracker: return None async def _handle_testing_agent_start(self, line: str) -> dict | None: - """Handle testing agent start message from orchestrator.""" + """Handle testing agent start message from orchestrator. + + Reuses existing testing agent entry if present to avoid ghost agents in UI. + """ async with self._lock: - agent_index = self._next_agent_index - self._next_agent_index += 1 + # Reuse existing testing agent entry if present + existing = self.active_agents.get(self.TESTING_AGENT_KEY) + if existing: + agent_index = existing['agent_index'] + agent_name = existing['name'] + else: + agent_index = self._next_agent_index + self._next_agent_index += 1 + agent_name = AGENT_MASCOTS[agent_index % len(AGENT_MASCOTS)] self.active_agents[self.TESTING_AGENT_KEY] = { - 'name': AGENT_MASCOTS[agent_index % len(AGENT_MASCOTS)], + 'name': agent_name, 'agent_index': agent_index, 'agent_type': 'testing', 'state': 'testing', @@ -216,7 +226,7 @@ class AgentTracker: return { 'type': 'agent_update', 'agentIndex': agent_index, - 'agentName': AGENT_MASCOTS[agent_index % len(AGENT_MASCOTS)], + 'agentName': agent_name, 'agentType': 'testing', 'featureId': 0, 'featureName': 'Regression Testing', @@ -251,16 +261,31 @@ class AgentTracker: return result - def get_agent_info(self, feature_id: int) -> tuple[int | None, str | None]: + async def get_agent_info(self, feature_id: int) -> tuple[int | None, str | None]: """Get agent index and name for a feature ID. + Thread-safe method that acquires the lock before reading state. + Returns: Tuple of (agentIndex, agentName) or (None, None) if not tracked. """ - agent = self.active_agents.get(feature_id) - if agent: - return agent['agent_index'], agent['name'] - return None, None + async with self._lock: + agent = self.active_agents.get(feature_id) + if agent: + return agent['agent_index'], agent['name'] + return None, None + + async def reset(self): + """Reset tracker state when orchestrator stops or crashes. + + Clears all active agents and resets the index counter to prevent + ghost agents accumulating across start/stop cycles. + + Must be called with await since it acquires the async lock. + """ + async with self._lock: + self.active_agents.clear() + self._next_agent_index = 0 async def _handle_agent_start(self, feature_id: int, line: str, agent_type: str = "coding") -> dict | None: """Handle agent start message from orchestrator.""" @@ -482,7 +507,7 @@ async def project_websocket(websocket: WebSocket, project_name: str): match = FEATURE_ID_PATTERN.match(line) if match: feature_id = int(match.group(1)) - agent_index, _ = agent_tracker.get_agent_info(feature_id) + agent_index, _ = await agent_tracker.get_agent_info(feature_id) # Send the raw log line with optional feature/agent attribution log_msg = { @@ -512,6 +537,9 @@ async def project_websocket(websocket: WebSocket, project_name: str): "type": "agent_status", "status": status, }) + # Reset tracker when agent stops OR crashes to prevent ghost agents on restart + if status in ("stopped", "crashed"): + await agent_tracker.reset() except Exception: pass # Connection may be closed diff --git a/ui/src/components/AgentControl.tsx b/ui/src/components/AgentControl.tsx index 1348809..2b58448 100644 --- a/ui/src/components/AgentControl.tsx +++ b/ui/src/components/AgentControl.tsx @@ -38,7 +38,6 @@ export function AgentControl({ projectName, status }: AgentControlProps) { parallelMode: isParallel, maxConcurrency: concurrency, // Always pass concurrency (1-5) testingAgentRatio: settings?.testing_agent_ratio, - countTestingInConcurrency: settings?.count_testing_in_concurrency, }) const handleStop = () => stopAgent.mutate() diff --git a/ui/src/components/SettingsModal.tsx b/ui/src/components/SettingsModal.tsx index db37908..e516de1 100644 --- a/ui/src/components/SettingsModal.tsx +++ b/ui/src/components/SettingsModal.tsx @@ -76,12 +76,6 @@ export function SettingsModal({ onClose }: SettingsModalProps) { } } - const handleCountTestingToggle = () => { - if (settings && !updateSettings.isPending) { - updateSettings.mutate({ count_testing_in_concurrency: !settings.count_testing_in_concurrency }) - } - } - const models = modelsData?.models ?? [] const isSaving = updateSettings.isPending @@ -211,16 +205,16 @@ export function SettingsModal({ onClose }: SettingsModalProps) { - {/* Testing Agent Ratio */} + {/* Regression Agents */}

- Regression testing agents spawned per coding agent (0 = disabled) + Number of regression testing agents (0 = disabled)

- {/* Count Testing in Concurrency Toggle */} -
-
-
- -

- If enabled, testing agents count toward the concurrency limit -

-
- -
-
- {/* Update Error */} {updateSettings.isError && (
diff --git a/ui/src/hooks/useProjects.ts b/ui/src/hooks/useProjects.ts index c15cbb8..46ec44c 100644 --- a/ui/src/hooks/useProjects.ts +++ b/ui/src/hooks/useProjects.ts @@ -128,7 +128,6 @@ export function useStartAgent(projectName: string) { parallelMode?: boolean maxConcurrency?: number testingAgentRatio?: number - countTestingInConcurrency?: boolean } = {}) => api.startAgent(projectName, options), onSuccess: () => { queryClient.invalidateQueries({ queryKey: ['agent-status', projectName] }) @@ -239,7 +238,6 @@ const DEFAULT_SETTINGS: Settings = { model: 'claude-opus-4-5-20251101', glm_mode: false, testing_agent_ratio: 1, - count_testing_in_concurrency: false, } export function useAvailableModels() { diff --git a/ui/src/hooks/useWebSocket.ts b/ui/src/hooks/useWebSocket.ts index 95ad084..533c2ab 100644 --- a/ui/src/hooks/useWebSocket.ts +++ b/ui/src/hooks/useWebSocket.ts @@ -112,8 +112,8 @@ export function useProjectWebSocket(projectName: string | null) { setState(prev => ({ ...prev, agentStatus: message.status, - // Clear active agents when process stops to prevent stale UI - ...(message.status === 'stopped' && { activeAgents: [], recentActivity: [] }), + // Clear active agents when process stops OR crashes to prevent stale UI + ...((message.status === 'stopped' || message.status === 'crashed') && { activeAgents: [], recentActivity: [] }), })) break diff --git a/ui/src/lib/api.ts b/ui/src/lib/api.ts index f35382b..7ef9a8a 100644 --- a/ui/src/lib/api.ts +++ b/ui/src/lib/api.ts @@ -211,7 +211,6 @@ export async function startAgent( parallelMode?: boolean maxConcurrency?: number testingAgentRatio?: number - countTestingInConcurrency?: boolean } = {} ): Promise { return fetchJSON(`/projects/${encodeURIComponent(projectName)}/agent/start`, { @@ -221,7 +220,6 @@ export async function startAgent( parallel_mode: options.parallelMode ?? false, max_concurrency: options.maxConcurrency, testing_agent_ratio: options.testingAgentRatio, - count_testing_in_concurrency: options.countTestingInConcurrency, }), }) } diff --git a/ui/src/lib/types.ts b/ui/src/lib/types.ts index a103b34..26e9c8b 100644 --- a/ui/src/lib/types.ts +++ b/ui/src/lib/types.ts @@ -129,8 +129,7 @@ export interface AgentStatusResponse { model: string | null // Model being used by running agent parallel_mode: boolean // DEPRECATED: Always true now (unified orchestrator) max_concurrency: number | null - testing_agent_ratio: number // Testing agents per coding agent (0-3) - count_testing_in_concurrency: boolean // Count testing toward concurrency limit + testing_agent_ratio: number // Regression testing agents (0-3) } export interface AgentActionResponse { @@ -479,15 +478,13 @@ export interface Settings { yolo_mode: boolean model: string glm_mode: boolean - testing_agent_ratio: number // Testing agents per coding agent (0-3) - count_testing_in_concurrency: boolean // Count testing toward concurrency limit + testing_agent_ratio: number // Regression testing agents (0-3) } export interface SettingsUpdate { yolo_mode?: boolean model?: string testing_agent_ratio?: number - count_testing_in_concurrency?: boolean } // ============================================================================