mirror of
https://github.com/anthropics/claude-plugins-official.git
synced 2026-01-31 04:52:02 +00:00
creating intital scaffolding for claude code plugins
This commit is contained in:
0
plugins/hookify/core/__init__.py
Normal file
0
plugins/hookify/core/__init__.py
Normal file
297
plugins/hookify/core/config_loader.py
Normal file
297
plugins/hookify/core/config_loader.py
Normal file
@@ -0,0 +1,297 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Configuration loader for hookify plugin.
|
||||
|
||||
Loads and parses .claude/hookify.*.local.md files.
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import glob
|
||||
import re
|
||||
from typing import List, Optional, Dict, Any
|
||||
from dataclasses import dataclass, field
|
||||
|
||||
|
||||
@dataclass
|
||||
class Condition:
|
||||
"""A single condition for matching."""
|
||||
field: str # "command", "new_text", "old_text", "file_path", etc.
|
||||
operator: str # "regex_match", "contains", "equals", etc.
|
||||
pattern: str # Pattern to match
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data: Dict[str, Any]) -> 'Condition':
|
||||
"""Create Condition from dict."""
|
||||
return cls(
|
||||
field=data.get('field', ''),
|
||||
operator=data.get('operator', 'regex_match'),
|
||||
pattern=data.get('pattern', '')
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class Rule:
|
||||
"""A hookify rule."""
|
||||
name: str
|
||||
enabled: bool
|
||||
event: str # "bash", "file", "stop", "all", etc.
|
||||
pattern: Optional[str] = None # Simple pattern (legacy)
|
||||
conditions: List[Condition] = field(default_factory=list)
|
||||
action: str = "warn" # "warn" or "block" (future)
|
||||
tool_matcher: Optional[str] = None # Override tool matching
|
||||
message: str = "" # Message body from markdown
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, frontmatter: Dict[str, Any], message: str) -> 'Rule':
|
||||
"""Create Rule from frontmatter dict and message body."""
|
||||
# Handle both simple pattern and complex conditions
|
||||
conditions = []
|
||||
|
||||
# New style: explicit conditions list
|
||||
if 'conditions' in frontmatter:
|
||||
cond_list = frontmatter['conditions']
|
||||
if isinstance(cond_list, list):
|
||||
conditions = [Condition.from_dict(c) for c in cond_list]
|
||||
|
||||
# Legacy style: simple pattern field
|
||||
simple_pattern = frontmatter.get('pattern')
|
||||
if simple_pattern and not conditions:
|
||||
# Convert simple pattern to condition
|
||||
# Infer field from event
|
||||
event = frontmatter.get('event', 'all')
|
||||
if event == 'bash':
|
||||
field = 'command'
|
||||
elif event == 'file':
|
||||
field = 'new_text'
|
||||
else:
|
||||
field = 'content'
|
||||
|
||||
conditions = [Condition(
|
||||
field=field,
|
||||
operator='regex_match',
|
||||
pattern=simple_pattern
|
||||
)]
|
||||
|
||||
return cls(
|
||||
name=frontmatter.get('name', 'unnamed'),
|
||||
enabled=frontmatter.get('enabled', True),
|
||||
event=frontmatter.get('event', 'all'),
|
||||
pattern=simple_pattern,
|
||||
conditions=conditions,
|
||||
action=frontmatter.get('action', 'warn'),
|
||||
tool_matcher=frontmatter.get('tool_matcher'),
|
||||
message=message.strip()
|
||||
)
|
||||
|
||||
|
||||
def extract_frontmatter(content: str) -> tuple[Dict[str, Any], str]:
|
||||
"""Extract YAML frontmatter and message body from markdown.
|
||||
|
||||
Returns (frontmatter_dict, message_body).
|
||||
|
||||
Supports multi-line dictionary items in lists by preserving indentation.
|
||||
"""
|
||||
if not content.startswith('---'):
|
||||
return {}, content
|
||||
|
||||
# Split on --- markers
|
||||
parts = content.split('---', 2)
|
||||
if len(parts) < 3:
|
||||
return {}, content
|
||||
|
||||
frontmatter_text = parts[1]
|
||||
message = parts[2].strip()
|
||||
|
||||
# Simple YAML parser that handles indented list items
|
||||
frontmatter = {}
|
||||
lines = frontmatter_text.split('\n')
|
||||
|
||||
current_key = None
|
||||
current_list = []
|
||||
current_dict = {}
|
||||
in_list = False
|
||||
in_dict_item = False
|
||||
|
||||
for line in lines:
|
||||
# Skip empty lines and comments
|
||||
stripped = line.strip()
|
||||
if not stripped or stripped.startswith('#'):
|
||||
continue
|
||||
|
||||
# Check indentation level
|
||||
indent = len(line) - len(line.lstrip())
|
||||
|
||||
# Top-level key (no indentation or minimal)
|
||||
if indent == 0 and ':' in line and not line.strip().startswith('-'):
|
||||
# Save previous list/dict if any
|
||||
if in_list and current_key:
|
||||
if in_dict_item and current_dict:
|
||||
current_list.append(current_dict)
|
||||
current_dict = {}
|
||||
frontmatter[current_key] = current_list
|
||||
in_list = False
|
||||
in_dict_item = False
|
||||
current_list = []
|
||||
|
||||
key, value = line.split(':', 1)
|
||||
key = key.strip()
|
||||
value = value.strip()
|
||||
|
||||
if not value:
|
||||
# Empty value - list or nested structure follows
|
||||
current_key = key
|
||||
in_list = True
|
||||
current_list = []
|
||||
else:
|
||||
# Simple key-value pair
|
||||
value = value.strip('"').strip("'")
|
||||
if value.lower() == 'true':
|
||||
value = True
|
||||
elif value.lower() == 'false':
|
||||
value = False
|
||||
frontmatter[key] = value
|
||||
|
||||
# List item (starts with -)
|
||||
elif stripped.startswith('-') and in_list:
|
||||
# Save previous dict item if any
|
||||
if in_dict_item and current_dict:
|
||||
current_list.append(current_dict)
|
||||
current_dict = {}
|
||||
|
||||
item_text = stripped[1:].strip()
|
||||
|
||||
# Check if this is an inline dict (key: value on same line)
|
||||
if ':' in item_text and ',' in item_text:
|
||||
# Inline comma-separated dict: "- field: command, operator: regex_match"
|
||||
item_dict = {}
|
||||
for part in item_text.split(','):
|
||||
if ':' in part:
|
||||
k, v = part.split(':', 1)
|
||||
item_dict[k.strip()] = v.strip().strip('"').strip("'")
|
||||
current_list.append(item_dict)
|
||||
in_dict_item = False
|
||||
elif ':' in item_text:
|
||||
# Start of multi-line dict item: "- field: command"
|
||||
in_dict_item = True
|
||||
k, v = item_text.split(':', 1)
|
||||
current_dict = {k.strip(): v.strip().strip('"').strip("'")}
|
||||
else:
|
||||
# Simple list item
|
||||
current_list.append(item_text.strip('"').strip("'"))
|
||||
in_dict_item = False
|
||||
|
||||
# Continuation of dict item (indented under list item)
|
||||
elif indent > 2 and in_dict_item and ':' in line:
|
||||
# This is a field of the current dict item
|
||||
k, v = stripped.split(':', 1)
|
||||
current_dict[k.strip()] = v.strip().strip('"').strip("'")
|
||||
|
||||
# Save final list/dict if any
|
||||
if in_list and current_key:
|
||||
if in_dict_item and current_dict:
|
||||
current_list.append(current_dict)
|
||||
frontmatter[current_key] = current_list
|
||||
|
||||
return frontmatter, message
|
||||
|
||||
|
||||
def load_rules(event: Optional[str] = None) -> List[Rule]:
|
||||
"""Load all hookify rules from .claude directory.
|
||||
|
||||
Args:
|
||||
event: Optional event filter ("bash", "file", "stop", etc.)
|
||||
|
||||
Returns:
|
||||
List of enabled Rule objects matching the event.
|
||||
"""
|
||||
rules = []
|
||||
|
||||
# Find all hookify.*.local.md files
|
||||
pattern = os.path.join('.claude', 'hookify.*.local.md')
|
||||
files = glob.glob(pattern)
|
||||
|
||||
for file_path in files:
|
||||
try:
|
||||
rule = load_rule_file(file_path)
|
||||
if not rule:
|
||||
continue
|
||||
|
||||
# Filter by event if specified
|
||||
if event:
|
||||
if rule.event != 'all' and rule.event != event:
|
||||
continue
|
||||
|
||||
# Only include enabled rules
|
||||
if rule.enabled:
|
||||
rules.append(rule)
|
||||
|
||||
except (IOError, OSError, PermissionError) as e:
|
||||
# File I/O errors - log and continue
|
||||
print(f"Warning: Failed to read {file_path}: {e}", file=sys.stderr)
|
||||
continue
|
||||
except (ValueError, KeyError, AttributeError, TypeError) as e:
|
||||
# Parsing errors - log and continue
|
||||
print(f"Warning: Failed to parse {file_path}: {e}", file=sys.stderr)
|
||||
continue
|
||||
except Exception as e:
|
||||
# Unexpected errors - log with type details
|
||||
print(f"Warning: Unexpected error loading {file_path} ({type(e).__name__}): {e}", file=sys.stderr)
|
||||
continue
|
||||
|
||||
return rules
|
||||
|
||||
|
||||
def load_rule_file(file_path: str) -> Optional[Rule]:
|
||||
"""Load a single rule file.
|
||||
|
||||
Returns:
|
||||
Rule object or None if file is invalid.
|
||||
"""
|
||||
try:
|
||||
with open(file_path, 'r') as f:
|
||||
content = f.read()
|
||||
|
||||
frontmatter, message = extract_frontmatter(content)
|
||||
|
||||
if not frontmatter:
|
||||
print(f"Warning: {file_path} missing YAML frontmatter (must start with ---)", file=sys.stderr)
|
||||
return None
|
||||
|
||||
rule = Rule.from_dict(frontmatter, message)
|
||||
return rule
|
||||
|
||||
except (IOError, OSError, PermissionError) as e:
|
||||
print(f"Error: Cannot read {file_path}: {e}", file=sys.stderr)
|
||||
return None
|
||||
except (ValueError, KeyError, AttributeError, TypeError) as e:
|
||||
print(f"Error: Malformed rule file {file_path}: {e}", file=sys.stderr)
|
||||
return None
|
||||
except UnicodeDecodeError as e:
|
||||
print(f"Error: Invalid encoding in {file_path}: {e}", file=sys.stderr)
|
||||
return None
|
||||
except Exception as e:
|
||||
print(f"Error: Unexpected error parsing {file_path} ({type(e).__name__}): {e}", file=sys.stderr)
|
||||
return None
|
||||
|
||||
|
||||
# For testing
|
||||
if __name__ == '__main__':
|
||||
import sys
|
||||
|
||||
# Test frontmatter parsing
|
||||
test_content = """---
|
||||
name: test-rule
|
||||
enabled: true
|
||||
event: bash
|
||||
pattern: "rm -rf"
|
||||
---
|
||||
|
||||
⚠️ Dangerous command detected!
|
||||
"""
|
||||
|
||||
fm, msg = extract_frontmatter(test_content)
|
||||
print("Frontmatter:", fm)
|
||||
print("Message:", msg)
|
||||
|
||||
rule = Rule.from_dict(fm, msg)
|
||||
print("Rule:", rule)
|
||||
313
plugins/hookify/core/rule_engine.py
Normal file
313
plugins/hookify/core/rule_engine.py
Normal file
@@ -0,0 +1,313 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Rule evaluation engine for hookify plugin."""
|
||||
|
||||
import re
|
||||
import sys
|
||||
from functools import lru_cache
|
||||
from typing import List, Dict, Any, Optional
|
||||
|
||||
# Import from local module
|
||||
from hookify.core.config_loader import Rule, Condition
|
||||
|
||||
|
||||
# Cache compiled regexes (max 128 patterns)
|
||||
@lru_cache(maxsize=128)
|
||||
def compile_regex(pattern: str) -> re.Pattern:
|
||||
"""Compile regex pattern with caching.
|
||||
|
||||
Args:
|
||||
pattern: Regex pattern string
|
||||
|
||||
Returns:
|
||||
Compiled regex pattern
|
||||
"""
|
||||
return re.compile(pattern, re.IGNORECASE)
|
||||
|
||||
|
||||
class RuleEngine:
|
||||
"""Evaluates rules against hook input data."""
|
||||
|
||||
def __init__(self):
|
||||
"""Initialize rule engine."""
|
||||
# No need for instance cache anymore - using global lru_cache
|
||||
pass
|
||||
|
||||
def evaluate_rules(self, rules: List[Rule], input_data: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""Evaluate all rules and return combined results.
|
||||
|
||||
Checks all rules and accumulates matches. Blocking rules take priority
|
||||
over warning rules. All matching rule messages are combined.
|
||||
|
||||
Args:
|
||||
rules: List of Rule objects to evaluate
|
||||
input_data: Hook input JSON (tool_name, tool_input, etc.)
|
||||
|
||||
Returns:
|
||||
Response dict with systemMessage, hookSpecificOutput, etc.
|
||||
Empty dict {} if no rules match.
|
||||
"""
|
||||
hook_event = input_data.get('hook_event_name', '')
|
||||
blocking_rules = []
|
||||
warning_rules = []
|
||||
|
||||
for rule in rules:
|
||||
if self._rule_matches(rule, input_data):
|
||||
if rule.action == 'block':
|
||||
blocking_rules.append(rule)
|
||||
else:
|
||||
warning_rules.append(rule)
|
||||
|
||||
# If any blocking rules matched, block the operation
|
||||
if blocking_rules:
|
||||
messages = [f"**[{r.name}]**\n{r.message}" for r in blocking_rules]
|
||||
combined_message = "\n\n".join(messages)
|
||||
|
||||
# Use appropriate blocking format based on event type
|
||||
if hook_event == 'Stop':
|
||||
return {
|
||||
"decision": "block",
|
||||
"reason": combined_message,
|
||||
"systemMessage": combined_message
|
||||
}
|
||||
elif hook_event in ['PreToolUse', 'PostToolUse']:
|
||||
return {
|
||||
"hookSpecificOutput": {
|
||||
"hookEventName": hook_event,
|
||||
"permissionDecision": "deny"
|
||||
},
|
||||
"systemMessage": combined_message
|
||||
}
|
||||
else:
|
||||
# For other events, just show message
|
||||
return {
|
||||
"systemMessage": combined_message
|
||||
}
|
||||
|
||||
# If only warnings, show them but allow operation
|
||||
if warning_rules:
|
||||
messages = [f"**[{r.name}]**\n{r.message}" for r in warning_rules]
|
||||
return {
|
||||
"systemMessage": "\n\n".join(messages)
|
||||
}
|
||||
|
||||
# No matches - allow operation
|
||||
return {}
|
||||
|
||||
def _rule_matches(self, rule: Rule, input_data: Dict[str, Any]) -> bool:
|
||||
"""Check if rule matches input data.
|
||||
|
||||
Args:
|
||||
rule: Rule to evaluate
|
||||
input_data: Hook input data
|
||||
|
||||
Returns:
|
||||
True if rule matches, False otherwise
|
||||
"""
|
||||
# Extract tool information
|
||||
tool_name = input_data.get('tool_name', '')
|
||||
tool_input = input_data.get('tool_input', {})
|
||||
|
||||
# Check tool matcher if specified
|
||||
if rule.tool_matcher:
|
||||
if not self._matches_tool(rule.tool_matcher, tool_name):
|
||||
return False
|
||||
|
||||
# If no conditions, don't match
|
||||
# (Rules must have at least one condition to be valid)
|
||||
if not rule.conditions:
|
||||
return False
|
||||
|
||||
# All conditions must match
|
||||
for condition in rule.conditions:
|
||||
if not self._check_condition(condition, tool_name, tool_input, input_data):
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def _matches_tool(self, matcher: str, tool_name: str) -> bool:
|
||||
"""Check if tool_name matches the matcher pattern.
|
||||
|
||||
Args:
|
||||
matcher: Pattern like "Bash", "Edit|Write", "*"
|
||||
tool_name: Actual tool name
|
||||
|
||||
Returns:
|
||||
True if matches
|
||||
"""
|
||||
if matcher == '*':
|
||||
return True
|
||||
|
||||
# Split on | for OR matching
|
||||
patterns = matcher.split('|')
|
||||
return tool_name in patterns
|
||||
|
||||
def _check_condition(self, condition: Condition, tool_name: str,
|
||||
tool_input: Dict[str, Any], input_data: Dict[str, Any] = None) -> bool:
|
||||
"""Check if a single condition matches.
|
||||
|
||||
Args:
|
||||
condition: Condition to check
|
||||
tool_name: Tool being used
|
||||
tool_input: Tool input dict
|
||||
input_data: Full hook input data (for Stop events, etc.)
|
||||
|
||||
Returns:
|
||||
True if condition matches
|
||||
"""
|
||||
# Extract the field value to check
|
||||
field_value = self._extract_field(condition.field, tool_name, tool_input, input_data)
|
||||
if field_value is None:
|
||||
return False
|
||||
|
||||
# Apply operator
|
||||
operator = condition.operator
|
||||
pattern = condition.pattern
|
||||
|
||||
if operator == 'regex_match':
|
||||
return self._regex_match(pattern, field_value)
|
||||
elif operator == 'contains':
|
||||
return pattern in field_value
|
||||
elif operator == 'equals':
|
||||
return pattern == field_value
|
||||
elif operator == 'not_contains':
|
||||
return pattern not in field_value
|
||||
elif operator == 'starts_with':
|
||||
return field_value.startswith(pattern)
|
||||
elif operator == 'ends_with':
|
||||
return field_value.endswith(pattern)
|
||||
else:
|
||||
# Unknown operator
|
||||
return False
|
||||
|
||||
def _extract_field(self, field: str, tool_name: str,
|
||||
tool_input: Dict[str, Any], input_data: Dict[str, Any] = None) -> Optional[str]:
|
||||
"""Extract field value from tool input or hook input data.
|
||||
|
||||
Args:
|
||||
field: Field name like "command", "new_text", "file_path", "reason", "transcript"
|
||||
tool_name: Tool being used (may be empty for Stop events)
|
||||
tool_input: Tool input dict
|
||||
input_data: Full hook input (for accessing transcript_path, reason, etc.)
|
||||
|
||||
Returns:
|
||||
Field value as string, or None if not found
|
||||
"""
|
||||
# Direct tool_input fields
|
||||
if field in tool_input:
|
||||
value = tool_input[field]
|
||||
if isinstance(value, str):
|
||||
return value
|
||||
return str(value)
|
||||
|
||||
# For Stop events and other non-tool events, check input_data
|
||||
if input_data:
|
||||
# Stop event specific fields
|
||||
if field == 'reason':
|
||||
return input_data.get('reason', '')
|
||||
elif field == 'transcript':
|
||||
# Read transcript file if path provided
|
||||
transcript_path = input_data.get('transcript_path')
|
||||
if transcript_path:
|
||||
try:
|
||||
with open(transcript_path, 'r') as f:
|
||||
return f.read()
|
||||
except FileNotFoundError:
|
||||
print(f"Warning: Transcript file not found: {transcript_path}", file=sys.stderr)
|
||||
return ''
|
||||
except PermissionError:
|
||||
print(f"Warning: Permission denied reading transcript: {transcript_path}", file=sys.stderr)
|
||||
return ''
|
||||
except (IOError, OSError) as e:
|
||||
print(f"Warning: Error reading transcript {transcript_path}: {e}", file=sys.stderr)
|
||||
return ''
|
||||
except UnicodeDecodeError as e:
|
||||
print(f"Warning: Encoding error in transcript {transcript_path}: {e}", file=sys.stderr)
|
||||
return ''
|
||||
elif field == 'user_prompt':
|
||||
# For UserPromptSubmit events
|
||||
return input_data.get('user_prompt', '')
|
||||
|
||||
# Handle special cases by tool type
|
||||
if tool_name == 'Bash':
|
||||
if field == 'command':
|
||||
return tool_input.get('command', '')
|
||||
|
||||
elif tool_name in ['Write', 'Edit']:
|
||||
if field == 'content':
|
||||
# Write uses 'content', Edit has 'new_string'
|
||||
return tool_input.get('content') or tool_input.get('new_string', '')
|
||||
elif field == 'new_text' or field == 'new_string':
|
||||
return tool_input.get('new_string', '')
|
||||
elif field == 'old_text' or field == 'old_string':
|
||||
return tool_input.get('old_string', '')
|
||||
elif field == 'file_path':
|
||||
return tool_input.get('file_path', '')
|
||||
|
||||
elif tool_name == 'MultiEdit':
|
||||
if field == 'file_path':
|
||||
return tool_input.get('file_path', '')
|
||||
elif field in ['new_text', 'content']:
|
||||
# Concatenate all edits
|
||||
edits = tool_input.get('edits', [])
|
||||
return ' '.join(e.get('new_string', '') for e in edits)
|
||||
|
||||
return None
|
||||
|
||||
def _regex_match(self, pattern: str, text: str) -> bool:
|
||||
"""Check if pattern matches text using regex.
|
||||
|
||||
Args:
|
||||
pattern: Regex pattern
|
||||
text: Text to match against
|
||||
|
||||
Returns:
|
||||
True if pattern matches
|
||||
"""
|
||||
try:
|
||||
# Use cached compiled regex (LRU cache with max 128 patterns)
|
||||
regex = compile_regex(pattern)
|
||||
return bool(regex.search(text))
|
||||
|
||||
except re.error as e:
|
||||
print(f"Invalid regex pattern '{pattern}': {e}", file=sys.stderr)
|
||||
return False
|
||||
|
||||
|
||||
# For testing
|
||||
if __name__ == '__main__':
|
||||
from hookify.core.config_loader import Condition, Rule
|
||||
|
||||
# Test rule evaluation
|
||||
rule = Rule(
|
||||
name="test-rm",
|
||||
enabled=True,
|
||||
event="bash",
|
||||
conditions=[
|
||||
Condition(field="command", operator="regex_match", pattern=r"rm\s+-rf")
|
||||
],
|
||||
message="Dangerous rm command!"
|
||||
)
|
||||
|
||||
engine = RuleEngine()
|
||||
|
||||
# Test matching input
|
||||
test_input = {
|
||||
"tool_name": "Bash",
|
||||
"tool_input": {
|
||||
"command": "rm -rf /tmp/test"
|
||||
}
|
||||
}
|
||||
|
||||
result = engine.evaluate_rules([rule], test_input)
|
||||
print("Match result:", result)
|
||||
|
||||
# Test non-matching input
|
||||
test_input2 = {
|
||||
"tool_name": "Bash",
|
||||
"tool_input": {
|
||||
"command": "ls -la"
|
||||
}
|
||||
}
|
||||
|
||||
result2 = engine.evaluate_rules([rule], test_input2)
|
||||
print("Non-match result:", result2)
|
||||
Reference in New Issue
Block a user