Files
claude-code/plugins/hookify/core/rule_engine.py
Claude ff0fdc0676 fix(security): Resolve symlinks before checking deny rules (CVE-2025-59829)
This commit fixes a security vulnerability where deny rules could be
bypassed by creating symbolic links to restricted files.

Changes:
- Add symlink resolution in rule_engine.py _extract_field method
- Add symlink resolution in security_reminder_hook.py check_patterns
- Create new symlink_deny_hook.py for blocking symlinks to system paths
- Include Read tool in file event handlers for deny rule checking
- Update hooks.json to apply security hooks to Read tool

The vulnerability allowed attackers to bypass deny rules like
Read(/etc/passwd) by creating a symlink (e.g., ln -s /etc/passwd test.txt)
and then reading the symlink instead of the restricted file directly.

The fix uses os.path.realpath() to resolve all symlinks to their canonical
paths before checking against deny patterns, ensuring that deny rules are
enforced regardless of whether the path is accessed directly or via symlink.
2026-01-08 20:08:08 +00:00

358 lines
12 KiB
Python

#!/usr/bin/env python3
"""Rule evaluation engine for hookify plugin."""
import re
import sys
from functools import lru_cache
from typing import List, Dict, Any, Optional
# Import from local module
from hookify.core.config_loader import Rule, Condition
# Cache compiled regexes (max 128 patterns)
@lru_cache(maxsize=128)
def compile_regex(pattern: str) -> re.Pattern:
"""Compile regex pattern with caching.
Args:
pattern: Regex pattern string
Returns:
Compiled regex pattern
"""
return re.compile(pattern, re.IGNORECASE)
class RuleEngine:
"""Evaluates rules against hook input data."""
def __init__(self):
"""Initialize rule engine."""
# No need for instance cache anymore - using global lru_cache
pass
def evaluate_rules(self, rules: List[Rule], input_data: Dict[str, Any]) -> Dict[str, Any]:
"""Evaluate all rules and return combined results.
Checks all rules and accumulates matches. Blocking rules take priority
over warning rules. All matching rule messages are combined.
Args:
rules: List of Rule objects to evaluate
input_data: Hook input JSON (tool_name, tool_input, etc.)
Returns:
Response dict with systemMessage, hookSpecificOutput, etc.
Empty dict {} if no rules match.
"""
hook_event = input_data.get('hook_event_name', '')
blocking_rules = []
warning_rules = []
for rule in rules:
if self._rule_matches(rule, input_data):
if rule.action == 'block':
blocking_rules.append(rule)
else:
warning_rules.append(rule)
# If any blocking rules matched, block the operation
if blocking_rules:
messages = [f"**[{r.name}]**\n{r.message}" for r in blocking_rules]
combined_message = "\n\n".join(messages)
# Use appropriate blocking format based on event type
if hook_event == 'Stop':
return {
"decision": "block",
"reason": combined_message,
"systemMessage": combined_message
}
elif hook_event in ['PreToolUse', 'PostToolUse']:
return {
"hookSpecificOutput": {
"hookEventName": hook_event,
"permissionDecision": "deny"
},
"systemMessage": combined_message
}
else:
# For other events, just show message
return {
"systemMessage": combined_message
}
# If only warnings, show them but allow operation
if warning_rules:
messages = [f"**[{r.name}]**\n{r.message}" for r in warning_rules]
return {
"systemMessage": "\n\n".join(messages)
}
# No matches - allow operation
return {}
def _rule_matches(self, rule: Rule, input_data: Dict[str, Any]) -> bool:
"""Check if rule matches input data.
Args:
rule: Rule to evaluate
input_data: Hook input data
Returns:
True if rule matches, False otherwise
"""
# Extract tool information
tool_name = input_data.get('tool_name', '')
tool_input = input_data.get('tool_input', {})
# Check tool matcher if specified
if rule.tool_matcher:
if not self._matches_tool(rule.tool_matcher, tool_name):
return False
# If no conditions, don't match
# (Rules must have at least one condition to be valid)
if not rule.conditions:
return False
# All conditions must match
for condition in rule.conditions:
if not self._check_condition(condition, tool_name, tool_input, input_data):
return False
return True
def _matches_tool(self, matcher: str, tool_name: str) -> bool:
"""Check if tool_name matches the matcher pattern.
Args:
matcher: Pattern like "Bash", "Edit|Write", "*"
tool_name: Actual tool name
Returns:
True if matches
"""
if matcher == '*':
return True
# Split on | for OR matching
patterns = matcher.split('|')
return tool_name in patterns
def _resolve_symlink_path(self, file_path: str) -> str:
"""Resolve symlinks in file path to get canonical path.
Security fix for CVE-2025-59829: Deny rules could be bypassed by creating
a symlink to a restricted file. This method resolves the symlink to its
target path so that deny rules are checked against the actual file.
Args:
file_path: The file path that may contain symlinks
Returns:
The canonical path with symlinks resolved, or original path if
resolution fails (e.g., file doesn't exist yet)
"""
import os
if not file_path:
return file_path
try:
# Expand user home directory first
expanded_path = os.path.expanduser(file_path)
# Use realpath to resolve all symlinks and get canonical path
# This handles nested symlinks and relative path components
resolved = os.path.realpath(expanded_path)
return resolved
except (OSError, ValueError):
# If resolution fails (e.g., permission denied, invalid path),
# return the original path to avoid blocking legitimate operations
return file_path
def _check_condition(self, condition: Condition, tool_name: str,
tool_input: Dict[str, Any], input_data: Dict[str, Any] = None) -> bool:
"""Check if a single condition matches.
Args:
condition: Condition to check
tool_name: Tool being used
tool_input: Tool input dict
input_data: Full hook input data (for Stop events, etc.)
Returns:
True if condition matches
"""
# Extract the field value to check
field_value = self._extract_field(condition.field, tool_name, tool_input, input_data)
if field_value is None:
return False
# Apply operator
operator = condition.operator
pattern = condition.pattern
if operator == 'regex_match':
return self._regex_match(pattern, field_value)
elif operator == 'contains':
return pattern in field_value
elif operator == 'equals':
return pattern == field_value
elif operator == 'not_contains':
return pattern not in field_value
elif operator == 'starts_with':
return field_value.startswith(pattern)
elif operator == 'ends_with':
return field_value.endswith(pattern)
else:
# Unknown operator
return False
def _extract_field(self, field: str, tool_name: str,
tool_input: Dict[str, Any], input_data: Dict[str, Any] = None) -> Optional[str]:
"""Extract field value from tool input or hook input data.
Args:
field: Field name like "command", "new_text", "file_path", "reason", "transcript"
tool_name: Tool being used (may be empty for Stop events)
tool_input: Tool input dict
input_data: Full hook input (for accessing transcript_path, reason, etc.)
Returns:
Field value as string, or None if not found
"""
# Direct tool_input fields
if field in tool_input:
value = tool_input[field]
if isinstance(value, str):
# Security fix: resolve symlinks for file_path fields to prevent bypass
# CVE-2025-59829: Deny rules could be bypassed via symlinks
if field == 'file_path':
value = self._resolve_symlink_path(value)
return value
return str(value)
# For Stop events and other non-tool events, check input_data
if input_data:
# Stop event specific fields
if field == 'reason':
return input_data.get('reason', '')
elif field == 'transcript':
# Read transcript file if path provided
transcript_path = input_data.get('transcript_path')
if transcript_path:
try:
with open(transcript_path, 'r') as f:
return f.read()
except FileNotFoundError:
print(f"Warning: Transcript file not found: {transcript_path}", file=sys.stderr)
return ''
except PermissionError:
print(f"Warning: Permission denied reading transcript: {transcript_path}", file=sys.stderr)
return ''
except (IOError, OSError) as e:
print(f"Warning: Error reading transcript {transcript_path}: {e}", file=sys.stderr)
return ''
except UnicodeDecodeError as e:
print(f"Warning: Encoding error in transcript {transcript_path}: {e}", file=sys.stderr)
return ''
elif field == 'user_prompt':
# For UserPromptSubmit events
return input_data.get('user_prompt', '')
# Handle special cases by tool type
if tool_name == 'Bash':
if field == 'command':
return tool_input.get('command', '')
elif tool_name in ['Write', 'Edit']:
if field == 'content':
# Write uses 'content', Edit has 'new_string'
return tool_input.get('content') or tool_input.get('new_string', '')
elif field == 'new_text' or field == 'new_string':
return tool_input.get('new_string', '')
elif field == 'old_text' or field == 'old_string':
return tool_input.get('old_string', '')
elif field == 'file_path':
# Security fix: resolve symlinks to prevent deny rule bypass
return self._resolve_symlink_path(tool_input.get('file_path', ''))
elif tool_name == 'Read':
# Security fix for CVE-2025-59829: Read tool symlink bypass
if field == 'file_path':
return self._resolve_symlink_path(tool_input.get('file_path', ''))
elif tool_name == 'MultiEdit':
if field == 'file_path':
# Security fix: resolve symlinks to prevent deny rule bypass
return self._resolve_symlink_path(tool_input.get('file_path', ''))
elif field in ['new_text', 'content']:
# Concatenate all edits
edits = tool_input.get('edits', [])
return ' '.join(e.get('new_string', '') for e in edits)
return None
def _regex_match(self, pattern: str, text: str) -> bool:
"""Check if pattern matches text using regex.
Args:
pattern: Regex pattern
text: Text to match against
Returns:
True if pattern matches
"""
try:
# Use cached compiled regex (LRU cache with max 128 patterns)
regex = compile_regex(pattern)
return bool(regex.search(text))
except re.error as e:
print(f"Invalid regex pattern '{pattern}': {e}", file=sys.stderr)
return False
# For testing
if __name__ == '__main__':
from hookify.core.config_loader import Condition, Rule
# Test rule evaluation
rule = Rule(
name="test-rm",
enabled=True,
event="bash",
conditions=[
Condition(field="command", operator="regex_match", pattern=r"rm\s+-rf")
],
message="Dangerous rm command!"
)
engine = RuleEngine()
# Test matching input
test_input = {
"tool_name": "Bash",
"tool_input": {
"command": "rm -rf /tmp/test"
}
}
result = engine.evaluate_rules([rule], test_input)
print("Match result:", result)
# Test non-matching input
test_input2 = {
"tool_name": "Bash",
"tool_input": {
"command": "ls -la"
}
}
result2 = engine.evaluate_rules([rule], test_input2)
print("Non-match result:", result2)