Files
autocoder/registry.py
Auto f6ddffa6e2 feat: persist concurrent agents slider at project level
Add `default_concurrency` column to the projects table in the registry
database, allowing each project to remember its preferred concurrency
setting (1-5 agents). The value persists across page refreshes and
app restarts.

Backend changes:
- Add `default_concurrency` column to Project model in registry.py
- Add database migration for existing databases (ALTER TABLE)
- Add get/set_project_concurrency() CRUD functions
- Add ProjectSettingsUpdate schema with validation
- Add PATCH /{name}/settings endpoint in projects router
- Include default_concurrency in ProjectSummary/ProjectDetail responses

Frontend changes:
- Add default_concurrency to ProjectSummary TypeScript interface
- Add ProjectSettingsUpdate type and updateProjectSettings API function
- Add useUpdateProjectSettings React Query mutation hook
- Update AgentControl to accept defaultConcurrency prop
- Sync local state when project changes via useEffect
- Debounce slider changes (500ms) before saving to backend
- Pass defaultConcurrency from selectedProjectData in App.tsx

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-29 09:08:17 +02:00

585 lines
16 KiB
Python

"""
Project Registry Module
=======================
Cross-platform project registry for storing project name to path mappings.
Uses SQLite database stored at ~/.autocoder/registry.db.
"""
import logging
import os
import re
import threading
import time
from contextlib import contextmanager
from datetime import datetime
from pathlib import Path
from typing import Any
from sqlalchemy import Column, DateTime, Integer, String, create_engine, text
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
# Module logger
logger = logging.getLogger(__name__)
# =============================================================================
# Model Configuration (Single Source of Truth)
# =============================================================================
# Available models with display names
# To add a new model: add an entry here with {"id": "model-id", "name": "Display Name"}
AVAILABLE_MODELS = [
{"id": "claude-opus-4-5-20251101", "name": "Claude Opus 4.5"},
{"id": "claude-sonnet-4-5-20250929", "name": "Claude Sonnet 4.5"},
]
# List of valid model IDs (derived from AVAILABLE_MODELS)
VALID_MODELS = [m["id"] for m in AVAILABLE_MODELS]
# Default model and settings
DEFAULT_MODEL = "claude-opus-4-5-20251101"
DEFAULT_YOLO_MODE = False
# SQLite connection settings
SQLITE_TIMEOUT = 30 # seconds to wait for database lock
SQLITE_MAX_RETRIES = 3 # number of retry attempts on busy database
# =============================================================================
# Exceptions
# =============================================================================
class RegistryError(Exception):
"""Base registry exception."""
pass
class RegistryNotFound(RegistryError):
"""Registry file doesn't exist."""
pass
class RegistryCorrupted(RegistryError):
"""Registry database is corrupted."""
pass
class RegistryPermissionDenied(RegistryError):
"""Can't read/write registry file."""
pass
# =============================================================================
# SQLAlchemy Model
# =============================================================================
Base = declarative_base()
class Project(Base):
"""SQLAlchemy model for registered projects."""
__tablename__ = "projects"
name = Column(String(50), primary_key=True, index=True)
path = Column(String, nullable=False) # POSIX format for cross-platform
created_at = Column(DateTime, nullable=False)
default_concurrency = Column(Integer, nullable=False, default=3)
class Settings(Base):
"""SQLAlchemy model for global settings (key-value store)."""
__tablename__ = "settings"
key = Column(String(50), primary_key=True)
value = Column(String(500), nullable=False)
updated_at = Column(DateTime, nullable=False)
# =============================================================================
# Database Connection
# =============================================================================
# Module-level singleton for database engine with thread-safe initialization
_engine = None
_SessionLocal = None
_engine_lock = threading.Lock()
def get_config_dir() -> Path:
"""
Get the config directory: ~/.autocoder/
Returns:
Path to ~/.autocoder/ (created if it doesn't exist)
"""
config_dir = Path.home() / ".autocoder"
config_dir.mkdir(parents=True, exist_ok=True)
return config_dir
def get_registry_path() -> Path:
"""Get the path to the registry database."""
return get_config_dir() / "registry.db"
def _get_engine():
"""
Get or create the database engine (thread-safe singleton pattern).
Returns:
Tuple of (engine, SessionLocal)
"""
global _engine, _SessionLocal
# Double-checked locking for thread safety
if _engine is None:
with _engine_lock:
if _engine is None:
db_path = get_registry_path()
db_url = f"sqlite:///{db_path.as_posix()}"
_engine = create_engine(
db_url,
connect_args={
"check_same_thread": False,
"timeout": SQLITE_TIMEOUT,
}
)
Base.metadata.create_all(bind=_engine)
_migrate_add_default_concurrency(_engine)
_SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=_engine)
logger.debug("Initialized registry database at: %s", db_path)
return _engine, _SessionLocal
def _migrate_add_default_concurrency(engine) -> None:
"""Add default_concurrency column if missing (for existing databases)."""
with engine.connect() as conn:
result = conn.execute(text("PRAGMA table_info(projects)"))
columns = [row[1] for row in result.fetchall()]
if "default_concurrency" not in columns:
conn.execute(text(
"ALTER TABLE projects ADD COLUMN default_concurrency INTEGER DEFAULT 3"
))
conn.commit()
logger.info("Migrated projects table: added default_concurrency column")
@contextmanager
def _get_session():
"""
Context manager for database sessions with automatic commit/rollback.
Includes retry logic for SQLite busy database errors.
Yields:
SQLAlchemy session
"""
_, SessionLocal = _get_engine()
session = SessionLocal()
try:
yield session
session.commit()
except Exception:
session.rollback()
raise
finally:
session.close()
def _with_retry(func, *args, **kwargs):
"""
Execute a database operation with retry logic for busy database.
Args:
func: Function to execute
*args, **kwargs: Arguments to pass to the function
Returns:
Result of the function
Raises:
Last exception if all retries fail
"""
last_error = None
for attempt in range(SQLITE_MAX_RETRIES):
try:
return func(*args, **kwargs)
except Exception as e:
last_error = e
error_str = str(e).lower()
if "database is locked" in error_str or "sqlite_busy" in error_str:
if attempt < SQLITE_MAX_RETRIES - 1:
wait_time = (2 ** attempt) * 0.1 # Exponential backoff: 0.1s, 0.2s, 0.4s
logger.warning(
"Database busy, retrying in %.1fs (attempt %d/%d)",
wait_time, attempt + 1, SQLITE_MAX_RETRIES
)
time.sleep(wait_time)
continue
raise
raise last_error
# =============================================================================
# Project CRUD Functions
# =============================================================================
def register_project(name: str, path: Path) -> None:
"""
Register a new project in the registry.
Args:
name: The project name (unique identifier).
path: The absolute path to the project directory.
Raises:
ValueError: If project name is invalid or path is not absolute.
RegistryError: If a project with that name already exists.
"""
# Validate name
if not re.match(r'^[a-zA-Z0-9_-]{1,50}$', name):
raise ValueError(
"Invalid project name. Use only letters, numbers, hyphens, "
"and underscores (1-50 chars)."
)
# Ensure path is absolute
path = Path(path).resolve()
with _get_session() as session:
existing = session.query(Project).filter(Project.name == name).first()
if existing:
logger.warning("Attempted to register duplicate project: %s", name)
raise RegistryError(f"Project '{name}' already exists in registry")
project = Project(
name=name,
path=path.as_posix(),
created_at=datetime.now()
)
session.add(project)
logger.info("Registered project '%s' at path: %s", name, path)
def unregister_project(name: str) -> bool:
"""
Remove a project from the registry.
Args:
name: The project name to remove.
Returns:
True if removed, False if project wasn't found.
"""
with _get_session() as session:
project = session.query(Project).filter(Project.name == name).first()
if not project:
logger.debug("Attempted to unregister non-existent project: %s", name)
return False
session.delete(project)
logger.info("Unregistered project: %s", name)
return True
def get_project_path(name: str) -> Path | None:
"""
Look up a project's path by name.
Args:
name: The project name.
Returns:
The project Path, or None if not found.
"""
_, SessionLocal = _get_engine()
session = SessionLocal()
try:
project = session.query(Project).filter(Project.name == name).first()
if project is None:
return None
return Path(project.path)
finally:
session.close()
def list_registered_projects() -> dict[str, dict[str, Any]]:
"""
Get all registered projects.
Returns:
Dictionary mapping project names to their info dictionaries.
"""
_, SessionLocal = _get_engine()
session = SessionLocal()
try:
projects = session.query(Project).all()
return {
p.name: {
"path": p.path,
"created_at": p.created_at.isoformat() if p.created_at else None,
"default_concurrency": getattr(p, 'default_concurrency', 3) or 3
}
for p in projects
}
finally:
session.close()
def get_project_info(name: str) -> dict[str, Any] | None:
"""
Get full info about a project.
Args:
name: The project name.
Returns:
Project info dictionary, or None if not found.
"""
_, SessionLocal = _get_engine()
session = SessionLocal()
try:
project = session.query(Project).filter(Project.name == name).first()
if project is None:
return None
return {
"path": project.path,
"created_at": project.created_at.isoformat() if project.created_at else None,
"default_concurrency": getattr(project, 'default_concurrency', 3) or 3
}
finally:
session.close()
def update_project_path(name: str, new_path: Path) -> bool:
"""
Update a project's path (for relocating projects).
Args:
name: The project name.
new_path: The new absolute path.
Returns:
True if updated, False if project wasn't found.
"""
new_path = Path(new_path).resolve()
with _get_session() as session:
project = session.query(Project).filter(Project.name == name).first()
if not project:
return False
project.path = new_path.as_posix()
return True
def get_project_concurrency(name: str) -> int:
"""
Get project's default concurrency (1-5).
Args:
name: The project name.
Returns:
The default concurrency value (defaults to 3 if not set or project not found).
"""
_, SessionLocal = _get_engine()
session = SessionLocal()
try:
project = session.query(Project).filter(Project.name == name).first()
if project is None:
return 3
return getattr(project, 'default_concurrency', 3) or 3
finally:
session.close()
def set_project_concurrency(name: str, concurrency: int) -> bool:
"""
Set project's default concurrency (1-5).
Args:
name: The project name.
concurrency: The concurrency value (1-5).
Returns:
True if updated, False if project wasn't found.
Raises:
ValueError: If concurrency is not between 1 and 5.
"""
if concurrency < 1 or concurrency > 5:
raise ValueError("concurrency must be between 1 and 5")
with _get_session() as session:
project = session.query(Project).filter(Project.name == name).first()
if not project:
return False
project.default_concurrency = concurrency
logger.info("Set project '%s' default_concurrency to %d", name, concurrency)
return True
# =============================================================================
# Validation Functions
# =============================================================================
def validate_project_path(path: Path) -> tuple[bool, str]:
"""
Validate that a project path is accessible and writable.
Args:
path: The path to validate.
Returns:
Tuple of (is_valid, error_message).
"""
path = Path(path).resolve()
# Check if path exists
if not path.exists():
return False, f"Path does not exist: {path}"
# Check if it's a directory
if not path.is_dir():
return False, f"Path is not a directory: {path}"
# Check read permissions
if not os.access(path, os.R_OK):
return False, f"No read permission: {path}"
# Check write permissions
if not os.access(path, os.W_OK):
return False, f"No write permission: {path}"
return True, ""
def cleanup_stale_projects() -> list[str]:
"""
Remove projects from registry whose paths no longer exist.
Returns:
List of removed project names.
"""
removed = []
with _get_session() as session:
projects = session.query(Project).all()
for project in projects:
path = Path(project.path)
if not path.exists():
session.delete(project)
removed.append(project.name)
if removed:
logger.info("Cleaned up stale projects: %s", removed)
return removed
def list_valid_projects() -> list[dict[str, Any]]:
"""
List all projects that have valid, accessible paths.
Returns:
List of project info dicts with additional 'name' field.
"""
_, SessionLocal = _get_engine()
session = SessionLocal()
try:
projects = session.query(Project).all()
valid = []
for p in projects:
path = Path(p.path)
is_valid, _ = validate_project_path(path)
if is_valid:
valid.append({
"name": p.name,
"path": p.path,
"created_at": p.created_at.isoformat() if p.created_at else None
})
return valid
finally:
session.close()
# =============================================================================
# Settings CRUD Functions
# =============================================================================
def get_setting(key: str, default: str | None = None) -> str | None:
"""
Get a setting value by key.
Args:
key: The setting key.
default: Default value if setting doesn't exist or on DB error.
Returns:
The setting value, or default if not found or on error.
"""
try:
_, SessionLocal = _get_engine()
session = SessionLocal()
try:
setting = session.query(Settings).filter(Settings.key == key).first()
return setting.value if setting else default
finally:
session.close()
except Exception as e:
logger.warning("Failed to read setting '%s': %s", key, e)
return default
def set_setting(key: str, value: str) -> None:
"""
Set a setting value (creates or updates).
Args:
key: The setting key.
value: The setting value.
"""
with _get_session() as session:
setting = session.query(Settings).filter(Settings.key == key).first()
if setting:
setting.value = value
setting.updated_at = datetime.now()
else:
setting = Settings(
key=key,
value=value,
updated_at=datetime.now()
)
session.add(setting)
logger.debug("Set setting '%s' = '%s'", key, value)
def get_all_settings() -> dict[str, str]:
"""
Get all settings as a dictionary.
Returns:
Dictionary mapping setting keys to values.
"""
try:
_, SessionLocal = _get_engine()
session = SessionLocal()
try:
settings = session.query(Settings).all()
return {s.key: s.value for s in settings}
finally:
session.close()
except Exception as e:
logger.warning("Failed to read settings: %s", e)
return {}