Merge branch 'master' into master

This commit is contained in:
syphonetic
2026-02-04 05:50:35 +08:00
committed by GitHub
98 changed files with 7360 additions and 4816 deletions

View File

@@ -7,6 +7,7 @@ Provides REST API, WebSocket, and static file serving.
"""
import asyncio
import logging
import os
import shutil
import sys
@@ -42,6 +43,7 @@ from .routers import (
)
from .schemas import SetupStatus
from .services.assistant_chat_session import cleanup_all_sessions as cleanup_assistant_sessions
from .services.chat_constants import ROOT_DIR
from .services.dev_server_manager import (
cleanup_all_devservers,
cleanup_orphaned_devserver_locks,
@@ -53,7 +55,6 @@ from .services.terminal_manager import cleanup_all_terminals
from .websocket import project_websocket
# Paths
ROOT_DIR = Path(__file__).parent.parent
UI_DIST_DIR = ROOT_DIR / "ui" / "dist"
@@ -88,10 +89,19 @@ app = FastAPI(
lifespan=lifespan,
)
# Module logger
logger = logging.getLogger(__name__)
# Check if remote access is enabled via environment variable
# Set by start_ui.py when --host is not 127.0.0.1
ALLOW_REMOTE = os.environ.get("AUTOCODER_ALLOW_REMOTE", "").lower() in ("1", "true", "yes")
if ALLOW_REMOTE:
logger.warning(
"ALLOW_REMOTE is enabled. Terminal WebSocket is exposed without sandboxing. "
"Only use this in trusted network environments."
)
# CORS - allow all origins when remote access is enabled, otherwise localhost only
if ALLOW_REMOTE:
app.add_middleware(
@@ -222,7 +232,14 @@ if UI_DIST_DIR.exists():
raise HTTPException(status_code=404)
# Try to serve the file directly
file_path = UI_DIST_DIR / path
file_path = (UI_DIST_DIR / path).resolve()
# Ensure resolved path is within UI_DIST_DIR (prevent path traversal)
try:
file_path.relative_to(UI_DIST_DIR.resolve())
except ValueError:
raise HTTPException(status_code=404)
if file_path.exists() and file_path.is_file():
return FileResponse(file_path)

View File

@@ -6,31 +6,22 @@ API endpoints for agent control (start/stop/pause/resume).
Uses project registry for path lookups.
"""
import re
from pathlib import Path
from fastapi import APIRouter, HTTPException
from ..schemas import AgentActionResponse, AgentStartRequest, AgentStatus
from ..services.chat_constants import ROOT_DIR
from ..services.process_manager import get_manager
from ..utils.project_helpers import get_project_path as _get_project_path
from ..utils.validation import validate_project_name
def _get_project_path(project_name: str) -> Path:
"""Get project path from registry."""
import sys
root = Path(__file__).parent.parent.parent
if str(root) not in sys.path:
sys.path.insert(0, str(root))
from registry import get_project_path
return get_project_path(project_name)
def _get_settings_defaults() -> tuple[bool, str, int]:
def _get_settings_defaults() -> tuple[bool, str, int, bool, int]:
"""Get defaults from global settings.
Returns:
Tuple of (yolo_mode, model, testing_agent_ratio)
Tuple of (yolo_mode, model, testing_agent_ratio, playwright_headless, batch_size)
"""
import sys
root = Path(__file__).parent.parent.parent
@@ -49,24 +40,18 @@ def _get_settings_defaults() -> tuple[bool, str, int]:
except (ValueError, TypeError):
testing_agent_ratio = 1
return yolo_mode, model, testing_agent_ratio
playwright_headless = (settings.get("playwright_headless") or "true").lower() == "true"
try:
batch_size = int(settings.get("batch_size", "3"))
except (ValueError, TypeError):
batch_size = 3
return yolo_mode, model, testing_agent_ratio, playwright_headless, batch_size
router = APIRouter(prefix="/api/projects/{project_name}/agent", tags=["agent"])
# Root directory for process manager
ROOT_DIR = Path(__file__).parent.parent.parent
def validate_project_name(name: str) -> str:
"""Validate and sanitize project name to prevent path traversal."""
if not re.match(r'^[a-zA-Z0-9_-]{1,50}$', name):
raise HTTPException(
status_code=400,
detail="Invalid project name"
)
return name
def get_project_manager(project_name: str):
"""Get the process manager for a project."""
@@ -111,18 +96,22 @@ async def start_agent(
manager = get_project_manager(project_name)
# Get defaults from global settings if not provided in request
default_yolo, default_model, default_testing_ratio = _get_settings_defaults()
default_yolo, default_model, default_testing_ratio, playwright_headless, default_batch_size = _get_settings_defaults()
yolo_mode = request.yolo_mode if request.yolo_mode is not None else default_yolo
model = request.model if request.model else default_model
max_concurrency = request.max_concurrency or 1
testing_agent_ratio = request.testing_agent_ratio if request.testing_agent_ratio is not None else default_testing_ratio
batch_size = default_batch_size
success, message = await manager.start(
yolo_mode=yolo_mode,
model=model,
max_concurrency=max_concurrency,
testing_agent_ratio=testing_agent_ratio,
playwright_headless=playwright_headless,
batch_size=batch_size,
)
# Notify scheduler of manual start (to prevent auto-stop during scheduled window)

View File

@@ -7,8 +7,6 @@ WebSocket and REST endpoints for the read-only project assistant.
import json
import logging
import re
from pathlib import Path
from typing import Optional
from fastapi import APIRouter, HTTPException, WebSocket, WebSocketDisconnect
@@ -27,30 +25,13 @@ from ..services.assistant_database import (
get_conversation,
get_conversations,
)
from ..utils.project_helpers import get_project_path as _get_project_path
from ..utils.validation import is_valid_project_name as validate_project_name
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/assistant", tags=["assistant-chat"])
# Root directory
ROOT_DIR = Path(__file__).parent.parent.parent
def _get_project_path(project_name: str) -> Optional[Path]:
"""Get project path from registry."""
import sys
root = Path(__file__).parent.parent.parent
if str(root) not in sys.path:
sys.path.insert(0, str(root))
from registry import get_project_path
return get_project_path(project_name)
def validate_project_name(name: str) -> bool:
"""Validate project name to prevent path traversal."""
return bool(re.match(r'^[a-zA-Z0-9_-]{1,50}$', name))
# ============================================================================
# Pydantic Models
@@ -145,9 +126,9 @@ async def create_project_conversation(project_name: str):
conversation = create_conversation(project_dir, project_name)
return ConversationSummary(
id=conversation.id,
project_name=conversation.project_name,
title=conversation.title,
id=int(conversation.id),
project_name=str(conversation.project_name),
title=str(conversation.title) if conversation.title else None,
created_at=conversation.created_at.isoformat() if conversation.created_at else None,
updated_at=conversation.updated_at.isoformat() if conversation.updated_at else None,
message_count=0,

View File

@@ -8,7 +8,6 @@ Allows adding multiple features to existing projects via natural language.
import json
import logging
from pathlib import Path
from typing import Optional
from fastapi import APIRouter, HTTPException, WebSocket, WebSocketDisconnect
@@ -22,27 +21,13 @@ from ..services.expand_chat_session import (
list_expand_sessions,
remove_expand_session,
)
from ..utils.project_helpers import get_project_path as _get_project_path
from ..utils.validation import validate_project_name
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/expand", tags=["expand-project"])
# Root directory
ROOT_DIR = Path(__file__).parent.parent.parent
def _get_project_path(project_name: str) -> Path:
"""Get project path from registry."""
import sys
root = Path(__file__).parent.parent.parent
if str(root) not in sys.path:
sys.path.insert(0, str(root))
from registry import get_project_path
return get_project_path(project_name)
# ============================================================================
@@ -136,7 +121,8 @@ async def expand_project_websocket(websocket: WebSocket, project_name: str):
return
# Verify project has app_spec.txt
spec_path = project_dir / "prompts" / "app_spec.txt"
from autocoder_paths import get_prompts_dir
spec_path = get_prompts_dir(project_dir) / "app_spec.txt"
if not spec_path.exists():
await websocket.close(code=4004, reason="Project has no spec. Create spec first.")
return

View File

@@ -8,10 +8,12 @@ API endpoints for feature/test case management.
import logging
from contextlib import contextmanager
from pathlib import Path
from typing import Literal
from fastapi import APIRouter, HTTPException
from ..schemas import (
DependencyGraphEdge,
DependencyGraphNode,
DependencyGraphResponse,
DependencyUpdate,
@@ -22,6 +24,7 @@ from ..schemas import (
FeatureResponse,
FeatureUpdate,
)
from ..utils.project_helpers import get_project_path as _get_project_path
from ..utils.validation import validate_project_name
# Lazy imports to avoid circular dependencies
@@ -31,17 +34,6 @@ _Feature = None
logger = logging.getLogger(__name__)
def _get_project_path(project_name: str) -> Path:
"""Get project path from registry."""
import sys
root = Path(__file__).parent.parent.parent
if str(root) not in sys.path:
sys.path.insert(0, str(root))
from registry import get_project_path
return get_project_path(project_name)
def _get_db_classes():
"""Lazy import of database classes."""
global _create_database, _Feature
@@ -71,6 +63,9 @@ def get_db_session(project_dir: Path):
session = SessionLocal()
try:
yield session
except Exception:
session.rollback()
raise
finally:
session.close()
@@ -131,7 +126,8 @@ async def list_features(project_name: str):
if not project_dir.exists():
raise HTTPException(status_code=404, detail="Project directory not found")
db_file = project_dir / "features.db"
from autocoder_paths import get_features_db_path
db_file = get_features_db_path(project_dir)
if not db_file.exists():
return FeatureListResponse(pending=[], in_progress=[], done=[])
@@ -326,7 +322,8 @@ async def get_dependency_graph(project_name: str):
if not project_dir.exists():
raise HTTPException(status_code=404, detail="Project directory not found")
db_file = project_dir / "features.db"
from autocoder_paths import get_features_db_path
db_file = get_features_db_path(project_dir)
if not db_file.exists():
return DependencyGraphResponse(nodes=[], edges=[])
@@ -344,6 +341,7 @@ async def get_dependency_graph(project_name: str):
deps = f.dependencies or []
blocking = [d for d in deps if d not in passing_ids]
status: Literal["pending", "in_progress", "done", "blocked"]
if f.passes:
status = "done"
elif blocking:
@@ -363,7 +361,7 @@ async def get_dependency_graph(project_name: str):
))
for dep_id in deps:
edges.append({"source": dep_id, "target": f.id})
edges.append(DependencyGraphEdge(source=dep_id, target=f.id))
return DependencyGraphResponse(nodes=nodes, edges=edges)
except HTTPException:
@@ -390,7 +388,8 @@ async def get_feature(project_name: str, feature_id: int):
if not project_dir.exists():
raise HTTPException(status_code=404, detail="Project directory not found")
db_file = project_dir / "features.db"
from autocoder_paths import get_features_db_path
db_file = get_features_db_path(project_dir)
if not db_file.exists():
raise HTTPException(status_code=404, detail="No features database found")

View File

@@ -6,6 +6,7 @@ API endpoints for browsing the filesystem for project folder selection.
Provides cross-platform support for Windows, macOS, and Linux.
"""
import functools
import logging
import os
import re
@@ -14,6 +15,8 @@ from pathlib import Path
from fastapi import APIRouter, HTTPException, Query
from security import SENSITIVE_DIRECTORIES
# Module logger
logger = logging.getLogger(__name__)
@@ -77,17 +80,10 @@ LINUX_BLOCKED = {
"/opt",
}
# Universal blocked paths (relative to home directory)
UNIVERSAL_BLOCKED_RELATIVE = {
".ssh",
".aws",
".gnupg",
".config/gh",
".netrc",
".docker",
".kube",
".terraform",
}
# Universal blocked paths (relative to home directory).
# Delegates to the canonical SENSITIVE_DIRECTORIES set in security.py so that
# the filesystem browser and the EXTRA_READ_PATHS validator share one source of truth.
UNIVERSAL_BLOCKED_RELATIVE = SENSITIVE_DIRECTORIES
# Patterns for files that should not be shown
HIDDEN_PATTERNS = [
@@ -99,8 +95,14 @@ HIDDEN_PATTERNS = [
]
def get_blocked_paths() -> set[Path]:
"""Get the set of blocked paths for the current platform."""
@functools.lru_cache(maxsize=1)
def get_blocked_paths() -> frozenset[Path]:
"""
Get the set of blocked paths for the current platform.
Cached because the platform and home directory do not change at runtime,
and this function is called once per directory entry in list_directory().
"""
home = Path.home()
blocked = set()
@@ -119,7 +121,7 @@ def get_blocked_paths() -> set[Path]:
for rel in UNIVERSAL_BLOCKED_RELATIVE:
blocked.add((home / rel).resolve())
return blocked
return frozenset(blocked)
def is_path_blocked(path: Path) -> bool:

View File

@@ -10,6 +10,7 @@ import re
import shutil
import sys
from pathlib import Path
from typing import Any, Callable
from fastapi import APIRouter, HTTPException
@@ -24,11 +25,12 @@ from ..schemas import (
)
# Lazy imports to avoid circular dependencies
# These are initialized by _init_imports() before first use.
_imports_initialized = False
_check_spec_exists = None
_scaffold_project_prompts = None
_get_project_prompts_dir = None
_count_passing_tests = None
_check_spec_exists: Callable[..., Any] | None = None
_scaffold_project_prompts: Callable[..., Any] | None = None
_get_project_prompts_dir: Callable[..., Any] | None = None
_count_passing_tests: Callable[..., Any] | None = None
def _init_imports():
@@ -99,6 +101,7 @@ def validate_project_name(name: str) -> str:
def get_project_stats(project_dir: Path) -> ProjectStats:
"""Get statistics for a project."""
_init_imports()
assert _count_passing_tests is not None # guaranteed by _init_imports()
passing, in_progress, total = _count_passing_tests(project_dir)
percentage = (passing / total * 100) if total > 0 else 0.0
return ProjectStats(
@@ -113,6 +116,7 @@ def get_project_stats(project_dir: Path) -> ProjectStats:
async def list_projects():
"""List all registered projects."""
_init_imports()
assert _check_spec_exists is not None # guaranteed by _init_imports()
(_, _, _, list_registered_projects, validate_project_path,
get_project_concurrency, _) = _get_registry_functions()
@@ -145,6 +149,7 @@ async def list_projects():
async def create_project(project: ProjectCreate):
"""Create a new project at the specified path."""
_init_imports()
assert _scaffold_project_prompts is not None # guaranteed by _init_imports()
(register_project, _, get_project_path, list_registered_projects,
_, _, _) = _get_registry_functions()
@@ -225,6 +230,8 @@ async def create_project(project: ProjectCreate):
async def get_project(name: str):
"""Get detailed information about a project."""
_init_imports()
assert _check_spec_exists is not None # guaranteed by _init_imports()
assert _get_project_prompts_dir is not None # guaranteed by _init_imports()
(_, _, get_project_path, _, _, get_project_concurrency, _) = _get_registry_functions()
name = validate_project_name(name)
@@ -269,8 +276,8 @@ async def delete_project(name: str, delete_files: bool = False):
raise HTTPException(status_code=404, detail=f"Project '{name}' not found")
# Check if agent is running
lock_file = project_dir / ".agent.lock"
if lock_file.exists():
from autocoder_paths import has_agent_running
if has_agent_running(project_dir):
raise HTTPException(
status_code=409,
detail="Cannot delete project while agent is running. Stop the agent first."
@@ -296,6 +303,7 @@ async def delete_project(name: str, delete_files: bool = False):
async def get_project_prompts(name: str):
"""Get the content of project prompt files."""
_init_imports()
assert _get_project_prompts_dir is not None # guaranteed by _init_imports()
(_, _, get_project_path, _, _, _, _) = _get_registry_functions()
name = validate_project_name(name)
@@ -307,7 +315,7 @@ async def get_project_prompts(name: str):
if not project_dir.exists():
raise HTTPException(status_code=404, detail="Project directory not found")
prompts_dir = _get_project_prompts_dir(project_dir)
prompts_dir: Path = _get_project_prompts_dir(project_dir)
def read_file(filename: str) -> str:
filepath = prompts_dir / filename
@@ -329,6 +337,7 @@ async def get_project_prompts(name: str):
async def update_project_prompts(name: str, prompts: ProjectPromptsUpdate):
"""Update project prompt files."""
_init_imports()
assert _get_project_prompts_dir is not None # guaranteed by _init_imports()
(_, _, get_project_path, _, _, _, _) = _get_registry_functions()
name = validate_project_name(name)
@@ -398,8 +407,8 @@ async def reset_project(name: str, full_reset: bool = False):
raise HTTPException(status_code=404, detail="Project directory not found")
# Check if agent is running
lock_file = project_dir / ".agent.lock"
if lock_file.exists():
from autocoder_paths import has_agent_running
if has_agent_running(project_dir):
raise HTTPException(
status_code=409,
detail="Cannot reset project while agent is running. Stop the agent first."
@@ -415,36 +424,58 @@ async def reset_project(name: str, full_reset: bool = False):
deleted_files: list[str] = []
# Files to delete in quick reset
quick_reset_files = [
"features.db",
"features.db-wal", # WAL mode journal file
"features.db-shm", # WAL mode shared memory file
"assistant.db",
"assistant.db-wal",
"assistant.db-shm",
".claude_settings.json",
".claude_assistant_settings.json",
from autocoder_paths import (
get_assistant_db_path,
get_claude_assistant_settings_path,
get_claude_settings_path,
get_features_db_path,
)
# Build list of files to delete using path helpers (finds files at current location)
# Plus explicit old-location fallbacks for backward compatibility
db_path = get_features_db_path(project_dir)
asst_path = get_assistant_db_path(project_dir)
reset_files: list[Path] = [
db_path,
db_path.with_suffix(".db-wal"),
db_path.with_suffix(".db-shm"),
asst_path,
asst_path.with_suffix(".db-wal"),
asst_path.with_suffix(".db-shm"),
get_claude_settings_path(project_dir),
get_claude_assistant_settings_path(project_dir),
# Also clean old root-level locations if they exist
project_dir / "features.db",
project_dir / "features.db-wal",
project_dir / "features.db-shm",
project_dir / "assistant.db",
project_dir / "assistant.db-wal",
project_dir / "assistant.db-shm",
project_dir / ".claude_settings.json",
project_dir / ".claude_assistant_settings.json",
]
for filename in quick_reset_files:
file_path = project_dir / filename
for file_path in reset_files:
if file_path.exists():
try:
relative = file_path.relative_to(project_dir)
file_path.unlink()
deleted_files.append(filename)
deleted_files.append(str(relative))
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to delete {filename}: {e}")
raise HTTPException(status_code=500, detail=f"Failed to delete {file_path.name}: {e}")
# Full reset: also delete prompts directory
if full_reset:
prompts_dir = project_dir / "prompts"
if prompts_dir.exists():
try:
shutil.rmtree(prompts_dir)
deleted_files.append("prompts/")
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to delete prompts/: {e}")
from autocoder_paths import get_prompts_dir
# Delete prompts from both possible locations
for prompts_dir in [get_prompts_dir(project_dir), project_dir / "prompts"]:
if prompts_dir.exists():
try:
relative = prompts_dir.relative_to(project_dir)
shutil.rmtree(prompts_dir)
deleted_files.append(f"{relative}/")
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to delete prompts: {e}")
return {
"success": True,
@@ -458,6 +489,8 @@ async def reset_project(name: str, full_reset: bool = False):
async def update_project_settings(name: str, settings: ProjectSettingsUpdate):
"""Update project-level settings (concurrency, etc.)."""
_init_imports()
assert _check_spec_exists is not None # guaranteed by _init_imports()
assert _get_project_prompts_dir is not None # guaranteed by _init_imports()
(_, _, get_project_path, _, _, get_project_concurrency,
set_project_concurrency) = _get_registry_functions()

View File

@@ -6,12 +6,10 @@ API endpoints for managing agent schedules.
Provides CRUD operations for time-based schedule configuration.
"""
import re
import sys
from contextlib import contextmanager
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Generator, Tuple
from typing import TYPE_CHECKING, Generator, Tuple
from fastapi import APIRouter, HTTPException
from sqlalchemy.orm import Session
@@ -26,17 +24,21 @@ from ..schemas import (
ScheduleResponse,
ScheduleUpdate,
)
from ..utils.project_helpers import get_project_path as _get_project_path
from ..utils.validation import validate_project_name
if TYPE_CHECKING:
from api.database import Schedule as ScheduleModel
def _get_project_path(project_name: str) -> Path:
"""Get project path from registry."""
root = Path(__file__).parent.parent.parent
if str(root) not in sys.path:
sys.path.insert(0, str(root))
from registry import get_project_path
return get_project_path(project_name)
def _schedule_to_response(schedule: "ScheduleModel") -> ScheduleResponse:
"""Convert a Schedule ORM object to a ScheduleResponse Pydantic model.
SQLAlchemy Column descriptors resolve to Python types at instance access time,
but mypy sees the Column[T] descriptor type. Using model_validate with
from_attributes handles this conversion correctly.
"""
return ScheduleResponse.model_validate(schedule, from_attributes=True)
router = APIRouter(
prefix="/api/projects/{project_name}/schedules",
@@ -44,16 +46,6 @@ router = APIRouter(
)
def validate_project_name(name: str) -> str:
"""Validate and sanitize project name to prevent path traversal."""
if not re.match(r'^[a-zA-Z0-9_-]{1,50}$', name):
raise HTTPException(
status_code=400,
detail="Invalid project name"
)
return name
@contextmanager
def _get_db_session(project_name: str) -> Generator[Tuple[Session, Path], None, None]:
"""Get database session for a project as a context manager.
@@ -84,6 +76,9 @@ def _get_db_session(project_name: str) -> Generator[Tuple[Session, Path], None,
db = SessionLocal()
try:
yield db, project_path
except Exception:
db.rollback()
raise
finally:
db.close()
@@ -99,21 +94,7 @@ async def list_schedules(project_name: str):
).order_by(Schedule.start_time).all()
return ScheduleListResponse(
schedules=[
ScheduleResponse(
id=s.id,
project_name=s.project_name,
start_time=s.start_time,
duration_minutes=s.duration_minutes,
days_of_week=s.days_of_week,
enabled=s.enabled,
yolo_mode=s.yolo_mode,
model=s.model,
crash_count=s.crash_count,
created_at=s.created_at,
)
for s in schedules
]
schedules=[_schedule_to_response(s) for s in schedules]
)
@@ -187,18 +168,7 @@ async def create_schedule(project_name: str, data: ScheduleCreate):
except Exception as e:
logger.error(f"Failed to start agent for schedule {schedule.id}: {e}", exc_info=True)
return ScheduleResponse(
id=schedule.id,
project_name=schedule.project_name,
start_time=schedule.start_time,
duration_minutes=schedule.duration_minutes,
days_of_week=schedule.days_of_week,
enabled=schedule.enabled,
yolo_mode=schedule.yolo_mode,
model=schedule.model,
crash_count=schedule.crash_count,
created_at=schedule.created_at,
)
return _schedule_to_response(schedule)
@router.get("/next", response_model=NextRunResponse)
@@ -256,8 +226,8 @@ async def get_next_scheduled_run(project_name: str):
return NextRunResponse(
has_schedules=True,
next_start=next_start.isoformat() if (active_count == 0 and next_start) else None,
next_end=latest_end.isoformat() if latest_end else None,
next_start=next_start if active_count == 0 else None,
next_end=latest_end,
is_currently_running=active_count > 0,
active_schedule_count=active_count,
)
@@ -277,18 +247,7 @@ async def get_schedule(project_name: str, schedule_id: int):
if not schedule:
raise HTTPException(status_code=404, detail="Schedule not found")
return ScheduleResponse(
id=schedule.id,
project_name=schedule.project_name,
start_time=schedule.start_time,
duration_minutes=schedule.duration_minutes,
days_of_week=schedule.days_of_week,
enabled=schedule.enabled,
yolo_mode=schedule.yolo_mode,
model=schedule.model,
crash_count=schedule.crash_count,
created_at=schedule.created_at,
)
return _schedule_to_response(schedule)
@router.patch("/{schedule_id}", response_model=ScheduleResponse)
@@ -331,18 +290,7 @@ async def update_schedule(
# Was enabled, now disabled - remove jobs
scheduler.remove_schedule(schedule_id)
return ScheduleResponse(
id=schedule.id,
project_name=schedule.project_name,
start_time=schedule.start_time,
duration_minutes=schedule.duration_minutes,
days_of_week=schedule.days_of_week,
enabled=schedule.enabled,
yolo_mode=schedule.yolo_mode,
model=schedule.model,
crash_count=schedule.crash_count,
created_at=schedule.created_at,
)
return _schedule_to_response(schedule)
@router.delete("/{schedule_id}", status_code=204)

View File

@@ -9,17 +9,16 @@ Settings are stored in the registry database and shared across all projects.
import mimetypes
import os
import sys
from pathlib import Path
from fastapi import APIRouter
from ..schemas import ModelInfo, ModelsResponse, SettingsResponse, SettingsUpdate
from ..services.chat_constants import ROOT_DIR
# Mimetype fix for Windows - must run before StaticFiles is mounted
mimetypes.add_type("text/javascript", ".js", True)
# Add root to path for registry import
ROOT_DIR = Path(__file__).parent.parent.parent
# Ensure root is on sys.path for registry import
if str(ROOT_DIR) not in sys.path:
sys.path.insert(0, str(ROOT_DIR))
@@ -92,6 +91,8 @@ async def get_settings():
glm_mode=_is_glm_mode(),
ollama_mode=_is_ollama_mode(),
testing_agent_ratio=_parse_int(all_settings.get("testing_agent_ratio"), 1),
playwright_headless=_parse_bool(all_settings.get("playwright_headless"), default=True),
batch_size=_parse_int(all_settings.get("batch_size"), 3),
)
@@ -107,6 +108,12 @@ async def update_settings(update: SettingsUpdate):
if update.testing_agent_ratio is not None:
set_setting("testing_agent_ratio", str(update.testing_agent_ratio))
if update.playwright_headless is not None:
set_setting("playwright_headless", "true" if update.playwright_headless else "false")
if update.batch_size is not None:
set_setting("batch_size", str(update.batch_size))
# Return updated settings
all_settings = get_all_settings()
return SettingsResponse(
@@ -115,4 +122,6 @@ async def update_settings(update: SettingsUpdate):
glm_mode=_is_glm_mode(),
ollama_mode=_is_ollama_mode(),
testing_agent_ratio=_parse_int(all_settings.get("testing_agent_ratio"), 1),
playwright_headless=_parse_bool(all_settings.get("playwright_headless"), default=True),
batch_size=_parse_int(all_settings.get("batch_size"), 3),
)

View File

@@ -7,8 +7,6 @@ WebSocket and REST endpoints for interactive spec creation with Claude.
import json
import logging
import re
from pathlib import Path
from typing import Optional
from fastapi import APIRouter, HTTPException, WebSocket, WebSocketDisconnect
@@ -22,30 +20,13 @@ from ..services.spec_chat_session import (
list_sessions,
remove_session,
)
from ..utils.project_helpers import get_project_path as _get_project_path
from ..utils.validation import is_valid_project_name as validate_project_name
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/spec", tags=["spec-creation"])
# Root directory
ROOT_DIR = Path(__file__).parent.parent.parent
def _get_project_path(project_name: str) -> Path:
"""Get project path from registry."""
import sys
root = Path(__file__).parent.parent.parent
if str(root) not in sys.path:
sys.path.insert(0, str(root))
from registry import get_project_path
return get_project_path(project_name)
def validate_project_name(name: str) -> bool:
"""Validate project name to prevent path traversal."""
return bool(re.match(r'^[a-zA-Z0-9_-]{1,50}$', name))
# ============================================================================
# REST Endpoints
@@ -124,7 +105,8 @@ async def get_spec_file_status(project_name: str):
if not project_dir.exists():
raise HTTPException(status_code=404, detail="Project directory not found")
status_file = project_dir / "prompts" / ".spec_status.json"
from autocoder_paths import get_prompts_dir
status_file = get_prompts_dir(project_dir) / ".spec_status.json"
if not status_file.exists():
return SpecFileStatus(

View File

@@ -12,8 +12,6 @@ import base64
import json
import logging
import re
import sys
from pathlib import Path
from fastapi import APIRouter, HTTPException, WebSocket, WebSocketDisconnect
from pydantic import BaseModel
@@ -27,13 +25,8 @@ from ..services.terminal_manager import (
rename_terminal,
stop_terminal_session,
)
# Add project root to path for registry import
_root = Path(__file__).parent.parent.parent
if str(_root) not in sys.path:
sys.path.insert(0, str(_root))
from registry import get_project_path as registry_get_project_path
from ..utils.project_helpers import get_project_path as _get_project_path
from ..utils.validation import is_valid_project_name as validate_project_name
logger = logging.getLogger(__name__)
@@ -48,27 +41,6 @@ class TerminalCloseCode:
FAILED_TO_START = 4500
def _get_project_path(project_name: str) -> Path | None:
"""Get project path from registry."""
return registry_get_project_path(project_name)
def validate_project_name(name: str) -> bool:
"""
Validate project name to prevent path traversal attacks.
Allows only alphanumeric characters, underscores, and hyphens.
Maximum length of 50 characters.
Args:
name: The project name to validate
Returns:
True if valid, False otherwise
"""
return bool(re.match(r"^[a-zA-Z0-9_-]{1,50}$", name))
def validate_terminal_id(terminal_id: str) -> bool:
"""
Validate terminal ID format.

View File

@@ -398,6 +398,8 @@ class SettingsResponse(BaseModel):
glm_mode: bool = False # True if GLM API is configured via .env
ollama_mode: bool = False # True if Ollama API is configured via .env
testing_agent_ratio: int = 1 # Regression testing agents (0-3)
playwright_headless: bool = True
batch_size: int = 3 # Features per coding agent batch (1-3)
class ModelsResponse(BaseModel):
@@ -411,6 +413,8 @@ class SettingsUpdate(BaseModel):
yolo_mode: bool | None = None
model: str | None = None
testing_agent_ratio: int | None = None # 0-3
playwright_headless: bool | None = None
batch_size: int | None = None # Features per agent batch (1-3)
@field_validator('model')
@classmethod
@@ -426,6 +430,13 @@ class SettingsUpdate(BaseModel):
raise ValueError("testing_agent_ratio must be between 0 and 3")
return v
@field_validator('batch_size')
@classmethod
def validate_batch_size(cls, v: int | None) -> int | None:
if v is not None and (v < 1 or v > 3):
raise ValueError("batch_size must be between 1 and 3")
return v
# ============================================================================
# Dev Server Schemas

View File

@@ -25,25 +25,13 @@ from .assistant_database import (
create_conversation,
get_messages,
)
from .chat_constants import API_ENV_VARS, ROOT_DIR
# Load environment variables from .env file if present
load_dotenv()
logger = logging.getLogger(__name__)
# Root directory of the project
ROOT_DIR = Path(__file__).parent.parent.parent
# Environment variables to pass through to Claude CLI for API configuration
API_ENV_VARS = [
"ANTHROPIC_BASE_URL",
"ANTHROPIC_AUTH_TOKEN",
"API_TIMEOUT_MS",
"ANTHROPIC_DEFAULT_SONNET_MODEL",
"ANTHROPIC_DEFAULT_OPUS_MODEL",
"ANTHROPIC_DEFAULT_HAIKU_MODEL",
]
# Read-only feature MCP tools
READONLY_FEATURE_MCP_TOOLS = [
"mcp__features__feature_get_stats",
@@ -76,7 +64,8 @@ def get_system_prompt(project_name: str, project_dir: Path) -> str:
"""Generate the system prompt for the assistant with project context."""
# Try to load app_spec.txt for context
app_spec_content = ""
app_spec_path = project_dir / "prompts" / "app_spec.txt"
from autocoder_paths import get_prompts_dir
app_spec_path = get_prompts_dir(project_dir) / "app_spec.txt"
if app_spec_path.exists():
try:
app_spec_content = app_spec_path.read_text(encoding="utf-8")
@@ -90,6 +79,8 @@ def get_system_prompt(project_name: str, project_dir: Path) -> str:
Your role is to help users understand the codebase, answer questions about features, and manage the project backlog. You can READ files and CREATE/MANAGE features, but you cannot modify source code.
You have MCP tools available for feature management. Use them directly by calling the tool -- do not suggest CLI commands, bash commands, or curl commands to the user. You can create features yourself using the feature_create and feature_create_bulk tools.
## What You CAN Do
**Codebase Analysis (Read-Only):**
@@ -134,17 +125,21 @@ If the user asks you to modify code, explain that you're a project assistant and
## Creating Features
When a user asks to add a feature, gather the following information:
1. **Category**: A grouping like "Authentication", "API", "UI", "Database"
2. **Name**: A concise, descriptive name
3. **Description**: What the feature should do
4. **Steps**: How to verify/implement the feature (as a list)
When a user asks to add a feature, use the `feature_create` or `feature_create_bulk` MCP tools directly:
For a **single feature**, call `feature_create` with:
- category: A grouping like "Authentication", "API", "UI", "Database"
- name: A concise, descriptive name
- description: What the feature should do
- steps: List of verification/implementation steps
For **multiple features**, call `feature_create_bulk` with an array of feature objects.
You can ask clarifying questions if the user's request is vague, or make reasonable assumptions for simple requests.
**Example interaction:**
User: "Add a feature for S3 sync"
You: I'll create that feature. Let me add it to the backlog...
You: I'll create that feature now.
[calls feature_create with appropriate parameters]
You: Done! I've added "S3 Sync Integration" to your backlog. It's now visible on the kanban board.
@@ -208,7 +203,7 @@ class AssistantChatSession:
# Create a new conversation if we don't have one
if is_new_conversation:
conv = create_conversation(self.project_dir, self.project_name)
self.conversation_id = conv.id
self.conversation_id = int(conv.id) # type coercion: Column[int] -> int
yield {"type": "conversation_created", "conversation_id": self.conversation_id}
# Build permissions list for assistant access (read + feature management)
@@ -229,7 +224,9 @@ class AssistantChatSession:
"allow": permissions_list,
},
}
settings_file = self.project_dir / ".claude_assistant_settings.json"
from autocoder_paths import get_claude_assistant_settings_path
settings_file = get_claude_assistant_settings_path(self.project_dir)
settings_file.parent.mkdir(parents=True, exist_ok=True)
with open(settings_file, "w") as f:
json.dump(security_settings, f, indent=2)
@@ -261,7 +258,11 @@ class AssistantChatSession:
system_cli = shutil.which("claude")
# Build environment overrides for API configuration
sdk_env = {var: os.getenv(var) for var in API_ENV_VARS if os.getenv(var)}
sdk_env: dict[str, str] = {}
for var in API_ENV_VARS:
value = os.getenv(var)
if value:
sdk_env[var] = value
# Determine model from environment or use default
# This allows using alternative APIs (e.g., GLM via z.ai) that may not support Claude model names
@@ -277,7 +278,7 @@ class AssistantChatSession:
# This avoids Windows command line length limit (~8191 chars)
setting_sources=["project"],
allowed_tools=[*READONLY_BUILTIN_TOOLS, *ASSISTANT_FEATURE_TOOLS],
mcp_servers=mcp_servers,
mcp_servers=mcp_servers, # type: ignore[arg-type] # SDK accepts dict config at runtime
permission_mode="bypassPermissions",
max_turns=100,
cwd=str(self.project_dir.resolve()),
@@ -303,6 +304,8 @@ class AssistantChatSession:
greeting = f"Hello! I'm your project assistant for **{self.project_name}**. I can help you understand the codebase, explain features, and answer questions about the project. What would you like to know?"
# Store the greeting in the database
# conversation_id is guaranteed non-None here (set on line 206 above)
assert self.conversation_id is not None
add_message(self.project_dir, self.conversation_id, "assistant", greeting)
yield {"type": "text", "content": greeting}

View File

@@ -7,20 +7,28 @@ Each project has its own assistant.db file in the project directory.
"""
import logging
import threading
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
from sqlalchemy import Column, DateTime, ForeignKey, Integer, String, Text, create_engine, func
from sqlalchemy.orm import declarative_base, relationship, sessionmaker
from sqlalchemy.engine import Engine
from sqlalchemy.orm import DeclarativeBase, relationship, sessionmaker
logger = logging.getLogger(__name__)
Base = declarative_base()
class Base(DeclarativeBase):
"""SQLAlchemy 2.0 style declarative base."""
pass
# Engine cache to avoid creating new engines for each request
# Key: project directory path (as posix string), Value: SQLAlchemy engine
_engine_cache: dict[str, object] = {}
_engine_cache: dict[str, Engine] = {}
# Lock for thread-safe access to the engine cache
# Prevents race conditions when multiple threads create engines simultaneously
_cache_lock = threading.Lock()
def _utc_now() -> datetime:
@@ -56,7 +64,8 @@ class ConversationMessage(Base):
def get_db_path(project_dir: Path) -> Path:
"""Get the path to the assistant database for a project."""
return project_dir / "assistant.db"
from autocoder_paths import get_assistant_db_path
return get_assistant_db_path(project_dir)
def get_engine(project_dir: Path):
@@ -64,17 +73,33 @@ def get_engine(project_dir: Path):
Uses a cache to avoid creating new engines for each request, which improves
performance by reusing database connections.
Thread-safe: Uses a lock to prevent race conditions when multiple threads
try to create engines simultaneously for the same project.
"""
cache_key = project_dir.as_posix()
if cache_key not in _engine_cache:
db_path = get_db_path(project_dir)
# Use as_posix() for cross-platform compatibility with SQLite connection strings
db_url = f"sqlite:///{db_path.as_posix()}"
engine = create_engine(db_url, echo=False)
Base.metadata.create_all(engine)
_engine_cache[cache_key] = engine
logger.debug(f"Created new database engine for {cache_key}")
# Double-checked locking for thread safety and performance
if cache_key in _engine_cache:
return _engine_cache[cache_key]
with _cache_lock:
# Check again inside the lock in case another thread created it
if cache_key not in _engine_cache:
db_path = get_db_path(project_dir)
# Use as_posix() for cross-platform compatibility with SQLite connection strings
db_url = f"sqlite:///{db_path.as_posix()}"
engine = create_engine(
db_url,
echo=False,
connect_args={
"check_same_thread": False,
"timeout": 30, # Wait up to 30s for locks
}
)
Base.metadata.create_all(engine)
_engine_cache[cache_key] = engine
logger.debug(f"Created new database engine for {cache_key}")
return _engine_cache[cache_key]

View File

@@ -0,0 +1,57 @@
"""
Chat Session Constants
======================
Shared constants for all chat session types (assistant, spec, expand).
The canonical ``API_ENV_VARS`` list lives in ``env_constants.py`` at the
project root and is re-exported here for convenience so that existing
imports (``from .chat_constants import API_ENV_VARS``) continue to work.
"""
import sys
from pathlib import Path
from typing import AsyncGenerator
# -------------------------------------------------------------------
# Root directory of the autocoder project (repository root).
# Used throughout the server package whenever the repo root is needed.
# -------------------------------------------------------------------
ROOT_DIR = Path(__file__).parent.parent.parent
# Ensure the project root is on sys.path so we can import env_constants
# from the root-level module without requiring a package install.
_root_str = str(ROOT_DIR)
if _root_str not in sys.path:
sys.path.insert(0, _root_str)
# -------------------------------------------------------------------
# Environment variables forwarded to Claude CLI subprocesses.
# Single source of truth lives in env_constants.py at the project root.
# Re-exported here so existing ``from .chat_constants import API_ENV_VARS``
# imports continue to work unchanged.
# -------------------------------------------------------------------
from env_constants import API_ENV_VARS # noqa: E402, F401
async def make_multimodal_message(content_blocks: list[dict]) -> AsyncGenerator[dict, None]:
"""Yield a single multimodal user message in Claude Agent SDK format.
The Claude Agent SDK's ``query()`` method accepts either a plain string
or an ``AsyncIterable[dict]`` for custom message formats. This helper
wraps a list of content blocks (text and/or images) in the expected
envelope.
Args:
content_blocks: List of content-block dicts, e.g.
``[{"type": "text", "text": "..."}, {"type": "image", ...}]``.
Yields:
A single dict representing the user message.
"""
yield {
"type": "user",
"message": {"role": "user", "content": content_blocks},
"parent_tool_use_id": None,
"session_id": "default",
}

View File

@@ -16,28 +16,19 @@ import threading
import uuid
from datetime import datetime
from pathlib import Path
from typing import AsyncGenerator, Optional
from typing import Any, AsyncGenerator, Optional
from claude_agent_sdk import ClaudeAgentOptions, ClaudeSDKClient
from dotenv import load_dotenv
from ..schemas import ImageAttachment
from .chat_constants import API_ENV_VARS, ROOT_DIR, make_multimodal_message
# Load environment variables from .env file if present
load_dotenv()
logger = logging.getLogger(__name__)
# Environment variables to pass through to Claude CLI for API configuration
API_ENV_VARS = [
"ANTHROPIC_BASE_URL",
"ANTHROPIC_AUTH_TOKEN",
"API_TIMEOUT_MS",
"ANTHROPIC_DEFAULT_SONNET_MODEL",
"ANTHROPIC_DEFAULT_OPUS_MODEL",
"ANTHROPIC_DEFAULT_HAIKU_MODEL",
]
# Feature MCP tools needed for expand session
EXPAND_FEATURE_TOOLS = [
"mcp__features__feature_create",
@@ -46,22 +37,6 @@ EXPAND_FEATURE_TOOLS = [
]
async def _make_multimodal_message(content_blocks: list[dict]) -> AsyncGenerator[dict, None]:
"""
Create an async generator that yields a properly formatted multimodal message.
"""
yield {
"type": "user",
"message": {"role": "user", "content": content_blocks},
"parent_tool_use_id": None,
"session_id": "default",
}
# Root directory of the project
ROOT_DIR = Path(__file__).parent.parent.parent
class ExpandChatSession:
"""
Manages a project expansion conversation.
@@ -128,7 +103,8 @@ class ExpandChatSession:
return
# Verify project has existing spec
spec_path = self.project_dir / "prompts" / "app_spec.txt"
from autocoder_paths import get_prompts_dir
spec_path = get_prompts_dir(self.project_dir) / "app_spec.txt"
if not spec_path.exists():
yield {
"type": "error",
@@ -162,10 +138,13 @@ class ExpandChatSession:
"allow": [
"Read(./**)",
"Glob(./**)",
*EXPAND_FEATURE_TOOLS,
],
},
}
settings_file = self.project_dir / f".claude_settings.expand.{uuid.uuid4().hex}.json"
from autocoder_paths import get_expand_settings_path
settings_file = get_expand_settings_path(self.project_dir, uuid.uuid4().hex)
settings_file.parent.mkdir(parents=True, exist_ok=True)
self._settings_file = settings_file
with open(settings_file, "w", encoding="utf-8") as f:
json.dump(security_settings, f, indent=2)
@@ -175,7 +154,12 @@ class ExpandChatSession:
system_prompt = skill_content.replace("$ARGUMENTS", project_path)
# Build environment overrides for API configuration
sdk_env = {var: os.getenv(var) for var in API_ENV_VARS if os.getenv(var)}
# Filter to only include vars that are actually set (non-None)
sdk_env: dict[str, str] = {}
for var in API_ENV_VARS:
value = os.getenv(var)
if value:
sdk_env[var] = value
# Determine model from environment or use default
# This allows using alternative APIs (e.g., GLM via z.ai) that may not support Claude model names
@@ -203,9 +187,12 @@ class ExpandChatSession:
allowed_tools=[
"Read",
"Glob",
"Grep",
"WebFetch",
"WebSearch",
*EXPAND_FEATURE_TOOLS,
],
mcp_servers=mcp_servers,
mcp_servers=mcp_servers, # type: ignore[arg-type] # SDK accepts dict config at runtime
permission_mode="bypassPermissions",
max_turns=100,
cwd=str(self.project_dir.resolve()),
@@ -299,7 +286,7 @@ class ExpandChatSession:
# Build the message content
if attachments and len(attachments) > 0:
content_blocks = []
content_blocks: list[dict[str, Any]] = []
if message:
content_blocks.append({"type": "text", "text": message})
for att in attachments:
@@ -311,7 +298,7 @@ class ExpandChatSession:
"data": att.base64Data,
}
})
await self.client.query(_make_multimodal_message(content_blocks))
await self.client.query(make_multimodal_message(content_blocks))
logger.info(f"Sent multimodal message with {len(attachments)} image(s)")
else:
await self.client.query(message)

View File

@@ -15,7 +15,7 @@ import sys
import threading
from datetime import datetime
from pathlib import Path
from typing import Awaitable, Callable, Literal, Set
from typing import Any, Awaitable, Callable, Literal, Set
import psutil
@@ -92,7 +92,8 @@ class AgentProcessManager:
self._callbacks_lock = threading.Lock()
# Lock file to prevent multiple instances (stored in project directory)
self.lock_file = self.project_dir / ".agent.lock"
from autocoder_paths import get_agent_lock_path
self.lock_file = get_agent_lock_path(self.project_dir)
@property
def status(self) -> Literal["stopped", "running", "paused", "crashed"]:
@@ -296,6 +297,8 @@ class AgentProcessManager:
parallel_mode: bool = False,
max_concurrency: int | None = None,
testing_agent_ratio: int = 1,
playwright_headless: bool = True,
batch_size: int = 3,
) -> tuple[bool, str]:
"""
Start the agent as a subprocess.
@@ -306,6 +309,7 @@ class AgentProcessManager:
parallel_mode: DEPRECATED - ignored, always uses unified orchestrator
max_concurrency: Max concurrent coding agents (1-5, default 1)
testing_agent_ratio: Number of regression testing agents (0-3, default 1)
playwright_headless: If True, run browser in headless mode
Returns:
Tuple of (success, message)
@@ -346,18 +350,21 @@ class AgentProcessManager:
# Add testing agent configuration
cmd.extend(["--testing-ratio", str(testing_agent_ratio)])
# Add --batch-size flag for multi-feature batching
cmd.extend(["--batch-size", str(batch_size)])
try:
# Start subprocess with piped stdout/stderr
# Use project_dir as cwd so Claude SDK sandbox allows access to project files
# stdin=DEVNULL prevents blocking if Claude CLI or child process tries to read stdin
# CREATE_NO_WINDOW on Windows prevents console window pop-ups
# PYTHONUNBUFFERED ensures output isn't delayed
popen_kwargs = {
popen_kwargs: dict[str, Any] = {
"stdin": subprocess.DEVNULL,
"stdout": subprocess.PIPE,
"stderr": subprocess.STDOUT,
"cwd": str(self.project_dir),
"env": {**os.environ, "PYTHONUNBUFFERED": "1"},
"env": {**os.environ, "PYTHONUNBUFFERED": "1", "PLAYWRIGHT_HEADLESS": "true" if playwright_headless else "false"},
}
if sys.platform == "win32":
popen_kwargs["creationflags"] = subprocess.CREATE_NO_WINDOW
@@ -579,8 +586,18 @@ def cleanup_orphaned_locks() -> int:
if not project_path.exists():
continue
lock_file = project_path / ".agent.lock"
if not lock_file.exists():
# Check both legacy and new locations for lock files
from autocoder_paths import get_autocoder_dir
lock_locations = [
project_path / ".agent.lock",
get_autocoder_dir(project_path) / ".agent.lock",
]
lock_file = None
for candidate in lock_locations:
if candidate.exists():
lock_file = candidate
break
if lock_file is None:
continue
try:

View File

@@ -92,8 +92,9 @@ class SchedulerService:
async def _load_project_schedules(self, project_name: str, project_dir: Path) -> int:
"""Load schedules for a single project. Returns count of schedules loaded."""
from api.database import Schedule, create_database
from autocoder_paths import get_features_db_path
db_path = project_dir / "features.db"
db_path = get_features_db_path(project_dir)
if not db_path.exists():
return 0
@@ -567,8 +568,9 @@ class SchedulerService:
):
"""Check if a project should be started on server startup."""
from api.database import Schedule, ScheduleOverride, create_database
from autocoder_paths import get_features_db_path
db_path = project_dir / "features.db"
db_path = get_features_db_path(project_dir)
if not db_path.exists():
return

View File

@@ -13,49 +13,19 @@ import shutil
import threading
from datetime import datetime
from pathlib import Path
from typing import AsyncGenerator, Optional
from typing import Any, AsyncGenerator, Optional
from claude_agent_sdk import ClaudeAgentOptions, ClaudeSDKClient
from dotenv import load_dotenv
from ..schemas import ImageAttachment
from .chat_constants import API_ENV_VARS, ROOT_DIR, make_multimodal_message
# Load environment variables from .env file if present
load_dotenv()
logger = logging.getLogger(__name__)
# Environment variables to pass through to Claude CLI for API configuration
API_ENV_VARS = [
"ANTHROPIC_BASE_URL",
"ANTHROPIC_AUTH_TOKEN",
"API_TIMEOUT_MS",
"ANTHROPIC_DEFAULT_SONNET_MODEL",
"ANTHROPIC_DEFAULT_OPUS_MODEL",
"ANTHROPIC_DEFAULT_HAIKU_MODEL",
]
async def _make_multimodal_message(content_blocks: list[dict]) -> AsyncGenerator[dict, None]:
"""
Create an async generator that yields a properly formatted multimodal message.
The Claude Agent SDK's query() method accepts either:
- A string (simple text)
- An AsyncIterable[dict] (for custom message formats)
This function wraps content blocks in the expected message format.
"""
yield {
"type": "user",
"message": {"role": "user", "content": content_blocks},
"parent_tool_use_id": None,
"session_id": "default",
}
# Root directory of the project
ROOT_DIR = Path(__file__).parent.parent.parent
class SpecChatSession:
"""
@@ -125,7 +95,8 @@ class SpecChatSession:
# Delete app_spec.txt so Claude can create it fresh
# The SDK requires reading existing files before writing, but app_spec.txt is created new
# Note: We keep initializer_prompt.md so Claude can read and update the template
prompts_dir = self.project_dir / "prompts"
from autocoder_paths import get_prompts_dir
prompts_dir = get_prompts_dir(self.project_dir)
app_spec_path = prompts_dir / "app_spec.txt"
if app_spec_path.exists():
app_spec_path.unlink()
@@ -145,7 +116,9 @@ class SpecChatSession:
],
},
}
settings_file = self.project_dir / ".claude_settings.json"
from autocoder_paths import get_claude_settings_path
settings_file = get_claude_settings_path(self.project_dir)
settings_file.parent.mkdir(parents=True, exist_ok=True)
with open(settings_file, "w") as f:
json.dump(security_settings, f, indent=2)
@@ -167,7 +140,12 @@ class SpecChatSession:
system_cli = shutil.which("claude")
# Build environment overrides for API configuration
sdk_env = {var: os.getenv(var) for var in API_ENV_VARS if os.getenv(var)}
# Filter to only include vars that are actually set (non-None)
sdk_env: dict[str, str] = {}
for var in API_ENV_VARS:
value = os.getenv(var)
if value:
sdk_env[var] = value
# Determine model from environment or use default
# This allows using alternative APIs (e.g., GLM via z.ai) that may not support Claude model names
@@ -289,7 +267,7 @@ class SpecChatSession:
# Build the message content
if attachments and len(attachments) > 0:
# Multimodal message: build content blocks array
content_blocks = []
content_blocks: list[dict[str, Any]] = []
# Add text block if there's text
if message:
@@ -308,7 +286,7 @@ class SpecChatSession:
# Send multimodal content to Claude using async generator format
# The SDK's query() accepts AsyncIterable[dict] for custom message formats
await self.client.query(_make_multimodal_message(content_blocks))
await self.client.query(make_multimodal_message(content_blocks))
logger.info(f"Sent multimodal message with {len(attachments)} image(s)")
else:
# Text-only message: use string format
@@ -317,7 +295,7 @@ class SpecChatSession:
current_text = ""
# Track pending writes for BOTH required files
pending_writes = {
pending_writes: dict[str, dict[str, Any] | None] = {
"app_spec": None, # {"tool_id": ..., "path": ...}
"initializer": None, # {"tool_id": ..., "path": ...}
}
@@ -392,7 +370,8 @@ class SpecChatSession:
logger.warning(f"Tool error: {content}")
# Clear any pending writes that failed
for key in pending_writes:
if pending_writes[key] and tool_use_id == pending_writes[key].get("tool_id"):
pending_write = pending_writes[key]
if pending_write is not None and tool_use_id == pending_write.get("tool_id"):
logger.error(f"{key} write failed: {content}")
pending_writes[key] = None
else:

View File

@@ -371,7 +371,7 @@ class TerminalSession:
# Reap zombie if not already reaped
if self._child_pid is not None:
try:
os.waitpid(self._child_pid, os.WNOHANG)
os.waitpid(self._child_pid, os.WNOHANG) # type: ignore[attr-defined] # Unix-only method, guarded by runtime platform selection
except ChildProcessError:
pass
except Exception:
@@ -736,7 +736,7 @@ async def cleanup_all_terminals() -> None:
Called on server shutdown to ensure all PTY processes are terminated.
"""
with _sessions_lock:
all_sessions = []
all_sessions: list[TerminalSession] = []
for project_sessions in _sessions.values():
all_sessions.extend(project_sessions.values())

View File

@@ -0,0 +1,32 @@
"""
Project Helper Utilities
========================
Shared project path lookup used across all server routers and websocket handlers.
Consolidates the previously duplicated _get_project_path() function.
"""
import sys
from pathlib import Path
# Ensure the project root is on sys.path so `registry` can be imported.
# This is necessary because `registry.py` lives at the repository root,
# outside the `server` package.
_root = Path(__file__).parent.parent.parent
if str(_root) not in sys.path:
sys.path.insert(0, str(_root))
from registry import get_project_path as _registry_get_project_path
def get_project_path(project_name: str) -> Path | None:
"""Look up a project's filesystem path from the global registry.
Args:
project_name: The registered name of the project.
Returns:
The resolved ``Path`` to the project directory, or ``None`` if the
project is not found in the registry.
"""
return _registry_get_project_path(project_name)

View File

@@ -1,26 +1,52 @@
"""
Shared validation utilities for the server.
Shared Validation Utilities
============================
Project name validation used across REST endpoints and WebSocket handlers.
Two variants are provided:
* ``is_valid_project_name`` -- returns ``bool``, suitable for WebSocket
handlers where raising an HTTPException is not appropriate.
* ``validate_project_name`` -- raises ``HTTPException(400)`` on failure,
suitable for REST endpoint handlers.
"""
import re
from fastapi import HTTPException
# Compiled once; reused by both variants.
_PROJECT_NAME_RE = re.compile(r'^[a-zA-Z0-9_-]{1,50}$')
def is_valid_project_name(name: str) -> bool:
"""Check whether *name* is a valid project name.
Allows only ASCII letters, digits, hyphens, and underscores (1-50 chars).
Returns ``True`` if valid, ``False`` otherwise.
Use this in WebSocket handlers where you need to close the socket
yourself rather than raise an HTTP error.
"""
return bool(_PROJECT_NAME_RE.match(name))
def validate_project_name(name: str) -> str:
"""
Validate and sanitize project name to prevent path traversal.
"""Validate and return *name*, or raise ``HTTPException(400)``.
Suitable for REST endpoint handlers where FastAPI will convert the
exception into an HTTP 400 response automatically.
Args:
name: Project name to validate
name: Project name to validate.
Returns:
The validated project name
The validated project name (unchanged).
Raises:
HTTPException: If name is invalid
HTTPException: If *name* is invalid.
"""
if not re.match(r'^[a-zA-Z0-9_-]{1,50}$', name):
if not _PROJECT_NAME_RE.match(name):
raise HTTPException(
status_code=400,
detail="Invalid project name. Use only letters, numbers, hyphens, and underscores (1-50 chars)."

View File

@@ -16,8 +16,11 @@ from typing import Set
from fastapi import WebSocket, WebSocketDisconnect
from .schemas import AGENT_MASCOTS
from .services.chat_constants import ROOT_DIR
from .services.dev_server_manager import get_devserver_manager
from .services.process_manager import get_manager
from .utils.project_helpers import get_project_path as _get_project_path
from .utils.validation import is_valid_project_name as validate_project_name
# Lazy imports
_count_passing_tests = None
@@ -36,6 +39,14 @@ TESTING_AGENT_START_PATTERN = re.compile(r'Started testing agent for feature #(\
# Matches: "Feature #123 testing completed" or "Feature #123 testing failed"
TESTING_AGENT_COMPLETE_PATTERN = re.compile(r'Feature #(\d+) testing (completed|failed)')
# Pattern to detect batch coding agent start message
# Matches: "Started coding agent for features #5, #8, #12"
BATCH_CODING_AGENT_START_PATTERN = re.compile(r'Started coding agent for features (#\d+(?:,\s*#\d+)*)')
# Pattern to detect batch completion
# Matches: "Features #5, #8, #12 completed" or "Features #5, #8, #12 failed"
BATCH_FEATURES_COMPLETE_PATTERN = re.compile(r'Features (#\d+(?:,\s*#\d+)*)\s+(completed|failed)')
# Patterns for detecting agent activity and thoughts
THOUGHT_PATTERNS = [
# Claude's tool usage patterns (actual format: [Tool: name])
@@ -61,9 +72,9 @@ ORCHESTRATOR_PATTERNS = {
'capacity_check': re.compile(r'\[DEBUG\] Spawning loop: (\d+) ready, (\d+) slots'),
'at_capacity': re.compile(r'At max capacity|at max testing agents|At max total agents'),
'feature_start': re.compile(r'Starting feature \d+/\d+: #(\d+) - (.+)'),
'coding_spawn': re.compile(r'Started coding agent for feature #(\d+)'),
'coding_spawn': re.compile(r'Started coding agent for features? #(\d+)'),
'testing_spawn': re.compile(r'Started testing agent for feature #(\d+)'),
'coding_complete': re.compile(r'Feature #(\d+) (completed|failed)'),
'coding_complete': re.compile(r'Features? #(\d+)(?:,\s*#\d+)* (completed|failed)'),
'testing_complete': re.compile(r'Feature #(\d+) testing (completed|failed)'),
'all_complete': re.compile(r'All features complete'),
'blocked_features': re.compile(r'(\d+) blocked by dependencies'),
@@ -93,14 +104,26 @@ class AgentTracker:
# Check for orchestrator status messages first
# These don't have [Feature #X] prefix
# Coding agent start: "Started coding agent for feature #X"
if line.startswith("Started coding agent for feature #"):
# Batch coding agent start: "Started coding agent for features #5, #8, #12"
batch_start_match = BATCH_CODING_AGENT_START_PATTERN.match(line)
if batch_start_match:
try:
feature_id = int(re.search(r'#(\d+)', line).group(1))
return await self._handle_agent_start(feature_id, line, agent_type="coding")
except (AttributeError, ValueError):
feature_ids = [int(x.strip().lstrip('#')) for x in batch_start_match.group(1).split(',')]
if feature_ids:
return await self._handle_batch_agent_start(feature_ids, "coding")
except ValueError:
pass
# Single coding agent start: "Started coding agent for feature #X"
if line.startswith("Started coding agent for feature #"):
m = re.search(r'#(\d+)', line)
if m:
try:
feature_id = int(m.group(1))
return await self._handle_agent_start(feature_id, line, agent_type="coding")
except ValueError:
pass
# Testing agent start: "Started testing agent for feature #X (PID xxx)"
testing_start_match = TESTING_AGENT_START_PATTERN.match(line)
if testing_start_match:
@@ -114,14 +137,27 @@ class AgentTracker:
is_success = testing_complete_match.group(2) == "completed"
return await self._handle_agent_complete(feature_id, is_success, agent_type="testing")
# Batch features complete: "Features #5, #8, #12 completed/failed"
batch_complete_match = BATCH_FEATURES_COMPLETE_PATTERN.match(line)
if batch_complete_match:
try:
feature_ids = [int(x.strip().lstrip('#')) for x in batch_complete_match.group(1).split(',')]
is_success = batch_complete_match.group(2) == "completed"
if feature_ids:
return await self._handle_batch_agent_complete(feature_ids, is_success, "coding")
except ValueError:
pass
# Coding agent complete: "Feature #X completed/failed" (without "testing" keyword)
if line.startswith("Feature #") and ("completed" in line or "failed" in line) and "testing" not in line:
try:
feature_id = int(re.search(r'#(\d+)', line).group(1))
is_success = "completed" in line
return await self._handle_agent_complete(feature_id, is_success, agent_type="coding")
except (AttributeError, ValueError):
pass
m = re.search(r'#(\d+)', line)
if m:
try:
feature_id = int(m.group(1))
is_success = "completed" in line
return await self._handle_agent_complete(feature_id, is_success, agent_type="coding")
except ValueError:
pass
# Check for feature-specific output lines: [Feature #X] content
# Both coding and testing agents use this format now
@@ -151,6 +187,7 @@ class AgentTracker:
'name': AGENT_MASCOTS[agent_index % len(AGENT_MASCOTS)],
'agent_index': agent_index,
'agent_type': 'coding',
'feature_ids': [feature_id],
'state': 'thinking',
'feature_name': f'Feature #{feature_id}',
'last_thought': None,
@@ -158,6 +195,10 @@ class AgentTracker:
agent = self.active_agents[key]
# Update current_feature_id for batch agents when output comes from a different feature
if 'current_feature_id' in agent and feature_id in agent.get('feature_ids', []):
agent['current_feature_id'] = feature_id
# Detect state and thought from content
state = 'working'
thought = None
@@ -181,6 +222,7 @@ class AgentTracker:
'agentName': agent['name'],
'agentType': agent['agent_type'],
'featureId': feature_id,
'featureIds': agent.get('feature_ids', [feature_id]),
'featureName': agent['feature_name'],
'state': state,
'thought': thought,
@@ -237,6 +279,7 @@ class AgentTracker:
'name': AGENT_MASCOTS[agent_index % len(AGENT_MASCOTS)],
'agent_index': agent_index,
'agent_type': agent_type,
'feature_ids': [feature_id],
'state': 'thinking',
'feature_name': feature_name,
'last_thought': 'Starting work...',
@@ -248,12 +291,55 @@ class AgentTracker:
'agentName': AGENT_MASCOTS[agent_index % len(AGENT_MASCOTS)],
'agentType': agent_type,
'featureId': feature_id,
'featureIds': [feature_id],
'featureName': feature_name,
'state': 'thinking',
'thought': 'Starting work...',
'timestamp': datetime.now().isoformat(),
}
async def _handle_batch_agent_start(self, feature_ids: list[int], agent_type: str = "coding") -> dict | None:
"""Handle batch agent start message from orchestrator."""
if not feature_ids:
return None
primary_id = feature_ids[0]
async with self._lock:
key = (primary_id, agent_type)
agent_index = self._next_agent_index
self._next_agent_index += 1
feature_name = f'Features {", ".join(f"#{fid}" for fid in feature_ids)}'
self.active_agents[key] = {
'name': AGENT_MASCOTS[agent_index % len(AGENT_MASCOTS)],
'agent_index': agent_index,
'agent_type': agent_type,
'feature_ids': list(feature_ids),
'current_feature_id': primary_id,
'state': 'thinking',
'feature_name': feature_name,
'last_thought': 'Starting batch work...',
}
# Register all feature IDs so output lines can find this agent
for fid in feature_ids:
secondary_key = (fid, agent_type)
if secondary_key != key:
self.active_agents[secondary_key] = self.active_agents[key]
return {
'type': 'agent_update',
'agentIndex': agent_index,
'agentName': AGENT_MASCOTS[agent_index % len(AGENT_MASCOTS)],
'agentType': agent_type,
'featureId': primary_id,
'featureIds': list(feature_ids),
'featureName': feature_name,
'state': 'thinking',
'thought': 'Starting batch work...',
'timestamp': datetime.now().isoformat(),
}
async def _handle_agent_complete(self, feature_id: int, is_success: bool, agent_type: str = "coding") -> dict | None:
"""Handle agent completion - ALWAYS emits a message, even if agent wasn't tracked.
@@ -275,6 +361,7 @@ class AgentTracker:
'agentName': agent['name'],
'agentType': agent.get('agent_type', agent_type),
'featureId': feature_id,
'featureIds': agent.get('feature_ids', [feature_id]),
'featureName': agent['feature_name'],
'state': state,
'thought': 'Completed successfully!' if is_success else 'Failed to complete',
@@ -291,6 +378,7 @@ class AgentTracker:
'agentName': 'Unknown',
'agentType': agent_type,
'featureId': feature_id,
'featureIds': [feature_id],
'featureName': f'Feature #{feature_id}',
'state': state,
'thought': 'Completed successfully!' if is_success else 'Failed to complete',
@@ -298,6 +386,49 @@ class AgentTracker:
'synthetic': True,
}
async def _handle_batch_agent_complete(self, feature_ids: list[int], is_success: bool, agent_type: str = "coding") -> dict | None:
"""Handle batch agent completion."""
if not feature_ids:
return None
primary_id = feature_ids[0]
async with self._lock:
state = 'success' if is_success else 'error'
key = (primary_id, agent_type)
if key in self.active_agents:
agent = self.active_agents[key]
result = {
'type': 'agent_update',
'agentIndex': agent['agent_index'],
'agentName': agent['name'],
'agentType': agent.get('agent_type', agent_type),
'featureId': primary_id,
'featureIds': agent.get('feature_ids', list(feature_ids)),
'featureName': agent['feature_name'],
'state': state,
'thought': 'Batch completed successfully!' if is_success else 'Batch failed to complete',
'timestamp': datetime.now().isoformat(),
}
# Clean up all keys for this batch
for fid in feature_ids:
self.active_agents.pop((fid, agent_type), None)
return result
else:
# Synthetic completion
return {
'type': 'agent_update',
'agentIndex': -1,
'agentName': 'Unknown',
'agentType': agent_type,
'featureId': primary_id,
'featureIds': list(feature_ids),
'featureName': f'Features {", ".join(f"#{fid}" for fid in feature_ids)}',
'state': state,
'thought': 'Batch completed successfully!' if is_success else 'Batch failed to complete',
'timestamp': datetime.now().isoformat(),
'synthetic': True,
}
class OrchestratorTracker:
"""Tracks orchestrator state for Mission Control observability.
@@ -444,7 +575,7 @@ class OrchestratorTracker:
timestamp = datetime.now().isoformat()
# Add to recent events (keep last 5)
event = {
event: dict[str, str | int] = {
'eventType': event_type,
'message': message,
'timestamp': timestamp,
@@ -487,17 +618,6 @@ class OrchestratorTracker:
self.recent_events.clear()
def _get_project_path(project_name: str) -> Path:
"""Get project path from registry."""
import sys
root = Path(__file__).parent.parent
if str(root) not in sys.path:
sys.path.insert(0, str(root))
from registry import get_project_path
return get_project_path(project_name)
def _get_count_passing_tests():
"""Lazy import of count_passing_tests."""
global _count_passing_tests
@@ -564,15 +684,6 @@ class ConnectionManager:
# Global connection manager
manager = ConnectionManager()
# Root directory
ROOT_DIR = Path(__file__).parent.parent
def validate_project_name(name: str) -> bool:
"""Validate project name to prevent path traversal."""
return bool(re.match(r'^[a-zA-Z0-9_-]{1,50}$', name))
async def poll_progress(websocket: WebSocket, project_name: str, project_dir: Path):
"""Poll database for progress changes and send updates."""
count_passing_tests = _get_count_passing_tests()
@@ -652,7 +763,7 @@ async def project_websocket(websocket: WebSocket, project_name: str):
agent_index, _ = await agent_tracker.get_agent_info(feature_id)
# Send the raw log line with optional feature/agent attribution
log_msg = {
log_msg: dict[str, str | int] = {
"type": "log",
"line": line,
"timestamp": datetime.now().isoformat(),