From b00eef5eca5b2277d1db39416fe17736711cf3f5 Mon Sep 17 00:00:00 2001 From: Auto Date: Thu, 22 Jan 2026 16:24:48 +0200 Subject: [PATCH] refactor: orchestrator pre-selects features for all agents Replace agent-initiated feature selection with orchestrator pre-selection for both coding and testing agents. This ensures Mission Control displays correct feature numbers for testing agents (previously showed "Feature #0"). Key changes: MCP Server (mcp_server/feature_mcp.py): - Add feature_get_by_id tool for agents to fetch assigned feature details - Remove obsolete tools: feature_get_next, feature_claim_next, feature_claim_for_testing, feature_get_for_regression - Remove helper functions and unused imports (text, OperationalError, func) Orchestrator (parallel_orchestrator.py): - Change running_testing_agents from list to dict[int, Popen] - Add claim_feature_for_testing() with random selection - Add release_testing_claim() method - Pass --testing-feature-id to spawned testing agents - Use unified [Feature #X] output format for both agent types Agent Entry Points: - autonomous_agent_demo.py: Add --testing-feature-id CLI argument - agent.py: Pass testing_feature_id to get_testing_prompt() Prompt Templates: - coding_prompt.template.md: Update to use feature_get_by_id - testing_prompt.template.md: Update workflow for pre-assigned features - prompts.py: Update pre-claimed headers for both agent types WebSocket (server/websocket.py): - Simplify tracking with unified [Feature #X] pattern - Remove testing-specific parsing code Assistant (server/services/assistant_chat_session.py): - Update help text with current available tools Co-Authored-By: Claude Opus 4.5 --- .claude/templates/coding_prompt.template.md | 31 +- .claude/templates/testing_prompt.template.md | 13 +- agent.py | 4 +- autonomous_agent_demo.py | 8 + client.py | 7 +- mcp_server/feature_mcp.py | 362 ++----------------- parallel_orchestrator.py | 149 ++++++-- prompts.py | 65 +++- server/services/assistant_chat_session.py | 10 +- server/websocket.py | 192 +++------- 10 files changed, 277 insertions(+), 564 deletions(-) diff --git a/.claude/templates/coding_prompt.template.md b/.claude/templates/coding_prompt.template.md index 6c24ed6..b4e464e 100644 --- a/.claude/templates/coding_prompt.template.md +++ b/.claude/templates/coding_prompt.template.md @@ -29,9 +29,6 @@ Then use MCP tools to check feature status: ``` # 6. Get progress statistics (passing/total counts) Use the feature_get_stats tool - -# 7. Get the next feature to work on -Use the feature_get_next tool ``` Understanding the `app_spec.txt` is critical - it contains the full requirements @@ -48,7 +45,7 @@ chmod +x init.sh Otherwise, start servers manually and document the process. -### STEP 3: CHOOSE ONE FEATURE TO IMPLEMENT +### STEP 3: GET YOUR ASSIGNED FEATURE #### TEST-DRIVEN DEVELOPMENT MINDSET (CRITICAL) @@ -63,19 +60,16 @@ Features are **test cases** that drive development. This is test-driven developm - WRONG: "Flashcard page doesn't exist yet" → skip feature - RIGHT: "Flashcard page doesn't exist yet" → build flashcard page → implement filter → test feature -Get the next feature to implement: +**Note:** Your feature has been pre-assigned by the orchestrator. Use `feature_get_by_id` with your assigned feature ID to get the details. + +Once you've retrieved the feature, **mark it as in-progress** (if not already): ``` -# Get the highest-priority pending feature -Use the feature_get_next tool +# Mark feature as in-progress +Use the feature_mark_in_progress tool with feature_id={your_assigned_id} ``` -Once you've retrieved the feature, **immediately mark it as in-progress**: - -``` -# Mark feature as in-progress to prevent other sessions from working on it -Use the feature_mark_in_progress tool with feature_id=42 -``` +If you get "already in-progress" error, that's OK - continue with implementation. Focus on completing one feature perfectly and completing its testing steps in this session before moving on to other features. It's ok if you only complete one feature in this session, as there will be more sessions later that continue to make progress. @@ -337,10 +331,10 @@ The feature tools exist to reduce token usage. **DO NOT make exploratory queries # 1. Get progress stats (passing/in_progress/total counts) feature_get_stats -# 2. Get the NEXT feature to work on (one feature only) -feature_get_next +# 2. Get your assigned feature details +feature_get_by_id with feature_id={your_assigned_id} -# 3. Mark a feature as in-progress (call immediately after feature_get_next) +# 3. Mark a feature as in-progress feature_mark_in_progress with feature_id={id} # 4. Mark a feature as passing (after verification) @@ -349,7 +343,7 @@ feature_mark_passing with feature_id={id} # 5. Mark a feature as failing (if you discover it's broken) feature_mark_failing with feature_id={id} -# 6. Skip a feature (moves to end of queue) - ONLY when blocked by dependency +# 6. Skip a feature (moves to end of queue) - ONLY when blocked by external dependency feature_skip with feature_id={id} # 7. Clear in-progress status (when abandoning a feature) @@ -361,8 +355,9 @@ feature_clear_in_progress with feature_id={id} - Do NOT try to fetch lists of all features - Do NOT query features by category - Do NOT list all pending features +- Your feature is pre-assigned by the orchestrator - use `feature_get_by_id` to get details -**You do NOT need to see all features.** The feature_get_next tool tells you exactly what to work on. Trust it. +**You do NOT need to see all features.** Work on your assigned feature only. --- diff --git a/.claude/templates/testing_prompt.template.md b/.claude/templates/testing_prompt.template.md index aa8f54f..a7e2bbe 100644 --- a/.claude/templates/testing_prompt.template.md +++ b/.claude/templates/testing_prompt.template.md @@ -40,17 +40,15 @@ chmod +x init.sh Otherwise, start servers manually. -### STEP 3: CLAIM A FEATURE TO TEST +### STEP 3: GET YOUR ASSIGNED FEATURE -Atomically claim ONE passing feature for regression testing: +Your feature has been pre-assigned by the orchestrator. Use `feature_get_by_id` to get the details: ``` -Use the feature_claim_for_testing tool +Use the feature_get_by_id tool with feature_id={your_assigned_id} ``` -This atomically claims a random passing feature that: -- Is not being worked on by coding agents -- Is not already being tested by another testing agent +The orchestrator has already claimed this feature for testing (set `testing_in_progress=true`). **CRITICAL:** You MUST call `feature_release_testing` when done, regardless of pass/fail. @@ -157,9 +155,8 @@ echo "[Testing] Session complete - verified/fixed feature #{id}" >> claude-progr ### Feature Management - `feature_get_stats` - Get progress overview (passing/in_progress/total counts) -- `feature_claim_for_testing` - **USE THIS** - Atomically claim a feature for testing +- `feature_get_by_id` - Get your assigned feature details - `feature_release_testing` - **REQUIRED** - Release claim after testing (pass tested_ok=true/false) -- `feature_get_for_regression` - (Legacy) Get random passing features without claiming - `feature_mark_failing` - Mark a feature as failing (when you find a regression) - `feature_mark_passing` - Mark a feature as passing (after fixing a regression) diff --git a/agent.py b/agent.py index efff621..7d90473 100644 --- a/agent.py +++ b/agent.py @@ -117,6 +117,7 @@ async def run_autonomous_agent( yolo_mode: bool = False, feature_id: Optional[int] = None, agent_type: Optional[str] = None, + testing_feature_id: Optional[int] = None, ) -> None: """ Run the autonomous agent loop. @@ -128,6 +129,7 @@ async def run_autonomous_agent( yolo_mode: If True, skip browser testing in coding agent prompts feature_id: If set, work only on this specific feature (used by orchestrator for coding agents) agent_type: Type of agent: "initializer", "coding", "testing", or None (auto-detect) + testing_feature_id: For testing agents, the pre-claimed feature ID to test """ print("\n" + "=" * 70) print(" AUTONOMOUS CODING AGENT") @@ -220,7 +222,7 @@ async def run_autonomous_agent( if agent_type == "initializer": prompt = get_initializer_prompt(project_dir) elif agent_type == "testing": - prompt = get_testing_prompt(project_dir) + prompt = get_testing_prompt(project_dir, testing_feature_id) elif feature_id: # Single-feature mode (used by orchestrator for coding agents) prompt = get_single_feature_prompt(feature_id, project_dir, yolo_mode) diff --git a/autonomous_agent_demo.py b/autonomous_agent_demo.py index f841775..16702f5 100644 --- a/autonomous_agent_demo.py +++ b/autonomous_agent_demo.py @@ -141,6 +141,13 @@ Authentication: help="Agent type (used by orchestrator to spawn specialized subprocesses)", ) + parser.add_argument( + "--testing-feature-id", + type=int, + default=None, + help="Feature ID to regression test (used by orchestrator for testing agents)", + ) + # Testing agent configuration parser.add_argument( "--testing-ratio", @@ -197,6 +204,7 @@ def main() -> None: yolo_mode=args.yolo, feature_id=args.feature_id, agent_type=args.agent_type, + testing_feature_id=args.testing_feature_id, ) ) else: diff --git a/client.py b/client.py index ef7dc34..05f53f0 100644 --- a/client.py +++ b/client.py @@ -54,9 +54,7 @@ def get_playwright_headless() -> bool: FEATURE_MCP_TOOLS = [ # Core feature operations "mcp__features__feature_get_stats", - "mcp__features__feature_get_next", - "mcp__features__feature_claim_next", # Atomic get+claim for parallel execution - "mcp__features__feature_get_for_regression", + "mcp__features__feature_get_by_id", # Get assigned feature details "mcp__features__feature_mark_in_progress", "mcp__features__feature_mark_passing", "mcp__features__feature_mark_failing", # Mark regression detected @@ -64,11 +62,12 @@ FEATURE_MCP_TOOLS = [ "mcp__features__feature_create_bulk", "mcp__features__feature_create", "mcp__features__feature_clear_in_progress", + "mcp__features__feature_release_testing", # Release testing claim # Dependency management "mcp__features__feature_add_dependency", "mcp__features__feature_remove_dependency", "mcp__features__feature_set_dependencies", - # Parallel execution support + # Query tools "mcp__features__feature_get_ready", "mcp__features__feature_get_blocked", "mcp__features__feature_get_graph", diff --git a/mcp_server/feature_mcp.py b/mcp_server/feature_mcp.py index fbcfc30..0a00e8c 100755 --- a/mcp_server/feature_mcp.py +++ b/mcp_server/feature_mcp.py @@ -3,28 +3,33 @@ MCP Server for Feature Management ================================== -Provides tools to manage features in the autonomous coding system, -replacing the previous FastAPI-based REST API. +Provides tools to manage features in the autonomous coding system. Tools: - feature_get_stats: Get progress statistics -- feature_get_next: Get next feature to implement -- feature_get_for_regression: Get random passing features for testing +- feature_get_by_id: Get a specific feature by ID - feature_mark_passing: Mark a feature as passing - feature_mark_failing: Mark a feature as failing (regression detected) - feature_skip: Skip a feature (move to end of queue) - feature_mark_in_progress: Mark a feature as in-progress - feature_clear_in_progress: Clear in-progress status +- feature_release_testing: Release testing lock on a feature - feature_create_bulk: Create multiple features at once - feature_create: Create a single feature +- feature_add_dependency: Add a dependency between features +- feature_remove_dependency: Remove a dependency +- feature_get_ready: Get features ready to implement +- feature_get_blocked: Get features blocked by dependencies +- feature_get_graph: Get the dependency graph + +Note: Feature selection (which feature to work on) is handled by the +orchestrator, not by agents. Agents receive pre-assigned feature IDs. """ import json import os -import random import sys import threading -import time as _time from contextlib import asynccontextmanager from datetime import datetime, timezone from pathlib import Path @@ -32,9 +37,6 @@ from typing import Annotated from mcp.server.fastmcp import FastMCP from pydantic import BaseModel, Field -from sqlalchemy import text -from sqlalchemy.exc import OperationalError -from sqlalchemy.sql.expression import func # Add parent directory to path so we can import from api module sys.path.insert(0, str(Path(__file__).parent.parent)) @@ -158,346 +160,32 @@ def feature_get_stats() -> str: @mcp.tool() -def feature_get_next() -> str: - """Get the highest-priority pending feature that has all dependencies satisfied. - - Returns the feature with the lowest priority number that: - 1. Has passes=false and in_progress=false - 2. Has all dependency features already passing (or no dependencies) - 3. All dependency IDs actually exist (orphaned dependencies are ignored) - - For backwards compatibility: if all pending features are blocked by dependencies, - falls back to returning the first pending feature (same as before dependencies). - - Returns: - JSON with feature details (id, priority, category, name, description, steps, passes, - in_progress, dependencies) or error message if all features are passing. - """ - session = get_session() - try: - all_features = session.query(Feature).all() - all_feature_ids = {f.id for f in all_features} - passing_ids = {f.id for f in all_features if f.passes} - - # Get pending, non-in-progress features - pending = [f for f in all_features if not f.passes and not f.in_progress] - - # Sort by scheduling score (higher = first), then priority, then id - all_dicts = [f.to_dict() for f in all_features] - scores = compute_scheduling_scores(all_dicts) - pending.sort(key=lambda f: (-scores.get(f.id, 0), f.priority, f.id)) - - if not pending: - if any(f.in_progress for f in all_features if not f.passes): - return json.dumps({"error": "All pending features are in progress by other agents"}) - return json.dumps({"error": "All features are passing! No more work to do."}) - - # Find first feature with satisfied dependencies - for feature in pending: - deps = feature.dependencies or [] - # Filter out orphaned dependencies (IDs that no longer exist) - valid_deps = [d for d in deps if d in all_feature_ids] - if all(dep_id in passing_ids for dep_id in valid_deps): - return json.dumps(feature.to_dict(), indent=2) - - # All pending features are blocked by unmet dependencies - # Return error with details about what's blocking progress - blocking_info = [] - for feature in pending[:3]: # Show first 3 blocked features - deps = feature.dependencies or [] - valid_deps = [d for d in deps if d in all_feature_ids] - orphaned = [d for d in deps if d not in all_feature_ids] - unmet = [d for d in valid_deps if d not in passing_ids] - info = f"#{feature.id} '{feature.name}'" - if unmet: - info += f" blocked by: {unmet}" - if orphaned: - info += f" (orphaned deps ignored: {orphaned})" - blocking_info.append(info) - - return json.dumps({ - "error": "All pending features are blocked by unmet dependencies", - "blocked_features": len(pending), - "examples": blocking_info, - "hint": "Complete the blocking dependencies first, or remove invalid dependencies" - }, indent=2) - finally: - session.close() - - -# Maximum retry attempts for feature claiming under contention -MAX_CLAIM_RETRIES = 5 - - -def _feature_claim_next_internal() -> str: - """Internal implementation of feature claiming with iterative retry. - - Uses an iterative loop instead of recursion to avoid double session.close() - issues and deep call stacks under contention. - - Returns: - JSON with claimed feature details, or error message if no feature available. - """ - for attempt in range(MAX_CLAIM_RETRIES): - session = get_session() - try: - # Use a lock to prevent concurrent claims within this process - with _priority_lock: - all_features = session.query(Feature).all() - all_feature_ids = {f.id for f in all_features} - passing_ids = {f.id for f in all_features if f.passes} - - # Get pending, non-in-progress features - pending = [f for f in all_features if not f.passes and not f.in_progress] - - # Sort by scheduling score (higher = first), then priority, then id - all_dicts = [f.to_dict() for f in all_features] - scores = compute_scheduling_scores(all_dicts) - pending.sort(key=lambda f: (-scores.get(f.id, 0), f.priority, f.id)) - - if not pending: - if any(f.in_progress for f in all_features if not f.passes): - return json.dumps({"error": "All pending features are in progress by other agents"}) - return json.dumps({"error": "All features are passing! No more work to do."}) - - # Find first feature with satisfied dependencies - candidate_id = None - for feature in pending: - deps = feature.dependencies or [] - # Filter out orphaned dependencies (IDs that no longer exist) - valid_deps = [d for d in deps if d in all_feature_ids] - if all(dep_id in passing_ids for dep_id in valid_deps): - candidate_id = feature.id - break - - if candidate_id is None: - # All pending features are blocked by unmet dependencies - blocking_info = [] - for feature in pending[:3]: - deps = feature.dependencies or [] - valid_deps = [d for d in deps if d in all_feature_ids] - orphaned = [d for d in deps if d not in all_feature_ids] - unmet = [d for d in valid_deps if d not in passing_ids] - info = f"#{feature.id} '{feature.name}'" - if unmet: - info += f" blocked by: {unmet}" - if orphaned: - info += f" (orphaned deps ignored: {orphaned})" - blocking_info.append(info) - - return json.dumps({ - "error": "All pending features are blocked by unmet dependencies", - "blocked_features": len(pending), - "examples": blocking_info, - "hint": "Complete the blocking dependencies first, or remove invalid dependencies" - }, indent=2) - - # Atomic claim: UPDATE only if still claimable - # This prevents race conditions even across processes - result = session.execute( - text(""" - UPDATE features - SET in_progress = 1 - WHERE id = :feature_id - AND in_progress = 0 - AND passes = 0 - """), - {"feature_id": candidate_id} - ) - session.commit() - - # Check if we actually claimed it - if result.rowcount == 0: - # Another process claimed it first - will retry after backoff - pass # Fall through to finally block, then retry loop - else: - # Successfully claimed - fetch and return - session.expire_all() # Clear cache to get fresh data - claimed_feature = session.query(Feature).filter(Feature.id == candidate_id).first() - return json.dumps(claimed_feature.to_dict(), indent=2) - - except OperationalError: - # Transient database error (e.g., SQLITE_BUSY) - rollback and retry - session.rollback() - # Fall through to backoff and retry - except Exception as e: - # Non-transient error - fail immediately - session.rollback() - return json.dumps({"error": f"Failed to claim feature: {str(e)}"}) - finally: - session.close() - - # Exponential backoff with jitter before next attempt - # Base 0.1s, 0.2s, 0.4s, 0.8s, 1.0s (capped) - # Jitter of up to 30% prevents synchronized retries under high contention - backoff = min(0.1 * (2 ** attempt), 1.0) - jitter = random.uniform(0, backoff * 0.3) - _time.sleep(backoff + jitter) - - # Exhausted all retries - return json.dumps({ - "error": "Failed to claim feature after maximum retries", - "hint": "High contention detected - try again or reduce parallel agents" - }) - - -@mcp.tool() -def feature_claim_next() -> str: - """Atomically get and claim the next available feature. - - This combines feature_get_next() and feature_mark_in_progress() in a single - atomic operation, preventing race conditions where two agents could claim - the same feature. - - Returns the feature with the lowest priority number that: - 1. Has passes=false and in_progress=false - 2. Has all dependency features already passing (or no dependencies) - 3. All dependency IDs actually exist (orphaned dependencies are ignored) - - On success, the feature's in_progress flag is set to True. - Uses exponential backoff retry (up to 5 attempts) under contention. - - Returns: - JSON with claimed feature details, or error message if no feature available. - """ - return _feature_claim_next_internal() - - -@mcp.tool() -def feature_get_for_regression( - limit: Annotated[int, Field(default=3, ge=1, le=10, description="Maximum number of passing features to return")] = 3 +def feature_get_by_id( + feature_id: Annotated[int, Field(description="The ID of the feature to retrieve", ge=1)] ) -> str: - """Get random passing features for regression testing. + """Get a specific feature by its ID. - Returns a random selection of features that are currently passing - and NOT currently in progress (to avoid conflicts with coding agents). - Use this to verify that previously implemented features still work - after making changes. + Returns the full details of a feature including its name, description, + verification steps, and current status. Args: - limit: Maximum number of features to return (1-10, default 3) + feature_id: The ID of the feature to retrieve Returns: - JSON with: features (list of feature objects), count (int) + JSON with feature details, or error if not found. """ session = get_session() try: - features = ( - session.query(Feature) - .filter(Feature.passes == True) - .filter(Feature.in_progress == False) # Avoid conflicts with coding agents - .order_by(func.random()) - .limit(limit) - .all() - ) + feature = session.query(Feature).filter(Feature.id == feature_id).first() - return json.dumps({ - "features": [f.to_dict() for f in features], - "count": len(features) - }, indent=2) + if feature is None: + return json.dumps({"error": f"Feature with ID {feature_id} not found"}) + + return json.dumps(feature.to_dict(), indent=2) finally: session.close() -def _feature_claim_for_testing_internal() -> str: - """Internal implementation of testing feature claim with iterative retry. - - Uses an iterative loop instead of recursion to avoid double session.close() - issues and deep call stacks under contention. - - Returns: - JSON with claimed feature details, or message if no features available. - """ - for attempt in range(MAX_CLAIM_RETRIES): - session = get_session() - try: - # Use lock to prevent concurrent claims within this process - with _priority_lock: - # Find a candidate feature - candidate = ( - session.query(Feature) - .filter(Feature.passes == True) - .filter(Feature.in_progress == False) - .filter(Feature.testing_in_progress == False) - .order_by(func.random()) - .first() - ) - - if not candidate: - return json.dumps({ - "message": "No features available for testing", - "hint": "All passing features are either being coded or tested" - }) - - # Atomic claim using UPDATE with WHERE clause - # This prevents race conditions even across processes - result = session.execute( - text(""" - UPDATE features - SET testing_in_progress = 1 - WHERE id = :feature_id - AND passes = 1 - AND in_progress = 0 - AND testing_in_progress = 0 - """), - {"feature_id": candidate.id} - ) - session.commit() - - # Check if we actually claimed it - if result.rowcount == 0: - # Another process claimed it first - will retry after backoff - pass # Fall through to finally block, then retry loop - else: - # Successfully claimed - fetch and return - session.expire_all() - claimed = session.query(Feature).filter(Feature.id == candidate.id).first() - return json.dumps(claimed.to_dict(), indent=2) - - except OperationalError: - # Transient database error (e.g., SQLITE_BUSY) - rollback and retry - session.rollback() - # Fall through to backoff and retry - except Exception as e: - # Non-transient error - fail immediately - session.rollback() - return json.dumps({"error": f"Failed to claim feature: {str(e)}"}) - finally: - session.close() - - # Exponential backoff with jitter before next attempt - backoff = min(0.1 * (2 ** attempt), 1.0) - jitter = random.uniform(0, backoff * 0.3) - _time.sleep(backoff + jitter) - - # Exhausted all retries - return json.dumps({ - "error": "Failed to claim feature after maximum retries", - "hint": "High contention detected - try again or reduce testing agents" - }) - - -@mcp.tool() -def feature_claim_for_testing() -> str: - """Atomically claim a passing feature for regression testing. - - Returns a random passing feature that is: - - Currently passing (passes=True) - - Not being worked on by coding agents (in_progress=False) - - Not already being tested (testing_in_progress=False) - - The feature's testing_in_progress flag is set to True atomically to prevent - other testing agents from claiming the same feature. Uses exponential backoff - retry (up to 5 attempts) under contention. - - After testing, you MUST call feature_release_testing() to release the claim. - - Returns: - JSON with feature details if available, or message if no features available. - """ - return _feature_claim_for_testing_internal() - - @mcp.tool() def feature_release_testing( feature_id: Annotated[int, Field(description="The ID of the feature to release", ge=1)], @@ -692,10 +380,10 @@ def feature_skip( def feature_mark_in_progress( feature_id: Annotated[int, Field(description="The ID of the feature to mark as in-progress", ge=1)] ) -> str: - """Mark a feature as in-progress. Call immediately after feature_get_next(). + """Mark a feature as in-progress. This prevents other agent sessions from working on the same feature. - Use this as soon as you retrieve a feature to work on. + Call this after getting your assigned feature details with feature_get_by_id. Args: feature_id: The ID of the feature to mark as in-progress diff --git a/parallel_orchestrator.py b/parallel_orchestrator.py index 2414ac9..47e11a9 100644 --- a/parallel_orchestrator.py +++ b/parallel_orchestrator.py @@ -173,8 +173,8 @@ class ParallelOrchestrator: self._lock = threading.Lock() # Coding agents: feature_id -> process self.running_coding_agents: dict[int, subprocess.Popen] = {} - # Testing agents: list of processes (not tied to specific features) - self.running_testing_agents: list[subprocess.Popen] = [] + # Testing agents: feature_id -> process (feature being tested) + self.running_testing_agents: dict[int, subprocess.Popen] = {} # Legacy alias for backward compatibility self.running_agents = self.running_coding_agents self.abort_events: dict[int, threading.Event] = {} @@ -193,6 +193,75 @@ class ParallelOrchestrator: """Get a new database session.""" return self._session_maker() + def claim_feature_for_testing(self) -> int | None: + """Claim a random passing feature for regression testing. + + Returns the feature ID if successful, None if no features available. + Sets testing_in_progress=True on the claimed feature. + """ + session = self.get_session() + try: + from sqlalchemy.sql.expression import func + + # Find a passing feature that's not being worked on + # Exclude features already being tested by this orchestrator + with self._lock: + testing_feature_ids = set(self.running_testing_agents.keys()) + + candidate = ( + session.query(Feature) + .filter(Feature.passes == True) + .filter(Feature.in_progress == False) + .filter(Feature.testing_in_progress == False) + .filter(~Feature.id.in_(testing_feature_ids) if testing_feature_ids else True) + .order_by(func.random()) + .first() + ) + + if not candidate: + return None + + # Atomic claim using UPDATE with WHERE clause + result = session.execute( + text(""" + UPDATE features + SET testing_in_progress = 1 + WHERE id = :feature_id + AND passes = 1 + AND in_progress = 0 + AND testing_in_progress = 0 + """), + {"feature_id": candidate.id} + ) + session.commit() + + if result.rowcount == 0: + # Another process claimed it + return None + + return candidate.id + except Exception as e: + session.rollback() + debug_log.log("TESTING", f"Failed to claim feature for testing: {e}") + return None + finally: + session.close() + + def release_testing_claim(self, feature_id: int): + """Release a testing claim on a feature (called when testing agent exits).""" + session = self.get_session() + try: + session.execute( + text("UPDATE features SET testing_in_progress = 0 WHERE id = :feature_id"), + {"feature_id": feature_id} + ) + session.commit() + except Exception as e: + session.rollback() + debug_log.log("TESTING", f"Failed to release testing claim for feature {feature_id}: {e}") + finally: + session.close() + def get_resumable_features(self) -> list[dict]: """Get features that were left in_progress from a previous session. @@ -563,13 +632,11 @@ class ParallelOrchestrator: def _spawn_testing_agent(self) -> tuple[bool, str]: """Spawn a testing agent subprocess for regression testing. - CRITICAL: Lock is held during the entire spawn operation to prevent - TOCTOU race conditions where multiple threads could pass limit checks - and spawn excess agents. + Claims a feature BEFORE spawning the agent (same pattern as coding agents). + This ensures we know which feature is being tested for UI display. """ - # Hold lock for entire operation to prevent TOCTOU race + # Check limits first (under lock) with self._lock: - # Check limits current_testing_count = len(self.running_testing_agents) if current_testing_count >= self.max_concurrency: debug_log.log("TESTING", f"Skipped spawn - at max testing agents ({current_testing_count}/{self.max_concurrency})") @@ -579,7 +646,21 @@ class ParallelOrchestrator: debug_log.log("TESTING", f"Skipped spawn - at max total agents ({total_agents}/{MAX_TOTAL_AGENTS})") return False, f"At max total agents ({total_agents})" - debug_log.log("TESTING", "Attempting to spawn testing agent subprocess") + # Claim a feature for testing (outside lock to avoid holding during DB ops) + feature_id = self.claim_feature_for_testing() + if feature_id is None: + debug_log.log("TESTING", "No features available for testing") + return False, "No features available for testing" + + debug_log.log("TESTING", f"Claimed feature #{feature_id} for testing") + + # Now spawn with the claimed feature ID + with self._lock: + # Re-check limits in case another thread spawned while we were claiming + current_testing_count = len(self.running_testing_agents) + if current_testing_count >= self.max_concurrency: + self.release_testing_claim(feature_id) + return False, f"At max testing agents ({current_testing_count})" cmd = [ sys.executable, @@ -588,10 +669,10 @@ class ParallelOrchestrator: "--project-dir", str(self.project_dir), "--max-iterations", "1", "--agent-type", "testing", + "--testing-feature-id", str(feature_id), ] if self.model: cmd.extend(["--model", self.model]) - # Testing agents don't need --yolo flag (they use testing prompt regardless) try: proc = subprocess.Popen( @@ -604,25 +685,26 @@ class ParallelOrchestrator: ) except Exception as e: debug_log.log("TESTING", f"FAILED to spawn testing agent: {e}") + self.release_testing_claim(feature_id) return False, f"Failed to start testing agent: {e}" - # Register process immediately while still holding lock - self.running_testing_agents.append(proc) + # Register process with feature ID (same pattern as coding agents) + self.running_testing_agents[feature_id] = proc testing_count = len(self.running_testing_agents) - # Start output reader thread (feature_id=None for testing agents) - # This can be outside lock since process is already registered + # Start output reader thread with feature ID (same as coding agents) threading.Thread( target=self._read_output, - args=(None, proc, threading.Event(), "testing"), + args=(feature_id, proc, threading.Event(), "testing"), daemon=True ).start() - print(f"Started testing agent (PID {proc.pid})", flush=True) - debug_log.log("TESTING", "Successfully spawned testing agent", + print(f"Started testing agent for feature #{feature_id} (PID {proc.pid})", flush=True) + debug_log.log("TESTING", f"Successfully spawned testing agent for feature #{feature_id}", pid=proc.pid, + feature_id=feature_id, total_testing_agents=testing_count) - return True, "Started testing agent" + return True, f"Started testing agent for feature #{feature_id}" async def _run_initializer(self) -> bool: """Run initializer agent as blocking subprocess. @@ -706,10 +788,8 @@ class ParallelOrchestrator: if self.on_output: self.on_output(feature_id or 0, line) else: - if agent_type == "testing": - print(f"[Testing] {line}", flush=True) - else: - print(f"[Feature #{feature_id}] {line}", flush=True) + # Both coding and testing agents now use [Feature #X] format + print(f"[Feature #{feature_id}] {line}", flush=True) proc.wait() finally: self._on_agent_complete(feature_id, proc.returncode, agent_type, proc) @@ -730,17 +810,27 @@ class ParallelOrchestrator: is safe. For testing agents: - - Just remove from the running list. + - Remove from running dict and release testing claim on feature. """ if agent_type == "testing": with self._lock: - if proc in self.running_testing_agents: - self.running_testing_agents.remove(proc) + # Remove from dict by finding the feature_id for this proc + found_feature_id = None + for fid, p in list(self.running_testing_agents.items()): + if p is proc: + found_feature_id = fid + del self.running_testing_agents[fid] + break + + # Release testing claim on the feature + if found_feature_id is not None: + self.release_testing_claim(found_feature_id) status = "completed" if return_code == 0 else "failed" - print(f"Testing agent (PID {proc.pid}) {status}", flush=True) - debug_log.log("COMPLETE", "Testing agent finished", + print(f"Feature #{feature_id} testing {status}", flush=True) + debug_log.log("COMPLETE", f"Testing agent for feature #{feature_id} finished", pid=proc.pid, + feature_id=feature_id, status=status) return @@ -846,11 +936,12 @@ class ParallelOrchestrator: # Stop testing agents with self._lock: - testing_procs = list(self.running_testing_agents) + testing_items = list(self.running_testing_agents.items()) - for proc in testing_procs: + for feature_id, proc in testing_items: result = kill_process_tree(proc, timeout=5.0) - debug_log.log("STOP", f"Killed testing agent PID {proc.pid} process tree", + self.release_testing_claim(feature_id) + debug_log.log("STOP", f"Killed testing agent for feature #{feature_id} (PID {proc.pid})", status=result.status, children_found=result.children_found, children_terminated=result.children_terminated, children_killed=result.children_killed) diff --git a/prompts.py b/prompts.py index ad76ff0..6a2fe28 100644 --- a/prompts.py +++ b/prompts.py @@ -74,9 +74,38 @@ def get_coding_prompt(project_dir: Path | None = None) -> str: return load_prompt("coding_prompt", project_dir) -def get_testing_prompt(project_dir: Path | None = None) -> str: - """Load the testing agent prompt (project-specific if available).""" - return load_prompt("testing_prompt", project_dir) +def get_testing_prompt(project_dir: Path | None = None, testing_feature_id: int | None = None) -> str: + """Load the testing agent prompt (project-specific if available). + + Args: + project_dir: Optional project directory for project-specific prompts + testing_feature_id: If provided, the pre-assigned feature ID to test. + The orchestrator claims the feature before spawning the agent. + + Returns: + The testing prompt, with pre-assigned feature instructions if applicable. + """ + base_prompt = load_prompt("testing_prompt", project_dir) + + if testing_feature_id is not None: + # Prepend pre-assigned feature instructions + pre_assigned_header = f"""## ASSIGNED FEATURE + +**You are assigned to regression test Feature #{testing_feature_id}.** + +The orchestrator has already claimed this feature for you. + +### Your workflow: +1. Call `feature_get_by_id` with ID {testing_feature_id} to get the feature details +2. Verify the feature through the UI using browser automation +3. When done, call `feature_release_testing` with feature_id={testing_feature_id} + +--- + +""" + return pre_assigned_header + base_prompt + + return base_prompt def get_single_feature_prompt(feature_id: int, project_dir: Path | None = None, yolo_mode: bool = False) -> str: @@ -100,26 +129,26 @@ def get_single_feature_prompt(feature_id: int, project_dir: Path | None = None, base_prompt = get_coding_prompt(project_dir) # Prepend single-feature instructions - single_feature_header = f"""## SINGLE FEATURE MODE + single_feature_header = f"""## ASSIGNED FEATURE -**CRITICAL: You are assigned to work on Feature #{feature_id} ONLY.** +**You are assigned to work on Feature #{feature_id} ONLY.** -This session is part of a parallel execution where multiple agents work on different features simultaneously. You MUST: +This session is part of a parallel execution where multiple agents work on different features simultaneously. -1. **Skip the `feature_get_next` step** - Your feature is already assigned: #{feature_id} -2. **Immediately mark feature #{feature_id} as in-progress** using `feature_mark_in_progress` -3. **Focus ONLY on implementing and testing feature #{feature_id}** -4. **Do NOT work on any other features** - other agents are handling them +### Your workflow: -When you complete feature #{feature_id}: -- Mark it as passing with `feature_mark_passing` -- Commit your changes -- End the session +1. **Get feature details** using `feature_get_by_id` with ID {feature_id} +2. **Mark as in-progress** using `feature_mark_in_progress` with ID {feature_id} + - If you get "already in-progress" error, that's OK - continue with implementation +3. **Implement the feature** following the steps from the feature details +4. **Test your implementation** to verify it works correctly +5. **Mark as passing** using `feature_mark_passing` with ID {feature_id} +6. **Commit your changes** and end the session -If you cannot complete feature #{feature_id} due to a blocker: -- Use `feature_skip` to move it to the end of the queue -- Document the blocker in claude-progress.txt -- End the session +### Important rules: + +- **Do NOT** work on any other features - other agents are handling them +- If blocked, use `feature_skip` and document the blocker in claude-progress.txt --- diff --git a/server/services/assistant_chat_session.py b/server/services/assistant_chat_session.py index 54e3d12..f15eee8 100755 --- a/server/services/assistant_chat_session.py +++ b/server/services/assistant_chat_session.py @@ -47,8 +47,9 @@ API_ENV_VARS = [ # Read-only feature MCP tools READONLY_FEATURE_MCP_TOOLS = [ "mcp__features__feature_get_stats", - "mcp__features__feature_get_next", - "mcp__features__feature_get_for_regression", + "mcp__features__feature_get_by_id", + "mcp__features__feature_get_ready", + "mcp__features__feature_get_blocked", ] # Feature management tools (create/skip but not mark_passing) @@ -124,8 +125,9 @@ If the user asks you to modify code, explain that you're a project assistant and **Feature Management:** - **feature_get_stats**: Get feature completion progress -- **feature_get_next**: See the next pending feature -- **feature_get_for_regression**: See passing features for testing +- **feature_get_by_id**: Get details for a specific feature +- **feature_get_ready**: See features ready for implementation +- **feature_get_blocked**: See features blocked by dependencies - **feature_create**: Create a single feature in the backlog - **feature_create_bulk**: Create multiple features at once - **feature_skip**: Move a feature to the end of the queue diff --git a/server/websocket.py b/server/websocket.py index a747d09..18680ac 100644 --- a/server/websocket.py +++ b/server/websocket.py @@ -24,11 +24,17 @@ _count_passing_tests = None logger = logging.getLogger(__name__) -# Pattern to extract feature ID from parallel orchestrator output (coding agents) +# Pattern to extract feature ID from parallel orchestrator output +# Both coding and testing agents now use the same [Feature #X] format FEATURE_ID_PATTERN = re.compile(r'\[Feature #(\d+)\]\s*(.*)') -# Pattern to extract testing agent output -TESTING_AGENT_PATTERN = re.compile(r'\[Testing\]\s*(.*)') +# Pattern to detect testing agent start message (includes feature ID) +# Matches: "Started testing agent for feature #123 (PID xxx)" +TESTING_AGENT_START_PATTERN = re.compile(r'Started testing agent for feature #(\d+)') + +# Pattern to detect testing agent completion +# Matches: "Feature #123 testing completed" or "Feature #123 testing failed" +TESTING_AGENT_COMPLETE_PATTERN = re.compile(r'Feature #(\d+) testing (completed|failed)') # Patterns for detecting agent activity and thoughts THOUGHT_PATTERNS = [ @@ -50,14 +56,14 @@ THOUGHT_PATTERNS = [ class AgentTracker: - """Tracks active agents and their states for multi-agent mode.""" + """Tracks active agents and their states for multi-agent mode. - # Use a special key for the testing agent since it doesn't have a fixed feature ID - TESTING_AGENT_KEY = -1 + Both coding and testing agents are now tracked by their feature ID. + The agent_type field distinguishes between them. + """ def __init__(self): # feature_id -> {name, state, last_thought, agent_index, agent_type} - # For testing agents, use TESTING_AGENT_KEY as the key self.active_agents: dict[int, dict] = {} self._next_agent_index = 0 self._lock = asyncio.Lock() @@ -68,35 +74,43 @@ class AgentTracker: Returns None if no update should be emitted. """ - # Check for testing agent output first - testing_match = TESTING_AGENT_PATTERN.match(line) - if testing_match: - content = testing_match.group(1) - return await self._process_testing_agent_line(content) + # Check for orchestrator status messages first + # These don't have [Feature #X] prefix - # Check for feature-specific output (coding agents) + # Coding agent start: "Started coding agent for feature #X" + if line.startswith("Started coding agent for feature #"): + try: + feature_id = int(re.search(r'#(\d+)', line).group(1)) + return await self._handle_agent_start(feature_id, line, agent_type="coding") + except (AttributeError, ValueError): + pass + + # Testing agent start: "Started testing agent for feature #X (PID xxx)" + testing_start_match = TESTING_AGENT_START_PATTERN.match(line) + if testing_start_match: + feature_id = int(testing_start_match.group(1)) + return await self._handle_agent_start(feature_id, line, agent_type="testing") + + # Testing agent complete: "Feature #X testing completed/failed" + testing_complete_match = TESTING_AGENT_COMPLETE_PATTERN.match(line) + if testing_complete_match: + feature_id = int(testing_complete_match.group(1)) + is_success = testing_complete_match.group(2) == "completed" + return await self._handle_agent_complete(feature_id, is_success) + + # Coding agent complete: "Feature #X completed/failed" (without "testing" keyword) + if line.startswith("Feature #") and ("completed" in line or "failed" in line) and "testing" not in line: + try: + feature_id = int(re.search(r'#(\d+)', line).group(1)) + is_success = "completed" in line + return await self._handle_agent_complete(feature_id, is_success) + except (AttributeError, ValueError): + pass + + # Check for feature-specific output lines: [Feature #X] content + # Both coding and testing agents use this format now match = FEATURE_ID_PATTERN.match(line) if not match: - # Also check for orchestrator status messages - if line.startswith("Started coding agent for feature #"): - try: - feature_id = int(re.search(r'#(\d+)', line).group(1)) - return await self._handle_agent_start(feature_id, line, agent_type="coding") - except (AttributeError, ValueError): - pass - elif line.startswith("Started testing agent"): - return await self._handle_testing_agent_start(line) - elif line.startswith("Feature #") and ("completed" in line or "failed" in line): - try: - feature_id = int(re.search(r'#(\d+)', line).group(1)) - is_success = "completed" in line - return await self._handle_agent_complete(feature_id, is_success) - except (AttributeError, ValueError): - pass - elif line.startswith("Testing agent") and ("completed" in line or "failed" in line): - # Format: "Testing agent (PID xxx) completed" or "Testing agent (PID xxx) failed" - is_success = "completed" in line - return await self._handle_testing_agent_complete(is_success) return None feature_id = int(match.group(1)) @@ -149,118 +163,6 @@ class AgentTracker: return None - async def _process_testing_agent_line(self, content: str) -> dict | None: - """Process output from a testing agent.""" - async with self._lock: - # Ensure testing agent is tracked - if self.TESTING_AGENT_KEY not in self.active_agents: - agent_index = self._next_agent_index - self._next_agent_index += 1 - self.active_agents[self.TESTING_AGENT_KEY] = { - 'name': AGENT_MASCOTS[agent_index % len(AGENT_MASCOTS)], - 'agent_index': agent_index, - 'agent_type': 'testing', - 'state': 'testing', - 'feature_name': 'Regression Testing', - 'last_thought': None, - } - - agent = self.active_agents[self.TESTING_AGENT_KEY] - - # Detect state and thought from content - state = 'testing' - thought = None - - for pattern, detected_state in THOUGHT_PATTERNS: - m = pattern.search(content) - if m: - state = detected_state - thought = m.group(1) if m.lastindex else content[:100] - break - - # Only emit update if state changed or we have a new thought - if state != agent['state'] or thought != agent['last_thought']: - agent['state'] = state - if thought: - agent['last_thought'] = thought - - return { - 'type': 'agent_update', - 'agentIndex': agent['agent_index'], - 'agentName': agent['name'], - 'agentType': 'testing', - 'featureId': 0, # Testing agents work on random features - 'featureName': agent['feature_name'], - 'state': state, - 'thought': thought, - 'timestamp': datetime.now().isoformat(), - } - - return None - - async def _handle_testing_agent_start(self, line: str) -> dict | None: - """Handle testing agent start message from orchestrator. - - Reuses existing testing agent entry if present to avoid ghost agents in UI. - """ - async with self._lock: - # Reuse existing testing agent entry if present - existing = self.active_agents.get(self.TESTING_AGENT_KEY) - if existing: - agent_index = existing['agent_index'] - agent_name = existing['name'] - else: - agent_index = self._next_agent_index - self._next_agent_index += 1 - agent_name = AGENT_MASCOTS[agent_index % len(AGENT_MASCOTS)] - - self.active_agents[self.TESTING_AGENT_KEY] = { - 'name': agent_name, - 'agent_index': agent_index, - 'agent_type': 'testing', - 'state': 'testing', - 'feature_name': 'Regression Testing', - 'last_thought': 'Starting regression tests...', - } - - return { - 'type': 'agent_update', - 'agentIndex': agent_index, - 'agentName': agent_name, - 'agentType': 'testing', - 'featureId': 0, - 'featureName': 'Regression Testing', - 'state': 'testing', - 'thought': 'Starting regression tests...', - 'timestamp': datetime.now().isoformat(), - } - - async def _handle_testing_agent_complete(self, is_success: bool) -> dict | None: - """Handle testing agent completion.""" - async with self._lock: - if self.TESTING_AGENT_KEY not in self.active_agents: - return None - - agent = self.active_agents[self.TESTING_AGENT_KEY] - state = 'success' if is_success else 'error' - - result = { - 'type': 'agent_update', - 'agentIndex': agent['agent_index'], - 'agentName': agent['name'], - 'agentType': 'testing', - 'featureId': 0, - 'featureName': agent['feature_name'], - 'state': state, - 'thought': 'Tests passed!' if is_success else 'Found regressions', - 'timestamp': datetime.now().isoformat(), - } - - # Remove from active agents - del self.active_agents[self.TESTING_AGENT_KEY] - - return result - async def get_agent_info(self, feature_id: int) -> tuple[int | None, str | None]: """Get agent index and name for a feature ID.