refactor: orchestrator pre-selects features for all agents

Replace agent-initiated feature selection with orchestrator pre-selection
for both coding and testing agents. This ensures Mission Control displays
correct feature numbers for testing agents (previously showed "Feature #0").

Key changes:

MCP Server (mcp_server/feature_mcp.py):
- Add feature_get_by_id tool for agents to fetch assigned feature details
- Remove obsolete tools: feature_get_next, feature_claim_next,
  feature_claim_for_testing, feature_get_for_regression
- Remove helper functions and unused imports (text, OperationalError, func)

Orchestrator (parallel_orchestrator.py):
- Change running_testing_agents from list to dict[int, Popen]
- Add claim_feature_for_testing() with random selection
- Add release_testing_claim() method
- Pass --testing-feature-id to spawned testing agents
- Use unified [Feature #X] output format for both agent types

Agent Entry Points:
- autonomous_agent_demo.py: Add --testing-feature-id CLI argument
- agent.py: Pass testing_feature_id to get_testing_prompt()

Prompt Templates:
- coding_prompt.template.md: Update to use feature_get_by_id
- testing_prompt.template.md: Update workflow for pre-assigned features
- prompts.py: Update pre-claimed headers for both agent types

WebSocket (server/websocket.py):
- Simplify tracking with unified [Feature #X] pattern
- Remove testing-specific parsing code

Assistant (server/services/assistant_chat_session.py):
- Update help text with current available tools

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Auto
2026-01-22 16:24:48 +02:00
parent 357083dbae
commit b00eef5eca
10 changed files with 277 additions and 564 deletions

View File

@@ -29,9 +29,6 @@ Then use MCP tools to check feature status:
``` ```
# 6. Get progress statistics (passing/total counts) # 6. Get progress statistics (passing/total counts)
Use the feature_get_stats tool Use the feature_get_stats tool
# 7. Get the next feature to work on
Use the feature_get_next tool
``` ```
Understanding the `app_spec.txt` is critical - it contains the full requirements Understanding the `app_spec.txt` is critical - it contains the full requirements
@@ -48,7 +45,7 @@ chmod +x init.sh
Otherwise, start servers manually and document the process. Otherwise, start servers manually and document the process.
### STEP 3: CHOOSE ONE FEATURE TO IMPLEMENT ### STEP 3: GET YOUR ASSIGNED FEATURE
#### TEST-DRIVEN DEVELOPMENT MINDSET (CRITICAL) #### TEST-DRIVEN DEVELOPMENT MINDSET (CRITICAL)
@@ -63,19 +60,16 @@ Features are **test cases** that drive development. This is test-driven developm
- WRONG: "Flashcard page doesn't exist yet" → skip feature - WRONG: "Flashcard page doesn't exist yet" → skip feature
- RIGHT: "Flashcard page doesn't exist yet" → build flashcard page → implement filter → test feature - RIGHT: "Flashcard page doesn't exist yet" → build flashcard page → implement filter → test feature
Get the next feature to implement: **Note:** Your feature has been pre-assigned by the orchestrator. Use `feature_get_by_id` with your assigned feature ID to get the details.
Once you've retrieved the feature, **mark it as in-progress** (if not already):
``` ```
# Get the highest-priority pending feature # Mark feature as in-progress
Use the feature_get_next tool Use the feature_mark_in_progress tool with feature_id={your_assigned_id}
``` ```
Once you've retrieved the feature, **immediately mark it as in-progress**: If you get "already in-progress" error, that's OK - continue with implementation.
```
# Mark feature as in-progress to prevent other sessions from working on it
Use the feature_mark_in_progress tool with feature_id=42
```
Focus on completing one feature perfectly and completing its testing steps in this session before moving on to other features. Focus on completing one feature perfectly and completing its testing steps in this session before moving on to other features.
It's ok if you only complete one feature in this session, as there will be more sessions later that continue to make progress. It's ok if you only complete one feature in this session, as there will be more sessions later that continue to make progress.
@@ -337,10 +331,10 @@ The feature tools exist to reduce token usage. **DO NOT make exploratory queries
# 1. Get progress stats (passing/in_progress/total counts) # 1. Get progress stats (passing/in_progress/total counts)
feature_get_stats feature_get_stats
# 2. Get the NEXT feature to work on (one feature only) # 2. Get your assigned feature details
feature_get_next feature_get_by_id with feature_id={your_assigned_id}
# 3. Mark a feature as in-progress (call immediately after feature_get_next) # 3. Mark a feature as in-progress
feature_mark_in_progress with feature_id={id} feature_mark_in_progress with feature_id={id}
# 4. Mark a feature as passing (after verification) # 4. Mark a feature as passing (after verification)
@@ -349,7 +343,7 @@ feature_mark_passing with feature_id={id}
# 5. Mark a feature as failing (if you discover it's broken) # 5. Mark a feature as failing (if you discover it's broken)
feature_mark_failing with feature_id={id} feature_mark_failing with feature_id={id}
# 6. Skip a feature (moves to end of queue) - ONLY when blocked by dependency # 6. Skip a feature (moves to end of queue) - ONLY when blocked by external dependency
feature_skip with feature_id={id} feature_skip with feature_id={id}
# 7. Clear in-progress status (when abandoning a feature) # 7. Clear in-progress status (when abandoning a feature)
@@ -361,8 +355,9 @@ feature_clear_in_progress with feature_id={id}
- Do NOT try to fetch lists of all features - Do NOT try to fetch lists of all features
- Do NOT query features by category - Do NOT query features by category
- Do NOT list all pending features - Do NOT list all pending features
- Your feature is pre-assigned by the orchestrator - use `feature_get_by_id` to get details
**You do NOT need to see all features.** The feature_get_next tool tells you exactly what to work on. Trust it. **You do NOT need to see all features.** Work on your assigned feature only.
--- ---

View File

@@ -40,17 +40,15 @@ chmod +x init.sh
Otherwise, start servers manually. Otherwise, start servers manually.
### STEP 3: CLAIM A FEATURE TO TEST ### STEP 3: GET YOUR ASSIGNED FEATURE
Atomically claim ONE passing feature for regression testing: Your feature has been pre-assigned by the orchestrator. Use `feature_get_by_id` to get the details:
``` ```
Use the feature_claim_for_testing tool Use the feature_get_by_id tool with feature_id={your_assigned_id}
``` ```
This atomically claims a random passing feature that: The orchestrator has already claimed this feature for testing (set `testing_in_progress=true`).
- Is not being worked on by coding agents
- Is not already being tested by another testing agent
**CRITICAL:** You MUST call `feature_release_testing` when done, regardless of pass/fail. **CRITICAL:** You MUST call `feature_release_testing` when done, regardless of pass/fail.
@@ -157,9 +155,8 @@ echo "[Testing] Session complete - verified/fixed feature #{id}" >> claude-progr
### Feature Management ### Feature Management
- `feature_get_stats` - Get progress overview (passing/in_progress/total counts) - `feature_get_stats` - Get progress overview (passing/in_progress/total counts)
- `feature_claim_for_testing` - **USE THIS** - Atomically claim a feature for testing - `feature_get_by_id` - Get your assigned feature details
- `feature_release_testing` - **REQUIRED** - Release claim after testing (pass tested_ok=true/false) - `feature_release_testing` - **REQUIRED** - Release claim after testing (pass tested_ok=true/false)
- `feature_get_for_regression` - (Legacy) Get random passing features without claiming
- `feature_mark_failing` - Mark a feature as failing (when you find a regression) - `feature_mark_failing` - Mark a feature as failing (when you find a regression)
- `feature_mark_passing` - Mark a feature as passing (after fixing a regression) - `feature_mark_passing` - Mark a feature as passing (after fixing a regression)

View File

@@ -117,6 +117,7 @@ async def run_autonomous_agent(
yolo_mode: bool = False, yolo_mode: bool = False,
feature_id: Optional[int] = None, feature_id: Optional[int] = None,
agent_type: Optional[str] = None, agent_type: Optional[str] = None,
testing_feature_id: Optional[int] = None,
) -> None: ) -> None:
""" """
Run the autonomous agent loop. Run the autonomous agent loop.
@@ -128,6 +129,7 @@ async def run_autonomous_agent(
yolo_mode: If True, skip browser testing in coding agent prompts yolo_mode: If True, skip browser testing in coding agent prompts
feature_id: If set, work only on this specific feature (used by orchestrator for coding agents) feature_id: If set, work only on this specific feature (used by orchestrator for coding agents)
agent_type: Type of agent: "initializer", "coding", "testing", or None (auto-detect) agent_type: Type of agent: "initializer", "coding", "testing", or None (auto-detect)
testing_feature_id: For testing agents, the pre-claimed feature ID to test
""" """
print("\n" + "=" * 70) print("\n" + "=" * 70)
print(" AUTONOMOUS CODING AGENT") print(" AUTONOMOUS CODING AGENT")
@@ -220,7 +222,7 @@ async def run_autonomous_agent(
if agent_type == "initializer": if agent_type == "initializer":
prompt = get_initializer_prompt(project_dir) prompt = get_initializer_prompt(project_dir)
elif agent_type == "testing": elif agent_type == "testing":
prompt = get_testing_prompt(project_dir) prompt = get_testing_prompt(project_dir, testing_feature_id)
elif feature_id: elif feature_id:
# Single-feature mode (used by orchestrator for coding agents) # Single-feature mode (used by orchestrator for coding agents)
prompt = get_single_feature_prompt(feature_id, project_dir, yolo_mode) prompt = get_single_feature_prompt(feature_id, project_dir, yolo_mode)

View File

@@ -141,6 +141,13 @@ Authentication:
help="Agent type (used by orchestrator to spawn specialized subprocesses)", help="Agent type (used by orchestrator to spawn specialized subprocesses)",
) )
parser.add_argument(
"--testing-feature-id",
type=int,
default=None,
help="Feature ID to regression test (used by orchestrator for testing agents)",
)
# Testing agent configuration # Testing agent configuration
parser.add_argument( parser.add_argument(
"--testing-ratio", "--testing-ratio",
@@ -197,6 +204,7 @@ def main() -> None:
yolo_mode=args.yolo, yolo_mode=args.yolo,
feature_id=args.feature_id, feature_id=args.feature_id,
agent_type=args.agent_type, agent_type=args.agent_type,
testing_feature_id=args.testing_feature_id,
) )
) )
else: else:

View File

@@ -54,9 +54,7 @@ def get_playwright_headless() -> bool:
FEATURE_MCP_TOOLS = [ FEATURE_MCP_TOOLS = [
# Core feature operations # Core feature operations
"mcp__features__feature_get_stats", "mcp__features__feature_get_stats",
"mcp__features__feature_get_next", "mcp__features__feature_get_by_id", # Get assigned feature details
"mcp__features__feature_claim_next", # Atomic get+claim for parallel execution
"mcp__features__feature_get_for_regression",
"mcp__features__feature_mark_in_progress", "mcp__features__feature_mark_in_progress",
"mcp__features__feature_mark_passing", "mcp__features__feature_mark_passing",
"mcp__features__feature_mark_failing", # Mark regression detected "mcp__features__feature_mark_failing", # Mark regression detected
@@ -64,11 +62,12 @@ FEATURE_MCP_TOOLS = [
"mcp__features__feature_create_bulk", "mcp__features__feature_create_bulk",
"mcp__features__feature_create", "mcp__features__feature_create",
"mcp__features__feature_clear_in_progress", "mcp__features__feature_clear_in_progress",
"mcp__features__feature_release_testing", # Release testing claim
# Dependency management # Dependency management
"mcp__features__feature_add_dependency", "mcp__features__feature_add_dependency",
"mcp__features__feature_remove_dependency", "mcp__features__feature_remove_dependency",
"mcp__features__feature_set_dependencies", "mcp__features__feature_set_dependencies",
# Parallel execution support # Query tools
"mcp__features__feature_get_ready", "mcp__features__feature_get_ready",
"mcp__features__feature_get_blocked", "mcp__features__feature_get_blocked",
"mcp__features__feature_get_graph", "mcp__features__feature_get_graph",

View File

@@ -3,28 +3,33 @@
MCP Server for Feature Management MCP Server for Feature Management
================================== ==================================
Provides tools to manage features in the autonomous coding system, Provides tools to manage features in the autonomous coding system.
replacing the previous FastAPI-based REST API.
Tools: Tools:
- feature_get_stats: Get progress statistics - feature_get_stats: Get progress statistics
- feature_get_next: Get next feature to implement - feature_get_by_id: Get a specific feature by ID
- feature_get_for_regression: Get random passing features for testing
- feature_mark_passing: Mark a feature as passing - feature_mark_passing: Mark a feature as passing
- feature_mark_failing: Mark a feature as failing (regression detected) - feature_mark_failing: Mark a feature as failing (regression detected)
- feature_skip: Skip a feature (move to end of queue) - feature_skip: Skip a feature (move to end of queue)
- feature_mark_in_progress: Mark a feature as in-progress - feature_mark_in_progress: Mark a feature as in-progress
- feature_clear_in_progress: Clear in-progress status - feature_clear_in_progress: Clear in-progress status
- feature_release_testing: Release testing lock on a feature
- feature_create_bulk: Create multiple features at once - feature_create_bulk: Create multiple features at once
- feature_create: Create a single feature - feature_create: Create a single feature
- feature_add_dependency: Add a dependency between features
- feature_remove_dependency: Remove a dependency
- feature_get_ready: Get features ready to implement
- feature_get_blocked: Get features blocked by dependencies
- feature_get_graph: Get the dependency graph
Note: Feature selection (which feature to work on) is handled by the
orchestrator, not by agents. Agents receive pre-assigned feature IDs.
""" """
import json import json
import os import os
import random
import sys import sys
import threading import threading
import time as _time
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from datetime import datetime, timezone from datetime import datetime, timezone
from pathlib import Path from pathlib import Path
@@ -32,9 +37,6 @@ from typing import Annotated
from mcp.server.fastmcp import FastMCP from mcp.server.fastmcp import FastMCP
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
from sqlalchemy import text
from sqlalchemy.exc import OperationalError
from sqlalchemy.sql.expression import func
# Add parent directory to path so we can import from api module # Add parent directory to path so we can import from api module
sys.path.insert(0, str(Path(__file__).parent.parent)) sys.path.insert(0, str(Path(__file__).parent.parent))
@@ -158,346 +160,32 @@ def feature_get_stats() -> str:
@mcp.tool() @mcp.tool()
def feature_get_next() -> str: def feature_get_by_id(
"""Get the highest-priority pending feature that has all dependencies satisfied. feature_id: Annotated[int, Field(description="The ID of the feature to retrieve", ge=1)]
Returns the feature with the lowest priority number that:
1. Has passes=false and in_progress=false
2. Has all dependency features already passing (or no dependencies)
3. All dependency IDs actually exist (orphaned dependencies are ignored)
For backwards compatibility: if all pending features are blocked by dependencies,
falls back to returning the first pending feature (same as before dependencies).
Returns:
JSON with feature details (id, priority, category, name, description, steps, passes,
in_progress, dependencies) or error message if all features are passing.
"""
session = get_session()
try:
all_features = session.query(Feature).all()
all_feature_ids = {f.id for f in all_features}
passing_ids = {f.id for f in all_features if f.passes}
# Get pending, non-in-progress features
pending = [f for f in all_features if not f.passes and not f.in_progress]
# Sort by scheduling score (higher = first), then priority, then id
all_dicts = [f.to_dict() for f in all_features]
scores = compute_scheduling_scores(all_dicts)
pending.sort(key=lambda f: (-scores.get(f.id, 0), f.priority, f.id))
if not pending:
if any(f.in_progress for f in all_features if not f.passes):
return json.dumps({"error": "All pending features are in progress by other agents"})
return json.dumps({"error": "All features are passing! No more work to do."})
# Find first feature with satisfied dependencies
for feature in pending:
deps = feature.dependencies or []
# Filter out orphaned dependencies (IDs that no longer exist)
valid_deps = [d for d in deps if d in all_feature_ids]
if all(dep_id in passing_ids for dep_id in valid_deps):
return json.dumps(feature.to_dict(), indent=2)
# All pending features are blocked by unmet dependencies
# Return error with details about what's blocking progress
blocking_info = []
for feature in pending[:3]: # Show first 3 blocked features
deps = feature.dependencies or []
valid_deps = [d for d in deps if d in all_feature_ids]
orphaned = [d for d in deps if d not in all_feature_ids]
unmet = [d for d in valid_deps if d not in passing_ids]
info = f"#{feature.id} '{feature.name}'"
if unmet:
info += f" blocked by: {unmet}"
if orphaned:
info += f" (orphaned deps ignored: {orphaned})"
blocking_info.append(info)
return json.dumps({
"error": "All pending features are blocked by unmet dependencies",
"blocked_features": len(pending),
"examples": blocking_info,
"hint": "Complete the blocking dependencies first, or remove invalid dependencies"
}, indent=2)
finally:
session.close()
# Maximum retry attempts for feature claiming under contention
MAX_CLAIM_RETRIES = 5
def _feature_claim_next_internal() -> str:
"""Internal implementation of feature claiming with iterative retry.
Uses an iterative loop instead of recursion to avoid double session.close()
issues and deep call stacks under contention.
Returns:
JSON with claimed feature details, or error message if no feature available.
"""
for attempt in range(MAX_CLAIM_RETRIES):
session = get_session()
try:
# Use a lock to prevent concurrent claims within this process
with _priority_lock:
all_features = session.query(Feature).all()
all_feature_ids = {f.id for f in all_features}
passing_ids = {f.id for f in all_features if f.passes}
# Get pending, non-in-progress features
pending = [f for f in all_features if not f.passes and not f.in_progress]
# Sort by scheduling score (higher = first), then priority, then id
all_dicts = [f.to_dict() for f in all_features]
scores = compute_scheduling_scores(all_dicts)
pending.sort(key=lambda f: (-scores.get(f.id, 0), f.priority, f.id))
if not pending:
if any(f.in_progress for f in all_features if not f.passes):
return json.dumps({"error": "All pending features are in progress by other agents"})
return json.dumps({"error": "All features are passing! No more work to do."})
# Find first feature with satisfied dependencies
candidate_id = None
for feature in pending:
deps = feature.dependencies or []
# Filter out orphaned dependencies (IDs that no longer exist)
valid_deps = [d for d in deps if d in all_feature_ids]
if all(dep_id in passing_ids for dep_id in valid_deps):
candidate_id = feature.id
break
if candidate_id is None:
# All pending features are blocked by unmet dependencies
blocking_info = []
for feature in pending[:3]:
deps = feature.dependencies or []
valid_deps = [d for d in deps if d in all_feature_ids]
orphaned = [d for d in deps if d not in all_feature_ids]
unmet = [d for d in valid_deps if d not in passing_ids]
info = f"#{feature.id} '{feature.name}'"
if unmet:
info += f" blocked by: {unmet}"
if orphaned:
info += f" (orphaned deps ignored: {orphaned})"
blocking_info.append(info)
return json.dumps({
"error": "All pending features are blocked by unmet dependencies",
"blocked_features": len(pending),
"examples": blocking_info,
"hint": "Complete the blocking dependencies first, or remove invalid dependencies"
}, indent=2)
# Atomic claim: UPDATE only if still claimable
# This prevents race conditions even across processes
result = session.execute(
text("""
UPDATE features
SET in_progress = 1
WHERE id = :feature_id
AND in_progress = 0
AND passes = 0
"""),
{"feature_id": candidate_id}
)
session.commit()
# Check if we actually claimed it
if result.rowcount == 0:
# Another process claimed it first - will retry after backoff
pass # Fall through to finally block, then retry loop
else:
# Successfully claimed - fetch and return
session.expire_all() # Clear cache to get fresh data
claimed_feature = session.query(Feature).filter(Feature.id == candidate_id).first()
return json.dumps(claimed_feature.to_dict(), indent=2)
except OperationalError:
# Transient database error (e.g., SQLITE_BUSY) - rollback and retry
session.rollback()
# Fall through to backoff and retry
except Exception as e:
# Non-transient error - fail immediately
session.rollback()
return json.dumps({"error": f"Failed to claim feature: {str(e)}"})
finally:
session.close()
# Exponential backoff with jitter before next attempt
# Base 0.1s, 0.2s, 0.4s, 0.8s, 1.0s (capped)
# Jitter of up to 30% prevents synchronized retries under high contention
backoff = min(0.1 * (2 ** attempt), 1.0)
jitter = random.uniform(0, backoff * 0.3)
_time.sleep(backoff + jitter)
# Exhausted all retries
return json.dumps({
"error": "Failed to claim feature after maximum retries",
"hint": "High contention detected - try again or reduce parallel agents"
})
@mcp.tool()
def feature_claim_next() -> str:
"""Atomically get and claim the next available feature.
This combines feature_get_next() and feature_mark_in_progress() in a single
atomic operation, preventing race conditions where two agents could claim
the same feature.
Returns the feature with the lowest priority number that:
1. Has passes=false and in_progress=false
2. Has all dependency features already passing (or no dependencies)
3. All dependency IDs actually exist (orphaned dependencies are ignored)
On success, the feature's in_progress flag is set to True.
Uses exponential backoff retry (up to 5 attempts) under contention.
Returns:
JSON with claimed feature details, or error message if no feature available.
"""
return _feature_claim_next_internal()
@mcp.tool()
def feature_get_for_regression(
limit: Annotated[int, Field(default=3, ge=1, le=10, description="Maximum number of passing features to return")] = 3
) -> str: ) -> str:
"""Get random passing features for regression testing. """Get a specific feature by its ID.
Returns a random selection of features that are currently passing Returns the full details of a feature including its name, description,
and NOT currently in progress (to avoid conflicts with coding agents). verification steps, and current status.
Use this to verify that previously implemented features still work
after making changes.
Args: Args:
limit: Maximum number of features to return (1-10, default 3) feature_id: The ID of the feature to retrieve
Returns: Returns:
JSON with: features (list of feature objects), count (int) JSON with feature details, or error if not found.
""" """
session = get_session() session = get_session()
try: try:
features = ( feature = session.query(Feature).filter(Feature.id == feature_id).first()
session.query(Feature)
.filter(Feature.passes == True)
.filter(Feature.in_progress == False) # Avoid conflicts with coding agents
.order_by(func.random())
.limit(limit)
.all()
)
return json.dumps({ if feature is None:
"features": [f.to_dict() for f in features], return json.dumps({"error": f"Feature with ID {feature_id} not found"})
"count": len(features)
}, indent=2) return json.dumps(feature.to_dict(), indent=2)
finally: finally:
session.close() session.close()
def _feature_claim_for_testing_internal() -> str:
"""Internal implementation of testing feature claim with iterative retry.
Uses an iterative loop instead of recursion to avoid double session.close()
issues and deep call stacks under contention.
Returns:
JSON with claimed feature details, or message if no features available.
"""
for attempt in range(MAX_CLAIM_RETRIES):
session = get_session()
try:
# Use lock to prevent concurrent claims within this process
with _priority_lock:
# Find a candidate feature
candidate = (
session.query(Feature)
.filter(Feature.passes == True)
.filter(Feature.in_progress == False)
.filter(Feature.testing_in_progress == False)
.order_by(func.random())
.first()
)
if not candidate:
return json.dumps({
"message": "No features available for testing",
"hint": "All passing features are either being coded or tested"
})
# Atomic claim using UPDATE with WHERE clause
# This prevents race conditions even across processes
result = session.execute(
text("""
UPDATE features
SET testing_in_progress = 1
WHERE id = :feature_id
AND passes = 1
AND in_progress = 0
AND testing_in_progress = 0
"""),
{"feature_id": candidate.id}
)
session.commit()
# Check if we actually claimed it
if result.rowcount == 0:
# Another process claimed it first - will retry after backoff
pass # Fall through to finally block, then retry loop
else:
# Successfully claimed - fetch and return
session.expire_all()
claimed = session.query(Feature).filter(Feature.id == candidate.id).first()
return json.dumps(claimed.to_dict(), indent=2)
except OperationalError:
# Transient database error (e.g., SQLITE_BUSY) - rollback and retry
session.rollback()
# Fall through to backoff and retry
except Exception as e:
# Non-transient error - fail immediately
session.rollback()
return json.dumps({"error": f"Failed to claim feature: {str(e)}"})
finally:
session.close()
# Exponential backoff with jitter before next attempt
backoff = min(0.1 * (2 ** attempt), 1.0)
jitter = random.uniform(0, backoff * 0.3)
_time.sleep(backoff + jitter)
# Exhausted all retries
return json.dumps({
"error": "Failed to claim feature after maximum retries",
"hint": "High contention detected - try again or reduce testing agents"
})
@mcp.tool()
def feature_claim_for_testing() -> str:
"""Atomically claim a passing feature for regression testing.
Returns a random passing feature that is:
- Currently passing (passes=True)
- Not being worked on by coding agents (in_progress=False)
- Not already being tested (testing_in_progress=False)
The feature's testing_in_progress flag is set to True atomically to prevent
other testing agents from claiming the same feature. Uses exponential backoff
retry (up to 5 attempts) under contention.
After testing, you MUST call feature_release_testing() to release the claim.
Returns:
JSON with feature details if available, or message if no features available.
"""
return _feature_claim_for_testing_internal()
@mcp.tool() @mcp.tool()
def feature_release_testing( def feature_release_testing(
feature_id: Annotated[int, Field(description="The ID of the feature to release", ge=1)], feature_id: Annotated[int, Field(description="The ID of the feature to release", ge=1)],
@@ -692,10 +380,10 @@ def feature_skip(
def feature_mark_in_progress( def feature_mark_in_progress(
feature_id: Annotated[int, Field(description="The ID of the feature to mark as in-progress", ge=1)] feature_id: Annotated[int, Field(description="The ID of the feature to mark as in-progress", ge=1)]
) -> str: ) -> str:
"""Mark a feature as in-progress. Call immediately after feature_get_next(). """Mark a feature as in-progress.
This prevents other agent sessions from working on the same feature. This prevents other agent sessions from working on the same feature.
Use this as soon as you retrieve a feature to work on. Call this after getting your assigned feature details with feature_get_by_id.
Args: Args:
feature_id: The ID of the feature to mark as in-progress feature_id: The ID of the feature to mark as in-progress

View File

@@ -173,8 +173,8 @@ class ParallelOrchestrator:
self._lock = threading.Lock() self._lock = threading.Lock()
# Coding agents: feature_id -> process # Coding agents: feature_id -> process
self.running_coding_agents: dict[int, subprocess.Popen] = {} self.running_coding_agents: dict[int, subprocess.Popen] = {}
# Testing agents: list of processes (not tied to specific features) # Testing agents: feature_id -> process (feature being tested)
self.running_testing_agents: list[subprocess.Popen] = [] self.running_testing_agents: dict[int, subprocess.Popen] = {}
# Legacy alias for backward compatibility # Legacy alias for backward compatibility
self.running_agents = self.running_coding_agents self.running_agents = self.running_coding_agents
self.abort_events: dict[int, threading.Event] = {} self.abort_events: dict[int, threading.Event] = {}
@@ -193,6 +193,75 @@ class ParallelOrchestrator:
"""Get a new database session.""" """Get a new database session."""
return self._session_maker() return self._session_maker()
def claim_feature_for_testing(self) -> int | None:
"""Claim a random passing feature for regression testing.
Returns the feature ID if successful, None if no features available.
Sets testing_in_progress=True on the claimed feature.
"""
session = self.get_session()
try:
from sqlalchemy.sql.expression import func
# Find a passing feature that's not being worked on
# Exclude features already being tested by this orchestrator
with self._lock:
testing_feature_ids = set(self.running_testing_agents.keys())
candidate = (
session.query(Feature)
.filter(Feature.passes == True)
.filter(Feature.in_progress == False)
.filter(Feature.testing_in_progress == False)
.filter(~Feature.id.in_(testing_feature_ids) if testing_feature_ids else True)
.order_by(func.random())
.first()
)
if not candidate:
return None
# Atomic claim using UPDATE with WHERE clause
result = session.execute(
text("""
UPDATE features
SET testing_in_progress = 1
WHERE id = :feature_id
AND passes = 1
AND in_progress = 0
AND testing_in_progress = 0
"""),
{"feature_id": candidate.id}
)
session.commit()
if result.rowcount == 0:
# Another process claimed it
return None
return candidate.id
except Exception as e:
session.rollback()
debug_log.log("TESTING", f"Failed to claim feature for testing: {e}")
return None
finally:
session.close()
def release_testing_claim(self, feature_id: int):
"""Release a testing claim on a feature (called when testing agent exits)."""
session = self.get_session()
try:
session.execute(
text("UPDATE features SET testing_in_progress = 0 WHERE id = :feature_id"),
{"feature_id": feature_id}
)
session.commit()
except Exception as e:
session.rollback()
debug_log.log("TESTING", f"Failed to release testing claim for feature {feature_id}: {e}")
finally:
session.close()
def get_resumable_features(self) -> list[dict]: def get_resumable_features(self) -> list[dict]:
"""Get features that were left in_progress from a previous session. """Get features that were left in_progress from a previous session.
@@ -563,13 +632,11 @@ class ParallelOrchestrator:
def _spawn_testing_agent(self) -> tuple[bool, str]: def _spawn_testing_agent(self) -> tuple[bool, str]:
"""Spawn a testing agent subprocess for regression testing. """Spawn a testing agent subprocess for regression testing.
CRITICAL: Lock is held during the entire spawn operation to prevent Claims a feature BEFORE spawning the agent (same pattern as coding agents).
TOCTOU race conditions where multiple threads could pass limit checks This ensures we know which feature is being tested for UI display.
and spawn excess agents.
""" """
# Hold lock for entire operation to prevent TOCTOU race # Check limits first (under lock)
with self._lock: with self._lock:
# Check limits
current_testing_count = len(self.running_testing_agents) current_testing_count = len(self.running_testing_agents)
if current_testing_count >= self.max_concurrency: if current_testing_count >= self.max_concurrency:
debug_log.log("TESTING", f"Skipped spawn - at max testing agents ({current_testing_count}/{self.max_concurrency})") debug_log.log("TESTING", f"Skipped spawn - at max testing agents ({current_testing_count}/{self.max_concurrency})")
@@ -579,7 +646,21 @@ class ParallelOrchestrator:
debug_log.log("TESTING", f"Skipped spawn - at max total agents ({total_agents}/{MAX_TOTAL_AGENTS})") debug_log.log("TESTING", f"Skipped spawn - at max total agents ({total_agents}/{MAX_TOTAL_AGENTS})")
return False, f"At max total agents ({total_agents})" return False, f"At max total agents ({total_agents})"
debug_log.log("TESTING", "Attempting to spawn testing agent subprocess") # Claim a feature for testing (outside lock to avoid holding during DB ops)
feature_id = self.claim_feature_for_testing()
if feature_id is None:
debug_log.log("TESTING", "No features available for testing")
return False, "No features available for testing"
debug_log.log("TESTING", f"Claimed feature #{feature_id} for testing")
# Now spawn with the claimed feature ID
with self._lock:
# Re-check limits in case another thread spawned while we were claiming
current_testing_count = len(self.running_testing_agents)
if current_testing_count >= self.max_concurrency:
self.release_testing_claim(feature_id)
return False, f"At max testing agents ({current_testing_count})"
cmd = [ cmd = [
sys.executable, sys.executable,
@@ -588,10 +669,10 @@ class ParallelOrchestrator:
"--project-dir", str(self.project_dir), "--project-dir", str(self.project_dir),
"--max-iterations", "1", "--max-iterations", "1",
"--agent-type", "testing", "--agent-type", "testing",
"--testing-feature-id", str(feature_id),
] ]
if self.model: if self.model:
cmd.extend(["--model", self.model]) cmd.extend(["--model", self.model])
# Testing agents don't need --yolo flag (they use testing prompt regardless)
try: try:
proc = subprocess.Popen( proc = subprocess.Popen(
@@ -604,25 +685,26 @@ class ParallelOrchestrator:
) )
except Exception as e: except Exception as e:
debug_log.log("TESTING", f"FAILED to spawn testing agent: {e}") debug_log.log("TESTING", f"FAILED to spawn testing agent: {e}")
self.release_testing_claim(feature_id)
return False, f"Failed to start testing agent: {e}" return False, f"Failed to start testing agent: {e}"
# Register process immediately while still holding lock # Register process with feature ID (same pattern as coding agents)
self.running_testing_agents.append(proc) self.running_testing_agents[feature_id] = proc
testing_count = len(self.running_testing_agents) testing_count = len(self.running_testing_agents)
# Start output reader thread (feature_id=None for testing agents) # Start output reader thread with feature ID (same as coding agents)
# This can be outside lock since process is already registered
threading.Thread( threading.Thread(
target=self._read_output, target=self._read_output,
args=(None, proc, threading.Event(), "testing"), args=(feature_id, proc, threading.Event(), "testing"),
daemon=True daemon=True
).start() ).start()
print(f"Started testing agent (PID {proc.pid})", flush=True) print(f"Started testing agent for feature #{feature_id} (PID {proc.pid})", flush=True)
debug_log.log("TESTING", "Successfully spawned testing agent", debug_log.log("TESTING", f"Successfully spawned testing agent for feature #{feature_id}",
pid=proc.pid, pid=proc.pid,
feature_id=feature_id,
total_testing_agents=testing_count) total_testing_agents=testing_count)
return True, "Started testing agent" return True, f"Started testing agent for feature #{feature_id}"
async def _run_initializer(self) -> bool: async def _run_initializer(self) -> bool:
"""Run initializer agent as blocking subprocess. """Run initializer agent as blocking subprocess.
@@ -706,10 +788,8 @@ class ParallelOrchestrator:
if self.on_output: if self.on_output:
self.on_output(feature_id or 0, line) self.on_output(feature_id or 0, line)
else: else:
if agent_type == "testing": # Both coding and testing agents now use [Feature #X] format
print(f"[Testing] {line}", flush=True) print(f"[Feature #{feature_id}] {line}", flush=True)
else:
print(f"[Feature #{feature_id}] {line}", flush=True)
proc.wait() proc.wait()
finally: finally:
self._on_agent_complete(feature_id, proc.returncode, agent_type, proc) self._on_agent_complete(feature_id, proc.returncode, agent_type, proc)
@@ -730,17 +810,27 @@ class ParallelOrchestrator:
is safe. is safe.
For testing agents: For testing agents:
- Just remove from the running list. - Remove from running dict and release testing claim on feature.
""" """
if agent_type == "testing": if agent_type == "testing":
with self._lock: with self._lock:
if proc in self.running_testing_agents: # Remove from dict by finding the feature_id for this proc
self.running_testing_agents.remove(proc) found_feature_id = None
for fid, p in list(self.running_testing_agents.items()):
if p is proc:
found_feature_id = fid
del self.running_testing_agents[fid]
break
# Release testing claim on the feature
if found_feature_id is not None:
self.release_testing_claim(found_feature_id)
status = "completed" if return_code == 0 else "failed" status = "completed" if return_code == 0 else "failed"
print(f"Testing agent (PID {proc.pid}) {status}", flush=True) print(f"Feature #{feature_id} testing {status}", flush=True)
debug_log.log("COMPLETE", "Testing agent finished", debug_log.log("COMPLETE", f"Testing agent for feature #{feature_id} finished",
pid=proc.pid, pid=proc.pid,
feature_id=feature_id,
status=status) status=status)
return return
@@ -846,11 +936,12 @@ class ParallelOrchestrator:
# Stop testing agents # Stop testing agents
with self._lock: with self._lock:
testing_procs = list(self.running_testing_agents) testing_items = list(self.running_testing_agents.items())
for proc in testing_procs: for feature_id, proc in testing_items:
result = kill_process_tree(proc, timeout=5.0) result = kill_process_tree(proc, timeout=5.0)
debug_log.log("STOP", f"Killed testing agent PID {proc.pid} process tree", self.release_testing_claim(feature_id)
debug_log.log("STOP", f"Killed testing agent for feature #{feature_id} (PID {proc.pid})",
status=result.status, children_found=result.children_found, status=result.status, children_found=result.children_found,
children_terminated=result.children_terminated, children_killed=result.children_killed) children_terminated=result.children_terminated, children_killed=result.children_killed)

View File

@@ -74,9 +74,38 @@ def get_coding_prompt(project_dir: Path | None = None) -> str:
return load_prompt("coding_prompt", project_dir) return load_prompt("coding_prompt", project_dir)
def get_testing_prompt(project_dir: Path | None = None) -> str: def get_testing_prompt(project_dir: Path | None = None, testing_feature_id: int | None = None) -> str:
"""Load the testing agent prompt (project-specific if available).""" """Load the testing agent prompt (project-specific if available).
return load_prompt("testing_prompt", project_dir)
Args:
project_dir: Optional project directory for project-specific prompts
testing_feature_id: If provided, the pre-assigned feature ID to test.
The orchestrator claims the feature before spawning the agent.
Returns:
The testing prompt, with pre-assigned feature instructions if applicable.
"""
base_prompt = load_prompt("testing_prompt", project_dir)
if testing_feature_id is not None:
# Prepend pre-assigned feature instructions
pre_assigned_header = f"""## ASSIGNED FEATURE
**You are assigned to regression test Feature #{testing_feature_id}.**
The orchestrator has already claimed this feature for you.
### Your workflow:
1. Call `feature_get_by_id` with ID {testing_feature_id} to get the feature details
2. Verify the feature through the UI using browser automation
3. When done, call `feature_release_testing` with feature_id={testing_feature_id}
---
"""
return pre_assigned_header + base_prompt
return base_prompt
def get_single_feature_prompt(feature_id: int, project_dir: Path | None = None, yolo_mode: bool = False) -> str: def get_single_feature_prompt(feature_id: int, project_dir: Path | None = None, yolo_mode: bool = False) -> str:
@@ -100,26 +129,26 @@ def get_single_feature_prompt(feature_id: int, project_dir: Path | None = None,
base_prompt = get_coding_prompt(project_dir) base_prompt = get_coding_prompt(project_dir)
# Prepend single-feature instructions # Prepend single-feature instructions
single_feature_header = f"""## SINGLE FEATURE MODE single_feature_header = f"""## ASSIGNED FEATURE
**CRITICAL: You are assigned to work on Feature #{feature_id} ONLY.** **You are assigned to work on Feature #{feature_id} ONLY.**
This session is part of a parallel execution where multiple agents work on different features simultaneously. You MUST: This session is part of a parallel execution where multiple agents work on different features simultaneously.
1. **Skip the `feature_get_next` step** - Your feature is already assigned: #{feature_id} ### Your workflow:
2. **Immediately mark feature #{feature_id} as in-progress** using `feature_mark_in_progress`
3. **Focus ONLY on implementing and testing feature #{feature_id}**
4. **Do NOT work on any other features** - other agents are handling them
When you complete feature #{feature_id}: 1. **Get feature details** using `feature_get_by_id` with ID {feature_id}
- Mark it as passing with `feature_mark_passing` 2. **Mark as in-progress** using `feature_mark_in_progress` with ID {feature_id}
- Commit your changes - If you get "already in-progress" error, that's OK - continue with implementation
- End the session 3. **Implement the feature** following the steps from the feature details
4. **Test your implementation** to verify it works correctly
5. **Mark as passing** using `feature_mark_passing` with ID {feature_id}
6. **Commit your changes** and end the session
If you cannot complete feature #{feature_id} due to a blocker: ### Important rules:
- Use `feature_skip` to move it to the end of the queue
- Document the blocker in claude-progress.txt - **Do NOT** work on any other features - other agents are handling them
- End the session - If blocked, use `feature_skip` and document the blocker in claude-progress.txt
--- ---

View File

@@ -47,8 +47,9 @@ API_ENV_VARS = [
# Read-only feature MCP tools # Read-only feature MCP tools
READONLY_FEATURE_MCP_TOOLS = [ READONLY_FEATURE_MCP_TOOLS = [
"mcp__features__feature_get_stats", "mcp__features__feature_get_stats",
"mcp__features__feature_get_next", "mcp__features__feature_get_by_id",
"mcp__features__feature_get_for_regression", "mcp__features__feature_get_ready",
"mcp__features__feature_get_blocked",
] ]
# Feature management tools (create/skip but not mark_passing) # Feature management tools (create/skip but not mark_passing)
@@ -124,8 +125,9 @@ If the user asks you to modify code, explain that you're a project assistant and
**Feature Management:** **Feature Management:**
- **feature_get_stats**: Get feature completion progress - **feature_get_stats**: Get feature completion progress
- **feature_get_next**: See the next pending feature - **feature_get_by_id**: Get details for a specific feature
- **feature_get_for_regression**: See passing features for testing - **feature_get_ready**: See features ready for implementation
- **feature_get_blocked**: See features blocked by dependencies
- **feature_create**: Create a single feature in the backlog - **feature_create**: Create a single feature in the backlog
- **feature_create_bulk**: Create multiple features at once - **feature_create_bulk**: Create multiple features at once
- **feature_skip**: Move a feature to the end of the queue - **feature_skip**: Move a feature to the end of the queue

View File

@@ -24,11 +24,17 @@ _count_passing_tests = None
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# Pattern to extract feature ID from parallel orchestrator output (coding agents) # Pattern to extract feature ID from parallel orchestrator output
# Both coding and testing agents now use the same [Feature #X] format
FEATURE_ID_PATTERN = re.compile(r'\[Feature #(\d+)\]\s*(.*)') FEATURE_ID_PATTERN = re.compile(r'\[Feature #(\d+)\]\s*(.*)')
# Pattern to extract testing agent output # Pattern to detect testing agent start message (includes feature ID)
TESTING_AGENT_PATTERN = re.compile(r'\[Testing\]\s*(.*)') # Matches: "Started testing agent for feature #123 (PID xxx)"
TESTING_AGENT_START_PATTERN = re.compile(r'Started testing agent for feature #(\d+)')
# Pattern to detect testing agent completion
# Matches: "Feature #123 testing completed" or "Feature #123 testing failed"
TESTING_AGENT_COMPLETE_PATTERN = re.compile(r'Feature #(\d+) testing (completed|failed)')
# Patterns for detecting agent activity and thoughts # Patterns for detecting agent activity and thoughts
THOUGHT_PATTERNS = [ THOUGHT_PATTERNS = [
@@ -50,14 +56,14 @@ THOUGHT_PATTERNS = [
class AgentTracker: class AgentTracker:
"""Tracks active agents and their states for multi-agent mode.""" """Tracks active agents and their states for multi-agent mode.
# Use a special key for the testing agent since it doesn't have a fixed feature ID Both coding and testing agents are now tracked by their feature ID.
TESTING_AGENT_KEY = -1 The agent_type field distinguishes between them.
"""
def __init__(self): def __init__(self):
# feature_id -> {name, state, last_thought, agent_index, agent_type} # feature_id -> {name, state, last_thought, agent_index, agent_type}
# For testing agents, use TESTING_AGENT_KEY as the key
self.active_agents: dict[int, dict] = {} self.active_agents: dict[int, dict] = {}
self._next_agent_index = 0 self._next_agent_index = 0
self._lock = asyncio.Lock() self._lock = asyncio.Lock()
@@ -68,35 +74,43 @@ class AgentTracker:
Returns None if no update should be emitted. Returns None if no update should be emitted.
""" """
# Check for testing agent output first # Check for orchestrator status messages first
testing_match = TESTING_AGENT_PATTERN.match(line) # These don't have [Feature #X] prefix
if testing_match:
content = testing_match.group(1)
return await self._process_testing_agent_line(content)
# Check for feature-specific output (coding agents) # Coding agent start: "Started coding agent for feature #X"
if line.startswith("Started coding agent for feature #"):
try:
feature_id = int(re.search(r'#(\d+)', line).group(1))
return await self._handle_agent_start(feature_id, line, agent_type="coding")
except (AttributeError, ValueError):
pass
# Testing agent start: "Started testing agent for feature #X (PID xxx)"
testing_start_match = TESTING_AGENT_START_PATTERN.match(line)
if testing_start_match:
feature_id = int(testing_start_match.group(1))
return await self._handle_agent_start(feature_id, line, agent_type="testing")
# Testing agent complete: "Feature #X testing completed/failed"
testing_complete_match = TESTING_AGENT_COMPLETE_PATTERN.match(line)
if testing_complete_match:
feature_id = int(testing_complete_match.group(1))
is_success = testing_complete_match.group(2) == "completed"
return await self._handle_agent_complete(feature_id, is_success)
# Coding agent complete: "Feature #X completed/failed" (without "testing" keyword)
if line.startswith("Feature #") and ("completed" in line or "failed" in line) and "testing" not in line:
try:
feature_id = int(re.search(r'#(\d+)', line).group(1))
is_success = "completed" in line
return await self._handle_agent_complete(feature_id, is_success)
except (AttributeError, ValueError):
pass
# Check for feature-specific output lines: [Feature #X] content
# Both coding and testing agents use this format now
match = FEATURE_ID_PATTERN.match(line) match = FEATURE_ID_PATTERN.match(line)
if not match: if not match:
# Also check for orchestrator status messages
if line.startswith("Started coding agent for feature #"):
try:
feature_id = int(re.search(r'#(\d+)', line).group(1))
return await self._handle_agent_start(feature_id, line, agent_type="coding")
except (AttributeError, ValueError):
pass
elif line.startswith("Started testing agent"):
return await self._handle_testing_agent_start(line)
elif line.startswith("Feature #") and ("completed" in line or "failed" in line):
try:
feature_id = int(re.search(r'#(\d+)', line).group(1))
is_success = "completed" in line
return await self._handle_agent_complete(feature_id, is_success)
except (AttributeError, ValueError):
pass
elif line.startswith("Testing agent") and ("completed" in line or "failed" in line):
# Format: "Testing agent (PID xxx) completed" or "Testing agent (PID xxx) failed"
is_success = "completed" in line
return await self._handle_testing_agent_complete(is_success)
return None return None
feature_id = int(match.group(1)) feature_id = int(match.group(1))
@@ -149,118 +163,6 @@ class AgentTracker:
return None return None
async def _process_testing_agent_line(self, content: str) -> dict | None:
"""Process output from a testing agent."""
async with self._lock:
# Ensure testing agent is tracked
if self.TESTING_AGENT_KEY not in self.active_agents:
agent_index = self._next_agent_index
self._next_agent_index += 1
self.active_agents[self.TESTING_AGENT_KEY] = {
'name': AGENT_MASCOTS[agent_index % len(AGENT_MASCOTS)],
'agent_index': agent_index,
'agent_type': 'testing',
'state': 'testing',
'feature_name': 'Regression Testing',
'last_thought': None,
}
agent = self.active_agents[self.TESTING_AGENT_KEY]
# Detect state and thought from content
state = 'testing'
thought = None
for pattern, detected_state in THOUGHT_PATTERNS:
m = pattern.search(content)
if m:
state = detected_state
thought = m.group(1) if m.lastindex else content[:100]
break
# Only emit update if state changed or we have a new thought
if state != agent['state'] or thought != agent['last_thought']:
agent['state'] = state
if thought:
agent['last_thought'] = thought
return {
'type': 'agent_update',
'agentIndex': agent['agent_index'],
'agentName': agent['name'],
'agentType': 'testing',
'featureId': 0, # Testing agents work on random features
'featureName': agent['feature_name'],
'state': state,
'thought': thought,
'timestamp': datetime.now().isoformat(),
}
return None
async def _handle_testing_agent_start(self, line: str) -> dict | None:
"""Handle testing agent start message from orchestrator.
Reuses existing testing agent entry if present to avoid ghost agents in UI.
"""
async with self._lock:
# Reuse existing testing agent entry if present
existing = self.active_agents.get(self.TESTING_AGENT_KEY)
if existing:
agent_index = existing['agent_index']
agent_name = existing['name']
else:
agent_index = self._next_agent_index
self._next_agent_index += 1
agent_name = AGENT_MASCOTS[agent_index % len(AGENT_MASCOTS)]
self.active_agents[self.TESTING_AGENT_KEY] = {
'name': agent_name,
'agent_index': agent_index,
'agent_type': 'testing',
'state': 'testing',
'feature_name': 'Regression Testing',
'last_thought': 'Starting regression tests...',
}
return {
'type': 'agent_update',
'agentIndex': agent_index,
'agentName': agent_name,
'agentType': 'testing',
'featureId': 0,
'featureName': 'Regression Testing',
'state': 'testing',
'thought': 'Starting regression tests...',
'timestamp': datetime.now().isoformat(),
}
async def _handle_testing_agent_complete(self, is_success: bool) -> dict | None:
"""Handle testing agent completion."""
async with self._lock:
if self.TESTING_AGENT_KEY not in self.active_agents:
return None
agent = self.active_agents[self.TESTING_AGENT_KEY]
state = 'success' if is_success else 'error'
result = {
'type': 'agent_update',
'agentIndex': agent['agent_index'],
'agentName': agent['name'],
'agentType': 'testing',
'featureId': 0,
'featureName': agent['feature_name'],
'state': state,
'thought': 'Tests passed!' if is_success else 'Found regressions',
'timestamp': datetime.now().isoformat(),
}
# Remove from active agents
del self.active_agents[self.TESTING_AGENT_KEY]
return result
async def get_agent_info(self, feature_id: int) -> tuple[int | None, str | None]: async def get_agent_info(self, feature_id: int) -> tuple[int | None, str | None]:
"""Get agent index and name for a feature ID. """Get agent index and name for a feature ID.