diff --git a/parallel_orchestrator.py b/parallel_orchestrator.py index 10d0923..d31db0b 100644 --- a/parallel_orchestrator.py +++ b/parallel_orchestrator.py @@ -22,6 +22,7 @@ import asyncio import atexit import logging import os +import re import signal import subprocess import sys @@ -1116,6 +1117,11 @@ class ParallelOrchestrator: return True + # Pattern to detect when a batch agent claims a new feature + _CLAIM_FEATURE_PATTERN = re.compile( + r"feature_claim_and_get\b.*?['\"]?feature_id['\"]?\s*[:=]\s*(\d+)" + ) + def _read_output( self, feature_id: int | None, @@ -1124,6 +1130,7 @@ class ParallelOrchestrator: agent_type: Literal["coding", "testing"] = "coding", ): """Read output from subprocess and emit events.""" + current_feature_id = feature_id try: if proc.stdout is None: proc.wait() @@ -1132,11 +1139,17 @@ class ParallelOrchestrator: if abort.is_set(): break line = line.rstrip() + # Detect when a batch agent claims a new feature + claim_match = self._CLAIM_FEATURE_PATTERN.search(line) + if claim_match: + claimed_id = int(claim_match.group(1)) + if claimed_id != current_feature_id: + current_feature_id = claimed_id if self.on_output is not None: - self.on_output(feature_id or 0, line) + self.on_output(current_feature_id or 0, line) else: # Both coding and testing agents now use [Feature #X] format - print(f"[Feature #{feature_id}] {line}", flush=True) + print(f"[Feature #{current_feature_id}] {line}", flush=True) proc.wait() finally: # CRITICAL: Kill the process tree to clean up any child processes (e.g., Claude CLI)