Files
spec-kit/src/specify_cli/presets.py
Copilot 69ee7a836e feat(presets): Pluggable preset system with catalog, resolver, and skills propagation (#1787)
* Initial plan

* feat(templates): add pluggable template system with packs, catalog, resolver, and CLI commands

Co-authored-by: mnriem <15701806+mnriem@users.noreply.github.com>

* test(templates): add comprehensive unit tests for template pack system

Co-authored-by: mnriem <15701806+mnriem@users.noreply.github.com>

* feat(presets): pluggable preset system with template/command overrides, catalog, and resolver

- Rename 'template packs' to 'presets' to avoid naming collision with core templates
- PresetManifest, PresetRegistry, PresetManager, PresetCatalog, PresetResolver in presets.py
- Extract CommandRegistrar to agents.py as shared infrastructure
- CLI: specify preset list/add/remove/search/resolve/info
- CLI: specify preset catalog list/add/remove
- --preset option on specify init
- Priority-based preset stacking (--priority, lower = higher precedence)
- Command overrides registered into all detected agent directories (17+ agents)
- Extension command safety: skip registration if target extension not installed
- Multi-catalog support: env var, project config, user config, built-in defaults
- resolve_template() / Resolve-Template in bash/PowerShell scripts
- Self-test preset: overrides all 6 core templates + 1 command
- Scaffold with 4 examples: core/extension template and command overrides
- Preset catalog (catalog.json, catalog.community.json)
- Documentation: README.md, ARCHITECTURE.md, PUBLISHING.md
- 110 preset tests, 253 total tests passing

* feat(presets): propagate command overrides to skills via init-options

- Add save_init_options() / load_init_options() helpers that persist
  CLI flags from 'specify init' to .specify/init-options.json
- PresetManager._register_skills() overwrites SKILL.md files when
  --ai-skills was used during init and corresponding skill dirs exist
- PresetManager._unregister_skills() restores core template content
  on preset removal
- registered_skills stored in preset registry metadata
- 8 new tests covering skill override, skip conditions, and restore

* fix: address PR check failures (ruff F541, CodeQL URL substring)

- Remove extraneous f-prefix from two f-strings without placeholders
- Replace substring URL check in test with startswith/endswith assertions
  to satisfy CodeQL incomplete URL substring sanitization rule

* fix: address Copilot PR review comments

- Move save_init_options() before preset install so skills propagation
  works during 'specify init --preset --ai-skills'
- Clean up downloaded ZIP after successful preset install during init
- Validate --from URL scheme (require HTTPS, HTTP only for localhost)
- Expose unregister_commands() on extensions.py CommandRegistrar wrapper
  instead of reaching into private _registrar field
- Use _get_merged_packs() for search() and get_pack_info() so all
  active catalogs are searched, not just the highest-priority one
- Fix fetch_catalog() cache to verify cached URL matches current URL
- Fix PresetResolver: script resolution uses .sh extension, consistent
  file extensions throughout resolve(), and resolve_with_source()
  delegates to resolve() to honor template_type parameter
- Fix bash common.sh: fall through to directory scan when python3
  returns empty preset list
- Fix PowerShell Resolve-Template: filter out dot-folders and sort
  extensions deterministically

* fix: narrow empty except blocks and add explanatory comments

* fix: address Copilot PR review comments (round 2)

- Fix init --preset error masking: distinguish "not found" from real errors
- Fix bash resolve_template: skip hidden dirs in extensions (match Python/PS)
- Fix temp dir leaks in tests: use temp_dir fixture instead of mkdtemp
- Fix self-test catalog entry: add note that it's local-only (no download_url)
- Fix Windows path issue in resolve_with_source: use Path.relative_to()
- Fix skill restore path: use project's .specify/templates/commands/ not source tree
- Add encoding="utf-8" to all file read/write in agents.py
- Update test to set up core command templates for skill restoration

* fix: remove self-test from catalog.json (local-only preset)

* fix: address Copilot PR review comments (round 3)

- Fix PS Resolve-Template fallback to skip dot-prefixed dirs (.cache)
- Rename _catalog to _catalog_name for consistency with extension system
- Enforce install_allowed policy in CLI preset add and download_pack()
- Fix shell injection: pass registry path via env var instead of string interpolation

* fix: correct PresetError docstring from template to preset

* Removed CHANGELOG requirement

* Applying review recommendations

* Applying review recommendations

* Applying review recommendations

* Applying review recommendations

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: mnriem <15701806+mnriem@users.noreply.github.com>
2026-03-13 15:09:14 -05:00

1531 lines
53 KiB
Python

"""
Preset Manager for Spec Kit
Handles installation, removal, and management of Spec Kit presets.
Presets are self-contained, versioned collections of templates
(artifact, command, and script templates) that can be installed to
customize the Spec-Driven Development workflow.
"""
import json
import hashlib
import os
import tempfile
import zipfile
import shutil
from dataclasses import dataclass
from pathlib import Path
from typing import Optional, Dict, List, Any
from datetime import datetime, timezone
import re
import yaml
from packaging import version as pkg_version
from packaging.specifiers import SpecifierSet, InvalidSpecifier
@dataclass
class PresetCatalogEntry:
"""Represents a single entry in the preset catalog stack."""
url: str
name: str
priority: int
install_allowed: bool
description: str = ""
class PresetError(Exception):
"""Base exception for preset-related errors."""
pass
class PresetValidationError(PresetError):
"""Raised when preset manifest validation fails."""
pass
class PresetCompatibilityError(PresetError):
"""Raised when preset is incompatible with current environment."""
pass
VALID_PRESET_TEMPLATE_TYPES = {"template", "command", "script"}
class PresetManifest:
"""Represents and validates a preset manifest (preset.yml)."""
SCHEMA_VERSION = "1.0"
REQUIRED_FIELDS = ["schema_version", "preset", "requires", "provides"]
def __init__(self, manifest_path: Path):
"""Load and validate preset manifest.
Args:
manifest_path: Path to preset.yml file
Raises:
PresetValidationError: If manifest is invalid
"""
self.path = manifest_path
self.data = self._load_yaml(manifest_path)
self._validate()
def _load_yaml(self, path: Path) -> dict:
"""Load YAML file safely."""
try:
with open(path, 'r') as f:
return yaml.safe_load(f) or {}
except yaml.YAMLError as e:
raise PresetValidationError(f"Invalid YAML in {path}: {e}")
except FileNotFoundError:
raise PresetValidationError(f"Manifest not found: {path}")
def _validate(self):
"""Validate manifest structure and required fields."""
# Check required top-level fields
for field in self.REQUIRED_FIELDS:
if field not in self.data:
raise PresetValidationError(f"Missing required field: {field}")
# Validate schema version
if self.data["schema_version"] != self.SCHEMA_VERSION:
raise PresetValidationError(
f"Unsupported schema version: {self.data['schema_version']} "
f"(expected {self.SCHEMA_VERSION})"
)
# Validate preset metadata
pack = self.data["preset"]
for field in ["id", "name", "version", "description"]:
if field not in pack:
raise PresetValidationError(f"Missing preset.{field}")
# Validate pack ID format
if not re.match(r'^[a-z0-9-]+$', pack["id"]):
raise PresetValidationError(
f"Invalid preset ID '{pack['id']}': "
"must be lowercase alphanumeric with hyphens only"
)
# Validate semantic version
try:
pkg_version.Version(pack["version"])
except pkg_version.InvalidVersion:
raise PresetValidationError(f"Invalid version: {pack['version']}")
# Validate requires section
requires = self.data["requires"]
if "speckit_version" not in requires:
raise PresetValidationError("Missing requires.speckit_version")
# Validate provides section
provides = self.data["provides"]
if "templates" not in provides or not provides["templates"]:
raise PresetValidationError(
"Preset must provide at least one template"
)
# Validate templates
for tmpl in provides["templates"]:
if "type" not in tmpl or "name" not in tmpl or "file" not in tmpl:
raise PresetValidationError(
"Template missing 'type', 'name', or 'file'"
)
if tmpl["type"] not in VALID_PRESET_TEMPLATE_TYPES:
raise PresetValidationError(
f"Invalid template type '{tmpl['type']}': "
f"must be one of {sorted(VALID_PRESET_TEMPLATE_TYPES)}"
)
# Validate file path safety: must be relative, no parent traversal
file_path = tmpl["file"]
normalized = os.path.normpath(file_path)
if os.path.isabs(normalized) or normalized.startswith(".."):
raise PresetValidationError(
f"Invalid template file path '{file_path}': "
"must be a relative path within the preset directory"
)
# Validate template name format
if tmpl["type"] == "command":
# Commands use dot notation (e.g. speckit.specify)
if not re.match(r'^[a-z0-9.-]+$', tmpl["name"]):
raise PresetValidationError(
f"Invalid command name '{tmpl['name']}': "
"must be lowercase alphanumeric with hyphens and dots only"
)
else:
if not re.match(r'^[a-z0-9-]+$', tmpl["name"]):
raise PresetValidationError(
f"Invalid template name '{tmpl['name']}': "
"must be lowercase alphanumeric with hyphens only"
)
@property
def id(self) -> str:
"""Get preset ID."""
return self.data["preset"]["id"]
@property
def name(self) -> str:
"""Get preset name."""
return self.data["preset"]["name"]
@property
def version(self) -> str:
"""Get preset version."""
return self.data["preset"]["version"]
@property
def description(self) -> str:
"""Get preset description."""
return self.data["preset"]["description"]
@property
def author(self) -> str:
"""Get preset author."""
return self.data["preset"].get("author", "")
@property
def requires_speckit_version(self) -> str:
"""Get required spec-kit version range."""
return self.data["requires"]["speckit_version"]
@property
def templates(self) -> List[Dict[str, Any]]:
"""Get list of provided templates."""
return self.data["provides"]["templates"]
@property
def tags(self) -> List[str]:
"""Get preset tags."""
return self.data.get("tags", [])
def get_hash(self) -> str:
"""Calculate SHA256 hash of manifest file."""
with open(self.path, 'rb') as f:
return f"sha256:{hashlib.sha256(f.read()).hexdigest()}"
class PresetRegistry:
"""Manages the registry of installed presets."""
REGISTRY_FILE = ".registry"
SCHEMA_VERSION = "1.0"
def __init__(self, packs_dir: Path):
"""Initialize registry.
Args:
packs_dir: Path to .specify/presets/ directory
"""
self.packs_dir = packs_dir
self.registry_path = packs_dir / self.REGISTRY_FILE
self.data = self._load()
def _load(self) -> dict:
"""Load registry from disk."""
if not self.registry_path.exists():
return {
"schema_version": self.SCHEMA_VERSION,
"presets": {}
}
try:
with open(self.registry_path, 'r') as f:
return json.load(f)
except (json.JSONDecodeError, FileNotFoundError):
return {
"schema_version": self.SCHEMA_VERSION,
"presets": {}
}
def _save(self):
"""Save registry to disk."""
self.packs_dir.mkdir(parents=True, exist_ok=True)
with open(self.registry_path, 'w') as f:
json.dump(self.data, f, indent=2)
def add(self, pack_id: str, metadata: dict):
"""Add preset to registry.
Args:
pack_id: Preset ID
metadata: Pack metadata (version, source, etc.)
"""
self.data["presets"][pack_id] = {
**metadata,
"installed_at": datetime.now(timezone.utc).isoformat()
}
self._save()
def remove(self, pack_id: str):
"""Remove preset from registry.
Args:
pack_id: Preset ID
"""
if pack_id in self.data["presets"]:
del self.data["presets"][pack_id]
self._save()
def get(self, pack_id: str) -> Optional[dict]:
"""Get preset metadata from registry.
Args:
pack_id: Preset ID
Returns:
Pack metadata or None if not found
"""
return self.data["presets"].get(pack_id)
def list(self) -> Dict[str, dict]:
"""Get all installed presets.
Returns:
Dictionary of pack_id -> metadata
"""
return self.data["presets"]
def list_by_priority(self) -> List[tuple]:
"""Get all installed presets sorted by priority.
Lower priority number = higher precedence (checked first).
Returns:
List of (pack_id, metadata) tuples sorted by priority
"""
packs = self.data["presets"]
return sorted(
packs.items(),
key=lambda item: item[1].get("priority", 10),
)
def is_installed(self, pack_id: str) -> bool:
"""Check if preset is installed.
Args:
pack_id: Preset ID
Returns:
True if pack is installed
"""
return pack_id in self.data["presets"]
class PresetManager:
"""Manages preset lifecycle: installation, removal, updates."""
def __init__(self, project_root: Path):
"""Initialize preset manager.
Args:
project_root: Path to project root directory
"""
self.project_root = project_root
self.presets_dir = project_root / ".specify" / "presets"
self.registry = PresetRegistry(self.presets_dir)
def check_compatibility(
self,
manifest: PresetManifest,
speckit_version: str
) -> bool:
"""Check if preset is compatible with current spec-kit version.
Args:
manifest: Preset manifest
speckit_version: Current spec-kit version
Returns:
True if compatible
Raises:
PresetCompatibilityError: If pack is incompatible
"""
required = manifest.requires_speckit_version
current = pkg_version.Version(speckit_version)
try:
specifier = SpecifierSet(required)
if current not in specifier:
raise PresetCompatibilityError(
f"Preset requires spec-kit {required}, "
f"but {speckit_version} is installed.\n"
f"Upgrade spec-kit with: uv tool install specify-cli --force"
)
except InvalidSpecifier:
raise PresetCompatibilityError(
f"Invalid version specifier: {required}"
)
return True
def _register_commands(
self,
manifest: PresetManifest,
preset_dir: Path
) -> Dict[str, List[str]]:
"""Register preset command overrides with all detected AI agents.
Scans the preset's templates for type "command", reads each command
file, and writes it to every detected agent directory using the
CommandRegistrar from the agents module.
Args:
manifest: Preset manifest
preset_dir: Installed preset directory
Returns:
Dictionary mapping agent names to lists of registered command names
"""
command_templates = [
t for t in manifest.templates if t.get("type") == "command"
]
if not command_templates:
return {}
# Filter out extension command overrides if the extension isn't installed.
# Command names follow the pattern: speckit.<ext-id>.<cmd-name>
# Core commands (e.g. speckit.specify) have only one dot — always register.
extensions_dir = self.project_root / ".specify" / "extensions"
filtered = []
for cmd in command_templates:
parts = cmd["name"].split(".")
if len(parts) >= 3 and parts[0] == "speckit":
ext_id = parts[1]
if not (extensions_dir / ext_id).is_dir():
continue
filtered.append(cmd)
if not filtered:
return {}
try:
from .agents import CommandRegistrar
except ImportError:
return {}
registrar = CommandRegistrar()
return registrar.register_commands_for_all_agents(
filtered, manifest.id, preset_dir, self.project_root
)
def _unregister_commands(self, registered_commands: Dict[str, List[str]]) -> None:
"""Remove previously registered command files from agent directories.
Args:
registered_commands: Dict mapping agent names to command name lists
"""
try:
from .agents import CommandRegistrar
except ImportError:
return
registrar = CommandRegistrar()
registrar.unregister_commands(registered_commands, self.project_root)
def _get_skills_dir(self) -> Optional[Path]:
"""Return the skills directory if ``--ai-skills`` was used during init.
Reads ``.specify/init-options.json`` to determine whether skills
are enabled and which agent was selected, then delegates to
the module-level ``_get_skills_dir()`` helper for the concrete path.
Returns:
The skills directory ``Path``, or ``None`` if skills were not
enabled or the init-options file is missing.
"""
from . import load_init_options, _get_skills_dir
opts = load_init_options(self.project_root)
if not opts.get("ai_skills"):
return None
agent = opts.get("ai")
if not agent:
return None
skills_dir = _get_skills_dir(self.project_root, agent)
if not skills_dir.is_dir():
return None
return skills_dir
def _register_skills(
self,
manifest: "PresetManifest",
preset_dir: Path,
) -> List[str]:
"""Generate SKILL.md files for preset command overrides.
For every command template in the preset, checks whether a
corresponding skill already exists in any detected skills
directory. If so, the skill is overwritten with content derived
from the preset's command file. This ensures that presets that
override commands also propagate to the agentskills.io skill
layer when ``--ai-skills`` was used during project initialisation.
Args:
manifest: Preset manifest.
preset_dir: Installed preset directory.
Returns:
List of skill names that were written (for registry storage).
"""
command_templates = [
t for t in manifest.templates if t.get("type") == "command"
]
if not command_templates:
return []
# Filter out extension command overrides if the extension isn't installed,
# matching the same logic used by _register_commands().
extensions_dir = self.project_root / ".specify" / "extensions"
filtered = []
for cmd in command_templates:
parts = cmd["name"].split(".")
if len(parts) >= 3 and parts[0] == "speckit":
ext_id = parts[1]
if not (extensions_dir / ext_id).is_dir():
continue
filtered.append(cmd)
if not filtered:
return []
skills_dir = self._get_skills_dir()
if not skills_dir:
return []
from . import SKILL_DESCRIPTIONS, load_init_options
opts = load_init_options(self.project_root)
selected_ai = opts.get("ai", "")
written: List[str] = []
for cmd_tmpl in filtered:
cmd_name = cmd_tmpl["name"]
cmd_file_rel = cmd_tmpl["file"]
source_file = preset_dir / cmd_file_rel
if not source_file.exists():
continue
# Derive the short command name (e.g. "specify" from "speckit.specify")
short_name = cmd_name
if short_name.startswith("speckit."):
short_name = short_name[len("speckit."):]
# Kimi CLI discovers skills by directory name and invokes them as
# /skill:<name> — use dot separator to match packaging convention.
if selected_ai == "kimi":
skill_name = f"speckit.{short_name}"
else:
skill_name = f"speckit-{short_name}"
# Only overwrite if the skill already exists (i.e. --ai-skills was used)
skill_subdir = skills_dir / skill_name
if not skill_subdir.exists():
continue
# Parse the command file
content = source_file.read_text(encoding="utf-8")
if content.startswith("---"):
parts = content.split("---", 2)
if len(parts) >= 3:
frontmatter = yaml.safe_load(parts[1])
if not isinstance(frontmatter, dict):
frontmatter = {}
body = parts[2].strip()
else:
frontmatter = {}
body = content
else:
frontmatter = {}
body = content
original_desc = frontmatter.get("description", "")
enhanced_desc = SKILL_DESCRIPTIONS.get(
short_name,
original_desc or f"Spec-kit workflow command: {short_name}",
)
frontmatter_data = {
"name": skill_name,
"description": enhanced_desc,
"compatibility": "Requires spec-kit project structure with .specify/ directory",
"metadata": {
"author": "github-spec-kit",
"source": f"preset:{manifest.id}",
},
}
frontmatter_text = yaml.safe_dump(frontmatter_data, sort_keys=False).strip()
skill_content = (
f"---\n"
f"{frontmatter_text}\n"
f"---\n\n"
f"# Speckit {short_name.title()} Skill\n\n"
f"{body}\n"
)
skill_file = skill_subdir / "SKILL.md"
skill_file.write_text(skill_content, encoding="utf-8")
written.append(skill_name)
return written
def _unregister_skills(self, skill_names: List[str], preset_dir: Path) -> None:
"""Restore original SKILL.md files after a preset is removed.
For each skill that was overridden by the preset, attempts to
regenerate the skill from the core command template. If no core
template exists, the skill directory is removed.
Args:
skill_names: List of skill names written by the preset.
preset_dir: The preset's installed directory (may already be deleted).
"""
if not skill_names:
return
skills_dir = self._get_skills_dir()
if not skills_dir:
return
from . import SKILL_DESCRIPTIONS
# Locate core command templates from the project's installed templates
core_templates_dir = self.project_root / ".specify" / "templates" / "commands"
for skill_name in skill_names:
# Derive command name from skill name (speckit-specify -> specify)
short_name = skill_name
if short_name.startswith("speckit-"):
short_name = short_name[len("speckit-"):]
elif short_name.startswith("speckit."):
short_name = short_name[len("speckit."):]
skill_subdir = skills_dir / skill_name
skill_file = skill_subdir / "SKILL.md"
if not skill_file.exists():
continue
# Try to find the core command template
core_file = core_templates_dir / f"{short_name}.md" if core_templates_dir.exists() else None
if core_file and not core_file.exists():
core_file = None
if core_file:
# Restore from core template
content = core_file.read_text(encoding="utf-8")
if content.startswith("---"):
parts = content.split("---", 2)
if len(parts) >= 3:
frontmatter = yaml.safe_load(parts[1])
if not isinstance(frontmatter, dict):
frontmatter = {}
body = parts[2].strip()
else:
frontmatter = {}
body = content
else:
frontmatter = {}
body = content
original_desc = frontmatter.get("description", "")
enhanced_desc = SKILL_DESCRIPTIONS.get(
short_name,
original_desc or f"Spec-kit workflow command: {short_name}",
)
frontmatter_data = {
"name": skill_name,
"description": enhanced_desc,
"compatibility": "Requires spec-kit project structure with .specify/ directory",
"metadata": {
"author": "github-spec-kit",
"source": f"templates/commands/{short_name}.md",
},
}
frontmatter_text = yaml.safe_dump(frontmatter_data, sort_keys=False).strip()
skill_content = (
f"---\n"
f"{frontmatter_text}\n"
f"---\n\n"
f"# Speckit {short_name.title()} Skill\n\n"
f"{body}\n"
)
skill_file.write_text(skill_content, encoding="utf-8")
else:
# No core template — remove the skill entirely
shutil.rmtree(skill_subdir)
def install_from_directory(
self,
source_dir: Path,
speckit_version: str,
priority: int = 10,
) -> PresetManifest:
"""Install preset from a local directory.
Args:
source_dir: Path to preset directory
speckit_version: Current spec-kit version
priority: Resolution priority (lower = higher precedence, default 10)
Returns:
Installed preset manifest
Raises:
PresetValidationError: If manifest is invalid
PresetCompatibilityError: If pack is incompatible
"""
manifest_path = source_dir / "preset.yml"
manifest = PresetManifest(manifest_path)
self.check_compatibility(manifest, speckit_version)
if self.registry.is_installed(manifest.id):
raise PresetError(
f"Preset '{manifest.id}' is already installed. "
f"Use 'specify preset remove {manifest.id}' first."
)
dest_dir = self.presets_dir / manifest.id
if dest_dir.exists():
shutil.rmtree(dest_dir)
shutil.copytree(source_dir, dest_dir)
# Register command overrides with AI agents
registered_commands = self._register_commands(manifest, dest_dir)
# Update corresponding skills when --ai-skills was previously used
registered_skills = self._register_skills(manifest, dest_dir)
self.registry.add(manifest.id, {
"version": manifest.version,
"source": "local",
"manifest_hash": manifest.get_hash(),
"enabled": True,
"priority": priority,
"registered_commands": registered_commands,
"registered_skills": registered_skills,
})
return manifest
def install_from_zip(
self,
zip_path: Path,
speckit_version: str,
priority: int = 10,
) -> PresetManifest:
"""Install preset from ZIP file.
Args:
zip_path: Path to preset ZIP file
speckit_version: Current spec-kit version
Returns:
Installed preset manifest
Raises:
PresetValidationError: If manifest is invalid
PresetCompatibilityError: If pack is incompatible
"""
with tempfile.TemporaryDirectory() as tmpdir:
temp_path = Path(tmpdir)
with zipfile.ZipFile(zip_path, 'r') as zf:
temp_path_resolved = temp_path.resolve()
for member in zf.namelist():
member_path = (temp_path / member).resolve()
try:
member_path.relative_to(temp_path_resolved)
except ValueError:
raise PresetValidationError(
f"Unsafe path in ZIP archive: {member} "
"(potential path traversal)"
)
zf.extractall(temp_path)
pack_dir = temp_path
manifest_path = pack_dir / "preset.yml"
if not manifest_path.exists():
subdirs = [d for d in temp_path.iterdir() if d.is_dir()]
if len(subdirs) == 1:
pack_dir = subdirs[0]
manifest_path = pack_dir / "preset.yml"
if not manifest_path.exists():
raise PresetValidationError(
"No preset.yml found in ZIP file"
)
return self.install_from_directory(pack_dir, speckit_version, priority)
def remove(self, pack_id: str) -> bool:
"""Remove an installed preset.
Args:
pack_id: Preset ID
Returns:
True if pack was removed
"""
if not self.registry.is_installed(pack_id):
return False
# Unregister commands from AI agents
metadata = self.registry.get(pack_id)
registered_commands = metadata.get("registered_commands", {}) if metadata else {}
if registered_commands:
self._unregister_commands(registered_commands)
# Restore original skills when preset is removed
registered_skills = metadata.get("registered_skills", []) if metadata else []
pack_dir = self.presets_dir / pack_id
if registered_skills:
self._unregister_skills(registered_skills, pack_dir)
if pack_dir.exists():
shutil.rmtree(pack_dir)
self.registry.remove(pack_id)
return True
def list_installed(self) -> List[Dict[str, Any]]:
"""List all installed presets with metadata.
Returns:
List of preset metadata dictionaries
"""
result = []
for pack_id, metadata in self.registry.list().items():
pack_dir = self.presets_dir / pack_id
manifest_path = pack_dir / "preset.yml"
try:
manifest = PresetManifest(manifest_path)
result.append({
"id": pack_id,
"name": manifest.name,
"version": metadata["version"],
"description": manifest.description,
"enabled": metadata.get("enabled", True),
"installed_at": metadata.get("installed_at"),
"template_count": len(manifest.templates),
"tags": manifest.tags,
"priority": metadata.get("priority", 10),
})
except PresetValidationError:
result.append({
"id": pack_id,
"name": pack_id,
"version": metadata.get("version", "unknown"),
"description": "⚠️ Corrupted preset",
"enabled": False,
"installed_at": metadata.get("installed_at"),
"template_count": 0,
"tags": [],
"priority": metadata.get("priority", 10),
})
return result
def get_pack(self, pack_id: str) -> Optional[PresetManifest]:
"""Get manifest for an installed preset.
Args:
pack_id: Preset ID
Returns:
Preset manifest or None if not installed
"""
if not self.registry.is_installed(pack_id):
return None
pack_dir = self.presets_dir / pack_id
manifest_path = pack_dir / "preset.yml"
try:
return PresetManifest(manifest_path)
except PresetValidationError:
return None
class PresetCatalog:
"""Manages preset catalog fetching, caching, and searching.
Supports multi-catalog stacks with priority-based resolution,
mirroring the extension catalog system.
"""
DEFAULT_CATALOG_URL = "https://raw.githubusercontent.com/github/spec-kit/main/presets/catalog.json"
COMMUNITY_CATALOG_URL = "https://raw.githubusercontent.com/github/spec-kit/main/presets/catalog.community.json"
CACHE_DURATION = 3600 # 1 hour in seconds
def __init__(self, project_root: Path):
"""Initialize preset catalog manager.
Args:
project_root: Root directory of the spec-kit project
"""
self.project_root = project_root
self.presets_dir = project_root / ".specify" / "presets"
self.cache_dir = self.presets_dir / ".cache"
self.cache_file = self.cache_dir / "catalog.json"
self.cache_metadata_file = self.cache_dir / "catalog-metadata.json"
def _validate_catalog_url(self, url: str) -> None:
"""Validate that a catalog URL uses HTTPS (localhost HTTP allowed).
Args:
url: URL to validate
Raises:
PresetValidationError: If URL is invalid or uses non-HTTPS scheme
"""
from urllib.parse import urlparse
parsed = urlparse(url)
is_localhost = parsed.hostname in ("localhost", "127.0.0.1", "::1")
if parsed.scheme != "https" and not (
parsed.scheme == "http" and is_localhost
):
raise PresetValidationError(
f"Catalog URL must use HTTPS (got {parsed.scheme}://). "
"HTTP is only allowed for localhost."
)
if not parsed.netloc:
raise PresetValidationError(
"Catalog URL must be a valid URL with a host."
)
def _load_catalog_config(self, config_path: Path) -> Optional[List[PresetCatalogEntry]]:
"""Load catalog stack configuration from a YAML file.
Args:
config_path: Path to preset-catalogs.yml
Returns:
Ordered list of PresetCatalogEntry objects, or None if file
doesn't exist or contains no valid catalog entries.
Raises:
PresetValidationError: If any catalog entry has an invalid URL,
the file cannot be parsed, or a priority value is invalid.
"""
if not config_path.exists():
return None
try:
data = yaml.safe_load(config_path.read_text()) or {}
except (yaml.YAMLError, OSError) as e:
raise PresetValidationError(
f"Failed to read catalog config {config_path}: {e}"
)
if not isinstance(data, dict):
raise PresetValidationError(
f"Invalid catalog config {config_path}: expected a mapping at root, got {type(data).__name__}"
)
catalogs_data = data.get("catalogs", [])
if not catalogs_data:
return None
if not isinstance(catalogs_data, list):
raise PresetValidationError(
f"Invalid catalog config: 'catalogs' must be a list, got {type(catalogs_data).__name__}"
)
entries: List[PresetCatalogEntry] = []
for idx, item in enumerate(catalogs_data):
if not isinstance(item, dict):
raise PresetValidationError(
f"Invalid catalog entry at index {idx}: expected a mapping, got {type(item).__name__}"
)
url = str(item.get("url", "")).strip()
if not url:
continue
self._validate_catalog_url(url)
try:
priority = int(item.get("priority", idx + 1))
except (TypeError, ValueError):
raise PresetValidationError(
f"Invalid priority for catalog '{item.get('name', idx + 1)}': "
f"expected integer, got {item.get('priority')!r}"
)
raw_install = item.get("install_allowed", False)
if isinstance(raw_install, str):
install_allowed = raw_install.strip().lower() in ("true", "yes", "1")
else:
install_allowed = bool(raw_install)
entries.append(PresetCatalogEntry(
url=url,
name=str(item.get("name", f"catalog-{idx + 1}")),
priority=priority,
install_allowed=install_allowed,
description=str(item.get("description", "")),
))
entries.sort(key=lambda e: e.priority)
return entries if entries else None
def get_active_catalogs(self) -> List[PresetCatalogEntry]:
"""Get the ordered list of active preset catalogs.
Resolution order:
1. SPECKIT_PRESET_CATALOG_URL env var — single catalog replacing all defaults
2. Project-level .specify/preset-catalogs.yml
3. User-level ~/.specify/preset-catalogs.yml
4. Built-in default stack (default + community)
Returns:
List of PresetCatalogEntry objects sorted by priority (ascending)
Raises:
PresetValidationError: If a catalog URL is invalid
"""
import sys
# 1. SPECKIT_PRESET_CATALOG_URL env var replaces all defaults
if env_value := os.environ.get("SPECKIT_PRESET_CATALOG_URL"):
catalog_url = env_value.strip()
self._validate_catalog_url(catalog_url)
if catalog_url != self.DEFAULT_CATALOG_URL:
if not getattr(self, "_non_default_catalog_warning_shown", False):
print(
"Warning: Using non-default preset catalog. "
"Only use catalogs from sources you trust.",
file=sys.stderr,
)
self._non_default_catalog_warning_shown = True
return [PresetCatalogEntry(url=catalog_url, name="custom", priority=1, install_allowed=True, description="Custom catalog via SPECKIT_PRESET_CATALOG_URL")]
# 2. Project-level config overrides all defaults
project_config_path = self.project_root / ".specify" / "preset-catalogs.yml"
catalogs = self._load_catalog_config(project_config_path)
if catalogs is not None:
return catalogs
# 3. User-level config
user_config_path = Path.home() / ".specify" / "preset-catalogs.yml"
catalogs = self._load_catalog_config(user_config_path)
if catalogs is not None:
return catalogs
# 4. Built-in default stack
return [
PresetCatalogEntry(url=self.DEFAULT_CATALOG_URL, name="default", priority=1, install_allowed=True, description="Built-in catalog of installable presets"),
PresetCatalogEntry(url=self.COMMUNITY_CATALOG_URL, name="community", priority=2, install_allowed=False, description="Community-contributed presets (discovery only)"),
]
def get_catalog_url(self) -> str:
"""Get the primary catalog URL.
Returns the URL of the highest-priority catalog. Kept for backward
compatibility. Use get_active_catalogs() for full multi-catalog support.
Returns:
URL of the primary catalog
"""
active = self.get_active_catalogs()
return active[0].url if active else self.DEFAULT_CATALOG_URL
def _get_cache_paths(self, url: str):
"""Get cache file paths for a given catalog URL.
For the DEFAULT_CATALOG_URL, uses legacy cache files for backward
compatibility. For all other URLs, uses URL-hash-based cache files.
Returns:
Tuple of (cache_file_path, cache_metadata_path)
"""
if url == self.DEFAULT_CATALOG_URL:
return self.cache_file, self.cache_metadata_file
url_hash = hashlib.sha256(url.encode()).hexdigest()[:16]
return (
self.cache_dir / f"catalog-{url_hash}.json",
self.cache_dir / f"catalog-{url_hash}-metadata.json",
)
def _is_url_cache_valid(self, url: str) -> bool:
"""Check if cached catalog for a specific URL is still valid."""
cache_file, metadata_file = self._get_cache_paths(url)
if not cache_file.exists() or not metadata_file.exists():
return False
try:
metadata = json.loads(metadata_file.read_text())
cached_at = datetime.fromisoformat(metadata.get("cached_at", ""))
if cached_at.tzinfo is None:
cached_at = cached_at.replace(tzinfo=timezone.utc)
age_seconds = (
datetime.now(timezone.utc) - cached_at
).total_seconds()
return age_seconds < self.CACHE_DURATION
except (json.JSONDecodeError, ValueError, KeyError, TypeError):
return False
def _fetch_single_catalog(self, entry: PresetCatalogEntry, force_refresh: bool = False) -> Dict[str, Any]:
"""Fetch a single catalog with per-URL caching.
Args:
entry: PresetCatalogEntry describing the catalog to fetch
force_refresh: If True, bypass cache
Returns:
Catalog data dictionary
Raises:
PresetError: If catalog cannot be fetched
"""
cache_file, metadata_file = self._get_cache_paths(entry.url)
if not force_refresh and self._is_url_cache_valid(entry.url):
try:
return json.loads(cache_file.read_text())
except json.JSONDecodeError:
pass
try:
import urllib.request
import urllib.error
with urllib.request.urlopen(entry.url, timeout=10) as response:
catalog_data = json.loads(response.read())
if (
"schema_version" not in catalog_data
or "presets" not in catalog_data
):
raise PresetError("Invalid preset catalog format")
self.cache_dir.mkdir(parents=True, exist_ok=True)
cache_file.write_text(json.dumps(catalog_data, indent=2))
metadata = {
"cached_at": datetime.now(timezone.utc).isoformat(),
"catalog_url": entry.url,
}
metadata_file.write_text(json.dumps(metadata, indent=2))
return catalog_data
except (ImportError, Exception) as e:
if isinstance(e, PresetError):
raise
raise PresetError(
f"Failed to fetch preset catalog from {entry.url}: {e}"
)
def _get_merged_packs(self, force_refresh: bool = False) -> Dict[str, Dict[str, Any]]:
"""Fetch and merge presets from all active catalogs.
Higher-priority catalogs (lower priority number) win on ID conflicts.
Returns:
Merged dictionary of pack_id -> pack_data
"""
active_catalogs = self.get_active_catalogs()
merged: Dict[str, Dict[str, Any]] = {}
for entry in reversed(active_catalogs):
try:
data = self._fetch_single_catalog(entry, force_refresh)
for pack_id, pack_data in data.get("presets", {}).items():
pack_data_with_catalog = {**pack_data, "_catalog_name": entry.name, "_install_allowed": entry.install_allowed}
merged[pack_id] = pack_data_with_catalog
except PresetError:
continue
return merged
def is_cache_valid(self) -> bool:
"""Check if cached catalog is still valid.
Returns:
True if cache exists and is within cache duration
"""
if not self.cache_file.exists() or not self.cache_metadata_file.exists():
return False
try:
metadata = json.loads(self.cache_metadata_file.read_text())
cached_at = datetime.fromisoformat(metadata.get("cached_at", ""))
if cached_at.tzinfo is None:
cached_at = cached_at.replace(tzinfo=timezone.utc)
age_seconds = (
datetime.now(timezone.utc) - cached_at
).total_seconds()
return age_seconds < self.CACHE_DURATION
except (json.JSONDecodeError, ValueError, KeyError, TypeError):
return False
def fetch_catalog(self, force_refresh: bool = False) -> Dict[str, Any]:
"""Fetch preset catalog from URL or cache.
Args:
force_refresh: If True, bypass cache and fetch from network
Returns:
Catalog data dictionary
Raises:
PresetError: If catalog cannot be fetched
"""
catalog_url = self.get_catalog_url()
if not force_refresh and self.is_cache_valid():
try:
metadata = json.loads(self.cache_metadata_file.read_text())
if metadata.get("catalog_url") == catalog_url:
return json.loads(self.cache_file.read_text())
except (json.JSONDecodeError, OSError):
# Cache is corrupt or unreadable; fall through to network fetch
pass
try:
import urllib.request
import urllib.error
with urllib.request.urlopen(catalog_url, timeout=10) as response:
catalog_data = json.loads(response.read())
if (
"schema_version" not in catalog_data
or "presets" not in catalog_data
):
raise PresetError("Invalid preset catalog format")
self.cache_dir.mkdir(parents=True, exist_ok=True)
self.cache_file.write_text(json.dumps(catalog_data, indent=2))
metadata = {
"cached_at": datetime.now(timezone.utc).isoformat(),
"catalog_url": catalog_url,
}
self.cache_metadata_file.write_text(
json.dumps(metadata, indent=2)
)
return catalog_data
except (ImportError, Exception) as e:
if isinstance(e, PresetError):
raise
raise PresetError(
f"Failed to fetch preset catalog from {catalog_url}: {e}"
)
def search(
self,
query: Optional[str] = None,
tag: Optional[str] = None,
author: Optional[str] = None,
) -> List[Dict[str, Any]]:
"""Search catalog for presets.
Searches across all active catalogs (merged by priority) so that
community and custom catalogs are included in results.
Args:
query: Search query (searches name, description, tags)
tag: Filter by specific tag
author: Filter by author name
Returns:
List of matching preset metadata
"""
try:
packs = self._get_merged_packs()
except PresetError:
return []
results = []
for pack_id, pack_data in packs.items():
if author and pack_data.get("author", "").lower() != author.lower():
continue
if tag and tag.lower() not in [
t.lower() for t in pack_data.get("tags", [])
]:
continue
if query:
query_lower = query.lower()
searchable_text = " ".join(
[
pack_data.get("name", ""),
pack_data.get("description", ""),
pack_id,
]
+ pack_data.get("tags", [])
).lower()
if query_lower not in searchable_text:
continue
results.append({**pack_data, "id": pack_id})
return results
def get_pack_info(
self, pack_id: str
) -> Optional[Dict[str, Any]]:
"""Get detailed information about a specific preset.
Searches across all active catalogs (merged by priority).
Args:
pack_id: ID of the preset
Returns:
Pack metadata or None if not found
"""
try:
packs = self._get_merged_packs()
except PresetError:
return None
if pack_id in packs:
return {**packs[pack_id], "id": pack_id}
return None
def download_pack(
self, pack_id: str, target_dir: Optional[Path] = None
) -> Path:
"""Download preset ZIP from catalog.
Args:
pack_id: ID of the preset to download
target_dir: Directory to save ZIP file (defaults to cache directory)
Returns:
Path to downloaded ZIP file
Raises:
PresetError: If pack not found or download fails
"""
import urllib.request
import urllib.error
pack_info = self.get_pack_info(pack_id)
if not pack_info:
raise PresetError(
f"Preset '{pack_id}' not found in catalog"
)
if not pack_info.get("_install_allowed", True):
catalog_name = pack_info.get("_catalog_name", "unknown")
raise PresetError(
f"Preset '{pack_id}' is from the '{catalog_name}' catalog which does not allow installation. "
f"Use --from with the preset's repository URL instead."
)
download_url = pack_info.get("download_url")
if not download_url:
raise PresetError(
f"Preset '{pack_id}' has no download URL"
)
from urllib.parse import urlparse
parsed = urlparse(download_url)
is_localhost = parsed.hostname in ("localhost", "127.0.0.1", "::1")
if parsed.scheme != "https" and not (
parsed.scheme == "http" and is_localhost
):
raise PresetError(
f"Preset download URL must use HTTPS: {download_url}"
)
if target_dir is None:
target_dir = self.cache_dir / "downloads"
target_dir.mkdir(parents=True, exist_ok=True)
version = pack_info.get("version", "unknown")
zip_filename = f"{pack_id}-{version}.zip"
zip_path = target_dir / zip_filename
try:
with urllib.request.urlopen(download_url, timeout=60) as response:
zip_data = response.read()
zip_path.write_bytes(zip_data)
return zip_path
except urllib.error.URLError as e:
raise PresetError(
f"Failed to download preset from {download_url}: {e}"
)
except IOError as e:
raise PresetError(f"Failed to save preset ZIP: {e}")
def clear_cache(self):
"""Clear all catalog cache files, including per-URL hashed caches."""
if self.cache_dir.exists():
for f in self.cache_dir.iterdir():
if f.is_file() and f.name.startswith("catalog"):
f.unlink(missing_ok=True)
class PresetResolver:
"""Resolves template names to file paths using a priority stack.
Resolution order:
1. .specify/templates/overrides/ - Project-local overrides
2. .specify/presets/<preset-id>/ - Installed presets
3. .specify/extensions/<ext-id>/templates/ - Extension-provided templates
4. .specify/templates/ - Core templates (shipped with Spec Kit)
"""
def __init__(self, project_root: Path):
"""Initialize preset resolver.
Args:
project_root: Path to project root directory
"""
self.project_root = project_root
self.templates_dir = project_root / ".specify" / "templates"
self.presets_dir = project_root / ".specify" / "presets"
self.overrides_dir = self.templates_dir / "overrides"
self.extensions_dir = project_root / ".specify" / "extensions"
def resolve(
self,
template_name: str,
template_type: str = "template",
) -> Optional[Path]:
"""Resolve a template name to its file path.
Walks the priority stack and returns the first match.
Args:
template_name: Template name (e.g., "spec-template")
template_type: Template type ("template", "command", or "script")
Returns:
Path to the resolved template file, or None if not found
"""
# Determine subdirectory based on template type
if template_type == "template":
subdirs = ["templates", ""]
elif template_type == "command":
subdirs = ["commands"]
elif template_type == "script":
subdirs = ["scripts"]
else:
subdirs = [""]
# Determine file extension based on template type
ext = ".md"
if template_type == "script":
ext = ".sh" # scripts use .sh; callers can also check .ps1
# Priority 1: Project-local overrides
if template_type == "script":
override = self.overrides_dir / "scripts" / f"{template_name}{ext}"
else:
override = self.overrides_dir / f"{template_name}{ext}"
if override.exists():
return override
# Priority 2: Installed presets (sorted by priority — lower number wins)
if self.presets_dir.exists():
registry = PresetRegistry(self.presets_dir)
for pack_id, _metadata in registry.list_by_priority():
pack_dir = self.presets_dir / pack_id
for subdir in subdirs:
if subdir:
candidate = pack_dir / subdir / f"{template_name}{ext}"
else:
candidate = pack_dir / f"{template_name}{ext}"
if candidate.exists():
return candidate
# Priority 3: Extension-provided templates
if self.extensions_dir.exists():
for ext_dir in sorted(self.extensions_dir.iterdir()):
if not ext_dir.is_dir() or ext_dir.name.startswith("."):
continue
for subdir in subdirs:
if subdir:
candidate = ext_dir / subdir / f"{template_name}{ext}"
else:
candidate = ext_dir / "templates" / f"{template_name}{ext}"
if candidate.exists():
return candidate
# Priority 4: Core templates
if template_type == "template":
core = self.templates_dir / f"{template_name}.md"
if core.exists():
return core
elif template_type == "command":
core = self.templates_dir / "commands" / f"{template_name}.md"
if core.exists():
return core
elif template_type == "script":
core = self.templates_dir / "scripts" / f"{template_name}{ext}"
if core.exists():
return core
return None
def resolve_with_source(
self,
template_name: str,
template_type: str = "template",
) -> Optional[Dict[str, str]]:
"""Resolve a template name and return source attribution.
Args:
template_name: Template name (e.g., "spec-template")
template_type: Template type ("template", "command", or "script")
Returns:
Dictionary with 'path' and 'source' keys, or None if not found
"""
# Delegate to resolve() for the actual lookup, then determine source
resolved = self.resolve(template_name, template_type)
if resolved is None:
return None
resolved_str = str(resolved)
# Determine source attribution
if str(self.overrides_dir) in resolved_str:
return {"path": resolved_str, "source": "project override"}
if str(self.presets_dir) in resolved_str and self.presets_dir.exists():
registry = PresetRegistry(self.presets_dir)
for pack_id, _metadata in registry.list_by_priority():
pack_dir = self.presets_dir / pack_id
try:
resolved.relative_to(pack_dir)
meta = registry.get(pack_id)
version = meta.get("version", "?") if meta else "?"
return {
"path": resolved_str,
"source": f"{pack_id} v{version}",
}
except ValueError:
continue
if self.extensions_dir.exists():
for ext_dir in sorted(self.extensions_dir.iterdir()):
if not ext_dir.is_dir() or ext_dir.name.startswith("."):
continue
try:
resolved.relative_to(ext_dir)
return {
"path": resolved_str,
"source": f"extension:{ext_dir.name}",
}
except ValueError:
continue
return {"path": resolved_str, "source": "core"}