mirror of
https://github.com/leonvanzyl/autocoder.git
synced 2026-01-30 06:12:06 +00:00
refactor: remove testing agent claim mechanism for concurrent testing
Remove the testing_in_progress claim/release mechanism from the testing agent architecture. Multiple testing agents can now test the same feature concurrently, simplifying the system and eliminating potential stale lock issues. Changes: - parallel_orchestrator.py: - Remove claim_feature_for_testing() and release_testing_claim() methods - Remove _cleanup_stale_testing_locks() periodic cleanup - Replace with simple _get_random_passing_feature() selection - Remove startup stale lock cleanup code - Remove STALE_TESTING_LOCK_MINUTES constant - Remove unused imports (timedelta, text) - api/database.py: - Remove testing_in_progress and last_tested_at columns from Feature model - Update to_dict() to exclude these fields - Convert _migrate_add_testing_columns() to no-op for backwards compat - mcp_server/feature_mcp.py: - Remove feature_release_testing tool entirely - Remove unused datetime import - prompts.py: - Update testing prompt to remove feature_release_testing instruction - Testing agents now just verify and exit (no cleanup needed) - server/websocket.py: - Update AgentTracker to use composite keys (feature_id, agent_type) - Prevents ghost agent creation from ambiguous [Feature #X] messages - Proper separation of coding vs testing agent tracking Benefits: - Eliminates artificial bottleneck from claim coordination - No stale locks to clean up after crashes - Simpler crash recovery (no testing state to restore) - Reduced database writes (no claim/release transactions) - Matches intended design: random, concurrent regression testing Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -23,12 +23,10 @@ import os
|
||||
import subprocess
|
||||
import sys
|
||||
import threading
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Callable, Literal
|
||||
|
||||
from sqlalchemy import text
|
||||
|
||||
from api.database import Feature, create_database
|
||||
from api.dependency_resolver import are_dependencies_satisfied, compute_scheduling_scores
|
||||
from progress import has_features
|
||||
@@ -126,7 +124,6 @@ DEFAULT_CONCURRENCY = 3
|
||||
POLL_INTERVAL = 5 # seconds between checking for ready features
|
||||
MAX_FEATURE_RETRIES = 3 # Maximum times to retry a failed feature
|
||||
INITIALIZER_TIMEOUT = 1800 # 30 minutes timeout for initializer
|
||||
STALE_TESTING_LOCK_MINUTES = 30 # Auto-release testing locks older than this
|
||||
|
||||
|
||||
class ParallelOrchestrator:
|
||||
@@ -199,72 +196,28 @@ class ParallelOrchestrator:
|
||||
"""Get a new database session."""
|
||||
return self._session_maker()
|
||||
|
||||
def claim_feature_for_testing(self) -> int | None:
|
||||
"""Claim a random passing feature for regression testing.
|
||||
def _get_random_passing_feature(self) -> int | None:
|
||||
"""Get a random passing feature for regression testing (no claim needed).
|
||||
|
||||
Returns the feature ID if successful, None if no features available.
|
||||
Sets testing_in_progress=True on the claimed feature.
|
||||
Testing agents can test the same feature concurrently - it doesn't matter.
|
||||
This simplifies the architecture by removing unnecessary coordination.
|
||||
|
||||
Returns the feature ID if available, None if no passing features exist.
|
||||
"""
|
||||
from sqlalchemy.sql.expression import func
|
||||
|
||||
session = self.get_session()
|
||||
try:
|
||||
from sqlalchemy.sql.expression import func
|
||||
|
||||
# Find a passing feature that's not being worked on
|
||||
# Exclude features already being tested by this orchestrator
|
||||
with self._lock:
|
||||
testing_feature_ids = set(self.running_testing_agents.keys())
|
||||
|
||||
candidate = (
|
||||
# Find a passing feature that's not currently being coded
|
||||
# Multiple testing agents can test the same feature - that's fine
|
||||
feature = (
|
||||
session.query(Feature)
|
||||
.filter(Feature.passes == True)
|
||||
.filter(Feature.in_progress == False)
|
||||
.filter(Feature.testing_in_progress == False)
|
||||
.filter(~Feature.id.in_(testing_feature_ids) if testing_feature_ids else True)
|
||||
.filter(Feature.in_progress == False) # Don't test while coding
|
||||
.order_by(func.random())
|
||||
.first()
|
||||
)
|
||||
|
||||
if not candidate:
|
||||
return None
|
||||
|
||||
# Atomic claim using UPDATE with WHERE clause
|
||||
result = session.execute(
|
||||
text("""
|
||||
UPDATE features
|
||||
SET testing_in_progress = 1
|
||||
WHERE id = :feature_id
|
||||
AND passes = 1
|
||||
AND in_progress = 0
|
||||
AND testing_in_progress = 0
|
||||
"""),
|
||||
{"feature_id": candidate.id}
|
||||
)
|
||||
session.commit()
|
||||
|
||||
if result.rowcount == 0:
|
||||
# Another process claimed it
|
||||
return None
|
||||
|
||||
return candidate.id
|
||||
except Exception as e:
|
||||
session.rollback()
|
||||
debug_log.log("TESTING", f"Failed to claim feature for testing: {e}")
|
||||
return None
|
||||
finally:
|
||||
session.close()
|
||||
|
||||
def release_testing_claim(self, feature_id: int):
|
||||
"""Release a testing claim on a feature (called when testing agent exits)."""
|
||||
session = self.get_session()
|
||||
try:
|
||||
session.execute(
|
||||
text("UPDATE features SET testing_in_progress = 0 WHERE id = :feature_id"),
|
||||
{"feature_id": feature_id}
|
||||
)
|
||||
session.commit()
|
||||
except Exception as e:
|
||||
session.rollback()
|
||||
debug_log.log("TESTING", f"Failed to release testing claim for feature {feature_id}: {e}")
|
||||
return feature.id if feature else None
|
||||
finally:
|
||||
session.close()
|
||||
|
||||
@@ -424,55 +377,6 @@ class ParallelOrchestrator:
|
||||
finally:
|
||||
session.close()
|
||||
|
||||
def _cleanup_stale_testing_locks(self) -> None:
|
||||
"""Release stale testing locks from crashed testing agents.
|
||||
|
||||
A feature is considered stale if:
|
||||
- testing_in_progress=True AND
|
||||
- last_tested_at is NOT NULL AND older than STALE_TESTING_LOCK_MINUTES
|
||||
|
||||
Note: We do NOT release features with last_tested_at=NULL because that would
|
||||
incorrectly release features that are legitimately in the middle of their
|
||||
first test. The last_tested_at is only set when testing completes.
|
||||
|
||||
This handles the case where a testing agent crashes mid-test, leaving
|
||||
the feature locked until orchestrator restart. By checking periodically,
|
||||
we can release these locks without requiring a restart.
|
||||
"""
|
||||
session = self.get_session()
|
||||
try:
|
||||
# Use timezone-aware UTC, then strip timezone for SQLite compatibility
|
||||
# (SQLite stores datetimes as naive strings, but we want consistency with
|
||||
# datetime.now(timezone.utc) used elsewhere in the codebase)
|
||||
cutoff_time = (datetime.now(timezone.utc) - timedelta(minutes=STALE_TESTING_LOCK_MINUTES)).replace(tzinfo=None)
|
||||
|
||||
# Find stale locks: testing_in_progress=True AND last_tested_at < cutoff
|
||||
# Excludes NULL last_tested_at to avoid false positives on first-time tests
|
||||
stale_features = (
|
||||
session.query(Feature)
|
||||
.filter(Feature.testing_in_progress == True)
|
||||
.filter(Feature.last_tested_at.isnot(None))
|
||||
.filter(Feature.last_tested_at < cutoff_time)
|
||||
.all()
|
||||
)
|
||||
|
||||
if stale_features:
|
||||
stale_ids = [f.id for f in stale_features]
|
||||
# Use ORM update instead of raw SQL for SQLite IN clause compatibility
|
||||
session.query(Feature).filter(Feature.id.in_(stale_ids)).update(
|
||||
{"testing_in_progress": False},
|
||||
synchronize_session=False
|
||||
)
|
||||
session.commit()
|
||||
print(f"[CLEANUP] Released {len(stale_ids)} stale testing locks: {stale_ids}", flush=True)
|
||||
debug_log.log("CLEANUP", "Released stale testing locks", feature_ids=stale_ids)
|
||||
except Exception as e:
|
||||
session.rollback()
|
||||
print(f"[CLEANUP] Error cleaning stale locks: {e}", flush=True)
|
||||
debug_log.log("CLEANUP", f"Error cleaning stale locks: {e}")
|
||||
finally:
|
||||
session.close()
|
||||
|
||||
def _maintain_testing_agents(self) -> None:
|
||||
"""Maintain the desired count of testing agents independently.
|
||||
|
||||
@@ -480,8 +384,8 @@ class ParallelOrchestrator:
|
||||
the configured testing_agent_ratio. Testing agents run independently from
|
||||
coding agents and continuously re-test passing features to catch regressions.
|
||||
|
||||
Also periodically releases stale testing locks (features stuck in
|
||||
testing_in_progress=True for more than STALE_TESTING_LOCK_MINUTES).
|
||||
Multiple testing agents can test the same feature concurrently - this is
|
||||
intentional and simplifies the architecture by removing claim coordination.
|
||||
|
||||
Stops spawning when:
|
||||
- YOLO mode is enabled
|
||||
@@ -492,11 +396,6 @@ class ParallelOrchestrator:
|
||||
if self.yolo_mode or self.testing_agent_ratio == 0:
|
||||
return
|
||||
|
||||
# Periodically clean up stale testing locks (features stuck mid-test due to crash)
|
||||
# A feature is considered stale if testing_in_progress=True and last_tested_at
|
||||
# is either NULL or older than STALE_TESTING_LOCK_MINUTES
|
||||
self._cleanup_stale_testing_locks()
|
||||
|
||||
# No testing until there are passing features
|
||||
passing_count = self.get_passing_count()
|
||||
if passing_count == 0:
|
||||
@@ -641,8 +540,9 @@ class ParallelOrchestrator:
|
||||
def _spawn_testing_agent(self) -> tuple[bool, str]:
|
||||
"""Spawn a testing agent subprocess for regression testing.
|
||||
|
||||
Claims a feature BEFORE spawning the agent (same pattern as coding agents).
|
||||
This ensures we know which feature is being tested for UI display.
|
||||
Picks a random passing feature to test. Multiple testing agents can test
|
||||
the same feature concurrently - this is intentional and simplifies the
|
||||
architecture by removing claim coordination.
|
||||
"""
|
||||
# Check limits first (under lock)
|
||||
with self._lock:
|
||||
@@ -655,20 +555,19 @@ class ParallelOrchestrator:
|
||||
debug_log.log("TESTING", f"Skipped spawn - at max total agents ({total_agents}/{MAX_TOTAL_AGENTS})")
|
||||
return False, f"At max total agents ({total_agents})"
|
||||
|
||||
# Claim a feature for testing (outside lock to avoid holding during DB ops)
|
||||
feature_id = self.claim_feature_for_testing()
|
||||
# Pick a random passing feature (no claim needed - concurrent testing is fine)
|
||||
feature_id = self._get_random_passing_feature()
|
||||
if feature_id is None:
|
||||
debug_log.log("TESTING", "No features available for testing")
|
||||
return False, "No features available for testing"
|
||||
|
||||
debug_log.log("TESTING", f"Claimed feature #{feature_id} for testing")
|
||||
debug_log.log("TESTING", f"Selected feature #{feature_id} for testing")
|
||||
|
||||
# Now spawn with the claimed feature ID
|
||||
# Spawn the testing agent
|
||||
with self._lock:
|
||||
# Re-check limits in case another thread spawned while we were claiming
|
||||
# Re-check limits in case another thread spawned while we were selecting
|
||||
current_testing_count = len(self.running_testing_agents)
|
||||
if current_testing_count >= self.max_concurrency:
|
||||
self.release_testing_claim(feature_id)
|
||||
return False, f"At max testing agents ({current_testing_count})"
|
||||
|
||||
cmd = [
|
||||
@@ -694,7 +593,6 @@ class ParallelOrchestrator:
|
||||
)
|
||||
except Exception as e:
|
||||
debug_log.log("TESTING", f"FAILED to spawn testing agent: {e}")
|
||||
self.release_testing_claim(feature_id)
|
||||
return False, f"Failed to start testing agent: {e}"
|
||||
|
||||
# Register process with feature ID (same pattern as coding agents)
|
||||
@@ -865,22 +763,16 @@ class ParallelOrchestrator:
|
||||
is safe.
|
||||
|
||||
For testing agents:
|
||||
- Remove from running dict and release testing claim on feature.
|
||||
- Remove from running dict (no claim to release - concurrent testing is allowed).
|
||||
"""
|
||||
if agent_type == "testing":
|
||||
with self._lock:
|
||||
# Remove from dict by finding the feature_id for this proc
|
||||
found_feature_id = None
|
||||
for fid, p in list(self.running_testing_agents.items()):
|
||||
if p is proc:
|
||||
found_feature_id = fid
|
||||
del self.running_testing_agents[fid]
|
||||
break
|
||||
|
||||
# Release testing claim on the feature
|
||||
if found_feature_id is not None:
|
||||
self.release_testing_claim(found_feature_id)
|
||||
|
||||
status = "completed" if return_code == 0 else "failed"
|
||||
print(f"Feature #{feature_id} testing {status}", flush=True)
|
||||
debug_log.log("COMPLETE", f"Testing agent for feature #{feature_id} finished",
|
||||
@@ -974,13 +866,12 @@ class ParallelOrchestrator:
|
||||
for fid in feature_ids:
|
||||
self.stop_feature(fid)
|
||||
|
||||
# Stop testing agents
|
||||
# Stop testing agents (no claim to release - concurrent testing is allowed)
|
||||
with self._lock:
|
||||
testing_items = list(self.running_testing_agents.items())
|
||||
|
||||
for feature_id, proc in testing_items:
|
||||
result = kill_process_tree(proc, timeout=5.0)
|
||||
self.release_testing_claim(feature_id)
|
||||
debug_log.log("STOP", f"Killed testing agent for feature #{feature_id} (PID {proc.pid})",
|
||||
status=result.status, children_found=result.children_found,
|
||||
children_terminated=result.children_terminated, children_killed=result.children_killed)
|
||||
@@ -1002,19 +893,6 @@ class ParallelOrchestrator:
|
||||
# Must happen before any debug_log.log() calls
|
||||
debug_log.start_session()
|
||||
|
||||
# Clear stale testing_in_progress flags from crashed testing agents
|
||||
# This ensures features aren't permanently locked if a previous session crashed
|
||||
session = self.get_session()
|
||||
try:
|
||||
stale_count = session.query(Feature).filter(Feature.testing_in_progress == True).count()
|
||||
if stale_count > 0:
|
||||
session.execute(text("UPDATE features SET testing_in_progress = 0 WHERE testing_in_progress = 1"))
|
||||
session.commit()
|
||||
print(f"[STARTUP] Cleared {stale_count} stale testing_in_progress flags", flush=True)
|
||||
debug_log.log("STARTUP", f"Cleared {stale_count} stale testing_in_progress flags")
|
||||
finally:
|
||||
session.close()
|
||||
|
||||
# Log startup to debug file
|
||||
debug_log.section("ORCHESTRATOR STARTUP")
|
||||
debug_log.log("STARTUP", "Orchestrator run_loop starting",
|
||||
|
||||
Reference in New Issue
Block a user