Files
autocoder/mcp_server/feature_mcp.py
Connor Tyndall 398c9d492f feat: Enable assistant chat to create and manage features
Allow users to interact with the project assistant to create features
through natural conversation. The assistant can now:

- Create single features via `feature_create` tool
- Create multiple features via `feature_create_bulk`
- Skip features to deprioritize them via `feature_skip`

Changes:
- Add `feature_create` MCP tool for single-feature creation
- Update assistant allowed tools to include feature management
- Update system prompt to explain new capabilities
- Enhance UI tool call display with friendly messages

Security: File system access remains read-only. The assistant cannot
modify source code or mark features as passing (requires actual
implementation by the coding agent).

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-09 06:10:43 -06:00

470 lines
15 KiB
Python
Executable File

#!/usr/bin/env python3
"""
MCP Server for Feature Management
==================================
Provides tools to manage features in the autonomous coding system,
replacing the previous FastAPI-based REST API.
Tools:
- feature_get_stats: Get progress statistics
- feature_get_next: Get next feature to implement
- feature_get_for_regression: Get random passing features for testing
- feature_mark_passing: Mark a feature as passing
- feature_skip: Skip a feature (move to end of queue)
- feature_mark_in_progress: Mark a feature as in-progress
- feature_clear_in_progress: Clear in-progress status
- feature_create_bulk: Create multiple features at once
- feature_create: Create a single feature
"""
import json
import os
import sys
from contextlib import asynccontextmanager
from pathlib import Path
from typing import Annotated
from mcp.server.fastmcp import FastMCP
from pydantic import BaseModel, Field
from sqlalchemy.sql.expression import func
# Add parent directory to path so we can import from api module
sys.path.insert(0, str(Path(__file__).parent.parent))
from api.database import Feature, create_database
from api.migration import migrate_json_to_sqlite
# Configuration from environment
PROJECT_DIR = Path(os.environ.get("PROJECT_DIR", ".")).resolve()
# Pydantic models for input validation
class MarkPassingInput(BaseModel):
"""Input for marking a feature as passing."""
feature_id: int = Field(..., description="The ID of the feature to mark as passing", ge=1)
class SkipFeatureInput(BaseModel):
"""Input for skipping a feature."""
feature_id: int = Field(..., description="The ID of the feature to skip", ge=1)
class MarkInProgressInput(BaseModel):
"""Input for marking a feature as in-progress."""
feature_id: int = Field(..., description="The ID of the feature to mark as in-progress", ge=1)
class ClearInProgressInput(BaseModel):
"""Input for clearing in-progress status."""
feature_id: int = Field(..., description="The ID of the feature to clear in-progress status", ge=1)
class RegressionInput(BaseModel):
"""Input for getting regression features."""
limit: int = Field(default=3, ge=1, le=10, description="Maximum number of passing features to return")
class FeatureCreateItem(BaseModel):
"""Schema for creating a single feature."""
category: str = Field(..., min_length=1, max_length=100, description="Feature category")
name: str = Field(..., min_length=1, max_length=255, description="Feature name")
description: str = Field(..., min_length=1, description="Detailed description")
steps: list[str] = Field(..., min_length=1, description="Implementation/test steps")
class BulkCreateInput(BaseModel):
"""Input for bulk creating features."""
features: list[FeatureCreateItem] = Field(..., min_length=1, description="List of features to create")
# Global database session maker (initialized on startup)
_session_maker = None
_engine = None
@asynccontextmanager
async def server_lifespan(server: FastMCP):
"""Initialize database on startup, cleanup on shutdown."""
global _session_maker, _engine
# Create project directory if it doesn't exist
PROJECT_DIR.mkdir(parents=True, exist_ok=True)
# Initialize database
_engine, _session_maker = create_database(PROJECT_DIR)
# Run migration if needed (converts legacy JSON to SQLite)
migrate_json_to_sqlite(PROJECT_DIR, _session_maker)
yield
# Cleanup
if _engine:
_engine.dispose()
# Initialize the MCP server
mcp = FastMCP("features", lifespan=server_lifespan)
def get_session():
"""Get a new database session."""
if _session_maker is None:
raise RuntimeError("Database not initialized")
return _session_maker()
@mcp.tool()
def feature_get_stats() -> str:
"""Get statistics about feature completion progress.
Returns the number of passing features, in-progress features, total features,
and completion percentage. Use this to track overall progress of the implementation.
Returns:
JSON with: passing (int), in_progress (int), total (int), percentage (float)
"""
session = get_session()
try:
total = session.query(Feature).count()
passing = session.query(Feature).filter(Feature.passes == True).count()
in_progress = session.query(Feature).filter(Feature.in_progress == True).count()
percentage = round((passing / total) * 100, 1) if total > 0 else 0.0
return json.dumps({
"passing": passing,
"in_progress": in_progress,
"total": total,
"percentage": percentage
}, indent=2)
finally:
session.close()
@mcp.tool()
def feature_get_next() -> str:
"""Get the highest-priority pending feature to work on.
Returns the feature with the lowest priority number that has passes=false.
Use this at the start of each coding session to determine what to implement next.
Returns:
JSON with feature details (id, priority, category, name, description, steps, passes, in_progress)
or error message if all features are passing.
"""
session = get_session()
try:
feature = (
session.query(Feature)
.filter(Feature.passes == False)
.order_by(Feature.priority.asc(), Feature.id.asc())
.first()
)
if feature is None:
return json.dumps({"error": "All features are passing! No more work to do."})
return json.dumps(feature.to_dict(), indent=2)
finally:
session.close()
@mcp.tool()
def feature_get_for_regression(
limit: Annotated[int, Field(default=3, ge=1, le=10, description="Maximum number of passing features to return")] = 3
) -> str:
"""Get random passing features for regression testing.
Returns a random selection of features that are currently passing.
Use this to verify that previously implemented features still work
after making changes.
Args:
limit: Maximum number of features to return (1-10, default 3)
Returns:
JSON with: features (list of feature objects), count (int)
"""
session = get_session()
try:
features = (
session.query(Feature)
.filter(Feature.passes == True)
.order_by(func.random())
.limit(limit)
.all()
)
return json.dumps({
"features": [f.to_dict() for f in features],
"count": len(features)
}, indent=2)
finally:
session.close()
@mcp.tool()
def feature_mark_passing(
feature_id: Annotated[int, Field(description="The ID of the feature to mark as passing", ge=1)]
) -> str:
"""Mark a feature as passing after successful implementation.
Updates the feature's passes field to true and clears the in_progress flag.
Use this after you have implemented the feature and verified it works correctly.
Args:
feature_id: The ID of the feature to mark as passing
Returns:
JSON with the updated feature details, or error if not found.
"""
session = get_session()
try:
feature = session.query(Feature).filter(Feature.id == feature_id).first()
if feature is None:
return json.dumps({"error": f"Feature with ID {feature_id} not found"})
feature.passes = True
feature.in_progress = False
session.commit()
session.refresh(feature)
return json.dumps(feature.to_dict(), indent=2)
finally:
session.close()
@mcp.tool()
def feature_skip(
feature_id: Annotated[int, Field(description="The ID of the feature to skip", ge=1)]
) -> str:
"""Skip a feature by moving it to the end of the priority queue.
Use this when a feature cannot be implemented yet due to:
- Dependencies on other features that aren't implemented yet
- External blockers (missing assets, unclear requirements)
- Technical prerequisites that need to be addressed first
The feature's priority is set to max_priority + 1, so it will be
worked on after all other pending features. Also clears the in_progress
flag so the feature returns to "pending" status.
Args:
feature_id: The ID of the feature to skip
Returns:
JSON with skip details: id, name, old_priority, new_priority, message
"""
session = get_session()
try:
feature = session.query(Feature).filter(Feature.id == feature_id).first()
if feature is None:
return json.dumps({"error": f"Feature with ID {feature_id} not found"})
if feature.passes:
return json.dumps({"error": "Cannot skip a feature that is already passing"})
old_priority = feature.priority
# Get max priority and set this feature to max + 1
max_priority_result = session.query(Feature.priority).order_by(Feature.priority.desc()).first()
new_priority = (max_priority_result[0] + 1) if max_priority_result else 1
feature.priority = new_priority
feature.in_progress = False
session.commit()
session.refresh(feature)
return json.dumps({
"id": feature.id,
"name": feature.name,
"old_priority": old_priority,
"new_priority": new_priority,
"message": f"Feature '{feature.name}' moved to end of queue"
}, indent=2)
finally:
session.close()
@mcp.tool()
def feature_mark_in_progress(
feature_id: Annotated[int, Field(description="The ID of the feature to mark as in-progress", ge=1)]
) -> str:
"""Mark a feature as in-progress. Call immediately after feature_get_next().
This prevents other agent sessions from working on the same feature.
Use this as soon as you retrieve a feature to work on.
Args:
feature_id: The ID of the feature to mark as in-progress
Returns:
JSON with the updated feature details, or error if not found or already in-progress.
"""
session = get_session()
try:
feature = session.query(Feature).filter(Feature.id == feature_id).first()
if feature is None:
return json.dumps({"error": f"Feature with ID {feature_id} not found"})
if feature.passes:
return json.dumps({"error": f"Feature with ID {feature_id} is already passing"})
if feature.in_progress:
return json.dumps({"error": f"Feature with ID {feature_id} is already in-progress"})
feature.in_progress = True
session.commit()
session.refresh(feature)
return json.dumps(feature.to_dict(), indent=2)
finally:
session.close()
@mcp.tool()
def feature_clear_in_progress(
feature_id: Annotated[int, Field(description="The ID of the feature to clear in-progress status", ge=1)]
) -> str:
"""Clear in-progress status from a feature.
Use this when abandoning a feature or manually unsticking a stuck feature.
The feature will return to the pending queue.
Args:
feature_id: The ID of the feature to clear in-progress status
Returns:
JSON with the updated feature details, or error if not found.
"""
session = get_session()
try:
feature = session.query(Feature).filter(Feature.id == feature_id).first()
if feature is None:
return json.dumps({"error": f"Feature with ID {feature_id} not found"})
feature.in_progress = False
session.commit()
session.refresh(feature)
return json.dumps(feature.to_dict(), indent=2)
finally:
session.close()
@mcp.tool()
def feature_create_bulk(
features: Annotated[list[dict], Field(description="List of features to create, each with category, name, description, and steps")]
) -> str:
"""Create multiple features in a single operation.
Features are assigned sequential priorities based on their order.
All features start with passes=false.
This is typically used by the initializer agent to set up the initial
feature list from the app specification.
Args:
features: List of features to create, each with:
- category (str): Feature category
- name (str): Feature name
- description (str): Detailed description
- steps (list[str]): Implementation/test steps
Returns:
JSON with: created (int) - number of features created
"""
session = get_session()
try:
# Get the starting priority
max_priority_result = session.query(Feature.priority).order_by(Feature.priority.desc()).first()
start_priority = (max_priority_result[0] + 1) if max_priority_result else 1
created_count = 0
for i, feature_data in enumerate(features):
# Validate required fields
if not all(key in feature_data for key in ["category", "name", "description", "steps"]):
return json.dumps({
"error": f"Feature at index {i} missing required fields (category, name, description, steps)"
})
db_feature = Feature(
priority=start_priority + i,
category=feature_data["category"],
name=feature_data["name"],
description=feature_data["description"],
steps=feature_data["steps"],
passes=False,
)
session.add(db_feature)
created_count += 1
session.commit()
return json.dumps({"created": created_count}, indent=2)
except Exception as e:
session.rollback()
return json.dumps({"error": str(e)})
finally:
session.close()
@mcp.tool()
def feature_create(
category: Annotated[str, Field(description="Feature category (e.g., 'Authentication', 'API', 'UI')")],
name: Annotated[str, Field(description="Feature name")],
description: Annotated[str, Field(description="Detailed description of the feature")],
steps: Annotated[list[str], Field(description="List of implementation/verification steps")]
) -> str:
"""Create a single feature in the project backlog.
Use this when the user asks to add a new feature, capability, or test case.
The feature will be added with the next available priority number.
Args:
category: Feature category for grouping (e.g., 'Authentication', 'API', 'UI')
name: Descriptive name for the feature
description: Detailed description of what this feature should do
steps: List of steps to implement or verify the feature
Returns:
JSON with the created feature details including its ID
"""
session = get_session()
try:
# Get the next priority
max_priority_result = session.query(Feature.priority).order_by(Feature.priority.desc()).first()
next_priority = (max_priority_result[0] + 1) if max_priority_result else 1
db_feature = Feature(
priority=next_priority,
category=category,
name=name,
description=description,
steps=steps,
passes=False,
)
session.add(db_feature)
session.commit()
session.refresh(db_feature)
return json.dumps({
"success": True,
"message": f"Created feature: {name}",
"feature": db_feature.to_dict()
}, indent=2)
except Exception as e:
session.rollback()
return json.dumps({"error": str(e)})
finally:
session.close()
if __name__ == "__main__":
mcp.run()