mirror of
https://github.com/leonvanzyl/autocoder.git
synced 2026-01-30 06:12:06 +00:00
init
This commit is contained in:
1
mcp_server/__init__.py
Normal file
1
mcp_server/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
"""MCP Server Package for Feature Management."""
|
||||
332
mcp_server/feature_mcp.py
Normal file
332
mcp_server/feature_mcp.py
Normal file
@@ -0,0 +1,332 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
MCP Server for Feature Management
|
||||
==================================
|
||||
|
||||
Provides tools to manage features in the autonomous coding system,
|
||||
replacing the previous FastAPI-based REST API.
|
||||
|
||||
Tools:
|
||||
- feature_get_stats: Get progress statistics
|
||||
- feature_get_next: Get next feature to implement
|
||||
- feature_get_for_regression: Get random passing features for testing
|
||||
- feature_mark_passing: Mark a feature as passing
|
||||
- feature_skip: Skip a feature (move to end of queue)
|
||||
- feature_create_bulk: Create multiple features at once
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
from contextlib import asynccontextmanager
|
||||
from pathlib import Path
|
||||
from typing import Annotated
|
||||
|
||||
from mcp.server.fastmcp import FastMCP
|
||||
from pydantic import BaseModel, Field
|
||||
from sqlalchemy.sql.expression import func
|
||||
|
||||
# Add parent directory to path so we can import from api module
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent))
|
||||
|
||||
from api.database import Feature, create_database
|
||||
from api.migration import migrate_json_to_sqlite
|
||||
|
||||
# Configuration from environment
|
||||
PROJECT_DIR = Path(os.environ.get("PROJECT_DIR", ".")).resolve()
|
||||
|
||||
|
||||
# Pydantic models for input validation
|
||||
class MarkPassingInput(BaseModel):
|
||||
"""Input for marking a feature as passing."""
|
||||
feature_id: int = Field(..., description="The ID of the feature to mark as passing", ge=1)
|
||||
|
||||
|
||||
class SkipFeatureInput(BaseModel):
|
||||
"""Input for skipping a feature."""
|
||||
feature_id: int = Field(..., description="The ID of the feature to skip", ge=1)
|
||||
|
||||
|
||||
class RegressionInput(BaseModel):
|
||||
"""Input for getting regression features."""
|
||||
limit: int = Field(default=3, ge=1, le=10, description="Maximum number of passing features to return")
|
||||
|
||||
|
||||
class FeatureCreateItem(BaseModel):
|
||||
"""Schema for creating a single feature."""
|
||||
category: str = Field(..., min_length=1, max_length=100, description="Feature category")
|
||||
name: str = Field(..., min_length=1, max_length=255, description="Feature name")
|
||||
description: str = Field(..., min_length=1, description="Detailed description")
|
||||
steps: list[str] = Field(..., min_length=1, description="Implementation/test steps")
|
||||
|
||||
|
||||
class BulkCreateInput(BaseModel):
|
||||
"""Input for bulk creating features."""
|
||||
features: list[FeatureCreateItem] = Field(..., min_length=1, description="List of features to create")
|
||||
|
||||
|
||||
# Global database session maker (initialized on startup)
|
||||
_session_maker = None
|
||||
_engine = None
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def server_lifespan(server: FastMCP):
|
||||
"""Initialize database on startup, cleanup on shutdown."""
|
||||
global _session_maker, _engine
|
||||
|
||||
# Create project directory if it doesn't exist
|
||||
PROJECT_DIR.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Initialize database
|
||||
_engine, _session_maker = create_database(PROJECT_DIR)
|
||||
|
||||
# Run migration if needed (converts legacy JSON to SQLite)
|
||||
migrate_json_to_sqlite(PROJECT_DIR, _session_maker)
|
||||
|
||||
yield
|
||||
|
||||
# Cleanup
|
||||
if _engine:
|
||||
_engine.dispose()
|
||||
|
||||
|
||||
# Initialize the MCP server
|
||||
mcp = FastMCP("features", lifespan=server_lifespan)
|
||||
|
||||
|
||||
def get_session():
|
||||
"""Get a new database session."""
|
||||
if _session_maker is None:
|
||||
raise RuntimeError("Database not initialized")
|
||||
return _session_maker()
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
def feature_get_stats() -> str:
|
||||
"""Get statistics about feature completion progress.
|
||||
|
||||
Returns the number of passing features, total features, and completion percentage.
|
||||
Use this to track overall progress of the implementation.
|
||||
|
||||
Returns:
|
||||
JSON with: passing (int), total (int), percentage (float)
|
||||
"""
|
||||
session = get_session()
|
||||
try:
|
||||
total = session.query(Feature).count()
|
||||
passing = session.query(Feature).filter(Feature.passes == True).count()
|
||||
percentage = round((passing / total) * 100, 1) if total > 0 else 0.0
|
||||
|
||||
return json.dumps({
|
||||
"passing": passing,
|
||||
"total": total,
|
||||
"percentage": percentage
|
||||
}, indent=2)
|
||||
finally:
|
||||
session.close()
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
def feature_get_next() -> str:
|
||||
"""Get the highest-priority pending feature to work on.
|
||||
|
||||
Returns the feature with the lowest priority number that has passes=false.
|
||||
Use this at the start of each coding session to determine what to implement next.
|
||||
|
||||
Returns:
|
||||
JSON with feature details (id, priority, category, name, description, steps, passes)
|
||||
or error message if all features are passing.
|
||||
"""
|
||||
session = get_session()
|
||||
try:
|
||||
feature = (
|
||||
session.query(Feature)
|
||||
.filter(Feature.passes == False)
|
||||
.order_by(Feature.priority.asc(), Feature.id.asc())
|
||||
.first()
|
||||
)
|
||||
|
||||
if feature is None:
|
||||
return json.dumps({"error": "All features are passing! No more work to do."})
|
||||
|
||||
return json.dumps(feature.to_dict(), indent=2)
|
||||
finally:
|
||||
session.close()
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
def feature_get_for_regression(
|
||||
limit: Annotated[int, Field(default=3, ge=1, le=10, description="Maximum number of passing features to return")] = 3
|
||||
) -> str:
|
||||
"""Get random passing features for regression testing.
|
||||
|
||||
Returns a random selection of features that are currently passing.
|
||||
Use this to verify that previously implemented features still work
|
||||
after making changes.
|
||||
|
||||
Args:
|
||||
limit: Maximum number of features to return (1-10, default 3)
|
||||
|
||||
Returns:
|
||||
JSON with: features (list of feature objects), count (int)
|
||||
"""
|
||||
session = get_session()
|
||||
try:
|
||||
features = (
|
||||
session.query(Feature)
|
||||
.filter(Feature.passes == True)
|
||||
.order_by(func.random())
|
||||
.limit(limit)
|
||||
.all()
|
||||
)
|
||||
|
||||
return json.dumps({
|
||||
"features": [f.to_dict() for f in features],
|
||||
"count": len(features)
|
||||
}, indent=2)
|
||||
finally:
|
||||
session.close()
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
def feature_mark_passing(
|
||||
feature_id: Annotated[int, Field(description="The ID of the feature to mark as passing", ge=1)]
|
||||
) -> str:
|
||||
"""Mark a feature as passing after successful implementation.
|
||||
|
||||
Updates the feature's passes field to true. Use this after you have
|
||||
implemented the feature and verified it works correctly.
|
||||
|
||||
Args:
|
||||
feature_id: The ID of the feature to mark as passing
|
||||
|
||||
Returns:
|
||||
JSON with the updated feature details, or error if not found.
|
||||
"""
|
||||
session = get_session()
|
||||
try:
|
||||
feature = session.query(Feature).filter(Feature.id == feature_id).first()
|
||||
|
||||
if feature is None:
|
||||
return json.dumps({"error": f"Feature with ID {feature_id} not found"})
|
||||
|
||||
feature.passes = True
|
||||
session.commit()
|
||||
session.refresh(feature)
|
||||
|
||||
return json.dumps(feature.to_dict(), indent=2)
|
||||
finally:
|
||||
session.close()
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
def feature_skip(
|
||||
feature_id: Annotated[int, Field(description="The ID of the feature to skip", ge=1)]
|
||||
) -> str:
|
||||
"""Skip a feature by moving it to the end of the priority queue.
|
||||
|
||||
Use this when a feature cannot be implemented yet due to:
|
||||
- Dependencies on other features that aren't implemented yet
|
||||
- External blockers (missing assets, unclear requirements)
|
||||
- Technical prerequisites that need to be addressed first
|
||||
|
||||
The feature's priority is set to max_priority + 1, so it will be
|
||||
worked on after all other pending features.
|
||||
|
||||
Args:
|
||||
feature_id: The ID of the feature to skip
|
||||
|
||||
Returns:
|
||||
JSON with skip details: id, name, old_priority, new_priority, message
|
||||
"""
|
||||
session = get_session()
|
||||
try:
|
||||
feature = session.query(Feature).filter(Feature.id == feature_id).first()
|
||||
|
||||
if feature is None:
|
||||
return json.dumps({"error": f"Feature with ID {feature_id} not found"})
|
||||
|
||||
if feature.passes:
|
||||
return json.dumps({"error": "Cannot skip a feature that is already passing"})
|
||||
|
||||
old_priority = feature.priority
|
||||
|
||||
# Get max priority and set this feature to max + 1
|
||||
max_priority_result = session.query(Feature.priority).order_by(Feature.priority.desc()).first()
|
||||
new_priority = (max_priority_result[0] + 1) if max_priority_result else 1
|
||||
|
||||
feature.priority = new_priority
|
||||
session.commit()
|
||||
session.refresh(feature)
|
||||
|
||||
return json.dumps({
|
||||
"id": feature.id,
|
||||
"name": feature.name,
|
||||
"old_priority": old_priority,
|
||||
"new_priority": new_priority,
|
||||
"message": f"Feature '{feature.name}' moved to end of queue"
|
||||
}, indent=2)
|
||||
finally:
|
||||
session.close()
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
def feature_create_bulk(
|
||||
features: Annotated[list[dict], Field(description="List of features to create, each with category, name, description, and steps")]
|
||||
) -> str:
|
||||
"""Create multiple features in a single operation.
|
||||
|
||||
Features are assigned sequential priorities based on their order.
|
||||
All features start with passes=false.
|
||||
|
||||
This is typically used by the initializer agent to set up the initial
|
||||
feature list from the app specification.
|
||||
|
||||
Args:
|
||||
features: List of features to create, each with:
|
||||
- category (str): Feature category
|
||||
- name (str): Feature name
|
||||
- description (str): Detailed description
|
||||
- steps (list[str]): Implementation/test steps
|
||||
|
||||
Returns:
|
||||
JSON with: created (int) - number of features created
|
||||
"""
|
||||
session = get_session()
|
||||
try:
|
||||
# Get the starting priority
|
||||
max_priority_result = session.query(Feature.priority).order_by(Feature.priority.desc()).first()
|
||||
start_priority = (max_priority_result[0] + 1) if max_priority_result else 1
|
||||
|
||||
created_count = 0
|
||||
for i, feature_data in enumerate(features):
|
||||
# Validate required fields
|
||||
if not all(key in feature_data for key in ["category", "name", "description", "steps"]):
|
||||
return json.dumps({
|
||||
"error": f"Feature at index {i} missing required fields (category, name, description, steps)"
|
||||
})
|
||||
|
||||
db_feature = Feature(
|
||||
priority=start_priority + i,
|
||||
category=feature_data["category"],
|
||||
name=feature_data["name"],
|
||||
description=feature_data["description"],
|
||||
steps=feature_data["steps"],
|
||||
passes=False,
|
||||
)
|
||||
session.add(db_feature)
|
||||
created_count += 1
|
||||
|
||||
session.commit()
|
||||
|
||||
return json.dumps({"created": created_count}, indent=2)
|
||||
except Exception as e:
|
||||
session.rollback()
|
||||
return json.dumps({"error": str(e)})
|
||||
finally:
|
||||
session.close()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
mcp.run()
|
||||
Reference in New Issue
Block a user