diff --git a/api/database.py b/api/database.py index af2fd01..f3a0cce 100644 --- a/api/database.py +++ b/api/database.py @@ -57,10 +57,6 @@ class Feature(Base): # Dependencies: list of feature IDs that must be completed before this feature # NULL/empty = no dependencies (backwards compatible) dependencies = Column(JSON, nullable=True, default=None) - # Regression testing: prevent concurrent testing of the same feature - testing_in_progress = Column(Boolean, nullable=False, default=False, index=True) - # Last time this feature was tested (for session-based regression tracking) - last_tested_at = Column(DateTime, nullable=True, default=None) def to_dict(self) -> dict: """Convert feature to dictionary for JSON serialization.""" @@ -76,9 +72,6 @@ class Feature(Base): "in_progress": self.in_progress if self.in_progress is not None else False, # Dependencies: NULL/empty treated as empty list for backwards compat "dependencies": self.dependencies if self.dependencies else [], - # Regression testing fields - "testing_in_progress": self.testing_in_progress if self.testing_in_progress is not None else False, - "last_tested_at": self.last_tested_at.isoformat() if self.last_tested_at else None, } def get_dependencies_safe(self) -> list[int]: @@ -240,23 +233,18 @@ def _migrate_add_dependencies_column(engine) -> None: def _migrate_add_testing_columns(engine) -> None: - """Add testing_in_progress and last_tested_at columns for regression testing. + """Legacy migration - no longer adds testing columns. - These columns support atomic claiming of features for regression testing - and tracking when features were last tested in a session. + The testing_in_progress and last_tested_at columns were removed from the + Feature model as part of simplifying the testing agent architecture. + Multiple testing agents can now test the same feature concurrently + without coordination. + + This function is kept for backwards compatibility but does nothing. + Existing databases with these columns will continue to work - the columns + are simply ignored. """ - with engine.connect() as conn: - # Check existing columns - result = conn.execute(text("PRAGMA table_info(features)")) - columns = [row[1] for row in result.fetchall()] - - if "testing_in_progress" not in columns: - conn.execute(text("ALTER TABLE features ADD COLUMN testing_in_progress BOOLEAN DEFAULT 0")) - conn.commit() - - if "last_tested_at" not in columns: - conn.execute(text("ALTER TABLE features ADD COLUMN last_tested_at DATETIME DEFAULT NULL")) - conn.commit() + pass def _is_network_path(path: Path) -> bool: diff --git a/mcp_server/feature_mcp.py b/mcp_server/feature_mcp.py index 3c25001..a394f1e 100755 --- a/mcp_server/feature_mcp.py +++ b/mcp_server/feature_mcp.py @@ -15,7 +15,6 @@ Tools: - feature_mark_in_progress: Mark a feature as in-progress - feature_claim_and_get: Atomically claim and get feature details - feature_clear_in_progress: Clear in-progress status -- feature_release_testing: Release testing lock on a feature - feature_create_bulk: Create multiple features at once - feature_create: Create a single feature - feature_add_dependency: Add a dependency between features @@ -33,7 +32,6 @@ import os import sys import threading from contextlib import asynccontextmanager -from datetime import datetime, timezone from pathlib import Path from typing import Annotated @@ -228,57 +226,6 @@ def feature_get_summary( session.close() -@mcp.tool() -def feature_release_testing( - feature_id: Annotated[int, Field(description="The ID of the feature to release", ge=1)], - tested_ok: Annotated[bool, Field(description="True if the feature passed testing, False if regression found")] = True -) -> str: - """Release a feature after regression testing completes. - - Clears the testing_in_progress flag and updates last_tested_at timestamp. - - This should be called after testing is complete, whether the feature - passed or failed. If tested_ok=False, the feature was marked as failing - by a previous call to feature_mark_failing. - - Args: - feature_id: The ID of the feature that was being tested - tested_ok: True if testing passed, False if a regression was found - - Returns: - JSON with release confirmation or error message. - """ - session = get_session() - try: - feature = session.query(Feature).filter(Feature.id == feature_id).first() - - if feature is None: - return json.dumps({"error": f"Feature with ID {feature_id} not found"}) - - if not feature.testing_in_progress: - return json.dumps({ - "warning": f"Feature {feature_id} was not being tested", - "feature": feature.to_dict() - }) - - # Clear testing flag and update timestamp - feature.testing_in_progress = False - feature.last_tested_at = datetime.now(timezone.utc) - session.commit() - session.refresh(feature) - - status = "passed" if tested_ok else "failed (regression detected)" - return json.dumps({ - "message": f"Feature #{feature_id} testing {status}", - "feature": feature.to_dict() - }) - except Exception as e: - session.rollback() - return json.dumps({"error": f"Failed to release testing claim: {str(e)}"}) - finally: - session.close() - - @mcp.tool() def feature_mark_passing( feature_id: Annotated[int, Field(description="The ID of the feature to mark as passing", ge=1)] diff --git a/parallel_orchestrator.py b/parallel_orchestrator.py index eb57bf9..d2db637 100644 --- a/parallel_orchestrator.py +++ b/parallel_orchestrator.py @@ -23,12 +23,10 @@ import os import subprocess import sys import threading -from datetime import datetime, timedelta, timezone +from datetime import datetime, timezone from pathlib import Path from typing import Callable, Literal -from sqlalchemy import text - from api.database import Feature, create_database from api.dependency_resolver import are_dependencies_satisfied, compute_scheduling_scores from progress import has_features @@ -126,7 +124,6 @@ DEFAULT_CONCURRENCY = 3 POLL_INTERVAL = 5 # seconds between checking for ready features MAX_FEATURE_RETRIES = 3 # Maximum times to retry a failed feature INITIALIZER_TIMEOUT = 1800 # 30 minutes timeout for initializer -STALE_TESTING_LOCK_MINUTES = 30 # Auto-release testing locks older than this class ParallelOrchestrator: @@ -199,72 +196,28 @@ class ParallelOrchestrator: """Get a new database session.""" return self._session_maker() - def claim_feature_for_testing(self) -> int | None: - """Claim a random passing feature for regression testing. + def _get_random_passing_feature(self) -> int | None: + """Get a random passing feature for regression testing (no claim needed). - Returns the feature ID if successful, None if no features available. - Sets testing_in_progress=True on the claimed feature. + Testing agents can test the same feature concurrently - it doesn't matter. + This simplifies the architecture by removing unnecessary coordination. + + Returns the feature ID if available, None if no passing features exist. """ + from sqlalchemy.sql.expression import func + session = self.get_session() try: - from sqlalchemy.sql.expression import func - - # Find a passing feature that's not being worked on - # Exclude features already being tested by this orchestrator - with self._lock: - testing_feature_ids = set(self.running_testing_agents.keys()) - - candidate = ( + # Find a passing feature that's not currently being coded + # Multiple testing agents can test the same feature - that's fine + feature = ( session.query(Feature) .filter(Feature.passes == True) - .filter(Feature.in_progress == False) - .filter(Feature.testing_in_progress == False) - .filter(~Feature.id.in_(testing_feature_ids) if testing_feature_ids else True) + .filter(Feature.in_progress == False) # Don't test while coding .order_by(func.random()) .first() ) - - if not candidate: - return None - - # Atomic claim using UPDATE with WHERE clause - result = session.execute( - text(""" - UPDATE features - SET testing_in_progress = 1 - WHERE id = :feature_id - AND passes = 1 - AND in_progress = 0 - AND testing_in_progress = 0 - """), - {"feature_id": candidate.id} - ) - session.commit() - - if result.rowcount == 0: - # Another process claimed it - return None - - return candidate.id - except Exception as e: - session.rollback() - debug_log.log("TESTING", f"Failed to claim feature for testing: {e}") - return None - finally: - session.close() - - def release_testing_claim(self, feature_id: int): - """Release a testing claim on a feature (called when testing agent exits).""" - session = self.get_session() - try: - session.execute( - text("UPDATE features SET testing_in_progress = 0 WHERE id = :feature_id"), - {"feature_id": feature_id} - ) - session.commit() - except Exception as e: - session.rollback() - debug_log.log("TESTING", f"Failed to release testing claim for feature {feature_id}: {e}") + return feature.id if feature else None finally: session.close() @@ -424,55 +377,6 @@ class ParallelOrchestrator: finally: session.close() - def _cleanup_stale_testing_locks(self) -> None: - """Release stale testing locks from crashed testing agents. - - A feature is considered stale if: - - testing_in_progress=True AND - - last_tested_at is NOT NULL AND older than STALE_TESTING_LOCK_MINUTES - - Note: We do NOT release features with last_tested_at=NULL because that would - incorrectly release features that are legitimately in the middle of their - first test. The last_tested_at is only set when testing completes. - - This handles the case where a testing agent crashes mid-test, leaving - the feature locked until orchestrator restart. By checking periodically, - we can release these locks without requiring a restart. - """ - session = self.get_session() - try: - # Use timezone-aware UTC, then strip timezone for SQLite compatibility - # (SQLite stores datetimes as naive strings, but we want consistency with - # datetime.now(timezone.utc) used elsewhere in the codebase) - cutoff_time = (datetime.now(timezone.utc) - timedelta(minutes=STALE_TESTING_LOCK_MINUTES)).replace(tzinfo=None) - - # Find stale locks: testing_in_progress=True AND last_tested_at < cutoff - # Excludes NULL last_tested_at to avoid false positives on first-time tests - stale_features = ( - session.query(Feature) - .filter(Feature.testing_in_progress == True) - .filter(Feature.last_tested_at.isnot(None)) - .filter(Feature.last_tested_at < cutoff_time) - .all() - ) - - if stale_features: - stale_ids = [f.id for f in stale_features] - # Use ORM update instead of raw SQL for SQLite IN clause compatibility - session.query(Feature).filter(Feature.id.in_(stale_ids)).update( - {"testing_in_progress": False}, - synchronize_session=False - ) - session.commit() - print(f"[CLEANUP] Released {len(stale_ids)} stale testing locks: {stale_ids}", flush=True) - debug_log.log("CLEANUP", "Released stale testing locks", feature_ids=stale_ids) - except Exception as e: - session.rollback() - print(f"[CLEANUP] Error cleaning stale locks: {e}", flush=True) - debug_log.log("CLEANUP", f"Error cleaning stale locks: {e}") - finally: - session.close() - def _maintain_testing_agents(self) -> None: """Maintain the desired count of testing agents independently. @@ -480,8 +384,8 @@ class ParallelOrchestrator: the configured testing_agent_ratio. Testing agents run independently from coding agents and continuously re-test passing features to catch regressions. - Also periodically releases stale testing locks (features stuck in - testing_in_progress=True for more than STALE_TESTING_LOCK_MINUTES). + Multiple testing agents can test the same feature concurrently - this is + intentional and simplifies the architecture by removing claim coordination. Stops spawning when: - YOLO mode is enabled @@ -492,11 +396,6 @@ class ParallelOrchestrator: if self.yolo_mode or self.testing_agent_ratio == 0: return - # Periodically clean up stale testing locks (features stuck mid-test due to crash) - # A feature is considered stale if testing_in_progress=True and last_tested_at - # is either NULL or older than STALE_TESTING_LOCK_MINUTES - self._cleanup_stale_testing_locks() - # No testing until there are passing features passing_count = self.get_passing_count() if passing_count == 0: @@ -641,8 +540,9 @@ class ParallelOrchestrator: def _spawn_testing_agent(self) -> tuple[bool, str]: """Spawn a testing agent subprocess for regression testing. - Claims a feature BEFORE spawning the agent (same pattern as coding agents). - This ensures we know which feature is being tested for UI display. + Picks a random passing feature to test. Multiple testing agents can test + the same feature concurrently - this is intentional and simplifies the + architecture by removing claim coordination. """ # Check limits first (under lock) with self._lock: @@ -655,20 +555,19 @@ class ParallelOrchestrator: debug_log.log("TESTING", f"Skipped spawn - at max total agents ({total_agents}/{MAX_TOTAL_AGENTS})") return False, f"At max total agents ({total_agents})" - # Claim a feature for testing (outside lock to avoid holding during DB ops) - feature_id = self.claim_feature_for_testing() + # Pick a random passing feature (no claim needed - concurrent testing is fine) + feature_id = self._get_random_passing_feature() if feature_id is None: debug_log.log("TESTING", "No features available for testing") return False, "No features available for testing" - debug_log.log("TESTING", f"Claimed feature #{feature_id} for testing") + debug_log.log("TESTING", f"Selected feature #{feature_id} for testing") - # Now spawn with the claimed feature ID + # Spawn the testing agent with self._lock: - # Re-check limits in case another thread spawned while we were claiming + # Re-check limits in case another thread spawned while we were selecting current_testing_count = len(self.running_testing_agents) if current_testing_count >= self.max_concurrency: - self.release_testing_claim(feature_id) return False, f"At max testing agents ({current_testing_count})" cmd = [ @@ -694,7 +593,6 @@ class ParallelOrchestrator: ) except Exception as e: debug_log.log("TESTING", f"FAILED to spawn testing agent: {e}") - self.release_testing_claim(feature_id) return False, f"Failed to start testing agent: {e}" # Register process with feature ID (same pattern as coding agents) @@ -865,22 +763,16 @@ class ParallelOrchestrator: is safe. For testing agents: - - Remove from running dict and release testing claim on feature. + - Remove from running dict (no claim to release - concurrent testing is allowed). """ if agent_type == "testing": with self._lock: # Remove from dict by finding the feature_id for this proc - found_feature_id = None for fid, p in list(self.running_testing_agents.items()): if p is proc: - found_feature_id = fid del self.running_testing_agents[fid] break - # Release testing claim on the feature - if found_feature_id is not None: - self.release_testing_claim(found_feature_id) - status = "completed" if return_code == 0 else "failed" print(f"Feature #{feature_id} testing {status}", flush=True) debug_log.log("COMPLETE", f"Testing agent for feature #{feature_id} finished", @@ -974,13 +866,12 @@ class ParallelOrchestrator: for fid in feature_ids: self.stop_feature(fid) - # Stop testing agents + # Stop testing agents (no claim to release - concurrent testing is allowed) with self._lock: testing_items = list(self.running_testing_agents.items()) for feature_id, proc in testing_items: result = kill_process_tree(proc, timeout=5.0) - self.release_testing_claim(feature_id) debug_log.log("STOP", f"Killed testing agent for feature #{feature_id} (PID {proc.pid})", status=result.status, children_found=result.children_found, children_terminated=result.children_terminated, children_killed=result.children_killed) @@ -1002,19 +893,6 @@ class ParallelOrchestrator: # Must happen before any debug_log.log() calls debug_log.start_session() - # Clear stale testing_in_progress flags from crashed testing agents - # This ensures features aren't permanently locked if a previous session crashed - session = self.get_session() - try: - stale_count = session.query(Feature).filter(Feature.testing_in_progress == True).count() - if stale_count > 0: - session.execute(text("UPDATE features SET testing_in_progress = 0 WHERE testing_in_progress = 1")) - session.commit() - print(f"[STARTUP] Cleared {stale_count} stale testing_in_progress flags", flush=True) - debug_log.log("STARTUP", f"Cleared {stale_count} stale testing_in_progress flags") - finally: - session.close() - # Log startup to debug file debug_log.section("ORCHESTRATOR STARTUP") debug_log.log("STARTUP", "Orchestrator run_loop starting", diff --git a/prompts.py b/prompts.py index 6869256..137928c 100644 --- a/prompts.py +++ b/prompts.py @@ -93,12 +93,11 @@ def get_testing_prompt(project_dir: Path | None = None, testing_feature_id: int **You are assigned to regression test Feature #{testing_feature_id}.** -The orchestrator has already claimed this feature for you. - ### Your workflow: 1. Call `feature_get_by_id` with ID {testing_feature_id} to get the feature details 2. Verify the feature through the UI using browser automation -3. When done, call `feature_release_testing` with feature_id={testing_feature_id} +3. If regression found, call `feature_mark_failing` with feature_id={testing_feature_id} +4. Exit when done (no cleanup needed) --- diff --git a/server/websocket.py b/server/websocket.py index 713916f..4b86456 100644 --- a/server/websocket.py +++ b/server/websocket.py @@ -73,13 +73,14 @@ ORCHESTRATOR_PATTERNS = { class AgentTracker: """Tracks active agents and their states for multi-agent mode. - Both coding and testing agents are now tracked by their feature ID. - The agent_type field distinguishes between them. + Both coding and testing agents are tracked using a composite key of + (feature_id, agent_type) to allow simultaneous tracking of both agent + types for the same feature. """ def __init__(self): - # feature_id -> {name, state, last_thought, agent_index, agent_type} - self.active_agents: dict[int, dict] = {} + # (feature_id, agent_type) -> {name, state, last_thought, agent_index, agent_type} + self.active_agents: dict[tuple[int, str], dict] = {} self._next_agent_index = 0 self._lock = asyncio.Lock() @@ -111,14 +112,14 @@ class AgentTracker: if testing_complete_match: feature_id = int(testing_complete_match.group(1)) is_success = testing_complete_match.group(2) == "completed" - return await self._handle_agent_complete(feature_id, is_success) + return await self._handle_agent_complete(feature_id, is_success, agent_type="testing") # Coding agent complete: "Feature #X completed/failed" (without "testing" keyword) if line.startswith("Feature #") and ("completed" in line or "failed" in line) and "testing" not in line: try: feature_id = int(re.search(r'#(\d+)', line).group(1)) is_success = "completed" in line - return await self._handle_agent_complete(feature_id, is_success) + return await self._handle_agent_complete(feature_id, is_success, agent_type="coding") except (AttributeError, ValueError): pass @@ -132,11 +133,21 @@ class AgentTracker: content = match.group(2) async with self._lock: - # Ensure agent is tracked - if feature_id not in self.active_agents: + # Check if either coding or testing agent exists for this feature + # This prevents creating ghost agents when a testing agent outputs [Feature #X] lines + coding_key = (feature_id, 'coding') + testing_key = (feature_id, 'testing') + + if coding_key in self.active_agents: + key = coding_key + elif testing_key in self.active_agents: + key = testing_key + else: + # Neither exists, create a new coding agent entry (implicit tracking) + key = coding_key agent_index = self._next_agent_index self._next_agent_index += 1 - self.active_agents[feature_id] = { + self.active_agents[key] = { 'name': AGENT_MASCOTS[agent_index % len(AGENT_MASCOTS)], 'agent_index': agent_index, 'agent_type': 'coding', @@ -145,7 +156,7 @@ class AgentTracker: 'last_thought': None, } - agent = self.active_agents[feature_id] + agent = self.active_agents[key] # Detect state and thought from content state = 'working' @@ -178,16 +189,21 @@ class AgentTracker: return None - async def get_agent_info(self, feature_id: int) -> tuple[int | None, str | None]: - """Get agent index and name for a feature ID. + async def get_agent_info(self, feature_id: int, agent_type: str = "coding") -> tuple[int | None, str | None]: + """Get agent index and name for a feature ID and agent type. Thread-safe method that acquires the lock before reading state. + Args: + feature_id: The feature ID to look up. + agent_type: The agent type ("coding" or "testing"). Defaults to "coding". + Returns: Tuple of (agentIndex, agentName) or (None, None) if not tracked. """ async with self._lock: - agent = self.active_agents.get(feature_id) + key = (feature_id, agent_type) + agent = self.active_agents.get(key) if agent: return agent['agent_index'], agent['name'] return None, None @@ -207,6 +223,7 @@ class AgentTracker: async def _handle_agent_start(self, feature_id: int, line: str, agent_type: str = "coding") -> dict | None: """Handle agent start message from orchestrator.""" async with self._lock: + key = (feature_id, agent_type) # Composite key for separate tracking agent_index = self._next_agent_index self._next_agent_index += 1 @@ -216,7 +233,7 @@ class AgentTracker: if name_match: feature_name = name_match.group(1) - self.active_agents[feature_id] = { + self.active_agents[key] = { 'name': AGENT_MASCOTS[agent_index % len(AGENT_MASCOTS)], 'agent_index': agent_index, 'agent_type': agent_type, @@ -237,26 +254,33 @@ class AgentTracker: 'timestamp': datetime.now().isoformat(), } - async def _handle_agent_complete(self, feature_id: int, is_success: bool) -> dict | None: - """Handle agent completion - ALWAYS emits a message, even if agent wasn't tracked.""" + async def _handle_agent_complete(self, feature_id: int, is_success: bool, agent_type: str = "coding") -> dict | None: + """Handle agent completion - ALWAYS emits a message, even if agent wasn't tracked. + + Args: + feature_id: The feature ID. + is_success: Whether the agent completed successfully. + agent_type: The agent type ("coding" or "testing"). Defaults to "coding". + """ async with self._lock: + key = (feature_id, agent_type) # Composite key for correct agent lookup state = 'success' if is_success else 'error' - if feature_id in self.active_agents: + if key in self.active_agents: # Normal case: agent was tracked - agent = self.active_agents[feature_id] + agent = self.active_agents[key] result = { 'type': 'agent_update', 'agentIndex': agent['agent_index'], 'agentName': agent['name'], - 'agentType': agent.get('agent_type', 'coding'), + 'agentType': agent.get('agent_type', agent_type), 'featureId': feature_id, 'featureName': agent['feature_name'], 'state': state, 'thought': 'Completed successfully!' if is_success else 'Failed to complete', 'timestamp': datetime.now().isoformat(), } - del self.active_agents[feature_id] + del self.active_agents[key] return result else: # Synthetic completion for untracked agent @@ -265,7 +289,7 @@ class AgentTracker: 'type': 'agent_update', 'agentIndex': -1, # Sentinel for untracked 'agentName': 'Unknown', - 'agentType': 'coding', + 'agentType': agent_type, 'featureId': feature_id, 'featureName': f'Feature #{feature_id}', 'state': state,