""" Dependency Resolver =================== Provides dependency resolution using Kahn's algorithm for topological sorting. Includes cycle detection, validation, and helper functions for dependency management. """ from typing import TypedDict # Security: Prevent DoS via excessive dependencies MAX_DEPENDENCIES_PER_FEATURE = 20 MAX_DEPENDENCY_DEPTH = 50 # Prevent stack overflow in cycle detection class DependencyResult(TypedDict): """Result from dependency resolution.""" ordered_features: list[dict] circular_dependencies: list[list[int]] blocked_features: dict[int, list[int]] # feature_id -> [blocking_ids] missing_dependencies: dict[int, list[int]] # feature_id -> [missing_ids] def resolve_dependencies(features: list[dict]) -> DependencyResult: """Topological sort using Kahn's algorithm with priority-aware ordering. Returns ordered features respecting dependencies, plus metadata about cycles, blocked features, and missing dependencies. Args: features: List of feature dicts with id, priority, passes, and dependencies fields Returns: DependencyResult with ordered_features, circular_dependencies, blocked_features, and missing_dependencies """ feature_map = {f["id"]: f for f in features} in_degree = {f["id"]: 0 for f in features} adjacency: dict[int, list[int]] = {f["id"]: [] for f in features} blocked: dict[int, list[int]] = {} missing: dict[int, list[int]] = {} # Build graph for feature in features: deps = feature.get("dependencies") or [] for dep_id in deps: if dep_id not in feature_map: missing.setdefault(feature["id"], []).append(dep_id) else: adjacency[dep_id].append(feature["id"]) in_degree[feature["id"]] += 1 # Track blocked features dep = feature_map[dep_id] if not dep.get("passes"): blocked.setdefault(feature["id"], []).append(dep_id) # Kahn's algorithm with priority-aware selection queue = [f for f in features if in_degree[f["id"]] == 0] queue.sort(key=lambda f: (f.get("priority", 999), f["id"])) ordered: list[dict] = [] while queue: current = queue.pop(0) ordered.append(current) for dependent_id in adjacency[current["id"]]: in_degree[dependent_id] -= 1 if in_degree[dependent_id] == 0: queue.append(feature_map[dependent_id]) queue.sort(key=lambda f: (f.get("priority", 999), f["id"])) # Detect cycles (features not in ordered = part of cycle) cycles: list[list[int]] = [] if len(ordered) < len(features): remaining = [f for f in features if f not in ordered] cycles = _detect_cycles(remaining, feature_map) ordered.extend(remaining) # Add cyclic features at end return { "ordered_features": ordered, "circular_dependencies": cycles, "blocked_features": blocked, "missing_dependencies": missing, } def are_dependencies_satisfied(feature: dict, all_features: list[dict]) -> bool: """Check if all dependencies have passes=True. Args: feature: Feature dict to check all_features: List of all feature dicts Returns: True if all dependencies are satisfied (or no dependencies) """ deps = feature.get("dependencies") or [] if not deps: return True passing_ids = {f["id"] for f in all_features if f.get("passes")} return all(dep_id in passing_ids for dep_id in deps) def get_blocking_dependencies(feature: dict, all_features: list[dict]) -> list[int]: """Get list of incomplete dependency IDs. Args: feature: Feature dict to check all_features: List of all feature dicts Returns: List of feature IDs that are blocking this feature """ deps = feature.get("dependencies") or [] passing_ids = {f["id"] for f in all_features if f.get("passes")} return [dep_id for dep_id in deps if dep_id not in passing_ids] def would_create_circular_dependency( features: list[dict], source_id: int, target_id: int ) -> bool: """Check if adding a dependency from target to source would create a cycle. Uses DFS with visited set for efficient cycle detection. Args: features: List of all feature dicts source_id: The feature that would gain the dependency target_id: The feature that would become a dependency Returns: True if adding the dependency would create a cycle """ if source_id == target_id: return True # Self-reference is a cycle feature_map = {f["id"]: f for f in features} source = feature_map.get(source_id) if not source: return False # Check if target already depends on source (direct or indirect) target = feature_map.get(target_id) if not target: return False # DFS from target to see if we can reach source visited: set[int] = set() def can_reach(current_id: int, depth: int = 0) -> bool: # Security: Prevent stack overflow with depth limit if depth > MAX_DEPENDENCY_DEPTH: return True # Assume cycle if too deep (fail-safe) if current_id == source_id: return True if current_id in visited: return False visited.add(current_id) current = feature_map.get(current_id) if not current: return False deps = current.get("dependencies") or [] for dep_id in deps: if can_reach(dep_id, depth + 1): return True return False return can_reach(target_id) def validate_dependencies( feature_id: int, dependency_ids: list[int], all_feature_ids: set[int] ) -> tuple[bool, str]: """Validate dependency list. Args: feature_id: ID of the feature being validated dependency_ids: List of proposed dependency IDs all_feature_ids: Set of all valid feature IDs Returns: Tuple of (is_valid, error_message) """ # Security: Check limits if len(dependency_ids) > MAX_DEPENDENCIES_PER_FEATURE: return False, f"Maximum {MAX_DEPENDENCIES_PER_FEATURE} dependencies allowed" # Check self-reference if feature_id in dependency_ids: return False, "A feature cannot depend on itself" # Check all dependencies exist missing = [d for d in dependency_ids if d not in all_feature_ids] if missing: return False, f"Dependencies not found: {missing}" # Check for duplicates if len(dependency_ids) != len(set(dependency_ids)): return False, "Duplicate dependencies not allowed" return True, "" def _detect_cycles(features: list[dict], feature_map: dict) -> list[list[int]]: """Detect cycles using DFS with recursion tracking. Args: features: List of features to check for cycles feature_map: Map of feature_id -> feature dict Returns: List of cycles, where each cycle is a list of feature IDs """ cycles: list[list[int]] = [] visited: set[int] = set() rec_stack: set[int] = set() path: list[int] = [] def dfs(fid: int) -> bool: visited.add(fid) rec_stack.add(fid) path.append(fid) feature = feature_map.get(fid) if feature: for dep_id in feature.get("dependencies") or []: if dep_id not in visited: if dfs(dep_id): return True elif dep_id in rec_stack: cycle_start = path.index(dep_id) cycles.append(path[cycle_start:]) return True path.pop() rec_stack.remove(fid) return False for f in features: if f["id"] not in visited: dfs(f["id"]) return cycles def get_ready_features(features: list[dict], limit: int = 10) -> list[dict]: """Get features that are ready to be worked on. A feature is ready if: - It is not passing - It is not in progress - All its dependencies are satisfied Args: features: List of all feature dicts limit: Maximum number of features to return Returns: List of ready features, sorted by priority """ passing_ids = {f["id"] for f in features if f.get("passes")} ready = [] for f in features: if f.get("passes") or f.get("in_progress"): continue deps = f.get("dependencies") or [] if all(dep_id in passing_ids for dep_id in deps): ready.append(f) # Sort by priority ready.sort(key=lambda f: (f.get("priority", 999), f["id"])) return ready[:limit] def get_blocked_features(features: list[dict]) -> list[dict]: """Get features that are blocked by unmet dependencies. Args: features: List of all feature dicts Returns: List of blocked features with 'blocked_by' field added """ passing_ids = {f["id"] for f in features if f.get("passes")} blocked = [] for f in features: if f.get("passes"): continue deps = f.get("dependencies") or [] blocking = [d for d in deps if d not in passing_ids] if blocking: blocked.append({**f, "blocked_by": blocking}) return blocked def build_graph_data(features: list[dict]) -> dict: """Build graph data structure for visualization. Args: features: List of all feature dicts Returns: Dict with 'nodes' and 'edges' for graph visualization """ passing_ids = {f["id"] for f in features if f.get("passes")} nodes = [] edges = [] for f in features: deps = f.get("dependencies") or [] blocking = [d for d in deps if d not in passing_ids] if f.get("passes"): status = "done" elif blocking: status = "blocked" elif f.get("in_progress"): status = "in_progress" else: status = "pending" nodes.append({ "id": f["id"], "name": f["name"], "category": f["category"], "status": status, "priority": f.get("priority", 999), "dependencies": deps, }) for dep_id in deps: edges.append({"source": dep_id, "target": f["id"]}) return {"nodes": nodes, "edges": edges}