diff --git a/agent.py b/agent.py index bdc174c..a3daaf8 100644 --- a/agent.py +++ b/agent.py @@ -31,6 +31,7 @@ from progress import ( ) from prompts import ( copy_spec_to_project, + get_batch_feature_prompt, get_coding_prompt, get_initializer_prompt, get_single_feature_prompt, @@ -139,6 +140,7 @@ async def run_autonomous_agent( max_iterations: Optional[int] = None, yolo_mode: bool = False, feature_id: Optional[int] = None, + feature_ids: Optional[list[int]] = None, agent_type: Optional[str] = None, testing_feature_id: Optional[int] = None, testing_feature_ids: Optional[list[int]] = None, @@ -152,6 +154,7 @@ async def run_autonomous_agent( max_iterations: Maximum number of iterations (None for unlimited) yolo_mode: If True, skip browser testing in coding agent prompts feature_id: If set, work only on this specific feature (used by orchestrator for coding agents) + feature_ids: If set, work on these features in batch (used by orchestrator for batch mode) agent_type: Type of agent: "initializer", "coding", "testing", or None (auto-detect) testing_feature_id: For testing agents, the pre-claimed feature ID to test (legacy single mode) testing_feature_ids: For testing agents, list of feature IDs to batch test @@ -165,7 +168,9 @@ async def run_autonomous_agent( print(f"Agent type: {agent_type}") if yolo_mode: print("Mode: YOLO (testing agents disabled)") - if feature_id: + if feature_ids and len(feature_ids) > 1: + print(f"Feature batch: {', '.join(f'#{fid}' for fid in feature_ids)}") + elif feature_id: print(f"Feature assignment: #{feature_id}") if max_iterations: print(f"Max iterations: {max_iterations}") @@ -239,6 +244,8 @@ async def run_autonomous_agent( import os if agent_type == "testing": agent_id = f"testing-{os.getpid()}" # Unique ID for testing agents + elif feature_ids and len(feature_ids) > 1: + agent_id = f"batch-{feature_ids[0]}" elif feature_id: agent_id = f"feature-{feature_id}" else: @@ -250,9 +257,13 @@ async def run_autonomous_agent( prompt = get_initializer_prompt(project_dir) elif agent_type == "testing": prompt = get_testing_prompt(project_dir, testing_feature_id, testing_feature_ids) - elif feature_id: + elif feature_ids and len(feature_ids) > 1: + # Batch mode (used by orchestrator for multi-feature coding agents) + prompt = get_batch_feature_prompt(feature_ids, project_dir, yolo_mode) + elif feature_id or (feature_ids is not None and len(feature_ids) == 1): # Single-feature mode (used by orchestrator for coding agents) - prompt = get_single_feature_prompt(feature_id, project_dir, yolo_mode) + fid = feature_id if feature_id is not None else feature_ids[0] # type: ignore[index] + prompt = get_single_feature_prompt(fid, project_dir, yolo_mode) else: # General coding prompt (legacy path) prompt = get_coding_prompt(project_dir, yolo_mode=yolo_mode) @@ -356,12 +367,19 @@ async def run_autonomous_agent( print("The autonomous agent has finished its work.") break - # Single-feature mode OR testing agent: exit after one session - if feature_id is not None or agent_type == "testing": + # Single-feature mode, batch mode, or testing agent: exit after one session + if feature_ids and len(feature_ids) > 1: + print(f"\nBatch mode: Features {', '.join(f'#{fid}' for fid in feature_ids)} session complete.") + break + elif feature_id is not None or (feature_ids is not None and len(feature_ids) == 1): + fid = feature_id if feature_id is not None else feature_ids[0] # type: ignore[index] if agent_type == "testing": print("\nTesting agent complete. Terminating session.") else: - print(f"\nSingle-feature mode: Feature #{feature_id} session complete.") + print(f"\nSingle-feature mode: Feature #{fid} session complete.") + break + elif agent_type == "testing": + print("\nTesting agent complete. Terminating session.") break # Reset rate limit retries only if no rate limit signal was detected diff --git a/autonomous_agent_demo.py b/autonomous_agent_demo.py index 1e3d4d6..ed3db37 100644 --- a/autonomous_agent_demo.py +++ b/autonomous_agent_demo.py @@ -133,6 +133,13 @@ Authentication: help="Work on a specific feature ID only (used by orchestrator for coding agents)", ) + parser.add_argument( + "--feature-ids", + type=str, + default=None, + help="Comma-separated feature IDs to implement in batch (e.g., '5,8,12')", + ) + # Agent type for subprocess mode parser.add_argument( "--agent-type", @@ -170,6 +177,13 @@ Authentication: help="Number of features per testing batch (1-5, default: 3)", ) + parser.add_argument( + "--batch-size", + type=int, + default=3, + help="Max features per coding agent batch (1-3, default: 3)", + ) + return parser.parse_args() @@ -222,6 +236,15 @@ def main() -> None: print(f"Error: --testing-feature-ids must be comma-separated integers, got: {args.testing_feature_ids}") return + # Parse batch coding feature IDs (comma-separated string -> list[int]) + coding_feature_ids: list[int] | None = None + if args.feature_ids: + try: + coding_feature_ids = [int(x.strip()) for x in args.feature_ids.split(",") if x.strip()] + except ValueError: + print(f"Error: --feature-ids must be comma-separated integers, got: {args.feature_ids}") + return + try: if args.agent_type: # Subprocess mode - spawned by orchestrator for a specific role @@ -232,6 +255,7 @@ def main() -> None: max_iterations=args.max_iterations or 1, yolo_mode=args.yolo, feature_id=args.feature_id, + feature_ids=coding_feature_ids, agent_type=args.agent_type, testing_feature_id=args.testing_feature_id, testing_feature_ids=testing_feature_ids, @@ -254,6 +278,7 @@ def main() -> None: yolo_mode=args.yolo, testing_agent_ratio=args.testing_ratio, testing_batch_size=args.testing_batch_size, + batch_size=args.batch_size, ) ) except KeyboardInterrupt: diff --git a/parallel_orchestrator.py b/parallel_orchestrator.py index 3a0196b..10d0923 100644 --- a/parallel_orchestrator.py +++ b/parallel_orchestrator.py @@ -153,6 +153,7 @@ class ParallelOrchestrator: yolo_mode: bool = False, testing_agent_ratio: int = 1, testing_batch_size: int = DEFAULT_TESTING_BATCH_SIZE, + batch_size: int = 3, on_output: Callable[[int, str], None] | None = None, on_status: Callable[[int, str], None] | None = None, ): @@ -177,6 +178,7 @@ class ParallelOrchestrator: self.yolo_mode = yolo_mode self.testing_agent_ratio = min(max(testing_agent_ratio, 0), 3) # Clamp 0-3 self.testing_batch_size = min(max(testing_batch_size, 1), 5) # Clamp 1-5 + self.batch_size = min(max(batch_size, 1), 3) # Clamp 1-3 self.on_output = on_output self.on_status = on_status @@ -200,6 +202,11 @@ class ParallelOrchestrator: # Cleared when all passing features have been covered at least once. self._recently_tested: set[int] = set() + # Batch tracking: primary feature_id -> all feature IDs in batch + self._batch_features: dict[int, list[int]] = {} + # Reverse mapping: any feature_id -> primary feature_id + self._feature_to_primary: dict[int, int] = {} + # Shutdown flag for async-safe signal handling # Signal handlers only set this flag; cleanup happens in the main loop self._shutdown_requested = False @@ -352,6 +359,104 @@ class ParallelOrchestrator: return selected + def build_feature_batches( + self, + ready: list[dict], + all_features: list[dict], + scheduling_scores: dict[int, float], + ) -> list[list[dict]]: + """Build dependency-aware feature batches for coding agents. + + Each batch contains up to `batch_size` features. The algorithm: + 1. Start with a ready feature (sorted by scheduling score) + 2. Chain extension: find dependents whose deps are satisfied if earlier batch features pass + 3. Same-category fill: fill remaining slots with ready features from the same category + + Args: + ready: Ready features (sorted by scheduling score) + all_features: All features for dependency checking + scheduling_scores: Pre-computed scheduling scores + + Returns: + List of batches, each batch is a list of feature dicts + """ + if self.batch_size <= 1: + # No batching - return each feature as a single-item batch + return [[f] for f in ready] + + # Build children adjacency: parent_id -> [child_ids] + children: dict[int, list[int]] = {f["id"]: [] for f in all_features} + feature_map: dict[int, dict] = {f["id"]: f for f in all_features} + for f in all_features: + for dep_id in (f.get("dependencies") or []): + if dep_id in children: + children[dep_id].append(f["id"]) + + # Pre-compute passing IDs + passing_ids = {f["id"] for f in all_features if f.get("passes")} + + used_ids: set[int] = set() # Features already assigned to a batch + batches: list[list[dict]] = [] + + for feature in ready: + if feature["id"] in used_ids: + continue + + batch = [feature] + used_ids.add(feature["id"]) + # Simulate passing set = real passing + batch features + simulated_passing = passing_ids | {feature["id"]} + + # Phase 1: Chain extension - find dependents whose deps are met + for _ in range(self.batch_size - 1): + best_candidate = None + best_score = -1.0 + # Check children of all features currently in the batch + candidate_ids: set[int] = set() + for bf in batch: + for child_id in children.get(bf["id"], []): + if child_id not in used_ids and child_id not in simulated_passing: + candidate_ids.add(child_id) + + for cid in candidate_ids: + cf = feature_map.get(cid) + if not cf or cf.get("passes") or cf.get("in_progress"): + continue + # Check if ALL deps are satisfied by simulated passing set + deps = cf.get("dependencies") or [] + if all(d in simulated_passing for d in deps): + score = scheduling_scores.get(cid, 0) + if score > best_score: + best_score = score + best_candidate = cf + + if best_candidate: + batch.append(best_candidate) + used_ids.add(best_candidate["id"]) + simulated_passing.add(best_candidate["id"]) + else: + break + + # Phase 2: Same-category fill + if len(batch) < self.batch_size: + category = feature.get("category", "") + for rf in ready: + if len(batch) >= self.batch_size: + break + if rf["id"] in used_ids: + continue + if rf.get("category", "") == category: + batch.append(rf) + used_ids.add(rf["id"]) + + batches.append(batch) + + debug_log.log("BATCH", f"Built {len(batches)} batches from {len(ready)} ready features", + batch_sizes=[len(b) for b in batches], + batch_ids=[[f['id'] for f in b] for b in batches[:5]]) + + return batches + def get_resumable_features( self, feature_dicts: list[dict] | None = None, @@ -376,9 +481,11 @@ class ParallelOrchestrator: finally: session.close() - # Snapshot running IDs once to avoid acquiring lock per feature + # Snapshot running IDs once (include all batch feature IDs) with self._lock: running_ids = set(self.running_coding_agents.keys()) + for batch_ids in self._batch_features.values(): + running_ids.update(batch_ids) resumable = [] for fd in feature_dicts: @@ -421,9 +528,11 @@ class ParallelOrchestrator: # Pre-compute passing_ids once to avoid O(n^2) in the loop passing_ids = {fd["id"] for fd in feature_dicts if fd.get("passes")} - # Snapshot running IDs once to avoid acquiring lock per feature + # Snapshot running IDs once (include all batch feature IDs) with self._lock: running_ids = set(self.running_coding_agents.keys()) + for batch_ids in self._batch_features.values(): + running_ids.update(batch_ids) ready = [] skipped_reasons = {"passes": 0, "in_progress": 0, "running": 0, "failed": 0, "deps": 0} @@ -635,6 +744,75 @@ class ParallelOrchestrator: return True, f"Started feature {feature_id}" + def start_feature_batch(self, feature_ids: list[int], resume: bool = False) -> tuple[bool, str]: + """Start a coding agent for a batch of features. + + Args: + feature_ids: List of feature IDs to implement in batch + resume: If True, resume features already in_progress + + Returns: + Tuple of (success, message) + """ + if not feature_ids: + return False, "No features to start" + + # Single feature falls back to start_feature + if len(feature_ids) == 1: + return self.start_feature(feature_ids[0], resume=resume) + + with self._lock: + # Check if any feature in batch is already running + for fid in feature_ids: + if fid in self.running_coding_agents or fid in self._feature_to_primary: + return False, f"Feature {fid} already running" + if len(self.running_coding_agents) >= self.max_concurrency: + return False, "At max concurrency" + total_agents = len(self.running_coding_agents) + len(self.running_testing_agents) + if total_agents >= MAX_TOTAL_AGENTS: + return False, f"At max total agents ({total_agents}/{MAX_TOTAL_AGENTS})" + + # Mark all features as in_progress in a single transaction + session = self.get_session() + try: + features_to_mark = [] + for fid in feature_ids: + feature = session.query(Feature).filter(Feature.id == fid).first() + if not feature: + return False, f"Feature {fid} not found" + if feature.passes: + return False, f"Feature {fid} already complete" + if not resume: + if feature.in_progress: + return False, f"Feature {fid} already in progress" + features_to_mark.append(feature) + else: + if not feature.in_progress: + return False, f"Feature {fid} not in progress, cannot resume" + + for feature in features_to_mark: + feature.in_progress = True + session.commit() + finally: + session.close() + + # Spawn batch coding agent + success, message = self._spawn_coding_agent_batch(feature_ids) + if not success: + # Clear in_progress on failure + session = self.get_session() + try: + for fid in feature_ids: + feature = session.query(Feature).filter(Feature.id == fid).first() + if feature and not resume: + feature.in_progress = False + session.commit() + finally: + session.close() + return False, message + + return True, f"Started batch [{', '.join(str(fid) for fid in feature_ids)}]" + def _spawn_coding_agent(self, feature_id: int) -> tuple[bool, str]: """Spawn a coding agent subprocess for a specific feature.""" # Create abort event @@ -702,6 +880,75 @@ class ParallelOrchestrator: print(f"Started coding agent for feature #{feature_id}", flush=True) return True, f"Started feature {feature_id}" + def _spawn_coding_agent_batch(self, feature_ids: list[int]) -> tuple[bool, str]: + """Spawn a coding agent subprocess for a batch of features.""" + primary_id = feature_ids[0] + abort_event = threading.Event() + + cmd = [ + sys.executable, + "-u", + str(AUTOCODER_ROOT / "autonomous_agent_demo.py"), + "--project-dir", str(self.project_dir), + "--max-iterations", "1", + "--agent-type", "coding", + "--feature-ids", ",".join(str(fid) for fid in feature_ids), + ] + if self.model: + cmd.extend(["--model", self.model]) + if self.yolo_mode: + cmd.append("--yolo") + + try: + popen_kwargs: dict[str, Any] = { + "stdin": subprocess.DEVNULL, + "stdout": subprocess.PIPE, + "stderr": subprocess.STDOUT, + "text": True, + "encoding": "utf-8", + "errors": "replace", + "cwd": str(AUTOCODER_ROOT), + "env": {**os.environ, "PYTHONUNBUFFERED": "1"}, + } + if sys.platform == "win32": + popen_kwargs["creationflags"] = subprocess.CREATE_NO_WINDOW + + proc = subprocess.Popen(cmd, **popen_kwargs) + except Exception as e: + # Reset in_progress on failure + session = self.get_session() + try: + for fid in feature_ids: + feature = session.query(Feature).filter(Feature.id == fid).first() + if feature: + feature.in_progress = False + session.commit() + finally: + session.close() + return False, f"Failed to start batch agent: {e}" + + with self._lock: + self.running_coding_agents[primary_id] = proc + self.abort_events[primary_id] = abort_event + self._batch_features[primary_id] = list(feature_ids) + for fid in feature_ids: + self._feature_to_primary[fid] = primary_id + + # Start output reader thread + threading.Thread( + target=self._read_output, + args=(primary_id, proc, abort_event, "coding"), + daemon=True + ).start() + + if self.on_status is not None: + for fid in feature_ids: + self.on_status(fid, "running") + + ids_str = ", ".join(f"#{fid}" for fid in feature_ids) + print(f"Started coding agent for features {ids_str}", flush=True) + return True, f"Started batch [{ids_str}]" + def _spawn_testing_agent(self) -> tuple[bool, str]: """Spawn a testing agent subprocess for batch regression testing. @@ -982,73 +1229,84 @@ class ParallelOrchestrator: # feature_id is required for coding agents (always passed from start_feature) assert feature_id is not None, "feature_id must not be None for coding agents" - # Coding agent completion - debug_log.log("COMPLETE", f"Coding agent for feature #{feature_id} finished", - return_code=return_code, - status="success" if return_code == 0 else "failed") - + # Coding agent completion - handle both single and batch features + batch_ids = None with self._lock: + batch_ids = self._batch_features.pop(feature_id, None) + if batch_ids: + # Clean up reverse mapping + for fid in batch_ids: + self._feature_to_primary.pop(fid, None) self.running_coding_agents.pop(feature_id, None) self.abort_events.pop(feature_id, None) + all_feature_ids = batch_ids or [feature_id] + + debug_log.log("COMPLETE", f"Coding agent for feature(s) {all_feature_ids} finished", + return_code=return_code, + status="success" if return_code == 0 else "failed", + batch_size=len(all_feature_ids)) + # Refresh session cache to see subprocess commits - # The coding agent runs as a subprocess and commits changes (e.g., passes=True). - # Using session.expire_all() is lighter weight than engine.dispose() for SQLite WAL mode - # and is sufficient to invalidate cached data and force fresh reads. - # engine.dispose() is only called on orchestrator shutdown, not on every agent completion. session = self.get_session() try: session.expire_all() - feature = session.query(Feature).filter(Feature.id == feature_id).first() - feature_passes = feature.passes if feature else None - feature_in_progress = feature.in_progress if feature else None - debug_log.log("DB", f"Feature #{feature_id} state after session.expire_all()", - passes=feature_passes, - in_progress=feature_in_progress) - if feature and feature.in_progress and not feature.passes: - feature.in_progress = False - session.commit() - debug_log.log("DB", f"Cleared in_progress for feature #{feature_id} (agent failed)") + for fid in all_feature_ids: + feature = session.query(Feature).filter(Feature.id == fid).first() + feature_passes = feature.passes if feature else None + feature_in_progress = feature.in_progress if feature else None + debug_log.log("DB", f"Feature #{fid} state after session.expire_all()", + passes=feature_passes, + in_progress=feature_in_progress) + if feature and feature.in_progress and not feature.passes: + feature.in_progress = False + session.commit() + debug_log.log("DB", f"Cleared in_progress for feature #{fid} (agent failed)") finally: session.close() - # Track failures to prevent infinite retry loops + # Track failures for features still in_progress at exit if return_code != 0: with self._lock: - self._failure_counts[feature_id] = self._failure_counts.get(feature_id, 0) + 1 - failure_count = self._failure_counts[feature_id] - if failure_count >= MAX_FEATURE_RETRIES: - print(f"Feature #{feature_id} has failed {failure_count} times, will not retry", flush=True) - debug_log.log("COMPLETE", f"Feature #{feature_id} exceeded max retries", - failure_count=failure_count) + for fid in all_feature_ids: + self._failure_counts[fid] = self._failure_counts.get(fid, 0) + 1 + failure_count = self._failure_counts[fid] + if failure_count >= MAX_FEATURE_RETRIES: + print(f"Feature #{fid} has failed {failure_count} times, will not retry", flush=True) + debug_log.log("COMPLETE", f"Feature #{fid} exceeded max retries", + failure_count=failure_count) status = "completed" if return_code == 0 else "failed" if self.on_status is not None: - self.on_status(feature_id, status) - # CRITICAL: This print triggers the WebSocket to emit agent_update with state='error' or 'success' - print(f"Feature #{feature_id} {status}", flush=True) + for fid in all_feature_ids: + self.on_status(fid, status) + + # CRITICAL: Print triggers WebSocket to emit agent_update + if batch_ids and len(batch_ids) > 1: + ids_str = ", ".join(f"#{fid}" for fid in batch_ids) + print(f"Features {ids_str} {status}", flush=True) + else: + print(f"Feature #{feature_id} {status}", flush=True) # Signal main loop that an agent slot is available self._signal_agent_completed() - # NOTE: Testing agents are now spawned in start_feature() when coding agents START, - # not here when they complete. This ensures 1:1 ratio and proper termination. - def stop_feature(self, feature_id: int) -> tuple[bool, str]: """Stop a running coding agent and all its child processes.""" with self._lock: - if feature_id not in self.running_coding_agents: + # Check if this feature is part of a batch + primary_id = self._feature_to_primary.get(feature_id, feature_id) + if primary_id not in self.running_coding_agents: return False, "Feature not running" - abort = self.abort_events.get(feature_id) - proc = self.running_coding_agents.get(feature_id) + abort = self.abort_events.get(primary_id) + proc = self.running_coding_agents.get(primary_id) if abort: abort.set() if proc: - # Kill entire process tree to avoid orphaned children (e.g., browser instances) result = kill_process_tree(proc, timeout=5.0) - debug_log.log("STOP", f"Killed feature {feature_id} process tree", + debug_log.log("STOP", f"Killed feature {feature_id} (primary {primary_id}) process tree", status=result.status, children_found=result.children_found, children_terminated=result.children_terminated, children_killed=result.children_killed) @@ -1113,6 +1371,7 @@ class ParallelOrchestrator: print(f"Max concurrency: {self.max_concurrency} coding agents", flush=True) print(f"YOLO mode: {self.yolo_mode}", flush=True) print(f"Regression agents: {self.testing_agent_ratio} (maintained independently)", flush=True) + print(f"Batch size: {self.batch_size} features per agent", flush=True) print("=" * 70, flush=True) print(flush=True) @@ -1276,37 +1535,39 @@ class ParallelOrchestrator: await self._wait_for_agent_completion(timeout=POLL_INTERVAL * 2) continue - # Start features up to capacity + # Build dependency-aware batches from ready features slots = self.max_concurrency - current - logger.debug("Spawning loop: %d ready, %d slots available, max_concurrency=%d", - len(ready), slots, self.max_concurrency) - features_to_start = ready[:slots] - logger.debug("Features to start: %s", [f['id'] for f in features_to_start]) + batches = self.build_feature_batches(ready, feature_dicts, scheduling_scores) - debug_log.log("SPAWN", "Starting features batch", + logger.debug("Spawning loop: %d ready, %d slots available, %d batches built", + len(ready), slots, len(batches)) + + debug_log.log("SPAWN", "Starting feature batches", ready_count=len(ready), slots_available=slots, - features_to_start=[f['id'] for f in features_to_start]) + batch_count=len(batches), + batches=[[f['id'] for f in b] for b in batches[:slots]]) - for i, feature in enumerate(features_to_start): - logger.debug("Starting feature %d/%d: #%d - %s", - i + 1, len(features_to_start), feature['id'], feature['name']) - success, msg = self.start_feature(feature["id"]) + for batch in batches[:slots]: + batch_ids = [f["id"] for f in batch] + batch_names = [f"{f['id']}:{f['name']}" for f in batch] + logger.debug("Starting batch: %s", batch_ids) + success, msg = self.start_feature_batch(batch_ids) if not success: - logger.debug("Failed to start feature #%d: %s", feature['id'], msg) - debug_log.log("SPAWN", f"FAILED to start feature #{feature['id']}", - feature_name=feature['name'], + logger.debug("Failed to start batch %s: %s", batch_ids, msg) + debug_log.log("SPAWN", f"FAILED to start batch {batch_ids}", + batch_names=batch_names, error=msg) else: - logger.debug("Successfully started feature #%d", feature['id']) + logger.debug("Successfully started batch %s", batch_ids) with self._lock: running_count = len(self.running_coding_agents) logger.debug("Running coding agents after start: %d", running_count) - debug_log.log("SPAWN", f"Successfully started feature #{feature['id']}", - feature_name=feature['name'], + debug_log.log("SPAWN", f"Successfully started batch {batch_ids}", + batch_names=batch_names, running_coding_agents=running_count) - await asyncio.sleep(0.5) # Brief delay for subprocess to claim feature before re-querying + await asyncio.sleep(0.5) except Exception as e: print(f"Orchestrator error: {e}", flush=True) @@ -1376,6 +1637,7 @@ async def run_parallel_orchestrator( yolo_mode: bool = False, testing_agent_ratio: int = 1, testing_batch_size: int = DEFAULT_TESTING_BATCH_SIZE, + batch_size: int = 3, ) -> None: """Run the unified orchestrator. @@ -1386,6 +1648,7 @@ async def run_parallel_orchestrator( yolo_mode: Whether to run in YOLO mode (skip testing agents) testing_agent_ratio: Number of regression agents to maintain (0-3) testing_batch_size: Number of features per testing batch (1-5) + batch_size: Max features per coding agent batch (1-3) """ print(f"[ORCHESTRATOR] run_parallel_orchestrator called with max_concurrency={max_concurrency}", flush=True) orchestrator = ParallelOrchestrator( @@ -1395,6 +1658,7 @@ async def run_parallel_orchestrator( yolo_mode=yolo_mode, testing_agent_ratio=testing_agent_ratio, testing_batch_size=testing_batch_size, + batch_size=batch_size, ) # Set up cleanup to run on exit (handles normal exit, exceptions) @@ -1480,6 +1744,12 @@ def main(): default=DEFAULT_TESTING_BATCH_SIZE, help=f"Number of features per testing batch (1-5, default: {DEFAULT_TESTING_BATCH_SIZE})", ) + parser.add_argument( + "--batch-size", + type=int, + default=3, + help="Max features per coding agent batch (1-5, default: 3)", + ) args = parser.parse_args() @@ -1507,6 +1777,7 @@ def main(): yolo_mode=args.yolo, testing_agent_ratio=args.testing_agent_ratio, testing_batch_size=args.testing_batch_size, + batch_size=args.batch_size, )) except KeyboardInterrupt: print("\n\nInterrupted by user", flush=True) diff --git a/prompts.py b/prompts.py index f50aecb..5d83faa 100644 --- a/prompts.py +++ b/prompts.py @@ -217,6 +217,52 @@ If blocked, use `feature_skip` and document the blocker. return single_feature_header + base_prompt +def get_batch_feature_prompt( + feature_ids: list[int], + project_dir: Path | None = None, + yolo_mode: bool = False, +) -> str: + """Prepend batch-feature assignment header to base coding prompt. + + Used in parallel mode to assign multiple features to an agent. + Features should be implemented sequentially in the given order. + + Args: + feature_ids: List of feature IDs to implement in order + project_dir: Optional project directory for project-specific prompts + yolo_mode: If True, strip browser testing instructions from the base prompt + + Returns: + The prompt with batch-feature header prepended + """ + base_prompt = get_coding_prompt(project_dir, yolo_mode=yolo_mode) + ids_str = ", ".join(f"#{fid}" for fid in feature_ids) + + batch_header = f"""## ASSIGNED FEATURES (BATCH): {ids_str} + +You have been assigned {len(feature_ids)} features to implement sequentially. +Process them IN ORDER: {ids_str} + +### Workflow for each feature: +1. Call `feature_claim_and_get` with the feature ID to get its details +2. Implement the feature fully +3. Verify it works (browser testing if applicable) +4. Call `feature_mark_passing` to mark it complete +5. Git commit the changes +6. Move to the next feature + +### Important: +- Complete each feature fully before starting the next +- Mark each feature passing individually as you go +- If blocked on a feature, use `feature_skip` and move to the next one +- Other agents are handling other features - focus only on yours + +--- + +""" + return batch_header + base_prompt + + def get_app_spec(project_dir: Path) -> str: """ Load the app spec from the project. diff --git a/server/routers/agent.py b/server/routers/agent.py index 11e9b8a..9288745 100644 --- a/server/routers/agent.py +++ b/server/routers/agent.py @@ -17,11 +17,11 @@ from ..utils.project_helpers import get_project_path as _get_project_path from ..utils.validation import validate_project_name -def _get_settings_defaults() -> tuple[bool, str, int, bool]: +def _get_settings_defaults() -> tuple[bool, str, int, bool, int]: """Get defaults from global settings. Returns: - Tuple of (yolo_mode, model, testing_agent_ratio, playwright_headless) + Tuple of (yolo_mode, model, testing_agent_ratio, playwright_headless, batch_size) """ import sys root = Path(__file__).parent.parent.parent @@ -42,7 +42,12 @@ def _get_settings_defaults() -> tuple[bool, str, int, bool]: playwright_headless = (settings.get("playwright_headless") or "true").lower() == "true" - return yolo_mode, model, testing_agent_ratio, playwright_headless + try: + batch_size = int(settings.get("batch_size", "3")) + except (ValueError, TypeError): + batch_size = 3 + + return yolo_mode, model, testing_agent_ratio, playwright_headless, batch_size router = APIRouter(prefix="/api/projects/{project_name}/agent", tags=["agent"]) @@ -91,19 +96,22 @@ async def start_agent( manager = get_project_manager(project_name) # Get defaults from global settings if not provided in request - default_yolo, default_model, default_testing_ratio, playwright_headless = _get_settings_defaults() + default_yolo, default_model, default_testing_ratio, playwright_headless, default_batch_size = _get_settings_defaults() yolo_mode = request.yolo_mode if request.yolo_mode is not None else default_yolo model = request.model if request.model else default_model max_concurrency = request.max_concurrency or 1 testing_agent_ratio = request.testing_agent_ratio if request.testing_agent_ratio is not None else default_testing_ratio + batch_size = default_batch_size + success, message = await manager.start( yolo_mode=yolo_mode, model=model, max_concurrency=max_concurrency, testing_agent_ratio=testing_agent_ratio, playwright_headless=playwright_headless, + batch_size=batch_size, ) # Notify scheduler of manual start (to prevent auto-stop during scheduled window) diff --git a/server/routers/settings.py b/server/routers/settings.py index 481cd0d..77b4a4d 100644 --- a/server/routers/settings.py +++ b/server/routers/settings.py @@ -92,6 +92,7 @@ async def get_settings(): ollama_mode=_is_ollama_mode(), testing_agent_ratio=_parse_int(all_settings.get("testing_agent_ratio"), 1), playwright_headless=_parse_bool(all_settings.get("playwright_headless"), default=True), + batch_size=_parse_int(all_settings.get("batch_size"), 3), ) @@ -110,6 +111,9 @@ async def update_settings(update: SettingsUpdate): if update.playwright_headless is not None: set_setting("playwright_headless", "true" if update.playwright_headless else "false") + if update.batch_size is not None: + set_setting("batch_size", str(update.batch_size)) + # Return updated settings all_settings = get_all_settings() return SettingsResponse( @@ -119,4 +123,5 @@ async def update_settings(update: SettingsUpdate): ollama_mode=_is_ollama_mode(), testing_agent_ratio=_parse_int(all_settings.get("testing_agent_ratio"), 1), playwright_headless=_parse_bool(all_settings.get("playwright_headless"), default=True), + batch_size=_parse_int(all_settings.get("batch_size"), 3), ) diff --git a/server/schemas.py b/server/schemas.py index b5e2d84..e15f1b3 100644 --- a/server/schemas.py +++ b/server/schemas.py @@ -399,6 +399,7 @@ class SettingsResponse(BaseModel): ollama_mode: bool = False # True if Ollama API is configured via .env testing_agent_ratio: int = 1 # Regression testing agents (0-3) playwright_headless: bool = True + batch_size: int = 3 # Features per coding agent batch (1-3) class ModelsResponse(BaseModel): @@ -413,6 +414,7 @@ class SettingsUpdate(BaseModel): model: str | None = None testing_agent_ratio: int | None = None # 0-3 playwright_headless: bool | None = None + batch_size: int | None = None # Features per agent batch (1-3) @field_validator('model') @classmethod @@ -428,6 +430,13 @@ class SettingsUpdate(BaseModel): raise ValueError("testing_agent_ratio must be between 0 and 3") return v + @field_validator('batch_size') + @classmethod + def validate_batch_size(cls, v: int | None) -> int | None: + if v is not None and (v < 1 or v > 3): + raise ValueError("batch_size must be between 1 and 3") + return v + # ============================================================================ # Dev Server Schemas diff --git a/server/services/process_manager.py b/server/services/process_manager.py index 81a9025..3340cd1 100644 --- a/server/services/process_manager.py +++ b/server/services/process_manager.py @@ -298,6 +298,7 @@ class AgentProcessManager: max_concurrency: int | None = None, testing_agent_ratio: int = 1, playwright_headless: bool = True, + batch_size: int = 3, ) -> tuple[bool, str]: """ Start the agent as a subprocess. @@ -349,6 +350,9 @@ class AgentProcessManager: # Add testing agent configuration cmd.extend(["--testing-ratio", str(testing_agent_ratio)]) + # Add --batch-size flag for multi-feature batching + cmd.extend(["--batch-size", str(batch_size)]) + try: # Start subprocess with piped stdout/stderr # Use project_dir as cwd so Claude SDK sandbox allows access to project files diff --git a/server/websocket.py b/server/websocket.py index efce7b7..dfb4dee 100644 --- a/server/websocket.py +++ b/server/websocket.py @@ -39,6 +39,14 @@ TESTING_AGENT_START_PATTERN = re.compile(r'Started testing agent for feature #(\ # Matches: "Feature #123 testing completed" or "Feature #123 testing failed" TESTING_AGENT_COMPLETE_PATTERN = re.compile(r'Feature #(\d+) testing (completed|failed)') +# Pattern to detect batch coding agent start message +# Matches: "Started coding agent for features #5, #8, #12" +BATCH_CODING_AGENT_START_PATTERN = re.compile(r'Started coding agent for features (#\d+(?:,\s*#\d+)*)') + +# Pattern to detect batch completion +# Matches: "Features #5, #8, #12 completed" or "Features #5, #8, #12 failed" +BATCH_FEATURES_COMPLETE_PATTERN = re.compile(r'Features (#\d+(?:,\s*#\d+)*)\s+(completed|failed)') + # Patterns for detecting agent activity and thoughts THOUGHT_PATTERNS = [ # Claude's tool usage patterns (actual format: [Tool: name]) @@ -64,9 +72,9 @@ ORCHESTRATOR_PATTERNS = { 'capacity_check': re.compile(r'\[DEBUG\] Spawning loop: (\d+) ready, (\d+) slots'), 'at_capacity': re.compile(r'At max capacity|at max testing agents|At max total agents'), 'feature_start': re.compile(r'Starting feature \d+/\d+: #(\d+) - (.+)'), - 'coding_spawn': re.compile(r'Started coding agent for feature #(\d+)'), + 'coding_spawn': re.compile(r'Started coding agent for features? #(\d+)'), 'testing_spawn': re.compile(r'Started testing agent for feature #(\d+)'), - 'coding_complete': re.compile(r'Feature #(\d+) (completed|failed)'), + 'coding_complete': re.compile(r'Features? #(\d+)(?:,\s*#\d+)* (completed|failed)'), 'testing_complete': re.compile(r'Feature #(\d+) testing (completed|failed)'), 'all_complete': re.compile(r'All features complete'), 'blocked_features': re.compile(r'(\d+) blocked by dependencies'), @@ -96,7 +104,17 @@ class AgentTracker: # Check for orchestrator status messages first # These don't have [Feature #X] prefix - # Coding agent start: "Started coding agent for feature #X" + # Batch coding agent start: "Started coding agent for features #5, #8, #12" + batch_start_match = BATCH_CODING_AGENT_START_PATTERN.match(line) + if batch_start_match: + try: + feature_ids = [int(x.strip().lstrip('#')) for x in batch_start_match.group(1).split(',')] + if feature_ids: + return await self._handle_batch_agent_start(feature_ids, "coding") + except ValueError: + pass + + # Single coding agent start: "Started coding agent for feature #X" if line.startswith("Started coding agent for feature #"): m = re.search(r'#(\d+)', line) if m: @@ -119,6 +137,17 @@ class AgentTracker: is_success = testing_complete_match.group(2) == "completed" return await self._handle_agent_complete(feature_id, is_success, agent_type="testing") + # Batch features complete: "Features #5, #8, #12 completed/failed" + batch_complete_match = BATCH_FEATURES_COMPLETE_PATTERN.match(line) + if batch_complete_match: + try: + feature_ids = [int(x.strip().lstrip('#')) for x in batch_complete_match.group(1).split(',')] + is_success = batch_complete_match.group(2) == "completed" + if feature_ids: + return await self._handle_batch_agent_complete(feature_ids, is_success, "coding") + except ValueError: + pass + # Coding agent complete: "Feature #X completed/failed" (without "testing" keyword) if line.startswith("Feature #") and ("completed" in line or "failed" in line) and "testing" not in line: m = re.search(r'#(\d+)', line) @@ -158,6 +187,7 @@ class AgentTracker: 'name': AGENT_MASCOTS[agent_index % len(AGENT_MASCOTS)], 'agent_index': agent_index, 'agent_type': 'coding', + 'feature_ids': [feature_id], 'state': 'thinking', 'feature_name': f'Feature #{feature_id}', 'last_thought': None, @@ -165,6 +195,10 @@ class AgentTracker: agent = self.active_agents[key] + # Update current_feature_id for batch agents when output comes from a different feature + if 'current_feature_id' in agent and feature_id in agent.get('feature_ids', []): + agent['current_feature_id'] = feature_id + # Detect state and thought from content state = 'working' thought = None @@ -188,6 +222,7 @@ class AgentTracker: 'agentName': agent['name'], 'agentType': agent['agent_type'], 'featureId': feature_id, + 'featureIds': agent.get('feature_ids', [feature_id]), 'featureName': agent['feature_name'], 'state': state, 'thought': thought, @@ -244,6 +279,7 @@ class AgentTracker: 'name': AGENT_MASCOTS[agent_index % len(AGENT_MASCOTS)], 'agent_index': agent_index, 'agent_type': agent_type, + 'feature_ids': [feature_id], 'state': 'thinking', 'feature_name': feature_name, 'last_thought': 'Starting work...', @@ -255,12 +291,55 @@ class AgentTracker: 'agentName': AGENT_MASCOTS[agent_index % len(AGENT_MASCOTS)], 'agentType': agent_type, 'featureId': feature_id, + 'featureIds': [feature_id], 'featureName': feature_name, 'state': 'thinking', 'thought': 'Starting work...', 'timestamp': datetime.now().isoformat(), } + async def _handle_batch_agent_start(self, feature_ids: list[int], agent_type: str = "coding") -> dict | None: + """Handle batch agent start message from orchestrator.""" + if not feature_ids: + return None + primary_id = feature_ids[0] + async with self._lock: + key = (primary_id, agent_type) + agent_index = self._next_agent_index + self._next_agent_index += 1 + + feature_name = f'Features {", ".join(f"#{fid}" for fid in feature_ids)}' + + self.active_agents[key] = { + 'name': AGENT_MASCOTS[agent_index % len(AGENT_MASCOTS)], + 'agent_index': agent_index, + 'agent_type': agent_type, + 'feature_ids': list(feature_ids), + 'current_feature_id': primary_id, + 'state': 'thinking', + 'feature_name': feature_name, + 'last_thought': 'Starting batch work...', + } + + # Register all feature IDs so output lines can find this agent + for fid in feature_ids: + secondary_key = (fid, agent_type) + if secondary_key != key: + self.active_agents[secondary_key] = self.active_agents[key] + + return { + 'type': 'agent_update', + 'agentIndex': agent_index, + 'agentName': AGENT_MASCOTS[agent_index % len(AGENT_MASCOTS)], + 'agentType': agent_type, + 'featureId': primary_id, + 'featureIds': list(feature_ids), + 'featureName': feature_name, + 'state': 'thinking', + 'thought': 'Starting batch work...', + 'timestamp': datetime.now().isoformat(), + } + async def _handle_agent_complete(self, feature_id: int, is_success: bool, agent_type: str = "coding") -> dict | None: """Handle agent completion - ALWAYS emits a message, even if agent wasn't tracked. @@ -282,6 +361,7 @@ class AgentTracker: 'agentName': agent['name'], 'agentType': agent.get('agent_type', agent_type), 'featureId': feature_id, + 'featureIds': agent.get('feature_ids', [feature_id]), 'featureName': agent['feature_name'], 'state': state, 'thought': 'Completed successfully!' if is_success else 'Failed to complete', @@ -298,6 +378,7 @@ class AgentTracker: 'agentName': 'Unknown', 'agentType': agent_type, 'featureId': feature_id, + 'featureIds': [feature_id], 'featureName': f'Feature #{feature_id}', 'state': state, 'thought': 'Completed successfully!' if is_success else 'Failed to complete', @@ -305,6 +386,49 @@ class AgentTracker: 'synthetic': True, } + async def _handle_batch_agent_complete(self, feature_ids: list[int], is_success: bool, agent_type: str = "coding") -> dict | None: + """Handle batch agent completion.""" + if not feature_ids: + return None + primary_id = feature_ids[0] + async with self._lock: + state = 'success' if is_success else 'error' + key = (primary_id, agent_type) + + if key in self.active_agents: + agent = self.active_agents[key] + result = { + 'type': 'agent_update', + 'agentIndex': agent['agent_index'], + 'agentName': agent['name'], + 'agentType': agent.get('agent_type', agent_type), + 'featureId': primary_id, + 'featureIds': agent.get('feature_ids', list(feature_ids)), + 'featureName': agent['feature_name'], + 'state': state, + 'thought': 'Batch completed successfully!' if is_success else 'Batch failed to complete', + 'timestamp': datetime.now().isoformat(), + } + # Clean up all keys for this batch + for fid in feature_ids: + self.active_agents.pop((fid, agent_type), None) + return result + else: + # Synthetic completion + return { + 'type': 'agent_update', + 'agentIndex': -1, + 'agentName': 'Unknown', + 'agentType': agent_type, + 'featureId': primary_id, + 'featureIds': list(feature_ids), + 'featureName': f'Features {", ".join(f"#{fid}" for fid in feature_ids)}', + 'state': state, + 'thought': 'Batch completed successfully!' if is_success else 'Batch failed to complete', + 'timestamp': datetime.now().isoformat(), + 'synthetic': True, + } + class OrchestratorTracker: """Tracks orchestrator state for Mission Control observability. diff --git a/ui/src/components/AgentCard.tsx b/ui/src/components/AgentCard.tsx index 9fdff64..faa10db 100644 --- a/ui/src/components/AgentCard.tsx +++ b/ui/src/components/AgentCard.tsx @@ -112,12 +112,25 @@ export function AgentCard({ agent, onShowLogs }: AgentCardProps) { {/* Feature info */}
-
- Feature #{agent.featureId} -
-
- {agent.featureName} -
+ {agent.featureIds && agent.featureIds.length > 1 ? ( + <> +
+ Batch: {agent.featureIds.map(id => `#${id}`).join(', ')} +
+
+ Active: Feature #{agent.featureId} +
+ + ) : ( + <> +
+ Feature #{agent.featureId} +
+
+ {agent.featureName} +
+ + )}
{/* Thought bubble */} @@ -195,7 +208,10 @@ export function AgentLogModal({ agent, logs, onClose }: AgentLogModalProps) {

- Feature #{agent.featureId}: {agent.featureName} + {agent.featureIds && agent.featureIds.length > 1 + ? `Batch: ${agent.featureIds.map(id => `#${id}`).join(', ')}` + : `Feature #${agent.featureId}: ${agent.featureName}` + }

diff --git a/ui/src/components/DependencyGraph.tsx b/ui/src/components/DependencyGraph.tsx index 3061548..4151c39 100644 --- a/ui/src/components/DependencyGraph.tsx +++ b/ui/src/components/DependencyGraph.tsx @@ -227,10 +227,14 @@ function DependencyGraphInner({ graphData, onNodeClick, activeAgents = [] }: Dep }, []) // Create a map of featureId to agent info for quick lookup + // Maps ALL batch feature IDs to the same agent const agentByFeatureId = useMemo(() => { const map = new Map() for (const agent of activeAgents) { - map.set(agent.featureId, { name: agent.agentName, state: agent.state }) + const ids = agent.featureIds || [agent.featureId] + for (const fid of ids) { + map.set(fid, { name: agent.agentName, state: agent.state }) + } } return map }, [activeAgents]) diff --git a/ui/src/components/KanbanColumn.tsx b/ui/src/components/KanbanColumn.tsx index 9ab8902..1c39f30 100644 --- a/ui/src/components/KanbanColumn.tsx +++ b/ui/src/components/KanbanColumn.tsx @@ -41,9 +41,14 @@ export function KanbanColumn({ showCreateSpec, }: KanbanColumnProps) { // Create a map of feature ID to active agent for quick lookup - const agentByFeatureId = new Map( - activeAgents.map(agent => [agent.featureId, agent]) - ) + // Maps ALL batch feature IDs to the same agent + const agentByFeatureId = new Map() + for (const agent of activeAgents) { + const ids = agent.featureIds || [agent.featureId] + for (const fid of ids) { + agentByFeatureId.set(fid, agent) + } + } return ( diff --git a/ui/src/components/SettingsModal.tsx b/ui/src/components/SettingsModal.tsx index 03138b7..0246cdd 100644 --- a/ui/src/components/SettingsModal.tsx +++ b/ui/src/components/SettingsModal.tsx @@ -41,6 +41,12 @@ export function SettingsModal({ isOpen, onClose }: SettingsModalProps) { } } + const handleBatchSizeChange = (size: number) => { + if (!updateSettings.isPending) { + updateSettings.mutate({ batch_size: size }) + } + } + const models = modelsData?.models ?? [] const isSaving = updateSettings.isPending @@ -234,6 +240,30 @@ export function SettingsModal({ isOpen, onClose }: SettingsModalProps) { + {/* Features per Agent */} +
+ +

+ Number of features assigned to each coding agent +

+
+ {[1, 2, 3].map((size) => ( + + ))} +
+
+ {/* Update Error */} {updateSettings.isError && ( diff --git a/ui/src/hooks/useProjects.ts b/ui/src/hooks/useProjects.ts index 4f27380..676311c 100644 --- a/ui/src/hooks/useProjects.ts +++ b/ui/src/hooks/useProjects.ts @@ -267,6 +267,7 @@ const DEFAULT_SETTINGS: Settings = { ollama_mode: false, testing_agent_ratio: 1, playwright_headless: true, + batch_size: 3, } export function useAvailableModels() { diff --git a/ui/src/hooks/useWebSocket.ts b/ui/src/hooks/useWebSocket.ts index 18b117e..1a44435 100644 --- a/ui/src/hooks/useWebSocket.ts +++ b/ui/src/hooks/useWebSocket.ts @@ -210,6 +210,7 @@ export function useProjectWebSocket(projectName: string | null) { agentName: message.agentName, agentType: message.agentType || 'coding', // Default to coding for backwards compat featureId: message.featureId, + featureIds: message.featureIds || [message.featureId], featureName: message.featureName, state: message.state, thought: message.thought, @@ -225,6 +226,7 @@ export function useProjectWebSocket(projectName: string | null) { agentName: message.agentName, agentType: message.agentType || 'coding', // Default to coding for backwards compat featureId: message.featureId, + featureIds: message.featureIds || [message.featureId], featureName: message.featureName, state: message.state, thought: message.thought, diff --git a/ui/src/lib/types.ts b/ui/src/lib/types.ts index a90fc52..cec91ec 100644 --- a/ui/src/lib/types.ts +++ b/ui/src/lib/types.ts @@ -199,7 +199,8 @@ export interface ActiveAgent { agentIndex: number // -1 for synthetic completions agentName: AgentMascot | 'Unknown' agentType: AgentType // "coding" or "testing" - featureId: number + featureId: number // Current/primary feature (backward compat) + featureIds: number[] // All features in batch featureName: string state: AgentState thought?: string @@ -270,6 +271,7 @@ export interface WSAgentUpdateMessage { agentName: AgentMascot | 'Unknown' agentType: AgentType // "coding" or "testing" featureId: number + featureIds?: number[] // All features in batch (may be absent for backward compat) featureName: string state: AgentState thought?: string @@ -530,6 +532,7 @@ export interface Settings { ollama_mode: boolean testing_agent_ratio: number // Regression testing agents (0-3) playwright_headless: boolean + batch_size: number // Features per coding agent batch (1-3) } export interface SettingsUpdate { @@ -537,6 +540,7 @@ export interface SettingsUpdate { model?: string testing_agent_ratio?: number playwright_headless?: boolean + batch_size?: number } export interface ProjectSettingsUpdate {