Files
autocoder/mcp_server/feature_mcp.py
Auto bf3a6b0b73 feat: add per-agent logging UI and fix stuck agent issues
Changes:
- Add per-agent log viewer with copy-to-clipboard functionality
  - New AgentLogEntry type for structured log entries
  - Logs stored per-agent in WebSocket state (up to 500 entries)
  - Log modal rendered via React Portal to avoid overflow issues
  - Click log icon on agent card to view full activity history

- Fix agents getting stuck in "failed" state
  - Wrap client context manager in try/except (agent.py)
  - Remove failed agents from UI on error state (useWebSocket.ts)
  - Handle permanently failed features in get_all_complete()

- Add friendlier agent state labels
  - "Hit an issue" → "Trying plan B..."
  - "Retrying..." → "Being persistent..."
  - Softer colors (yellow/orange instead of red)

- Add scheduling scores for smarter feature ordering
  - compute_scheduling_scores() in dependency_resolver.py
  - Features that unblock others get higher priority

- Update CLAUDE.md with parallel mode documentation

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-17 14:11:24 +02:00

1003 lines
36 KiB
Python
Executable File

#!/usr/bin/env python3
"""
MCP Server for Feature Management
==================================
Provides tools to manage features in the autonomous coding system,
replacing the previous FastAPI-based REST API.
Tools:
- feature_get_stats: Get progress statistics
- feature_get_next: Get next feature to implement
- feature_get_for_regression: Get random passing features for testing
- feature_mark_passing: Mark a feature as passing
- feature_skip: Skip a feature (move to end of queue)
- feature_mark_in_progress: Mark a feature as in-progress
- feature_clear_in_progress: Clear in-progress status
- feature_create_bulk: Create multiple features at once
- feature_create: Create a single feature
"""
import json
import os
import sys
import threading
import time as _time
from contextlib import asynccontextmanager
from pathlib import Path
from typing import Annotated
from mcp.server.fastmcp import FastMCP
from pydantic import BaseModel, Field
from sqlalchemy import text
from sqlalchemy.sql.expression import func
# Add parent directory to path so we can import from api module
sys.path.insert(0, str(Path(__file__).parent.parent))
from api.database import Feature, create_database
from api.migration import migrate_json_to_sqlite
from api.dependency_resolver import (
would_create_circular_dependency,
are_dependencies_satisfied,
get_blocking_dependencies,
compute_scheduling_scores,
MAX_DEPENDENCIES_PER_FEATURE,
)
# Configuration from environment
PROJECT_DIR = Path(os.environ.get("PROJECT_DIR", ".")).resolve()
# Pydantic models for input validation
class MarkPassingInput(BaseModel):
"""Input for marking a feature as passing."""
feature_id: int = Field(..., description="The ID of the feature to mark as passing", ge=1)
class SkipFeatureInput(BaseModel):
"""Input for skipping a feature."""
feature_id: int = Field(..., description="The ID of the feature to skip", ge=1)
class MarkInProgressInput(BaseModel):
"""Input for marking a feature as in-progress."""
feature_id: int = Field(..., description="The ID of the feature to mark as in-progress", ge=1)
class ClearInProgressInput(BaseModel):
"""Input for clearing in-progress status."""
feature_id: int = Field(..., description="The ID of the feature to clear in-progress status", ge=1)
class RegressionInput(BaseModel):
"""Input for getting regression features."""
limit: int = Field(default=3, ge=1, le=10, description="Maximum number of passing features to return")
class FeatureCreateItem(BaseModel):
"""Schema for creating a single feature."""
category: str = Field(..., min_length=1, max_length=100, description="Feature category")
name: str = Field(..., min_length=1, max_length=255, description="Feature name")
description: str = Field(..., min_length=1, description="Detailed description")
steps: list[str] = Field(..., min_length=1, description="Implementation/test steps")
class BulkCreateInput(BaseModel):
"""Input for bulk creating features."""
features: list[FeatureCreateItem] = Field(..., min_length=1, description="List of features to create")
# Global database session maker (initialized on startup)
_session_maker = None
_engine = None
# Lock for priority assignment to prevent race conditions
_priority_lock = threading.Lock()
@asynccontextmanager
async def server_lifespan(server: FastMCP):
"""Initialize database on startup, cleanup on shutdown."""
global _session_maker, _engine
# Create project directory if it doesn't exist
PROJECT_DIR.mkdir(parents=True, exist_ok=True)
# Initialize database
_engine, _session_maker = create_database(PROJECT_DIR)
# Run migration if needed (converts legacy JSON to SQLite)
migrate_json_to_sqlite(PROJECT_DIR, _session_maker)
yield
# Cleanup
if _engine:
_engine.dispose()
# Initialize the MCP server
mcp = FastMCP("features", lifespan=server_lifespan)
def get_session():
"""Get a new database session."""
if _session_maker is None:
raise RuntimeError("Database not initialized")
return _session_maker()
@mcp.tool()
def feature_get_stats() -> str:
"""Get statistics about feature completion progress.
Returns the number of passing features, in-progress features, total features,
and completion percentage. Use this to track overall progress of the implementation.
Returns:
JSON with: passing (int), in_progress (int), total (int), percentage (float)
"""
session = get_session()
try:
total = session.query(Feature).count()
passing = session.query(Feature).filter(Feature.passes == True).count()
in_progress = session.query(Feature).filter(Feature.in_progress == True).count()
percentage = round((passing / total) * 100, 1) if total > 0 else 0.0
return json.dumps({
"passing": passing,
"in_progress": in_progress,
"total": total,
"percentage": percentage
}, indent=2)
finally:
session.close()
@mcp.tool()
def feature_get_next() -> str:
"""Get the highest-priority pending feature that has all dependencies satisfied.
Returns the feature with the lowest priority number that:
1. Has passes=false and in_progress=false
2. Has all dependency features already passing (or no dependencies)
3. All dependency IDs actually exist (orphaned dependencies are ignored)
For backwards compatibility: if all pending features are blocked by dependencies,
falls back to returning the first pending feature (same as before dependencies).
Returns:
JSON with feature details (id, priority, category, name, description, steps, passes,
in_progress, dependencies) or error message if all features are passing.
"""
session = get_session()
try:
all_features = session.query(Feature).all()
all_feature_ids = {f.id for f in all_features}
passing_ids = {f.id for f in all_features if f.passes}
# Get pending, non-in-progress features
pending = [f for f in all_features if not f.passes and not f.in_progress]
# Sort by scheduling score (higher = first), then priority, then id
all_dicts = [f.to_dict() for f in all_features]
scores = compute_scheduling_scores(all_dicts)
pending.sort(key=lambda f: (-scores.get(f.id, 0), f.priority, f.id))
if not pending:
if any(f.in_progress for f in all_features if not f.passes):
return json.dumps({"error": "All pending features are in progress by other agents"})
return json.dumps({"error": "All features are passing! No more work to do."})
# Find first feature with satisfied dependencies
for feature in pending:
deps = feature.dependencies or []
# Filter out orphaned dependencies (IDs that no longer exist)
valid_deps = [d for d in deps if d in all_feature_ids]
if all(dep_id in passing_ids for dep_id in valid_deps):
return json.dumps(feature.to_dict(), indent=2)
# All pending features are blocked by unmet dependencies
# Return error with details about what's blocking progress
blocking_info = []
for feature in pending[:3]: # Show first 3 blocked features
deps = feature.dependencies or []
valid_deps = [d for d in deps if d in all_feature_ids]
orphaned = [d for d in deps if d not in all_feature_ids]
unmet = [d for d in valid_deps if d not in passing_ids]
info = f"#{feature.id} '{feature.name}'"
if unmet:
info += f" blocked by: {unmet}"
if orphaned:
info += f" (orphaned deps ignored: {orphaned})"
blocking_info.append(info)
return json.dumps({
"error": "All pending features are blocked by unmet dependencies",
"blocked_features": len(pending),
"examples": blocking_info,
"hint": "Complete the blocking dependencies first, or remove invalid dependencies"
}, indent=2)
finally:
session.close()
# Maximum retry attempts for feature claiming under contention
MAX_CLAIM_RETRIES = 10
def _feature_claim_next_internal(attempt: int = 0) -> str:
"""Internal implementation of feature claiming with retry tracking.
Args:
attempt: Current retry attempt (0-indexed)
Returns:
JSON with claimed feature details, or error message if no feature available.
"""
if attempt >= MAX_CLAIM_RETRIES:
return json.dumps({
"error": "Failed to claim feature after maximum retries",
"hint": "High contention detected - try again or reduce parallel agents"
})
session = get_session()
try:
# Use a lock to prevent concurrent claims within this process
with _priority_lock:
all_features = session.query(Feature).all()
all_feature_ids = {f.id for f in all_features}
passing_ids = {f.id for f in all_features if f.passes}
# Get pending, non-in-progress features
pending = [f for f in all_features if not f.passes and not f.in_progress]
# Sort by scheduling score (higher = first), then priority, then id
all_dicts = [f.to_dict() for f in all_features]
scores = compute_scheduling_scores(all_dicts)
pending.sort(key=lambda f: (-scores.get(f.id, 0), f.priority, f.id))
if not pending:
if any(f.in_progress for f in all_features if not f.passes):
return json.dumps({"error": "All pending features are in progress by other agents"})
return json.dumps({"error": "All features are passing! No more work to do."})
# Find first feature with satisfied dependencies
candidate_id = None
for feature in pending:
deps = feature.dependencies or []
# Filter out orphaned dependencies (IDs that no longer exist)
valid_deps = [d for d in deps if d in all_feature_ids]
if all(dep_id in passing_ids for dep_id in valid_deps):
candidate_id = feature.id
break
if candidate_id is None:
# All pending features are blocked by unmet dependencies
blocking_info = []
for feature in pending[:3]:
deps = feature.dependencies or []
valid_deps = [d for d in deps if d in all_feature_ids]
orphaned = [d for d in deps if d not in all_feature_ids]
unmet = [d for d in valid_deps if d not in passing_ids]
info = f"#{feature.id} '{feature.name}'"
if unmet:
info += f" blocked by: {unmet}"
if orphaned:
info += f" (orphaned deps ignored: {orphaned})"
blocking_info.append(info)
return json.dumps({
"error": "All pending features are blocked by unmet dependencies",
"blocked_features": len(pending),
"examples": blocking_info,
"hint": "Complete the blocking dependencies first, or remove invalid dependencies"
}, indent=2)
# Atomic claim: UPDATE only if still claimable
# This prevents race conditions even across processes
result = session.execute(
text("""
UPDATE features
SET in_progress = 1
WHERE id = :feature_id
AND in_progress = 0
AND passes = 0
"""),
{"feature_id": candidate_id}
)
session.commit()
# Check if we actually claimed it
if result.rowcount == 0:
# Another process claimed it first - retry with backoff
session.close()
# Exponential backoff: 0.1s, 0.2s, 0.4s, ... up to 1.0s
backoff = min(0.1 * (2 ** attempt), 1.0)
_time.sleep(backoff)
return _feature_claim_next_internal(attempt + 1)
# Fetch the claimed feature
session.expire_all() # Clear cache to get fresh data
claimed_feature = session.query(Feature).filter(Feature.id == candidate_id).first()
return json.dumps(claimed_feature.to_dict(), indent=2)
except Exception as e:
session.rollback()
return json.dumps({"error": f"Failed to claim feature: {str(e)}"})
finally:
session.close()
@mcp.tool()
def feature_claim_next() -> str:
"""Atomically get and claim the next available feature.
This combines feature_get_next() and feature_mark_in_progress() in a single
atomic operation, preventing race conditions where two agents could claim
the same feature.
Returns the feature with the lowest priority number that:
1. Has passes=false and in_progress=false
2. Has all dependency features already passing (or no dependencies)
3. All dependency IDs actually exist (orphaned dependencies are ignored)
On success, the feature's in_progress flag is set to True.
Returns:
JSON with claimed feature details, or error message if no feature available.
"""
return _feature_claim_next_internal(attempt=0)
@mcp.tool()
def feature_get_for_regression(
limit: Annotated[int, Field(default=3, ge=1, le=10, description="Maximum number of passing features to return")] = 3
) -> str:
"""Get random passing features for regression testing.
Returns a random selection of features that are currently passing.
Use this to verify that previously implemented features still work
after making changes.
Args:
limit: Maximum number of features to return (1-10, default 3)
Returns:
JSON with: features (list of feature objects), count (int)
"""
session = get_session()
try:
features = (
session.query(Feature)
.filter(Feature.passes == True)
.order_by(func.random())
.limit(limit)
.all()
)
return json.dumps({
"features": [f.to_dict() for f in features],
"count": len(features)
}, indent=2)
finally:
session.close()
@mcp.tool()
def feature_mark_passing(
feature_id: Annotated[int, Field(description="The ID of the feature to mark as passing", ge=1)]
) -> str:
"""Mark a feature as passing after successful implementation.
Updates the feature's passes field to true and clears the in_progress flag.
Use this after you have implemented the feature and verified it works correctly.
Args:
feature_id: The ID of the feature to mark as passing
Returns:
JSON with the updated feature details, or error if not found.
"""
session = get_session()
try:
feature = session.query(Feature).filter(Feature.id == feature_id).first()
if feature is None:
return json.dumps({"error": f"Feature with ID {feature_id} not found"})
feature.passes = True
feature.in_progress = False
session.commit()
session.refresh(feature)
return json.dumps(feature.to_dict(), indent=2)
finally:
session.close()
@mcp.tool()
def feature_skip(
feature_id: Annotated[int, Field(description="The ID of the feature to skip", ge=1)]
) -> str:
"""Skip a feature by moving it to the end of the priority queue.
Use this when a feature cannot be implemented yet due to:
- Dependencies on other features that aren't implemented yet
- External blockers (missing assets, unclear requirements)
- Technical prerequisites that need to be addressed first
The feature's priority is set to max_priority + 1, so it will be
worked on after all other pending features. Also clears the in_progress
flag so the feature returns to "pending" status.
Args:
feature_id: The ID of the feature to skip
Returns:
JSON with skip details: id, name, old_priority, new_priority, message
"""
session = get_session()
try:
feature = session.query(Feature).filter(Feature.id == feature_id).first()
if feature is None:
return json.dumps({"error": f"Feature with ID {feature_id} not found"})
if feature.passes:
return json.dumps({"error": "Cannot skip a feature that is already passing"})
old_priority = feature.priority
# Use lock to prevent race condition in priority assignment
with _priority_lock:
# Get max priority and set this feature to max + 1
max_priority_result = session.query(Feature.priority).order_by(Feature.priority.desc()).first()
new_priority = (max_priority_result[0] + 1) if max_priority_result else 1
feature.priority = new_priority
feature.in_progress = False
session.commit()
session.refresh(feature)
return json.dumps({
"id": feature.id,
"name": feature.name,
"old_priority": old_priority,
"new_priority": new_priority,
"message": f"Feature '{feature.name}' moved to end of queue"
}, indent=2)
finally:
session.close()
@mcp.tool()
def feature_mark_in_progress(
feature_id: Annotated[int, Field(description="The ID of the feature to mark as in-progress", ge=1)]
) -> str:
"""Mark a feature as in-progress. Call immediately after feature_get_next().
This prevents other agent sessions from working on the same feature.
Use this as soon as you retrieve a feature to work on.
Args:
feature_id: The ID of the feature to mark as in-progress
Returns:
JSON with the updated feature details, or error if not found or already in-progress.
"""
session = get_session()
try:
feature = session.query(Feature).filter(Feature.id == feature_id).first()
if feature is None:
return json.dumps({"error": f"Feature with ID {feature_id} not found"})
if feature.passes:
return json.dumps({"error": f"Feature with ID {feature_id} is already passing"})
if feature.in_progress:
return json.dumps({"error": f"Feature with ID {feature_id} is already in-progress"})
feature.in_progress = True
session.commit()
session.refresh(feature)
return json.dumps(feature.to_dict(), indent=2)
finally:
session.close()
@mcp.tool()
def feature_clear_in_progress(
feature_id: Annotated[int, Field(description="The ID of the feature to clear in-progress status", ge=1)]
) -> str:
"""Clear in-progress status from a feature.
Use this when abandoning a feature or manually unsticking a stuck feature.
The feature will return to the pending queue.
Args:
feature_id: The ID of the feature to clear in-progress status
Returns:
JSON with the updated feature details, or error if not found.
"""
session = get_session()
try:
feature = session.query(Feature).filter(Feature.id == feature_id).first()
if feature is None:
return json.dumps({"error": f"Feature with ID {feature_id} not found"})
feature.in_progress = False
session.commit()
session.refresh(feature)
return json.dumps(feature.to_dict(), indent=2)
finally:
session.close()
@mcp.tool()
def feature_create_bulk(
features: Annotated[list[dict], Field(description="List of features to create, each with category, name, description, and steps")]
) -> str:
"""Create multiple features in a single operation.
Features are assigned sequential priorities based on their order.
All features start with passes=false.
This is typically used by the initializer agent to set up the initial
feature list from the app specification.
Args:
features: List of features to create, each with:
- category (str): Feature category
- name (str): Feature name
- description (str): Detailed description
- steps (list[str]): Implementation/test steps
- depends_on_indices (list[int], optional): Array indices (0-based) of
features in THIS batch that this feature depends on. Use this instead
of 'dependencies' since IDs aren't known until after creation.
Example: [0, 2] means this feature depends on features at index 0 and 2.
Returns:
JSON with: created (int) - number of features created, with_dependencies (int)
"""
session = get_session()
try:
# Use lock to prevent race condition in priority assignment
with _priority_lock:
# Get the starting priority
max_priority_result = session.query(Feature.priority).order_by(Feature.priority.desc()).first()
start_priority = (max_priority_result[0] + 1) if max_priority_result else 1
# First pass: validate all features and their index-based dependencies
for i, feature_data in enumerate(features):
# Validate required fields
if not all(key in feature_data for key in ["category", "name", "description", "steps"]):
return json.dumps({
"error": f"Feature at index {i} missing required fields (category, name, description, steps)"
})
# Validate depends_on_indices
indices = feature_data.get("depends_on_indices", [])
if indices:
# Check max dependencies
if len(indices) > MAX_DEPENDENCIES_PER_FEATURE:
return json.dumps({
"error": f"Feature at index {i} has {len(indices)} dependencies, max is {MAX_DEPENDENCIES_PER_FEATURE}"
})
# Check for duplicates
if len(indices) != len(set(indices)):
return json.dumps({
"error": f"Feature at index {i} has duplicate dependencies"
})
# Check for forward references (can only depend on earlier features)
for idx in indices:
if not isinstance(idx, int) or idx < 0:
return json.dumps({
"error": f"Feature at index {i} has invalid dependency index: {idx}"
})
if idx >= i:
return json.dumps({
"error": f"Feature at index {i} cannot depend on feature at index {idx} (forward reference not allowed)"
})
# Second pass: create all features
created_features: list[Feature] = []
for i, feature_data in enumerate(features):
db_feature = Feature(
priority=start_priority + i,
category=feature_data["category"],
name=feature_data["name"],
description=feature_data["description"],
steps=feature_data["steps"],
passes=False,
in_progress=False,
)
session.add(db_feature)
created_features.append(db_feature)
# Flush to get IDs assigned
session.flush()
# Third pass: resolve index-based dependencies to actual IDs
deps_count = 0
for i, feature_data in enumerate(features):
indices = feature_data.get("depends_on_indices", [])
if indices:
# Convert indices to actual feature IDs
dep_ids = [created_features[idx].id for idx in indices]
created_features[i].dependencies = sorted(dep_ids)
deps_count += 1
session.commit()
return json.dumps({
"created": len(created_features),
"with_dependencies": deps_count
}, indent=2)
except Exception as e:
session.rollback()
return json.dumps({"error": str(e)})
finally:
session.close()
@mcp.tool()
def feature_create(
category: Annotated[str, Field(min_length=1, max_length=100, description="Feature category (e.g., 'Authentication', 'API', 'UI')")],
name: Annotated[str, Field(min_length=1, max_length=255, description="Feature name")],
description: Annotated[str, Field(min_length=1, description="Detailed description of the feature")],
steps: Annotated[list[str], Field(min_length=1, description="List of implementation/verification steps")]
) -> str:
"""Create a single feature in the project backlog.
Use this when the user asks to add a new feature, capability, or test case.
The feature will be added with the next available priority number.
Args:
category: Feature category for grouping (e.g., 'Authentication', 'API', 'UI')
name: Descriptive name for the feature
description: Detailed description of what this feature should do
steps: List of steps to implement or verify the feature
Returns:
JSON with the created feature details including its ID
"""
session = get_session()
try:
# Use lock to prevent race condition in priority assignment
with _priority_lock:
# Get the next priority
max_priority_result = session.query(Feature.priority).order_by(Feature.priority.desc()).first()
next_priority = (max_priority_result[0] + 1) if max_priority_result else 1
db_feature = Feature(
priority=next_priority,
category=category,
name=name,
description=description,
steps=steps,
passes=False,
in_progress=False,
)
session.add(db_feature)
session.commit()
session.refresh(db_feature)
return json.dumps({
"success": True,
"message": f"Created feature: {name}",
"feature": db_feature.to_dict()
}, indent=2)
except Exception as e:
session.rollback()
return json.dumps({"error": str(e)})
finally:
session.close()
@mcp.tool()
def feature_add_dependency(
feature_id: Annotated[int, Field(ge=1, description="Feature to add dependency to")],
dependency_id: Annotated[int, Field(ge=1, description="ID of the dependency feature")]
) -> str:
"""Add a dependency relationship between features.
The dependency_id feature must be completed before feature_id can be started.
Validates: self-reference, existence, circular dependencies, max limit.
Args:
feature_id: The ID of the feature that will depend on another feature
dependency_id: The ID of the feature that must be completed first
Returns:
JSON with success status and updated dependencies list, or error message
"""
session = get_session()
try:
# Security: Self-reference check
if feature_id == dependency_id:
return json.dumps({"error": "A feature cannot depend on itself"})
feature = session.query(Feature).filter(Feature.id == feature_id).first()
dependency = session.query(Feature).filter(Feature.id == dependency_id).first()
if not feature:
return json.dumps({"error": f"Feature {feature_id} not found"})
if not dependency:
return json.dumps({"error": f"Dependency feature {dependency_id} not found"})
current_deps = feature.dependencies or []
# Security: Max dependencies limit
if len(current_deps) >= MAX_DEPENDENCIES_PER_FEATURE:
return json.dumps({"error": f"Maximum {MAX_DEPENDENCIES_PER_FEATURE} dependencies allowed per feature"})
# Check if already exists
if dependency_id in current_deps:
return json.dumps({"error": "Dependency already exists"})
# Security: Circular dependency check
# would_create_circular_dependency(features, source_id, target_id)
# source_id = feature gaining the dependency, target_id = feature being depended upon
all_features = [f.to_dict() for f in session.query(Feature).all()]
if would_create_circular_dependency(all_features, feature_id, dependency_id):
return json.dumps({"error": "Cannot add: would create circular dependency"})
# Add dependency
current_deps.append(dependency_id)
feature.dependencies = sorted(current_deps)
session.commit()
return json.dumps({
"success": True,
"feature_id": feature_id,
"dependencies": feature.dependencies
})
finally:
session.close()
@mcp.tool()
def feature_remove_dependency(
feature_id: Annotated[int, Field(ge=1, description="Feature to remove dependency from")],
dependency_id: Annotated[int, Field(ge=1, description="ID of dependency to remove")]
) -> str:
"""Remove a dependency from a feature.
Args:
feature_id: The ID of the feature to remove a dependency from
dependency_id: The ID of the dependency to remove
Returns:
JSON with success status and updated dependencies list, or error message
"""
session = get_session()
try:
feature = session.query(Feature).filter(Feature.id == feature_id).first()
if not feature:
return json.dumps({"error": f"Feature {feature_id} not found"})
current_deps = feature.dependencies or []
if dependency_id not in current_deps:
return json.dumps({"error": "Dependency does not exist"})
current_deps.remove(dependency_id)
feature.dependencies = current_deps if current_deps else None
session.commit()
return json.dumps({
"success": True,
"feature_id": feature_id,
"dependencies": feature.dependencies or []
})
finally:
session.close()
@mcp.tool()
def feature_get_ready(
limit: Annotated[int, Field(default=10, ge=1, le=50, description="Max features to return")] = 10
) -> str:
"""Get all features ready to start (dependencies satisfied, not in progress).
Useful for parallel execution - returns multiple features that can run simultaneously.
A feature is ready if it is not passing, not in progress, and all dependencies are passing.
Args:
limit: Maximum number of features to return (1-50, default 10)
Returns:
JSON with: features (list), count (int), total_ready (int)
"""
session = get_session()
try:
all_features = session.query(Feature).all()
passing_ids = {f.id for f in all_features if f.passes}
ready = []
all_dicts = [f.to_dict() for f in all_features]
for f in all_features:
if f.passes or f.in_progress:
continue
deps = f.dependencies or []
if all(dep_id in passing_ids for dep_id in deps):
ready.append(f.to_dict())
# Sort by scheduling score (higher = first), then priority, then id
scores = compute_scheduling_scores(all_dicts)
ready.sort(key=lambda f: (-scores.get(f["id"], 0), f["priority"], f["id"]))
return json.dumps({
"features": ready[:limit],
"count": len(ready[:limit]),
"total_ready": len(ready)
}, indent=2)
finally:
session.close()
@mcp.tool()
def feature_get_blocked() -> str:
"""Get all features that are blocked by unmet dependencies.
Returns features that have dependencies which are not yet passing.
Each feature includes a 'blocked_by' field listing the blocking feature IDs.
Returns:
JSON with: features (list with blocked_by field), count (int)
"""
session = get_session()
try:
all_features = session.query(Feature).all()
passing_ids = {f.id for f in all_features if f.passes}
blocked = []
for f in all_features:
if f.passes:
continue
deps = f.dependencies or []
blocking = [d for d in deps if d not in passing_ids]
if blocking:
blocked.append({
**f.to_dict(),
"blocked_by": blocking
})
return json.dumps({
"features": blocked,
"count": len(blocked)
}, indent=2)
finally:
session.close()
@mcp.tool()
def feature_get_graph() -> str:
"""Get dependency graph data for visualization.
Returns nodes (features) and edges (dependencies) for rendering a graph.
Each node includes status: 'pending', 'in_progress', 'done', or 'blocked'.
Returns:
JSON with: nodes (list), edges (list of {source, target})
"""
session = get_session()
try:
all_features = session.query(Feature).all()
passing_ids = {f.id for f in all_features if f.passes}
nodes = []
edges = []
for f in all_features:
deps = f.dependencies or []
blocking = [d for d in deps if d not in passing_ids]
if f.passes:
status = "done"
elif blocking:
status = "blocked"
elif f.in_progress:
status = "in_progress"
else:
status = "pending"
nodes.append({
"id": f.id,
"name": f.name,
"category": f.category,
"status": status,
"priority": f.priority,
"dependencies": deps
})
for dep_id in deps:
edges.append({"source": dep_id, "target": f.id})
return json.dumps({
"nodes": nodes,
"edges": edges
}, indent=2)
finally:
session.close()
@mcp.tool()
def feature_set_dependencies(
feature_id: Annotated[int, Field(ge=1, description="Feature to set dependencies for")],
dependency_ids: Annotated[list[int], Field(description="List of dependency feature IDs")]
) -> str:
"""Set all dependencies for a feature at once, replacing any existing dependencies.
Validates: self-reference, existence of all dependencies, circular dependencies, max limit.
Args:
feature_id: The ID of the feature to set dependencies for
dependency_ids: List of feature IDs that must be completed first
Returns:
JSON with success status and updated dependencies list, or error message
"""
session = get_session()
try:
# Security: Self-reference check
if feature_id in dependency_ids:
return json.dumps({"error": "A feature cannot depend on itself"})
# Security: Max dependencies limit
if len(dependency_ids) > MAX_DEPENDENCIES_PER_FEATURE:
return json.dumps({"error": f"Maximum {MAX_DEPENDENCIES_PER_FEATURE} dependencies allowed"})
# Check for duplicates
if len(dependency_ids) != len(set(dependency_ids)):
return json.dumps({"error": "Duplicate dependencies not allowed"})
feature = session.query(Feature).filter(Feature.id == feature_id).first()
if not feature:
return json.dumps({"error": f"Feature {feature_id} not found"})
# Validate all dependencies exist
all_feature_ids = {f.id for f in session.query(Feature).all()}
missing = [d for d in dependency_ids if d not in all_feature_ids]
if missing:
return json.dumps({"error": f"Dependencies not found: {missing}"})
# Check for circular dependencies
all_features = [f.to_dict() for f in session.query(Feature).all()]
# Temporarily update the feature's dependencies for cycle check
test_features = []
for f in all_features:
if f["id"] == feature_id:
test_features.append({**f, "dependencies": dependency_ids})
else:
test_features.append(f)
for dep_id in dependency_ids:
# source_id = feature_id (gaining dep), target_id = dep_id (being depended upon)
if would_create_circular_dependency(test_features, feature_id, dep_id):
return json.dumps({"error": f"Cannot add dependency {dep_id}: would create circular dependency"})
# Set dependencies
feature.dependencies = sorted(dependency_ids) if dependency_ids else None
session.commit()
return json.dumps({
"success": True,
"feature_id": feature_id,
"dependencies": feature.dependencies or []
})
finally:
session.close()
if __name__ == "__main__":
mcp.run()