mirror of
https://github.com/leonvanzyl/autocoder.git
synced 2026-01-30 06:12:06 +00:00
improve performance
This commit is contained in:
@@ -186,6 +186,12 @@ class ParallelOrchestrator:
|
||||
# Session tracking for logging/debugging
|
||||
self.session_start_time: datetime = None
|
||||
|
||||
# Event signaled when any agent completes, allowing the main loop to wake
|
||||
# immediately instead of waiting for the full POLL_INTERVAL timeout.
|
||||
# This reduces latency when spawning the next feature after completion.
|
||||
self._agent_completed_event: asyncio.Event = None # Created in run_loop
|
||||
self._event_loop: asyncio.AbstractEventLoop = None # Stored for thread-safe signaling
|
||||
|
||||
# Database session for this orchestrator
|
||||
self._engine, self._session_maker = create_database(project_dir)
|
||||
|
||||
@@ -311,6 +317,9 @@ class ParallelOrchestrator:
|
||||
all_features = session.query(Feature).all()
|
||||
all_dicts = [f.to_dict() for f in all_features]
|
||||
|
||||
# Pre-compute passing_ids once to avoid O(n^2) in the loop
|
||||
passing_ids = {f.id for f in all_features if f.passes}
|
||||
|
||||
ready = []
|
||||
skipped_reasons = {"passes": 0, "in_progress": 0, "running": 0, "failed": 0, "deps": 0}
|
||||
for f in all_features:
|
||||
@@ -329,8 +338,8 @@ class ParallelOrchestrator:
|
||||
if self._failure_counts.get(f.id, 0) >= MAX_FEATURE_RETRIES:
|
||||
skipped_reasons["failed"] += 1
|
||||
continue
|
||||
# Check dependencies
|
||||
if are_dependencies_satisfied(f.to_dict(), all_dicts):
|
||||
# Check dependencies (pass pre-computed passing_ids)
|
||||
if are_dependencies_satisfied(f.to_dict(), all_dicts, passing_ids):
|
||||
ready.append(f.to_dict())
|
||||
else:
|
||||
skipped_reasons["deps"] += 1
|
||||
@@ -794,6 +803,52 @@ class ParallelOrchestrator:
|
||||
finally:
|
||||
self._on_agent_complete(feature_id, proc.returncode, agent_type, proc)
|
||||
|
||||
def _signal_agent_completed(self):
|
||||
"""Signal that an agent has completed, waking the main loop.
|
||||
|
||||
This method is safe to call from any thread. It schedules the event.set()
|
||||
call to run on the event loop thread to avoid cross-thread issues with
|
||||
asyncio.Event.
|
||||
"""
|
||||
if self._agent_completed_event is not None and self._event_loop is not None:
|
||||
try:
|
||||
# Use the stored event loop reference to schedule the set() call
|
||||
# This is necessary because asyncio.Event is not thread-safe and
|
||||
# asyncio.get_event_loop() fails in threads without an event loop
|
||||
if self._event_loop.is_running():
|
||||
self._event_loop.call_soon_threadsafe(self._agent_completed_event.set)
|
||||
else:
|
||||
# Fallback: set directly if loop isn't running (shouldn't happen during normal operation)
|
||||
self._agent_completed_event.set()
|
||||
except RuntimeError:
|
||||
# Event loop closed, ignore (orchestrator may be shutting down)
|
||||
pass
|
||||
|
||||
async def _wait_for_agent_completion(self, timeout: float = POLL_INTERVAL):
|
||||
"""Wait for an agent to complete or until timeout expires.
|
||||
|
||||
This replaces fixed `asyncio.sleep(POLL_INTERVAL)` calls with event-based
|
||||
waiting. When an agent completes, _signal_agent_completed() sets the event,
|
||||
causing this method to return immediately. If no agent completes within
|
||||
the timeout, we return anyway to check for ready features.
|
||||
|
||||
Args:
|
||||
timeout: Maximum seconds to wait (default: POLL_INTERVAL)
|
||||
"""
|
||||
if self._agent_completed_event is None:
|
||||
# Fallback if event not initialized (shouldn't happen in normal operation)
|
||||
await asyncio.sleep(timeout)
|
||||
return
|
||||
|
||||
try:
|
||||
await asyncio.wait_for(self._agent_completed_event.wait(), timeout=timeout)
|
||||
# Event was set - an agent completed. Clear it for the next wait cycle.
|
||||
self._agent_completed_event.clear()
|
||||
debug_log.log("EVENT", "Woke up immediately - agent completed")
|
||||
except asyncio.TimeoutError:
|
||||
# Timeout reached without agent completion - this is normal, just check anyway
|
||||
pass
|
||||
|
||||
def _on_agent_complete(
|
||||
self,
|
||||
feature_id: int | None,
|
||||
@@ -832,6 +887,8 @@ class ParallelOrchestrator:
|
||||
pid=proc.pid,
|
||||
feature_id=feature_id,
|
||||
status=status)
|
||||
# Signal main loop that an agent slot is available
|
||||
self._signal_agent_completed()
|
||||
return
|
||||
|
||||
# Coding agent completion
|
||||
@@ -843,40 +900,20 @@ class ParallelOrchestrator:
|
||||
self.running_coding_agents.pop(feature_id, None)
|
||||
self.abort_events.pop(feature_id, None)
|
||||
|
||||
# BEFORE dispose: Query database state to see if it's stale
|
||||
session_before = self.get_session()
|
||||
try:
|
||||
session_before.expire_all()
|
||||
feature_before = session_before.query(Feature).filter(Feature.id == feature_id).first()
|
||||
all_before = session_before.query(Feature).all()
|
||||
passing_before = sum(1 for f in all_before if f.passes)
|
||||
debug_log.log("DB", f"BEFORE engine.dispose() - Feature #{feature_id} state",
|
||||
passes=feature_before.passes if feature_before else None,
|
||||
in_progress=feature_before.in_progress if feature_before else None,
|
||||
total_passing_in_db=passing_before)
|
||||
finally:
|
||||
session_before.close()
|
||||
|
||||
# CRITICAL: Refresh database connection to see subprocess commits
|
||||
# Refresh session cache to see subprocess commits
|
||||
# The coding agent runs as a subprocess and commits changes (e.g., passes=True).
|
||||
# SQLAlchemy may have stale connections. Disposing the engine forces new connections
|
||||
# that will see the subprocess's committed changes.
|
||||
debug_log.log("DB", "Disposing database engine now...")
|
||||
self._engine.dispose()
|
||||
|
||||
# AFTER dispose: Query again to compare
|
||||
# Using session.expire_all() is lighter weight than engine.dispose() for SQLite WAL mode
|
||||
# and is sufficient to invalidate cached data and force fresh reads.
|
||||
# engine.dispose() is only called on orchestrator shutdown, not on every agent completion.
|
||||
session = self.get_session()
|
||||
try:
|
||||
session.expire_all()
|
||||
feature = session.query(Feature).filter(Feature.id == feature_id).first()
|
||||
all_after = session.query(Feature).all()
|
||||
passing_after = sum(1 for f in all_after if f.passes)
|
||||
feature_passes = feature.passes if feature else None
|
||||
feature_in_progress = feature.in_progress if feature else None
|
||||
debug_log.log("DB", f"AFTER engine.dispose() - Feature #{feature_id} state",
|
||||
debug_log.log("DB", f"Feature #{feature_id} state after session.expire_all()",
|
||||
passes=feature_passes,
|
||||
in_progress=feature_in_progress,
|
||||
total_passing_in_db=passing_after,
|
||||
passing_changed=(passing_after != passing_before) if 'passing_before' in dir() else "unknown")
|
||||
in_progress=feature_in_progress)
|
||||
if feature and feature.in_progress and not feature.passes:
|
||||
feature.in_progress = False
|
||||
session.commit()
|
||||
@@ -900,6 +937,9 @@ class ParallelOrchestrator:
|
||||
# CRITICAL: This print triggers the WebSocket to emit agent_update with state='error' or 'success'
|
||||
print(f"Feature #{feature_id} {status}", flush=True)
|
||||
|
||||
# Signal main loop that an agent slot is available
|
||||
self._signal_agent_completed()
|
||||
|
||||
# NOTE: Testing agents are now spawned in start_feature() when coding agents START,
|
||||
# not here when they complete. This ensures 1:1 ratio and proper termination.
|
||||
|
||||
@@ -949,6 +989,12 @@ class ParallelOrchestrator:
|
||||
"""Main orchestration loop."""
|
||||
self.is_running = True
|
||||
|
||||
# Initialize the agent completion event for this run
|
||||
# Must be created in the async context where it will be used
|
||||
self._agent_completed_event = asyncio.Event()
|
||||
# Store the event loop reference for thread-safe signaling from output reader threads
|
||||
self._event_loop = asyncio.get_running_loop()
|
||||
|
||||
# Track session start for regression testing (UTC for consistency with last_tested_at)
|
||||
self.session_start_time = datetime.now(timezone.utc)
|
||||
|
||||
@@ -1100,8 +1146,8 @@ class ParallelOrchestrator:
|
||||
at_capacity=(current >= self.max_concurrency))
|
||||
|
||||
if current >= self.max_concurrency:
|
||||
debug_log.log("CAPACITY", "At max capacity, sleeping...")
|
||||
await asyncio.sleep(POLL_INTERVAL)
|
||||
debug_log.log("CAPACITY", "At max capacity, waiting for agent completion...")
|
||||
await self._wait_for_agent_completion()
|
||||
continue
|
||||
|
||||
# Priority 1: Resume features from previous session
|
||||
@@ -1119,7 +1165,7 @@ class ParallelOrchestrator:
|
||||
if not ready:
|
||||
# Wait for running features to complete
|
||||
if current > 0:
|
||||
await asyncio.sleep(POLL_INTERVAL)
|
||||
await self._wait_for_agent_completion()
|
||||
continue
|
||||
else:
|
||||
# No ready features and nothing running
|
||||
@@ -1138,7 +1184,7 @@ class ParallelOrchestrator:
|
||||
|
||||
# Still have pending features but all are blocked by dependencies
|
||||
print("No ready features available. All remaining features may be blocked by dependencies.", flush=True)
|
||||
await asyncio.sleep(POLL_INTERVAL * 2)
|
||||
await self._wait_for_agent_completion(timeout=POLL_INTERVAL * 2)
|
||||
continue
|
||||
|
||||
# Start features up to capacity
|
||||
@@ -1174,7 +1220,7 @@ class ParallelOrchestrator:
|
||||
|
||||
except Exception as e:
|
||||
print(f"Orchestrator error: {e}", flush=True)
|
||||
await asyncio.sleep(POLL_INTERVAL)
|
||||
await self._wait_for_agent_completion()
|
||||
|
||||
# Wait for remaining agents to complete
|
||||
print("Waiting for running agents to complete...", flush=True)
|
||||
@@ -1184,7 +1230,8 @@ class ParallelOrchestrator:
|
||||
testing_done = len(self.running_testing_agents) == 0
|
||||
if coding_done and testing_done:
|
||||
break
|
||||
await asyncio.sleep(1)
|
||||
# Use short timeout since we're just waiting for final agents to finish
|
||||
await self._wait_for_agent_completion(timeout=1.0)
|
||||
|
||||
print("Orchestrator finished.", flush=True)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user