feat(presets): add enable/disable toggle and update semantics (#1891)

* feat(presets): add enable/disable toggle and update semantics

Add preset enable/disable CLI commands and update semantics to match
the extension system capabilities.

Changes:
- Add `preset enable` and `preset disable` CLI commands
- Add `restore()` method to PresetRegistry for rollback scenarios
- Update `get()` and `list()` to return deep copies (prevents mutation)
- Update `list_by_priority()` to filter disabled presets by default
- Add input validation to `restore()` for defensive programming
- Add 16 new tests covering all functionality and edge cases

Closes #1851
Closes #1852

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* fix: address PR review - deep copy and error message accuracy

- Fix error message in restore() to match actual validation ("dict" not "non-empty dict")
- Use copy.deepcopy() in restore() to prevent caller mutation
- Apply same fixes to ExtensionRegistry for parity
- Add /defensive-check command for pre-PR validation
- Add tests for restore() validation and deep copy behavior

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* revert: remove defensive-check command from PR

* fix: address PR review - clarify messaging and add parity

- Add note to enable/disable output clarifying commands/skills remain active
- Add include_disabled parameter to ExtensionRegistry.list_by_priority for parity
- Add tests for extension disabled filtering

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* fix: address PR review - disabled extension resolution and corrupted entries

- Fix _get_all_extensions_by_priority to use include_disabled=True for tracking
  registered IDs, preventing disabled extensions from being picked up as
  unregistered directories
- Add corrupted entry handling to get() - returns None for non-dict entries
- Add integration tests for disabled extension template resolution
- Add tests for get() corrupted entry handling in both registries

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* fix: handle corrupted registry in list() methods

- Add defensive handling to list() when presets/extensions is not a dict
- Return empty dict instead of crashing on corrupted registry
- Apply same fix to both PresetRegistry and ExtensionRegistry for parity
- Add tests for corrupted registry handling

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* fix: validate top-level registry structure in get() and restore()

- get() now validates self.data["presets/extensions"] is a dict before accessing
- restore() ensures presets/extensions dict exists before writing
- Prevents crashes when registry JSON is parseable but has corrupted structure
- Applied same fixes to both PresetRegistry and ExtensionRegistry for parity

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* fix: validate root-level JSON structure in _load() and is_installed()

- _load() now validates json.load() result is a dict before returning
- is_installed() validates presets/extensions is a dict before checking membership
- Prevents crashes when registry file is valid JSON but wrong type (e.g., array)
- Applied same fixes to both registries for parity

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* fix: normalize presets/extensions field in _load()

- _load() now normalizes the presets/extensions field to {} if not a dict
- Makes corrupted registries recoverable for add/update/remove operations
- Applied same fix to both registries for parity

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* fix: use raw registry keys to track corrupted extensions

- Use registry.list().keys() instead of list_by_priority() for tracking
- Corrupted entries are now treated as tracked, not picked up as unregistered
- Tighten test assertion for disabled preset resolution
- Update test to match new expected behavior for corrupted entries

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* fix: handle None metadata in ExtensionManager.remove()

- Add defensive check for corrupted metadata in remove()
- Match existing pattern in PresetManager.remove()

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* fix: add keys() method and filter corrupted entries in list()

- Add lightweight keys() method that returns IDs without deep copy
- Update list() to filter out non-dict entries (match type contract)
- Use keys() instead of list().keys() for performance
- Fix comment to reflect actual behavior

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* fix: address defensive-check findings - deep copy, corruption guards, parity

- Extension enable/disable: use delta pattern matching presets
- add(): use copy.deepcopy(metadata) in both registries
- remove(): guard outer field for corruption in both registries
- update(): guard outer field for corruption in both registries

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* fix: deep copy updates in update() to prevent caller mutation

Both PresetRegistry.update() and ExtensionRegistry.update() now deep
copy the input updates/metadata dict to prevent callers from mutating
nested objects after the call.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

---------

Co-authored-by: iamaeroplane <michal.bachorik@gmail.com>
Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Michal Bachorik
2026-03-19 13:48:48 +01:00
committed by GitHub
parent f6794685b6
commit 2bf655e261
5 changed files with 783 additions and 54 deletions

View File

@@ -2419,6 +2419,89 @@ def preset_set_priority(
console.print("\n[dim]Lower priority = higher precedence in template resolution[/dim]")
@preset_app.command("enable")
def preset_enable(
pack_id: str = typer.Argument(help="Preset ID to enable"),
):
"""Enable a disabled preset."""
from .presets import PresetManager
project_root = Path.cwd()
# Check if we're in a spec-kit project
specify_dir = project_root / ".specify"
if not specify_dir.exists():
console.print("[red]Error:[/red] Not a spec-kit project (no .specify/ directory)")
console.print("Run this command from a spec-kit project root")
raise typer.Exit(1)
manager = PresetManager(project_root)
# Check if preset is installed
if not manager.registry.is_installed(pack_id):
console.print(f"[red]Error:[/red] Preset '{pack_id}' is not installed")
raise typer.Exit(1)
# Get current metadata
metadata = manager.registry.get(pack_id)
if metadata is None or not isinstance(metadata, dict):
console.print(f"[red]Error:[/red] Preset '{pack_id}' not found in registry (corrupted state)")
raise typer.Exit(1)
if metadata.get("enabled", True):
console.print(f"[yellow]Preset '{pack_id}' is already enabled[/yellow]")
raise typer.Exit(0)
# Enable the preset
manager.registry.update(pack_id, {"enabled": True})
console.print(f"[green]✓[/green] Preset '{pack_id}' enabled")
console.print("\nTemplates from this preset will now be included in resolution.")
console.print("[dim]Note: Previously registered commands/skills remain active.[/dim]")
@preset_app.command("disable")
def preset_disable(
pack_id: str = typer.Argument(help="Preset ID to disable"),
):
"""Disable a preset without removing it."""
from .presets import PresetManager
project_root = Path.cwd()
# Check if we're in a spec-kit project
specify_dir = project_root / ".specify"
if not specify_dir.exists():
console.print("[red]Error:[/red] Not a spec-kit project (no .specify/ directory)")
console.print("Run this command from a spec-kit project root")
raise typer.Exit(1)
manager = PresetManager(project_root)
# Check if preset is installed
if not manager.registry.is_installed(pack_id):
console.print(f"[red]Error:[/red] Preset '{pack_id}' is not installed")
raise typer.Exit(1)
# Get current metadata
metadata = manager.registry.get(pack_id)
if metadata is None or not isinstance(metadata, dict):
console.print(f"[red]Error:[/red] Preset '{pack_id}' not found in registry (corrupted state)")
raise typer.Exit(1)
if not metadata.get("enabled", True):
console.print(f"[yellow]Preset '{pack_id}' is already disabled[/yellow]")
raise typer.Exit(0)
# Disable the preset
manager.registry.update(pack_id, {"enabled": False})
console.print(f"[green]✓[/green] Preset '{pack_id}' disabled")
console.print("\nTemplates from this preset will be skipped during resolution.")
console.print("[dim]Note: Previously registered commands/skills remain active until preset removal.[/dim]")
console.print(f"To re-enable: specify preset enable {pack_id}")
# ===== Preset Catalog Commands =====
@@ -3855,8 +3938,7 @@ def extension_enable(
console.print(f"[yellow]Extension '{display_name}' is already enabled[/yellow]")
raise typer.Exit(0)
metadata["enabled"] = True
manager.registry.update(extension_id, metadata)
manager.registry.update(extension_id, {"enabled": True})
# Enable hooks in extensions.yml
config = hook_executor.get_project_config()
@@ -3903,8 +3985,7 @@ def extension_disable(
console.print(f"[yellow]Extension '{display_name}' is already disabled[/yellow]")
raise typer.Exit(0)
metadata["enabled"] = False
manager.registry.update(extension_id, metadata)
manager.registry.update(extension_id, {"enabled": False})
# Disable hooks in extensions.yml
config = hook_executor.get_project_config()

View File

@@ -222,7 +222,17 @@ class ExtensionRegistry:
try:
with open(self.registry_path, 'r') as f:
return json.load(f)
data = json.load(f)
# Validate loaded data is a dict (handles corrupted registry files)
if not isinstance(data, dict):
return {
"schema_version": self.SCHEMA_VERSION,
"extensions": {}
}
# Normalize extensions field (handles corrupted extensions value)
if not isinstance(data.get("extensions"), dict):
data["extensions"] = {}
return data
except (json.JSONDecodeError, FileNotFoundError):
# Corrupted or missing registry, start fresh
return {
@@ -244,7 +254,7 @@ class ExtensionRegistry:
metadata: Extension metadata (version, source, etc.)
"""
self.data["extensions"][extension_id] = {
**metadata,
**copy.deepcopy(metadata),
"installed_at": datetime.now(timezone.utc).isoformat()
}
self._save()
@@ -267,15 +277,16 @@ class ExtensionRegistry:
Raises:
KeyError: If extension is not installed
"""
if extension_id not in self.data["extensions"]:
extensions = self.data.get("extensions")
if not isinstance(extensions, dict) or extension_id not in extensions:
raise KeyError(f"Extension '{extension_id}' is not installed")
# Merge new metadata with existing, preserving original installed_at
existing = self.data["extensions"][extension_id]
existing = extensions[extension_id]
# Handle corrupted registry entries (e.g., string/list instead of dict)
if not isinstance(existing, dict):
existing = {}
# Merge: existing fields preserved, new fields override
merged = {**existing, **metadata}
# Merge: existing fields preserved, new fields override (deep copy to prevent caller mutation)
merged = {**existing, **copy.deepcopy(metadata)}
# Always preserve original installed_at based on key existence, not truthiness,
# to handle cases where the field exists but may be falsy (legacy/corruption)
if "installed_at" in existing:
@@ -283,7 +294,7 @@ class ExtensionRegistry:
else:
# If not present in existing, explicitly remove from merged if caller provided it
merged.pop("installed_at", None)
self.data["extensions"][extension_id] = merged
extensions[extension_id] = merged
self._save()
def restore(self, extension_id: str, metadata: dict):
@@ -296,8 +307,16 @@ class ExtensionRegistry:
Args:
extension_id: Extension ID
metadata: Complete extension metadata including installed_at
Raises:
ValueError: If metadata is None or not a dict
"""
self.data["extensions"][extension_id] = dict(metadata)
if metadata is None or not isinstance(metadata, dict):
raise ValueError(f"Cannot restore '{extension_id}': metadata must be a dict")
# Ensure extensions dict exists (handle corrupted registry)
if not isinstance(self.data.get("extensions"), dict):
self.data["extensions"] = {}
self.data["extensions"][extension_id] = copy.deepcopy(metadata)
self._save()
def remove(self, extension_id: str):
@@ -306,8 +325,11 @@ class ExtensionRegistry:
Args:
extension_id: Extension ID
"""
if extension_id in self.data["extensions"]:
del self.data["extensions"][extension_id]
extensions = self.data.get("extensions")
if not isinstance(extensions, dict):
return
if extension_id in extensions:
del extensions[extension_id]
self._save()
def get(self, extension_id: str) -> Optional[dict]:
@@ -320,21 +342,49 @@ class ExtensionRegistry:
extension_id: Extension ID
Returns:
Deep copy of extension metadata, or None if not found
Deep copy of extension metadata, or None if not found or corrupted
"""
entry = self.data["extensions"].get(extension_id)
return copy.deepcopy(entry) if entry is not None else None
extensions = self.data.get("extensions")
if not isinstance(extensions, dict):
return None
entry = extensions.get(extension_id)
# Return None for missing or corrupted (non-dict) entries
if entry is None or not isinstance(entry, dict):
return None
return copy.deepcopy(entry)
def list(self) -> Dict[str, dict]:
"""Get all installed extensions.
"""Get all installed extensions with valid metadata.
Returns a deep copy of the extensions mapping to prevent callers
from accidentally mutating nested internal registry state.
Returns a deep copy of extensions with dict metadata only.
Corrupted entries (non-dict values) are filtered out.
Returns:
Dictionary of extension_id -> metadata (deep copies)
Dictionary of extension_id -> metadata (deep copies), empty dict if corrupted
"""
return copy.deepcopy(self.data["extensions"])
extensions = self.data.get("extensions", {}) or {}
if not isinstance(extensions, dict):
return {}
# Filter to only valid dict entries to match type contract
return {
ext_id: copy.deepcopy(meta)
for ext_id, meta in extensions.items()
if isinstance(meta, dict)
}
def keys(self) -> set:
"""Get all extension IDs including corrupted entries.
Lightweight method that returns IDs without deep-copying metadata.
Use this when you only need to check which extensions are tracked.
Returns:
Set of extension IDs (includes corrupted entries)
"""
extensions = self.data.get("extensions", {}) or {}
if not isinstance(extensions, dict):
return set()
return set(extensions.keys())
def is_installed(self, extension_id: str) -> bool:
"""Check if extension is installed.
@@ -343,17 +393,23 @@ class ExtensionRegistry:
extension_id: Extension ID
Returns:
True if extension is installed
True if extension is installed, False if not or registry corrupted
"""
return extension_id in self.data["extensions"]
extensions = self.data.get("extensions")
if not isinstance(extensions, dict):
return False
return extension_id in extensions
def list_by_priority(self) -> List[tuple]:
def list_by_priority(self, include_disabled: bool = False) -> List[tuple]:
"""Get all installed extensions sorted by priority.
Lower priority number = higher precedence (checked first).
Extensions with equal priority are sorted alphabetically by ID
for deterministic ordering.
Args:
include_disabled: If True, include disabled extensions. Default False.
Returns:
List of (extension_id, metadata_copy) tuples sorted by priority.
Metadata is deep-copied to prevent accidental mutation.
@@ -365,6 +421,9 @@ class ExtensionRegistry:
for ext_id, meta in extensions.items():
if not isinstance(meta, dict):
continue
# Skip disabled extensions unless explicitly requested
if not include_disabled and not meta.get("enabled", True):
continue
metadata_copy = copy.deepcopy(meta)
metadata_copy["priority"] = normalize_priority(metadata_copy.get("priority", 10))
sortable_extensions.append((ext_id, metadata_copy))
@@ -633,7 +692,7 @@ class ExtensionManager:
# Get registered commands before removal
metadata = self.registry.get(extension_id)
registered_commands = metadata.get("registered_commands", {})
registered_commands = metadata.get("registered_commands", {}) if metadata else {}
extension_dir = self.extensions_dir / extension_id

View File

@@ -238,7 +238,17 @@ class PresetRegistry:
try:
with open(self.registry_path, 'r') as f:
return json.load(f)
data = json.load(f)
# Validate loaded data is a dict (handles corrupted registry files)
if not isinstance(data, dict):
return {
"schema_version": self.SCHEMA_VERSION,
"presets": {}
}
# Normalize presets field (handles corrupted presets value)
if not isinstance(data.get("presets"), dict):
data["presets"] = {}
return data
except (json.JSONDecodeError, FileNotFoundError):
return {
"schema_version": self.SCHEMA_VERSION,
@@ -259,7 +269,7 @@ class PresetRegistry:
metadata: Pack metadata (version, source, etc.)
"""
self.data["presets"][pack_id] = {
**metadata,
**copy.deepcopy(metadata),
"installed_at": datetime.now(timezone.utc).isoformat()
}
self._save()
@@ -270,8 +280,11 @@ class PresetRegistry:
Args:
pack_id: Preset ID
"""
if pack_id in self.data["presets"]:
del self.data["presets"][pack_id]
packs = self.data.get("presets")
if not isinstance(packs, dict):
return
if pack_id in packs:
del packs[pack_id]
self._save()
def update(self, pack_id: str, updates: dict):
@@ -288,14 +301,15 @@ class PresetRegistry:
Raises:
KeyError: If preset is not installed
"""
if pack_id not in self.data["presets"]:
packs = self.data.get("presets")
if not isinstance(packs, dict) or pack_id not in packs:
raise KeyError(f"Preset '{pack_id}' not found in registry")
existing = self.data["presets"][pack_id]
existing = packs[pack_id]
# Handle corrupted registry entries (e.g., string/list instead of dict)
if not isinstance(existing, dict):
existing = {}
# Merge: existing fields preserved, new fields override
merged = {**existing, **updates}
# Merge: existing fields preserved, new fields override (deep copy to prevent caller mutation)
merged = {**existing, **copy.deepcopy(updates)}
# Always preserve original installed_at based on key existence, not truthiness,
# to handle cases where the field exists but may be falsy (legacy/corruption)
if "installed_at" in existing:
@@ -303,35 +317,95 @@ class PresetRegistry:
else:
# If not present in existing, explicitly remove from merged if caller provided it
merged.pop("installed_at", None)
self.data["presets"][pack_id] = merged
packs[pack_id] = merged
self._save()
def restore(self, pack_id: str, metadata: dict):
"""Restore preset metadata to registry without modifying timestamps.
Use this method for rollback scenarios where you have a complete backup
of the registry entry (including installed_at) and want to restore it
exactly as it was.
Args:
pack_id: Preset ID
metadata: Complete preset metadata including installed_at
Raises:
ValueError: If metadata is None or not a dict
"""
if metadata is None or not isinstance(metadata, dict):
raise ValueError(f"Cannot restore '{pack_id}': metadata must be a dict")
# Ensure presets dict exists (handle corrupted registry)
if not isinstance(self.data.get("presets"), dict):
self.data["presets"] = {}
self.data["presets"][pack_id] = copy.deepcopy(metadata)
self._save()
def get(self, pack_id: str) -> Optional[dict]:
"""Get preset metadata from registry.
Returns a deep copy to prevent callers from accidentally mutating
nested internal registry state without going through the write path.
Args:
pack_id: Preset ID
Returns:
Pack metadata or None if not found
Deep copy of preset metadata, or None if not found or corrupted
"""
return self.data["presets"].get(pack_id)
packs = self.data.get("presets")
if not isinstance(packs, dict):
return None
entry = packs.get(pack_id)
# Return None for missing or corrupted (non-dict) entries
if entry is None or not isinstance(entry, dict):
return None
return copy.deepcopy(entry)
def list(self) -> Dict[str, dict]:
"""Get all installed presets.
"""Get all installed presets with valid metadata.
Returns a deep copy of presets with dict metadata only.
Corrupted entries (non-dict values) are filtered out.
Returns:
Dictionary of pack_id -> metadata
Dictionary of pack_id -> metadata (deep copies), empty dict if corrupted
"""
return self.data["presets"]
packs = self.data.get("presets", {}) or {}
if not isinstance(packs, dict):
return {}
# Filter to only valid dict entries to match type contract
return {
pack_id: copy.deepcopy(meta)
for pack_id, meta in packs.items()
if isinstance(meta, dict)
}
def list_by_priority(self) -> List[tuple]:
def keys(self) -> set:
"""Get all preset IDs including corrupted entries.
Lightweight method that returns IDs without deep-copying metadata.
Use this when you only need to check which presets are tracked.
Returns:
Set of preset IDs (includes corrupted entries)
"""
packs = self.data.get("presets", {}) or {}
if not isinstance(packs, dict):
return set()
return set(packs.keys())
def list_by_priority(self, include_disabled: bool = False) -> List[tuple]:
"""Get all installed presets sorted by priority.
Lower priority number = higher precedence (checked first).
Presets with equal priority are sorted alphabetically by ID
for deterministic ordering.
Args:
include_disabled: If True, include disabled presets. Default False.
Returns:
List of (pack_id, metadata_copy) tuples sorted by priority.
Metadata is deep-copied to prevent accidental mutation.
@@ -343,6 +417,9 @@ class PresetRegistry:
for pack_id, meta in packs.items():
if not isinstance(meta, dict):
continue
# Skip disabled presets unless explicitly requested
if not include_disabled and not meta.get("enabled", True):
continue
metadata_copy = copy.deepcopy(meta)
metadata_copy["priority"] = normalize_priority(metadata_copy.get("priority", 10))
sortable_packs.append((pack_id, metadata_copy))
@@ -358,9 +435,12 @@ class PresetRegistry:
pack_id: Preset ID
Returns:
True if pack is installed
True if pack is installed, False if not or registry corrupted
"""
return pack_id in self.data["presets"]
packs = self.data.get("presets")
if not isinstance(packs, dict):
return False
return pack_id in packs
class PresetManager:
@@ -1466,12 +1546,20 @@ class PresetResolver:
return []
registry = ExtensionRegistry(self.extensions_dir)
registered_extensions = registry.list_by_priority()
registered_extension_ids = {ext_id for ext_id, _ in registered_extensions}
# Use keys() to track ALL extensions (including corrupted entries) without deep copy
# This prevents corrupted entries from being picked up as "unregistered" dirs
registered_extension_ids = registry.keys()
# Get all registered extensions including disabled; we filter disabled manually below
all_registered = registry.list_by_priority(include_disabled=True)
all_extensions: list[tuple[int, str, dict | None]] = []
for ext_id, metadata in registered_extensions:
# Only include enabled extensions in the result
for ext_id, metadata in all_registered:
# Skip disabled extensions
if not metadata.get("enabled", True):
continue
priority = normalize_priority(metadata.get("priority") if metadata else None)
all_extensions.append((priority, ext_id, metadata))