mirror of
https://github.com/github/spec-kit.git
synced 2026-04-02 18:53:09 +00:00
Stage 1: Integration foundation — base classes, manifest system, and registry (#1925)
* feat: Stage 1 — integration foundation (base classes, manifest, registry) Add the integrations package with: - IntegrationBase ABC and MarkdownIntegration base class - IntegrationOption dataclass for per-integration CLI options - IntegrationManifest with SHA-256 hash-tracked install/uninstall - INTEGRATION_REGISTRY (empty, populated in later stages) - 34 tests at 98% coverage Purely additive — no existing code modified. Part of #1924 * fix: normalize manifest keys to POSIX, type manifest parameter - Store manifest file keys using as_posix() after resolving relative to project root, ensuring cross-platform portable manifests - Type the manifest parameter as IntegrationManifest (via TYPE_CHECKING import) instead of Any in IntegrationBase methods * fix: symlink safety in uninstall/setup, handle invalid JSON in load - uninstall() now uses non-resolved path for deletion so symlinks themselves are removed, not their targets; resolve only for containment validation - setup() keeps unresolved dst_file for copy; resolves separately for project-root validation - load() catches json.JSONDecodeError and re-raises as ValueError with the manifest path for clearer diagnostics - Added test for invalid JSON manifest loading * fix: lexical symlink containment, assert project_root consistency - uninstall() now uses os.path.normpath for lexical containment check instead of resolve(), so in-project symlinks pointing outside are still properly removed - setup() asserts manifest.project_root matches the passed project_root to prevent path mismatches between file operations and manifest recording * fix: handle non-files in check_modified/uninstall, validate manifest key - check_modified() treats non-regular-files (dirs, symlinks) as modified instead of crashing with IsADirectoryError - uninstall() skips directories (adds to skipped list), only unlinks files and symlinks - load() validates stored integration key matches the requested key * fix: safe symlink handling in uninstall - Broken symlinks now removable (lexists check via is_symlink fallback) - Symlinks never hashed (avoids following to external targets) - Symlinks only removed with force=True, otherwise skipped * fix: robust unlink, fail-fast config validation, symlink tests - uninstall() wraps path.unlink() in try/except OSError to avoid partial cleanup on race conditions or permission errors - setup() raises ValueError on missing config or folder instead of silently returning empty - Added 3 tests: symlink in check_modified, symlink skip/force in uninstall (47 total) * fix: check_modified uses lexical containment, explicit is_symlink check - check_modified() no longer calls _validate_rel_path (which resolves symlinks); uses lexical checks (is_absolute, '..' in parts) instead - is_symlink() checked before is_file() so symlinks to files are still treated as modified - Fixed templates_dir() docstring to match actual behavior --------- Co-authored-by: Manfred Riem <15701806+mnriem@users.noreply.github.com>
This commit is contained in:
34
src/specify_cli/integrations/__init__.py
Normal file
34
src/specify_cli/integrations/__init__.py
Normal file
@@ -0,0 +1,34 @@
|
||||
"""Integration registry for AI coding assistants.
|
||||
|
||||
Each integration is a self-contained subpackage that handles setup/teardown
|
||||
for a specific AI assistant (Copilot, Claude, Gemini, etc.).
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from .base import IntegrationBase
|
||||
|
||||
# Maps integration key → IntegrationBase instance.
|
||||
# Populated by later stages as integrations are migrated.
|
||||
INTEGRATION_REGISTRY: dict[str, IntegrationBase] = {}
|
||||
|
||||
|
||||
def _register(integration: IntegrationBase) -> None:
|
||||
"""Register an integration instance in the global registry.
|
||||
|
||||
Raises ``ValueError`` for falsy keys and ``KeyError`` for duplicates.
|
||||
"""
|
||||
key = integration.key
|
||||
if not key:
|
||||
raise ValueError("Cannot register integration with an empty key.")
|
||||
if key in INTEGRATION_REGISTRY:
|
||||
raise KeyError(f"Integration with key {key!r} is already registered.")
|
||||
INTEGRATION_REGISTRY[key] = integration
|
||||
|
||||
|
||||
def get_integration(key: str) -> IntegrationBase | None:
|
||||
"""Return the integration for *key*, or ``None`` if not registered."""
|
||||
return INTEGRATION_REGISTRY.get(key)
|
||||
215
src/specify_cli/integrations/base.py
Normal file
215
src/specify_cli/integrations/base.py
Normal file
@@ -0,0 +1,215 @@
|
||||
"""Base classes for AI-assistant integrations.
|
||||
|
||||
Provides:
|
||||
- ``IntegrationOption`` — declares a CLI option an integration accepts.
|
||||
- ``IntegrationBase`` — abstract base every integration must implement.
|
||||
- ``MarkdownIntegration`` — concrete base for standard Markdown-format
|
||||
integrations (the common case — subclass, set three class attrs, done).
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import shutil
|
||||
from abc import ABC
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import TYPE_CHECKING, Any
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from .manifest import IntegrationManifest
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# IntegrationOption
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class IntegrationOption:
|
||||
"""Declares an option that an integration accepts via ``--integration-options``.
|
||||
|
||||
Attributes:
|
||||
name: The flag name (e.g. ``"--commands-dir"``).
|
||||
is_flag: ``True`` for boolean flags (``--skills``).
|
||||
required: ``True`` if the option must be supplied.
|
||||
default: Default value when not supplied (``None`` → no default).
|
||||
help: One-line description shown in ``specify integrate info``.
|
||||
"""
|
||||
|
||||
name: str
|
||||
is_flag: bool = False
|
||||
required: bool = False
|
||||
default: Any = None
|
||||
help: str = ""
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# IntegrationBase — abstract base class
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class IntegrationBase(ABC):
|
||||
"""Abstract base class every integration must implement.
|
||||
|
||||
Subclasses must set the following class-level attributes:
|
||||
|
||||
* ``key`` — unique identifier, matches actual CLI tool name
|
||||
* ``config`` — dict compatible with ``AGENT_CONFIG`` entries
|
||||
* ``registrar_config`` — dict compatible with ``CommandRegistrar.AGENT_CONFIGS``
|
||||
|
||||
And may optionally set:
|
||||
|
||||
* ``context_file`` — path (relative to project root) of the agent
|
||||
context/instructions file (e.g. ``"CLAUDE.md"``)
|
||||
"""
|
||||
|
||||
# -- Must be set by every subclass ------------------------------------
|
||||
|
||||
key: str = ""
|
||||
"""Unique integration key — should match the actual CLI tool name."""
|
||||
|
||||
config: dict[str, Any] | None = None
|
||||
"""Metadata dict matching the ``AGENT_CONFIG`` shape."""
|
||||
|
||||
registrar_config: dict[str, Any] | None = None
|
||||
"""Registration dict matching ``CommandRegistrar.AGENT_CONFIGS`` shape."""
|
||||
|
||||
# -- Optional ---------------------------------------------------------
|
||||
|
||||
context_file: str | None = None
|
||||
"""Relative path to the agent context file (e.g. ``CLAUDE.md``)."""
|
||||
|
||||
# -- Public API -------------------------------------------------------
|
||||
|
||||
@classmethod
|
||||
def options(cls) -> list[IntegrationOption]:
|
||||
"""Return options this integration accepts. Default: none."""
|
||||
return []
|
||||
|
||||
def templates_dir(self) -> Path:
|
||||
"""Return the path to this integration's bundled templates.
|
||||
|
||||
By convention, templates live in a ``templates/`` subdirectory
|
||||
next to the file where the integration class is defined.
|
||||
"""
|
||||
import inspect
|
||||
|
||||
module_file = inspect.getfile(type(self))
|
||||
return Path(module_file).resolve().parent / "templates"
|
||||
|
||||
def setup(
|
||||
self,
|
||||
project_root: Path,
|
||||
manifest: IntegrationManifest,
|
||||
parsed_options: dict[str, Any] | None = None,
|
||||
**opts: Any,
|
||||
) -> list[Path]:
|
||||
"""Install integration files into *project_root*.
|
||||
|
||||
Returns the list of files created. The default implementation
|
||||
copies every file from ``templates_dir()`` into the commands
|
||||
directory derived from ``config``, recording each in *manifest*.
|
||||
"""
|
||||
created: list[Path] = []
|
||||
tpl_dir = self.templates_dir()
|
||||
if not tpl_dir.is_dir():
|
||||
return created
|
||||
|
||||
if not self.config:
|
||||
raise ValueError(
|
||||
f"{type(self).__name__}.config is not set; integration "
|
||||
"subclasses must define a non-empty 'config' mapping."
|
||||
)
|
||||
folder = self.config.get("folder")
|
||||
if not folder:
|
||||
raise ValueError(
|
||||
f"{type(self).__name__}.config is missing required 'folder' entry."
|
||||
)
|
||||
|
||||
project_root_resolved = project_root.resolve()
|
||||
if manifest.project_root != project_root_resolved:
|
||||
raise ValueError(
|
||||
f"manifest.project_root ({manifest.project_root}) does not match "
|
||||
f"project_root ({project_root_resolved})"
|
||||
)
|
||||
subdir = self.config.get("commands_subdir", "commands")
|
||||
dest = (project_root / folder / subdir).resolve()
|
||||
# Ensure destination stays within the project root
|
||||
try:
|
||||
dest.relative_to(project_root_resolved)
|
||||
except ValueError as exc:
|
||||
raise ValueError(
|
||||
f"Integration destination {dest} escapes "
|
||||
f"project root {project_root_resolved}"
|
||||
) from exc
|
||||
|
||||
dest.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
for src_file in sorted(tpl_dir.iterdir()):
|
||||
if src_file.is_file():
|
||||
dst_file = dest / src_file.name
|
||||
dst_resolved = dst_file.resolve()
|
||||
rel = dst_resolved.relative_to(project_root_resolved)
|
||||
shutil.copy2(src_file, dst_file)
|
||||
manifest.record_existing(rel)
|
||||
created.append(dst_file)
|
||||
|
||||
return created
|
||||
|
||||
def teardown(
|
||||
self,
|
||||
project_root: Path,
|
||||
manifest: IntegrationManifest,
|
||||
*,
|
||||
force: bool = False,
|
||||
) -> tuple[list[Path], list[Path]]:
|
||||
"""Uninstall integration files from *project_root*.
|
||||
|
||||
Delegates to ``manifest.uninstall()`` which only removes files
|
||||
whose hash still matches the recorded value (unless *force*).
|
||||
|
||||
Returns ``(removed, skipped)`` file lists.
|
||||
"""
|
||||
return manifest.uninstall(project_root, force=force)
|
||||
|
||||
# -- Convenience helpers for subclasses -------------------------------
|
||||
|
||||
def install(
|
||||
self,
|
||||
project_root: Path,
|
||||
manifest: IntegrationManifest,
|
||||
parsed_options: dict[str, Any] | None = None,
|
||||
**opts: Any,
|
||||
) -> list[Path]:
|
||||
"""High-level install — calls ``setup()`` and returns created files."""
|
||||
return self.setup(
|
||||
project_root, manifest, parsed_options=parsed_options, **opts
|
||||
)
|
||||
|
||||
def uninstall(
|
||||
self,
|
||||
project_root: Path,
|
||||
manifest: IntegrationManifest,
|
||||
*,
|
||||
force: bool = False,
|
||||
) -> tuple[list[Path], list[Path]]:
|
||||
"""High-level uninstall — calls ``teardown()``."""
|
||||
return self.teardown(project_root, manifest, force=force)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# MarkdownIntegration — covers ~20 standard agents
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class MarkdownIntegration(IntegrationBase):
|
||||
"""Concrete base for integrations that use standard Markdown commands.
|
||||
|
||||
Subclasses only need to set ``key``, ``config``, ``registrar_config``
|
||||
(and optionally ``context_file``). Everything else is inherited.
|
||||
|
||||
The default ``setup()`` from ``IntegrationBase`` copies templates
|
||||
into the agent's commands directory — which is correct for the
|
||||
standard Markdown case.
|
||||
"""
|
||||
|
||||
# MarkdownIntegration inherits IntegrationBase.setup() as-is.
|
||||
# Future stages may add markdown-specific path rewriting here.
|
||||
pass
|
||||
265
src/specify_cli/integrations/manifest.py
Normal file
265
src/specify_cli/integrations/manifest.py
Normal file
@@ -0,0 +1,265 @@
|
||||
"""Hash-tracked installation manifest for integrations.
|
||||
|
||||
Each installed integration records the files it created together with
|
||||
their SHA-256 hashes. On uninstall only files whose hash still matches
|
||||
the recorded value are removed — modified files are left in place and
|
||||
reported to the caller.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import hashlib
|
||||
import json
|
||||
import os
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
|
||||
def _sha256(path: Path) -> str:
|
||||
"""Return the hex SHA-256 digest of *path*."""
|
||||
h = hashlib.sha256()
|
||||
with open(path, "rb") as fh:
|
||||
for chunk in iter(lambda: fh.read(8192), b""):
|
||||
h.update(chunk)
|
||||
return h.hexdigest()
|
||||
|
||||
|
||||
def _validate_rel_path(rel: Path, root: Path) -> Path:
|
||||
"""Resolve *rel* against *root* and verify it stays within *root*.
|
||||
|
||||
Raises ``ValueError`` if *rel* is absolute, contains ``..`` segments
|
||||
that escape *root*, or otherwise resolves outside the project root.
|
||||
"""
|
||||
if rel.is_absolute():
|
||||
raise ValueError(
|
||||
f"Absolute paths are not allowed in manifests: {rel}"
|
||||
)
|
||||
resolved = (root / rel).resolve()
|
||||
root_resolved = root.resolve()
|
||||
try:
|
||||
resolved.relative_to(root_resolved)
|
||||
except ValueError:
|
||||
raise ValueError(
|
||||
f"Path {rel} resolves to {resolved} which is outside "
|
||||
f"the project root {root_resolved}"
|
||||
) from None
|
||||
return resolved
|
||||
|
||||
|
||||
class IntegrationManifest:
|
||||
"""Tracks files installed by a single integration.
|
||||
|
||||
Parameters:
|
||||
key: Integration identifier (e.g. ``"copilot"``).
|
||||
project_root: Absolute path to the project directory.
|
||||
version: CLI version string recorded in the manifest.
|
||||
"""
|
||||
|
||||
def __init__(self, key: str, project_root: Path, version: str = "") -> None:
|
||||
self.key = key
|
||||
self.project_root = project_root.resolve()
|
||||
self.version = version
|
||||
self._files: dict[str, str] = {} # rel_path → sha256 hex
|
||||
self._installed_at: str = ""
|
||||
|
||||
# -- Manifest file location -------------------------------------------
|
||||
|
||||
@property
|
||||
def manifest_path(self) -> Path:
|
||||
"""Path to the on-disk manifest JSON."""
|
||||
return self.project_root / ".specify" / "integrations" / f"{self.key}.manifest.json"
|
||||
|
||||
# -- Recording files --------------------------------------------------
|
||||
|
||||
def record_file(self, rel_path: str | Path, content: bytes | str) -> Path:
|
||||
"""Write *content* to *rel_path* (relative to project root) and record its hash.
|
||||
|
||||
Creates parent directories as needed. Returns the absolute path
|
||||
of the written file.
|
||||
|
||||
Raises ``ValueError`` if *rel_path* resolves outside the project root.
|
||||
"""
|
||||
rel = Path(rel_path)
|
||||
abs_path = _validate_rel_path(rel, self.project_root)
|
||||
abs_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
if isinstance(content, str):
|
||||
content = content.encode("utf-8")
|
||||
abs_path.write_bytes(content)
|
||||
|
||||
normalized = abs_path.relative_to(self.project_root).as_posix()
|
||||
self._files[normalized] = hashlib.sha256(content).hexdigest()
|
||||
return abs_path
|
||||
|
||||
def record_existing(self, rel_path: str | Path) -> None:
|
||||
"""Record the hash of an already-existing file at *rel_path*.
|
||||
|
||||
Raises ``ValueError`` if *rel_path* resolves outside the project root.
|
||||
"""
|
||||
rel = Path(rel_path)
|
||||
abs_path = _validate_rel_path(rel, self.project_root)
|
||||
normalized = abs_path.relative_to(self.project_root).as_posix()
|
||||
self._files[normalized] = _sha256(abs_path)
|
||||
|
||||
# -- Querying ---------------------------------------------------------
|
||||
|
||||
@property
|
||||
def files(self) -> dict[str, str]:
|
||||
"""Return a copy of the ``{rel_path: sha256}`` mapping."""
|
||||
return dict(self._files)
|
||||
|
||||
def check_modified(self) -> list[str]:
|
||||
"""Return relative paths of tracked files whose content changed on disk."""
|
||||
modified: list[str] = []
|
||||
for rel, expected_hash in self._files.items():
|
||||
rel_path = Path(rel)
|
||||
# Skip paths that are absolute or attempt to escape the project root
|
||||
if rel_path.is_absolute() or ".." in rel_path.parts:
|
||||
continue
|
||||
abs_path = self.project_root / rel_path
|
||||
if not abs_path.exists() and not abs_path.is_symlink():
|
||||
continue
|
||||
# Treat symlinks and non-regular-files as modified
|
||||
if abs_path.is_symlink() or not abs_path.is_file():
|
||||
modified.append(rel)
|
||||
continue
|
||||
if _sha256(abs_path) != expected_hash:
|
||||
modified.append(rel)
|
||||
return modified
|
||||
|
||||
# -- Uninstall --------------------------------------------------------
|
||||
|
||||
def uninstall(
|
||||
self,
|
||||
project_root: Path | None = None,
|
||||
*,
|
||||
force: bool = False,
|
||||
) -> tuple[list[Path], list[Path]]:
|
||||
"""Remove tracked files whose hash still matches.
|
||||
|
||||
Parameters:
|
||||
project_root: Override for the project root.
|
||||
force: If ``True``, remove files even if modified.
|
||||
|
||||
Returns:
|
||||
``(removed, skipped)`` — absolute paths.
|
||||
"""
|
||||
root = (project_root or self.project_root).resolve()
|
||||
removed: list[Path] = []
|
||||
skipped: list[Path] = []
|
||||
|
||||
for rel, expected_hash in self._files.items():
|
||||
# Use non-resolved path for deletion so symlinks themselves
|
||||
# are removed, not their targets.
|
||||
path = root / rel
|
||||
# Validate containment lexically (without following symlinks)
|
||||
# by collapsing .. segments via Path resolution on the string parts.
|
||||
try:
|
||||
normed = Path(os.path.normpath(path))
|
||||
normed.relative_to(root)
|
||||
except (ValueError, OSError):
|
||||
continue
|
||||
if not path.exists() and not path.is_symlink():
|
||||
continue
|
||||
# Skip directories — manifest only tracks files
|
||||
if not path.is_file() and not path.is_symlink():
|
||||
skipped.append(path)
|
||||
continue
|
||||
# Never follow symlinks when comparing hashes. Only remove
|
||||
# symlinks when forced, to avoid acting on tampered entries.
|
||||
if path.is_symlink():
|
||||
if not force:
|
||||
skipped.append(path)
|
||||
continue
|
||||
else:
|
||||
if not force and _sha256(path) != expected_hash:
|
||||
skipped.append(path)
|
||||
continue
|
||||
try:
|
||||
path.unlink()
|
||||
except OSError:
|
||||
skipped.append(path)
|
||||
continue
|
||||
removed.append(path)
|
||||
# Clean up empty parent directories up to project root
|
||||
parent = path.parent
|
||||
while parent != root:
|
||||
try:
|
||||
parent.rmdir() # only succeeds if empty
|
||||
except OSError:
|
||||
break
|
||||
parent = parent.parent
|
||||
|
||||
# Remove the manifest file itself
|
||||
manifest = root / ".specify" / "integrations" / f"{self.key}.manifest.json"
|
||||
if manifest.exists():
|
||||
manifest.unlink()
|
||||
parent = manifest.parent
|
||||
while parent != root:
|
||||
try:
|
||||
parent.rmdir()
|
||||
except OSError:
|
||||
break
|
||||
parent = parent.parent
|
||||
|
||||
return removed, skipped
|
||||
|
||||
# -- Persistence ------------------------------------------------------
|
||||
|
||||
def save(self) -> Path:
|
||||
"""Write the manifest to disk. Returns the manifest path."""
|
||||
self._installed_at = self._installed_at or datetime.now(timezone.utc).isoformat()
|
||||
data: dict[str, Any] = {
|
||||
"integration": self.key,
|
||||
"version": self.version,
|
||||
"installed_at": self._installed_at,
|
||||
"files": self._files,
|
||||
}
|
||||
path = self.manifest_path
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
path.write_text(json.dumps(data, indent=2) + "\n", encoding="utf-8")
|
||||
return path
|
||||
|
||||
@classmethod
|
||||
def load(cls, key: str, project_root: Path) -> IntegrationManifest:
|
||||
"""Load an existing manifest from disk.
|
||||
|
||||
Raises ``FileNotFoundError`` if the manifest does not exist.
|
||||
"""
|
||||
inst = cls(key, project_root)
|
||||
path = inst.manifest_path
|
||||
try:
|
||||
data = json.loads(path.read_text(encoding="utf-8"))
|
||||
except json.JSONDecodeError as exc:
|
||||
raise ValueError(
|
||||
f"Integration manifest at {path} contains invalid JSON"
|
||||
) from exc
|
||||
|
||||
if not isinstance(data, dict):
|
||||
raise ValueError(
|
||||
f"Integration manifest at {path} must be a JSON object, "
|
||||
f"got {type(data).__name__}"
|
||||
)
|
||||
|
||||
files = data.get("files", {})
|
||||
if not isinstance(files, dict) or not all(
|
||||
isinstance(k, str) and isinstance(v, str) for k, v in files.items()
|
||||
):
|
||||
raise ValueError(
|
||||
f"Integration manifest 'files' at {path} must be a "
|
||||
"mapping of string paths to string hashes"
|
||||
)
|
||||
|
||||
inst.version = data.get("version", "")
|
||||
inst._installed_at = data.get("installed_at", "")
|
||||
inst._files = files
|
||||
|
||||
stored_key = data.get("integration", "")
|
||||
if stored_key and stored_key != key:
|
||||
raise ValueError(
|
||||
f"Manifest at {path} belongs to integration {stored_key!r}, "
|
||||
f"not {key!r}"
|
||||
)
|
||||
|
||||
return inst
|
||||
Reference in New Issue
Block a user