Files
autocoder/api/dependency_resolver.py
rudiheydra d68d70c800 fix: prevent infinite loop in compute_scheduling_scores with circular deps
Add visited set to BFS algorithm to handle circular dependencies gracefully.
Previously, cycles in the dependency graph caused the orchestrator to hang
at 100% CPU indefinitely during startup.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-29 10:09:40 +11:00

448 lines
14 KiB
Python

"""
Dependency Resolver
===================
Provides dependency resolution using Kahn's algorithm for topological sorting.
Includes cycle detection, validation, and helper functions for dependency management.
"""
import heapq
from typing import TypedDict
# Security: Prevent DoS via excessive dependencies
MAX_DEPENDENCIES_PER_FEATURE = 20
MAX_DEPENDENCY_DEPTH = 50 # Prevent stack overflow in cycle detection
class DependencyResult(TypedDict):
"""Result from dependency resolution."""
ordered_features: list[dict]
circular_dependencies: list[list[int]]
blocked_features: dict[int, list[int]] # feature_id -> [blocking_ids]
missing_dependencies: dict[int, list[int]] # feature_id -> [missing_ids]
def resolve_dependencies(features: list[dict]) -> DependencyResult:
"""Topological sort using Kahn's algorithm with priority-aware ordering.
Returns ordered features respecting dependencies, plus metadata about
cycles, blocked features, and missing dependencies.
Args:
features: List of feature dicts with id, priority, passes, and dependencies fields
Returns:
DependencyResult with ordered_features, circular_dependencies,
blocked_features, and missing_dependencies
"""
feature_map = {f["id"]: f for f in features}
in_degree = {f["id"]: 0 for f in features}
adjacency: dict[int, list[int]] = {f["id"]: [] for f in features}
blocked: dict[int, list[int]] = {}
missing: dict[int, list[int]] = {}
# Build graph
for feature in features:
deps = feature.get("dependencies") or []
for dep_id in deps:
if dep_id not in feature_map:
missing.setdefault(feature["id"], []).append(dep_id)
else:
adjacency[dep_id].append(feature["id"])
in_degree[feature["id"]] += 1
# Track blocked features
dep = feature_map[dep_id]
if not dep.get("passes"):
blocked.setdefault(feature["id"], []).append(dep_id)
# Kahn's algorithm with priority-aware selection using a heap
# Heap entries are tuples: (priority, id, feature_dict) for stable ordering
heap = [
(f.get("priority", 999), f["id"], f)
for f in features
if in_degree[f["id"]] == 0
]
heapq.heapify(heap)
ordered: list[dict] = []
while heap:
_, _, current = heapq.heappop(heap)
ordered.append(current)
for dependent_id in adjacency[current["id"]]:
in_degree[dependent_id] -= 1
if in_degree[dependent_id] == 0:
dep_feature = feature_map[dependent_id]
heapq.heappush(
heap,
(dep_feature.get("priority", 999), dependent_id, dep_feature)
)
# Detect cycles (features not in ordered = part of cycle)
cycles: list[list[int]] = []
if len(ordered) < len(features):
remaining = [f for f in features if f not in ordered]
cycles = _detect_cycles(remaining, feature_map)
ordered.extend(remaining) # Add cyclic features at end
return {
"ordered_features": ordered,
"circular_dependencies": cycles,
"blocked_features": blocked,
"missing_dependencies": missing,
}
def are_dependencies_satisfied(
feature: dict,
all_features: list[dict],
passing_ids: set[int] | None = None,
) -> bool:
"""Check if all dependencies have passes=True.
Args:
feature: Feature dict to check
all_features: List of all feature dicts
passing_ids: Optional pre-computed set of passing feature IDs.
If None, will be computed from all_features. Pass this when
calling in a loop to avoid O(n^2) complexity.
Returns:
True if all dependencies are satisfied (or no dependencies)
"""
deps = feature.get("dependencies") or []
if not deps:
return True
if passing_ids is None:
passing_ids = {f["id"] for f in all_features if f.get("passes")}
return all(dep_id in passing_ids for dep_id in deps)
def get_blocking_dependencies(
feature: dict,
all_features: list[dict],
passing_ids: set[int] | None = None,
) -> list[int]:
"""Get list of incomplete dependency IDs.
Args:
feature: Feature dict to check
all_features: List of all feature dicts
passing_ids: Optional pre-computed set of passing feature IDs.
If None, will be computed from all_features. Pass this when
calling in a loop to avoid O(n^2) complexity.
Returns:
List of feature IDs that are blocking this feature
"""
deps = feature.get("dependencies") or []
if passing_ids is None:
passing_ids = {f["id"] for f in all_features if f.get("passes")}
return [dep_id for dep_id in deps if dep_id not in passing_ids]
def would_create_circular_dependency(
features: list[dict], source_id: int, target_id: int
) -> bool:
"""Check if adding a dependency from target to source would create a cycle.
Uses DFS with visited set for efficient cycle detection.
Args:
features: List of all feature dicts
source_id: The feature that would gain the dependency
target_id: The feature that would become a dependency
Returns:
True if adding the dependency would create a cycle
"""
if source_id == target_id:
return True # Self-reference is a cycle
feature_map = {f["id"]: f for f in features}
source = feature_map.get(source_id)
if not source:
return False
# Check if target already depends on source (direct or indirect)
target = feature_map.get(target_id)
if not target:
return False
# DFS from target to see if we can reach source
visited: set[int] = set()
def can_reach(current_id: int, depth: int = 0) -> bool:
# Security: Prevent stack overflow with depth limit
if depth > MAX_DEPENDENCY_DEPTH:
return True # Assume cycle if too deep (fail-safe)
if current_id == source_id:
return True
if current_id in visited:
return False
visited.add(current_id)
current = feature_map.get(current_id)
if not current:
return False
deps = current.get("dependencies") or []
for dep_id in deps:
if can_reach(dep_id, depth + 1):
return True
return False
return can_reach(target_id)
def validate_dependencies(
feature_id: int, dependency_ids: list[int], all_feature_ids: set[int]
) -> tuple[bool, str]:
"""Validate dependency list.
Args:
feature_id: ID of the feature being validated
dependency_ids: List of proposed dependency IDs
all_feature_ids: Set of all valid feature IDs
Returns:
Tuple of (is_valid, error_message)
"""
# Security: Check limits
if len(dependency_ids) > MAX_DEPENDENCIES_PER_FEATURE:
return False, f"Maximum {MAX_DEPENDENCIES_PER_FEATURE} dependencies allowed"
# Check self-reference
if feature_id in dependency_ids:
return False, "A feature cannot depend on itself"
# Check all dependencies exist
missing = [d for d in dependency_ids if d not in all_feature_ids]
if missing:
return False, f"Dependencies not found: {missing}"
# Check for duplicates
if len(dependency_ids) != len(set(dependency_ids)):
return False, "Duplicate dependencies not allowed"
return True, ""
def _detect_cycles(features: list[dict], feature_map: dict) -> list[list[int]]:
"""Detect cycles using DFS with recursion tracking.
Args:
features: List of features to check for cycles
feature_map: Map of feature_id -> feature dict
Returns:
List of cycles, where each cycle is a list of feature IDs
"""
cycles: list[list[int]] = []
visited: set[int] = set()
rec_stack: set[int] = set()
path: list[int] = []
def dfs(fid: int) -> bool:
visited.add(fid)
rec_stack.add(fid)
path.append(fid)
feature = feature_map.get(fid)
if feature:
for dep_id in feature.get("dependencies") or []:
if dep_id not in visited:
if dfs(dep_id):
return True
elif dep_id in rec_stack:
cycle_start = path.index(dep_id)
cycles.append(path[cycle_start:])
return True
path.pop()
rec_stack.remove(fid)
return False
for f in features:
if f["id"] not in visited:
dfs(f["id"])
return cycles
def compute_scheduling_scores(features: list[dict]) -> dict[int, float]:
"""Compute scheduling scores for all features.
Higher scores mean higher priority for scheduling. The algorithm considers:
1. Unblocking potential - Features that unblock more downstream work score higher
2. Depth in graph - Features with no dependencies (roots) are "shovel-ready"
3. User priority - Existing priority field as tiebreaker
Score formula: (1000 * unblock) + (100 * depth_score) + (10 * priority_factor)
Args:
features: List of feature dicts with id, priority, dependencies fields
Returns:
Dict mapping feature_id -> score (higher = schedule first)
"""
if not features:
return {}
# Build adjacency lists
children: dict[int, list[int]] = {f["id"]: [] for f in features} # who depends on me
parents: dict[int, list[int]] = {f["id"]: [] for f in features} # who I depend on
for f in features:
for dep_id in (f.get("dependencies") or []):
if dep_id in children: # Only valid deps
children[dep_id].append(f["id"])
parents[f["id"]].append(dep_id)
# Calculate depths via BFS from roots
# Use visited set to prevent infinite loops from circular dependencies
depths: dict[int, int] = {}
visited: set[int] = set()
roots = [f["id"] for f in features if not parents[f["id"]]]
queue = [(root, 0) for root in roots]
while queue:
node_id, depth = queue.pop(0)
if node_id in visited:
continue # Skip already visited nodes (handles cycles)
visited.add(node_id)
depths[node_id] = depth
for child_id in children[node_id]:
if child_id not in visited:
queue.append((child_id, depth + 1))
# Handle orphaned nodes (shouldn't happen but be safe)
for f in features:
if f["id"] not in depths:
depths[f["id"]] = 0
# Calculate transitive downstream counts (reverse topo order)
downstream: dict[int, int] = {f["id"]: 0 for f in features}
# Process in reverse depth order (leaves first)
for fid in sorted(depths.keys(), key=lambda x: -depths[x]):
for parent_id in parents[fid]:
downstream[parent_id] += 1 + downstream[fid]
# Normalize and compute scores
max_depth = max(depths.values()) if depths else 0
max_downstream = max(downstream.values()) if downstream else 0
scores: dict[int, float] = {}
for f in features:
fid = f["id"]
# Unblocking score: 0-1, higher = unblocks more
unblock = downstream[fid] / max_downstream if max_downstream > 0 else 0
# Depth score: 0-1, higher = closer to root (no deps)
depth_score = 1 - (depths[fid] / max_depth) if max_depth > 0 else 1
# Priority factor: 0-1, lower priority number = higher factor
priority = f.get("priority", 999)
priority_factor = (10 - min(priority, 10)) / 10
scores[fid] = (1000 * unblock) + (100 * depth_score) + (10 * priority_factor)
return scores
def get_ready_features(features: list[dict], limit: int = 10) -> list[dict]:
"""Get features that are ready to be worked on.
A feature is ready if:
- It is not passing
- It is not in progress
- All its dependencies are satisfied
Args:
features: List of all feature dicts
limit: Maximum number of features to return
Returns:
List of ready features, sorted by priority
"""
passing_ids = {f["id"] for f in features if f.get("passes")}
ready = []
for f in features:
if f.get("passes") or f.get("in_progress"):
continue
deps = f.get("dependencies") or []
if all(dep_id in passing_ids for dep_id in deps):
ready.append(f)
# Sort by scheduling score (higher = first), then priority, then id
scores = compute_scheduling_scores(features)
ready.sort(key=lambda f: (-scores.get(f["id"], 0), f.get("priority", 999), f["id"]))
return ready[:limit]
def get_blocked_features(features: list[dict]) -> list[dict]:
"""Get features that are blocked by unmet dependencies.
Args:
features: List of all feature dicts
Returns:
List of blocked features with 'blocked_by' field added
"""
passing_ids = {f["id"] for f in features if f.get("passes")}
blocked = []
for f in features:
if f.get("passes"):
continue
deps = f.get("dependencies") or []
blocking = [d for d in deps if d not in passing_ids]
if blocking:
blocked.append({**f, "blocked_by": blocking})
return blocked
def build_graph_data(features: list[dict]) -> dict:
"""Build graph data structure for visualization.
Args:
features: List of all feature dicts
Returns:
Dict with 'nodes' and 'edges' for graph visualization
"""
passing_ids = {f["id"] for f in features if f.get("passes")}
nodes = []
edges = []
for f in features:
deps = f.get("dependencies") or []
blocking = [d for d in deps if d not in passing_ids]
if f.get("passes"):
status = "done"
elif blocking:
status = "blocked"
elif f.get("in_progress"):
status = "in_progress"
else:
status = "pending"
nodes.append({
"id": f["id"],
"name": f["name"],
"category": f["category"],
"status": status,
"priority": f.get("priority", 999),
"dependencies": deps,
})
for dep_id in deps:
edges.append({"source": dep_id, "target": f["id"]})
return {"nodes": nodes, "edges": edges}