mirror of
https://github.com/leonvanzyl/autocoder.git
synced 2026-03-17 10:53:09 +00:00
Merge remote-tracking branch 'origin/master' into feature/blocked-for-human-input
# Conflicts: # server/services/process_manager.py
This commit is contained in:
@@ -61,6 +61,17 @@ UI_DIST_DIR = ROOT_DIR / "ui" / "dist"
|
||||
@asynccontextmanager
|
||||
async def lifespan(app: FastAPI):
|
||||
"""Lifespan context manager for startup and shutdown."""
|
||||
# Startup - clean up stale temp files (Playwright profiles, .node cache, etc.)
|
||||
try:
|
||||
from temp_cleanup import cleanup_stale_temp
|
||||
stats = cleanup_stale_temp()
|
||||
if stats["dirs_deleted"] > 0 or stats["files_deleted"] > 0:
|
||||
mb_freed = stats["bytes_freed"] / (1024 * 1024)
|
||||
logger.info("Startup temp cleanup: %d dirs, %d files, %.1f MB freed",
|
||||
stats["dirs_deleted"], stats["files_deleted"], mb_freed)
|
||||
except Exception as e:
|
||||
logger.warning("Startup temp cleanup failed (non-fatal): %s", e)
|
||||
|
||||
# Startup - clean up orphaned lock files from previous runs
|
||||
cleanup_orphaned_locks()
|
||||
cleanup_orphaned_devserver_locks()
|
||||
|
||||
@@ -175,3 +175,31 @@ async def resume_agent(project_name: str):
|
||||
status=manager.status,
|
||||
message=message,
|
||||
)
|
||||
|
||||
|
||||
@router.post("/graceful-pause", response_model=AgentActionResponse)
|
||||
async def graceful_pause_agent(project_name: str):
|
||||
"""Request a graceful pause (drain mode) - finish current work then pause."""
|
||||
manager = get_project_manager(project_name)
|
||||
|
||||
success, message = await manager.graceful_pause()
|
||||
|
||||
return AgentActionResponse(
|
||||
success=success,
|
||||
status=manager.status,
|
||||
message=message,
|
||||
)
|
||||
|
||||
|
||||
@router.post("/graceful-resume", response_model=AgentActionResponse)
|
||||
async def graceful_resume_agent(project_name: str):
|
||||
"""Resume from a graceful pause."""
|
||||
manager = get_project_manager(project_name)
|
||||
|
||||
success, message = await manager.graceful_resume()
|
||||
|
||||
return AgentActionResponse(
|
||||
success=success,
|
||||
status=manager.status,
|
||||
message=message,
|
||||
)
|
||||
|
||||
@@ -242,7 +242,7 @@ class AgentStartRequest(BaseModel):
|
||||
|
||||
class AgentStatus(BaseModel):
|
||||
"""Current agent status."""
|
||||
status: Literal["stopped", "running", "paused", "crashed"]
|
||||
status: Literal["stopped", "running", "paused", "crashed", "pausing", "paused_graceful"]
|
||||
pid: int | None = None
|
||||
started_at: datetime | None = None
|
||||
yolo_mode: bool = False
|
||||
|
||||
@@ -77,7 +77,7 @@ class AgentProcessManager:
|
||||
self.project_dir = project_dir
|
||||
self.root_dir = root_dir
|
||||
self.process: subprocess.Popen | None = None
|
||||
self._status: Literal["stopped", "running", "paused", "crashed"] = "stopped"
|
||||
self._status: Literal["stopped", "running", "paused", "crashed", "pausing", "paused_graceful"] = "stopped"
|
||||
self.started_at: datetime | None = None
|
||||
self._output_task: asyncio.Task | None = None
|
||||
self.yolo_mode: bool = False # YOLO mode for rapid prototyping
|
||||
@@ -96,11 +96,11 @@ class AgentProcessManager:
|
||||
self.lock_file = get_agent_lock_path(self.project_dir)
|
||||
|
||||
@property
|
||||
def status(self) -> Literal["stopped", "running", "paused", "crashed"]:
|
||||
def status(self) -> Literal["stopped", "running", "paused", "crashed", "pausing", "paused_graceful"]:
|
||||
return self._status
|
||||
|
||||
@status.setter
|
||||
def status(self, value: Literal["stopped", "running", "paused", "crashed"]):
|
||||
def status(self, value: Literal["stopped", "running", "paused", "crashed", "pausing", "paused_graceful"]):
|
||||
old_status = self._status
|
||||
self._status = value
|
||||
if old_status != value:
|
||||
@@ -227,6 +227,28 @@ class AgentProcessManager:
|
||||
"""Remove lock file."""
|
||||
self.lock_file.unlink(missing_ok=True)
|
||||
|
||||
def _apply_playwright_headless(self, headless: bool) -> None:
|
||||
"""Update .playwright/cli.config.json with the current headless setting.
|
||||
|
||||
playwright-cli reads this config file on each ``open`` command, so
|
||||
updating it before the agent starts is sufficient.
|
||||
"""
|
||||
config_file = self.project_dir / ".playwright" / "cli.config.json"
|
||||
if not config_file.exists():
|
||||
return
|
||||
try:
|
||||
import json
|
||||
config = json.loads(config_file.read_text(encoding="utf-8"))
|
||||
launch_opts = config.get("browser", {}).get("launchOptions", {})
|
||||
if launch_opts.get("headless") == headless:
|
||||
return # already correct
|
||||
launch_opts["headless"] = headless
|
||||
config.setdefault("browser", {})["launchOptions"] = launch_opts
|
||||
config_file.write_text(json.dumps(config, indent=2) + "\n", encoding="utf-8")
|
||||
logger.info("Set playwright headless=%s for %s", headless, self.project_name)
|
||||
except Exception:
|
||||
logger.warning("Failed to update playwright config", exc_info=True)
|
||||
|
||||
def _cleanup_stale_features(self) -> None:
|
||||
"""Clear in_progress flag for all features when agent stops/crashes.
|
||||
|
||||
@@ -255,10 +277,7 @@ class AgentProcessManager:
|
||||
).all()
|
||||
if stuck:
|
||||
for f in stuck:
|
||||
# Don't clear in_progress for features blocked for human input -
|
||||
# they should stay in needs_human_input state even after crash
|
||||
if not getattr(f, 'needs_human_input', False):
|
||||
f.in_progress = False
|
||||
f.in_progress = False # type: ignore[assignment]
|
||||
session.commit()
|
||||
logger.info(
|
||||
"Cleaned up %d stuck feature(s) for %s",
|
||||
@@ -311,6 +330,12 @@ class AgentProcessManager:
|
||||
for help_line in AUTH_ERROR_HELP.strip().split('\n'):
|
||||
await self._broadcast_output(help_line)
|
||||
|
||||
# Detect graceful pause status transitions from orchestrator output
|
||||
if "All agents drained - paused." in decoded:
|
||||
self.status = "paused_graceful"
|
||||
elif "Resuming from graceful pause..." in decoded:
|
||||
self.status = "running"
|
||||
|
||||
await self._broadcast_output(sanitized)
|
||||
|
||||
except asyncio.CancelledError:
|
||||
@@ -321,7 +346,7 @@ class AgentProcessManager:
|
||||
# Check if process ended
|
||||
if self.process and self.process.poll() is not None:
|
||||
exit_code = self.process.returncode
|
||||
if exit_code != 0 and self.status == "running":
|
||||
if exit_code != 0 and self.status in ("running", "pausing", "paused_graceful"):
|
||||
# Check buffered output for auth errors if we haven't detected one yet
|
||||
if not auth_error_detected:
|
||||
combined_output = '\n'.join(output_buffer)
|
||||
@@ -329,10 +354,16 @@ class AgentProcessManager:
|
||||
for help_line in AUTH_ERROR_HELP.strip().split('\n'):
|
||||
await self._broadcast_output(help_line)
|
||||
self.status = "crashed"
|
||||
elif self.status == "running":
|
||||
elif self.status in ("running", "pausing", "paused_graceful"):
|
||||
self.status = "stopped"
|
||||
self._cleanup_stale_features()
|
||||
self._remove_lock()
|
||||
# Clean up drain signal file if present
|
||||
try:
|
||||
from autoforge_paths import get_pause_drain_path
|
||||
get_pause_drain_path(self.project_dir).unlink(missing_ok=True)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
async def start(
|
||||
self,
|
||||
@@ -358,12 +389,21 @@ class AgentProcessManager:
|
||||
Returns:
|
||||
Tuple of (success, message)
|
||||
"""
|
||||
if self.status in ("running", "paused"):
|
||||
if self.status in ("running", "paused", "pausing", "paused_graceful"):
|
||||
return False, f"Agent is already {self.status}"
|
||||
|
||||
if not self._check_lock():
|
||||
return False, "Another agent instance is already running for this project"
|
||||
|
||||
# Clean up stale browser daemons from previous runs
|
||||
try:
|
||||
subprocess.run(
|
||||
["playwright-cli", "kill-all"],
|
||||
timeout=5, capture_output=True,
|
||||
)
|
||||
except (subprocess.TimeoutExpired, FileNotFoundError, OSError):
|
||||
pass
|
||||
|
||||
# Clean up features stuck from a previous crash/stop
|
||||
self._cleanup_stale_features()
|
||||
|
||||
@@ -400,6 +440,10 @@ class AgentProcessManager:
|
||||
# Add --batch-size flag for multi-feature batching
|
||||
cmd.extend(["--batch-size", str(batch_size)])
|
||||
|
||||
# Apply headless setting to .playwright/cli.config.json so playwright-cli
|
||||
# picks it up (the only mechanism it supports for headless control)
|
||||
self._apply_playwright_headless(playwright_headless)
|
||||
|
||||
try:
|
||||
# Start subprocess with piped stdout/stderr
|
||||
# Use project_dir as cwd so Claude SDK sandbox allows access to project files
|
||||
@@ -412,7 +456,8 @@ class AgentProcessManager:
|
||||
subprocess_env = {
|
||||
**os.environ,
|
||||
"PYTHONUNBUFFERED": "1",
|
||||
"PLAYWRIGHT_HEADLESS": "true" if playwright_headless else "false",
|
||||
"PLAYWRIGHT_CLI_SESSION": f"agent-{self.project_name}-{os.getpid()}",
|
||||
"NODE_COMPILE_CACHE": "", # Disable V8 compile caching to prevent .node file accumulation in %TEMP%
|
||||
**api_env,
|
||||
}
|
||||
|
||||
@@ -471,6 +516,15 @@ class AgentProcessManager:
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
|
||||
# Kill browser daemons before stopping agent
|
||||
try:
|
||||
subprocess.run(
|
||||
["playwright-cli", "kill-all"],
|
||||
timeout=5, capture_output=True,
|
||||
)
|
||||
except (subprocess.TimeoutExpired, FileNotFoundError, OSError):
|
||||
pass
|
||||
|
||||
# CRITICAL: Kill entire process tree, not just orchestrator
|
||||
# This ensures all spawned coding/testing agents are also terminated
|
||||
proc = self.process # Capture reference before async call
|
||||
@@ -484,6 +538,12 @@ class AgentProcessManager:
|
||||
|
||||
self._cleanup_stale_features()
|
||||
self._remove_lock()
|
||||
# Clean up drain signal file if present
|
||||
try:
|
||||
from autoforge_paths import get_pause_drain_path
|
||||
get_pause_drain_path(self.project_dir).unlink(missing_ok=True)
|
||||
except Exception:
|
||||
pass
|
||||
self.status = "stopped"
|
||||
self.process = None
|
||||
self.started_at = None
|
||||
@@ -544,6 +604,47 @@ class AgentProcessManager:
|
||||
logger.exception("Failed to resume agent")
|
||||
return False, f"Failed to resume agent: {e}"
|
||||
|
||||
async def graceful_pause(self) -> tuple[bool, str]:
|
||||
"""Request a graceful pause (drain mode).
|
||||
|
||||
Creates a signal file that the orchestrator polls. Running agents
|
||||
finish their current work before the orchestrator enters a paused state.
|
||||
|
||||
Returns:
|
||||
Tuple of (success, message)
|
||||
"""
|
||||
if not self.process or self.status not in ("running",):
|
||||
return False, "Agent is not running"
|
||||
|
||||
try:
|
||||
from autoforge_paths import get_pause_drain_path
|
||||
drain_path = get_pause_drain_path(self.project_dir)
|
||||
drain_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
drain_path.write_text(str(self.process.pid))
|
||||
self.status = "pausing"
|
||||
return True, "Graceful pause requested"
|
||||
except Exception as e:
|
||||
logger.exception("Failed to request graceful pause")
|
||||
return False, f"Failed to request graceful pause: {e}"
|
||||
|
||||
async def graceful_resume(self) -> tuple[bool, str]:
|
||||
"""Resume from a graceful pause by removing the drain signal file.
|
||||
|
||||
Returns:
|
||||
Tuple of (success, message)
|
||||
"""
|
||||
if not self.process or self.status not in ("pausing", "paused_graceful"):
|
||||
return False, "Agent is not in a graceful pause state"
|
||||
|
||||
try:
|
||||
from autoforge_paths import get_pause_drain_path
|
||||
get_pause_drain_path(self.project_dir).unlink(missing_ok=True)
|
||||
self.status = "running"
|
||||
return True, "Agent resumed from graceful pause"
|
||||
except Exception as e:
|
||||
logger.exception("Failed to resume from graceful pause")
|
||||
return False, f"Failed to resume: {e}"
|
||||
|
||||
async def healthcheck(self) -> bool:
|
||||
"""
|
||||
Check if the agent process is still alive.
|
||||
@@ -559,8 +660,14 @@ class AgentProcessManager:
|
||||
poll = self.process.poll()
|
||||
if poll is not None:
|
||||
# Process has terminated
|
||||
if self.status in ("running", "paused"):
|
||||
if self.status in ("running", "paused", "pausing", "paused_graceful"):
|
||||
self._cleanup_stale_features()
|
||||
# Clean up drain signal file if present
|
||||
try:
|
||||
from autoforge_paths import get_pause_drain_path
|
||||
get_pause_drain_path(self.project_dir).unlink(missing_ok=True)
|
||||
except Exception:
|
||||
pass
|
||||
self.status = "crashed"
|
||||
self._remove_lock()
|
||||
return False
|
||||
@@ -645,8 +752,14 @@ def cleanup_orphaned_locks() -> int:
|
||||
if not project_path.exists():
|
||||
continue
|
||||
|
||||
# Clean up stale drain signal files
|
||||
from autoforge_paths import get_autoforge_dir, get_pause_drain_path
|
||||
drain_file = get_pause_drain_path(project_path)
|
||||
if drain_file.exists():
|
||||
drain_file.unlink(missing_ok=True)
|
||||
logger.info("Removed stale drain signal file for project '%s'", name)
|
||||
|
||||
# Check both legacy and new locations for lock files
|
||||
from autoforge_paths import get_autoforge_dir
|
||||
lock_locations = [
|
||||
project_path / ".agent.lock",
|
||||
get_autoforge_dir(project_path) / ".agent.lock",
|
||||
|
||||
@@ -61,7 +61,7 @@ THOUGHT_PATTERNS = [
|
||||
(re.compile(r'(?:Testing|Verifying|Running tests|Validating)\s+(.+)', re.I), 'testing'),
|
||||
(re.compile(r'(?:Error|Failed|Cannot|Unable to|Exception)\s+(.+)', re.I), 'struggling'),
|
||||
# Test results
|
||||
(re.compile(r'(?:PASS|passed|success)', re.I), 'success'),
|
||||
(re.compile(r'(?:PASS|passed|success)', re.I), 'testing'),
|
||||
(re.compile(r'(?:FAIL|failed|error)', re.I), 'struggling'),
|
||||
]
|
||||
|
||||
@@ -78,6 +78,9 @@ ORCHESTRATOR_PATTERNS = {
|
||||
'testing_complete': re.compile(r'Feature #(\d+) testing (completed|failed)'),
|
||||
'all_complete': re.compile(r'All features complete'),
|
||||
'blocked_features': re.compile(r'(\d+) blocked by dependencies'),
|
||||
'drain_start': re.compile(r'Graceful pause requested'),
|
||||
'drain_complete': re.compile(r'All agents drained'),
|
||||
'drain_resume': re.compile(r'Resuming from graceful pause'),
|
||||
}
|
||||
|
||||
|
||||
@@ -562,6 +565,30 @@ class OrchestratorTracker:
|
||||
'All features complete!'
|
||||
)
|
||||
|
||||
# Graceful pause (drain mode) events
|
||||
elif ORCHESTRATOR_PATTERNS['drain_start'].search(line):
|
||||
self.state = 'draining'
|
||||
update = self._create_update(
|
||||
'drain_start',
|
||||
'Draining active agents...'
|
||||
)
|
||||
|
||||
elif ORCHESTRATOR_PATTERNS['drain_complete'].search(line):
|
||||
self.state = 'paused'
|
||||
self.coding_agents = 0
|
||||
self.testing_agents = 0
|
||||
update = self._create_update(
|
||||
'drain_complete',
|
||||
'All agents drained. Paused.'
|
||||
)
|
||||
|
||||
elif ORCHESTRATOR_PATTERNS['drain_resume'].search(line):
|
||||
self.state = 'scheduling'
|
||||
update = self._create_update(
|
||||
'drain_resume',
|
||||
'Resuming feature scheduling'
|
||||
)
|
||||
|
||||
return update
|
||||
|
||||
def _create_update(
|
||||
|
||||
Reference in New Issue
Block a user