diff --git a/api/database.py b/api/database.py index 3fc586c..cb8e7aa 100644 --- a/api/database.py +++ b/api/database.py @@ -5,6 +5,7 @@ Database Models and Connection SQLite database schema for feature storage using SQLAlchemy. """ +import sys from pathlib import Path from typing import Optional @@ -112,6 +113,57 @@ def _migrate_add_dependencies_column(engine) -> None: conn.commit() +def _is_network_path(path: Path) -> bool: + """Detect if path is on a network filesystem. + + WAL mode doesn't work reliably on network filesystems (NFS, SMB, CIFS) + and can cause database corruption. This function detects common network + path patterns so we can fall back to DELETE mode. + + Args: + path: The path to check + + Returns: + True if the path appears to be on a network filesystem + """ + path_str = str(path.resolve()) + + if sys.platform == "win32": + # Windows UNC paths: \\server\share or \\?\UNC\server\share + if path_str.startswith("\\\\"): + return True + # Mapped network drives - check if the drive is a network drive + try: + import ctypes + drive = path_str[:2] # e.g., "Z:" + if len(drive) == 2 and drive[1] == ":": + # DRIVE_REMOTE = 4 + drive_type = ctypes.windll.kernel32.GetDriveTypeW(drive + "\\") + if drive_type == 4: # DRIVE_REMOTE + return True + except (AttributeError, OSError): + pass + else: + # Unix: Check mount type via /proc/mounts or mount command + try: + with open("/proc/mounts", "r") as f: + mounts = f.read() + # Check each mount point to find which one contains our path + for line in mounts.splitlines(): + parts = line.split() + if len(parts) >= 3: + mount_point = parts[1] + fs_type = parts[2] + # Check if path is under this mount point and if it's a network FS + if path_str.startswith(mount_point): + if fs_type in ("nfs", "nfs4", "cifs", "smbfs", "fuse.sshfs"): + return True + except (FileNotFoundError, PermissionError): + pass + + return False + + def create_database(project_dir: Path) -> tuple: """ Create database and return engine + session maker. @@ -129,9 +181,13 @@ def create_database(project_dir: Path) -> tuple: }) Base.metadata.create_all(bind=engine) - # Enable WAL mode for better concurrent read/write performance + # Choose journal mode based on filesystem type + # WAL mode doesn't work reliably on network filesystems and can cause corruption + is_network = _is_network_path(project_dir) + journal_mode = "DELETE" if is_network else "WAL" + with engine.connect() as conn: - conn.execute(text("PRAGMA journal_mode=WAL")) + conn.execute(text(f"PRAGMA journal_mode={journal_mode}")) conn.execute(text("PRAGMA busy_timeout=30000")) conn.commit() diff --git a/api/migration.py b/api/migration.py index e0d0c51..9309456 100644 --- a/api/migration.py +++ b/api/migration.py @@ -83,6 +83,7 @@ def migrate_json_to_sqlite( steps=feature_dict.get("steps", []), passes=feature_dict.get("passes", False), in_progress=feature_dict.get("in_progress", False), + dependencies=feature_dict.get("dependencies"), ) session.add(feature) imported_count += 1 diff --git a/mcp_server/feature_mcp.py b/mcp_server/feature_mcp.py index f3f7c8d..d9b4e00 100755 --- a/mcp_server/feature_mcp.py +++ b/mcp_server/feature_mcp.py @@ -20,6 +20,7 @@ Tools: import json import os +import random import sys import threading import time as _time @@ -313,9 +314,11 @@ def _feature_claim_next_internal(attempt: int = 0) -> str: if result.rowcount == 0: # Another process claimed it first - retry with backoff session.close() - # Exponential backoff: 0.1s, 0.2s, 0.4s, ... up to 1.0s + # Exponential backoff with jitter: base 0.1s, 0.2s, 0.4s, ... up to 1.0s + # Jitter of up to 30% prevents synchronized retries under high contention backoff = min(0.1 * (2 ** attempt), 1.0) - _time.sleep(backoff) + jitter = random.uniform(0, backoff * 0.3) + _time.sleep(backoff + jitter) return _feature_claim_next_internal(attempt + 1) # Fetch the claimed feature diff --git a/parallel_orchestrator.py b/parallel_orchestrator.py index 8b634f6..3a804f2 100644 --- a/parallel_orchestrator.py +++ b/parallel_orchestrator.py @@ -19,6 +19,8 @@ import time from pathlib import Path from typing import Callable, Awaitable +import psutil + from api.database import Feature, create_database from api.dependency_resolver import are_dependencies_satisfied, compute_scheduling_scores @@ -32,6 +34,59 @@ POLL_INTERVAL = 5 # seconds between checking for ready features MAX_FEATURE_RETRIES = 3 # Maximum times to retry a failed feature +def _kill_process_tree(proc: subprocess.Popen, timeout: float = 5.0) -> None: + """Kill a process and all its child processes. + + On Windows, subprocess.terminate() only kills the immediate process, leaving + orphaned child processes (e.g., spawned browser instances). This function + uses psutil to kill the entire process tree. + + Args: + proc: The subprocess.Popen object to kill + timeout: Seconds to wait for graceful termination before force-killing + """ + try: + parent = psutil.Process(proc.pid) + # Get all children recursively before terminating + children = parent.children(recursive=True) + + # Terminate children first (graceful) + for child in children: + try: + child.terminate() + except psutil.NoSuchProcess: + pass + + # Wait for children to terminate + _, still_alive = psutil.wait_procs(children, timeout=timeout) + + # Force kill any remaining children + for child in still_alive: + try: + child.kill() + except psutil.NoSuchProcess: + pass + + # Now terminate the parent + proc.terminate() + try: + proc.wait(timeout=timeout) + except subprocess.TimeoutExpired: + proc.kill() + proc.wait() + + except psutil.NoSuchProcess: + # Process already dead, just ensure cleanup + try: + proc.terminate() + proc.wait(timeout=1) + except (subprocess.TimeoutExpired, OSError): + try: + proc.kill() + except OSError: + pass + + class ParallelOrchestrator: """Orchestrates parallel execution of independent features.""" @@ -302,7 +357,7 @@ class ParallelOrchestrator: print(f"Feature #{feature_id} {status}", flush=True) def stop_feature(self, feature_id: int) -> tuple[bool, str]: - """Stop a running feature agent.""" + """Stop a running feature agent and all its child processes.""" with self._lock: if feature_id not in self.running_agents: return False, "Feature not running" @@ -313,11 +368,8 @@ class ParallelOrchestrator: if abort: abort.set() if proc: - proc.terminate() - try: - proc.wait(timeout=5) - except subprocess.TimeoutExpired: - proc.kill() + # Kill entire process tree to avoid orphaned children (e.g., browser instances) + _kill_process_tree(proc, timeout=5.0) return True, f"Stopped feature {feature_id}" diff --git a/server/services/process_manager.py b/server/services/process_manager.py index 07015b0..2dc1137 100644 --- a/server/services/process_manager.py +++ b/server/services/process_manager.py @@ -148,16 +148,36 @@ class AgentProcessManager: return self.process.pid if self.process else None def _check_lock(self) -> bool: - """Check if another agent is already running for this project.""" + """Check if another agent is already running for this project. + + Uses PID + process creation time to handle PID reuse on Windows. + """ if not self.lock_file.exists(): return True try: - pid = int(self.lock_file.read_text().strip()) + lock_content = self.lock_file.read_text().strip() + # Support both legacy format (just PID) and new format (PID:CREATE_TIME) + if ":" in lock_content: + pid_str, create_time_str = lock_content.split(":", 1) + pid = int(pid_str) + stored_create_time = float(create_time_str) + else: + # Legacy format - just PID + pid = int(lock_content) + stored_create_time = None + if psutil.pid_exists(pid): # Check if it's actually our agent process try: proc = psutil.Process(pid) + # Verify it's the same process using creation time (handles PID reuse) + if stored_create_time is not None: + # Allow 1 second tolerance for creation time comparison + if abs(proc.create_time() - stored_create_time) > 1.0: + # Different process reused the PID - stale lock + self.lock_file.unlink(missing_ok=True) + return True cmdline = " ".join(proc.cmdline()) if "autonomous_agent_demo.py" in cmdline: return False # Another agent is running @@ -170,11 +190,34 @@ class AgentProcessManager: self.lock_file.unlink(missing_ok=True) return True - def _create_lock(self) -> None: - """Create lock file with current process PID.""" + def _create_lock(self) -> bool: + """Atomically create lock file with current process PID and creation time. + + Returns: + True if lock was created successfully, False if lock already exists. + """ self.lock_file.parent.mkdir(parents=True, exist_ok=True) - if self.process: - self.lock_file.write_text(str(self.process.pid)) + if not self.process: + return False + + try: + # Get process creation time for PID reuse detection + create_time = psutil.Process(self.process.pid).create_time() + lock_content = f"{self.process.pid}:{create_time}" + + # Atomic lock creation using O_CREAT | O_EXCL + # This prevents TOCTOU race conditions + import os + fd = os.open(str(self.lock_file), os.O_CREAT | os.O_EXCL | os.O_WRONLY) + os.write(fd, lock_content.encode()) + os.close(fd) + return True + except FileExistsError: + # Another process beat us to it + return False + except (psutil.NoSuchProcess, OSError) as e: + logger.warning(f"Failed to create lock file: {e}") + return False def _remove_lock(self) -> None: """Remove lock file.""" @@ -305,7 +348,17 @@ class AgentProcessManager: cwd=str(self.project_dir), ) - self._create_lock() + # Atomic lock creation - if it fails, another process beat us + if not self._create_lock(): + # Kill the process we just started since we couldn't get the lock + self.process.terminate() + try: + self.process.wait(timeout=5) + except subprocess.TimeoutExpired: + self.process.kill() + self.process = None + return False, "Another agent instance is already running for this project" + self.started_at = datetime.now() self.status = "running" @@ -511,13 +564,29 @@ def cleanup_orphaned_locks() -> int: continue try: - pid_str = lock_file.read_text().strip() - pid = int(pid_str) + lock_content = lock_file.read_text().strip() + # Support both legacy format (just PID) and new format (PID:CREATE_TIME) + if ":" in lock_content: + pid_str, create_time_str = lock_content.split(":", 1) + pid = int(pid_str) + stored_create_time = float(create_time_str) + else: + # Legacy format - just PID + pid = int(lock_content) + stored_create_time = None # Check if process is still running if psutil.pid_exists(pid): try: proc = psutil.Process(pid) + # Verify it's the same process using creation time (handles PID reuse) + if stored_create_time is not None: + if abs(proc.create_time() - stored_create_time) > 1.0: + # Different process reused the PID - stale lock + lock_file.unlink(missing_ok=True) + cleaned += 1 + logger.info("Removed orphaned lock file for project '%s' (PID reused)", name) + continue cmdline = " ".join(proc.cmdline()) if "autonomous_agent_demo.py" in cmdline: # Process is still running, don't remove