diff --git a/api/database.py b/api/database.py index cb8e7aa..e7bcf46 100644 --- a/api/database.py +++ b/api/database.py @@ -6,12 +6,13 @@ SQLite database schema for feature storage using SQLAlchemy. """ import sys +from datetime import datetime from pathlib import Path from typing import Optional -from sqlalchemy import Boolean, Column, Integer, String, Text, create_engine, text +from sqlalchemy import Boolean, Column, DateTime, ForeignKey, Integer, String, Text, create_engine, text from sqlalchemy.ext.declarative import declarative_base -from sqlalchemy.orm import Session, sessionmaker +from sqlalchemy.orm import Session, relationship, sessionmaker from sqlalchemy.types import JSON Base = declarative_base() @@ -59,6 +60,93 @@ class Feature(Base): return [] +class Schedule(Base): + """Time-based schedule for automated agent start/stop.""" + + __tablename__ = "schedules" + + id = Column(Integer, primary_key=True, index=True) + project_name = Column(String(50), nullable=False, index=True) + + # Timing (stored in UTC) + start_time = Column(String(5), nullable=False) # "HH:MM" format + duration_minutes = Column(Integer, nullable=False) # 1-1440 + + # Day filtering (bitfield: Mon=1, Tue=2, Wed=4, Thu=8, Fri=16, Sat=32, Sun=64) + days_of_week = Column(Integer, nullable=False, default=127) # 127 = all days + + # State + enabled = Column(Boolean, nullable=False, default=True, index=True) + + # Agent configuration for scheduled runs + yolo_mode = Column(Boolean, nullable=False, default=False) + model = Column(String(50), nullable=True) # None = use global default + max_concurrency = Column(Integer, nullable=False, default=3) # 1-5 concurrent agents + + # Crash recovery tracking + crash_count = Column(Integer, nullable=False, default=0) # Resets at window start + + # Metadata + created_at = Column(DateTime, nullable=False, default=datetime.utcnow) + + # Relationships + overrides = relationship( + "ScheduleOverride", back_populates="schedule", cascade="all, delete-orphan" + ) + + def to_dict(self) -> dict: + """Convert schedule to dictionary for JSON serialization.""" + return { + "id": self.id, + "project_name": self.project_name, + "start_time": self.start_time, + "duration_minutes": self.duration_minutes, + "days_of_week": self.days_of_week, + "enabled": self.enabled, + "yolo_mode": self.yolo_mode, + "model": self.model, + "max_concurrency": self.max_concurrency, + "crash_count": self.crash_count, + "created_at": self.created_at.isoformat() if self.created_at else None, + } + + def is_active_on_day(self, weekday: int) -> bool: + """Check if schedule is active on given weekday (0=Monday, 6=Sunday).""" + day_bit = 1 << weekday + return bool(self.days_of_week & day_bit) + + +class ScheduleOverride(Base): + """Persisted manual override for a schedule window.""" + + __tablename__ = "schedule_overrides" + + id = Column(Integer, primary_key=True, index=True) + schedule_id = Column( + Integer, ForeignKey("schedules.id", ondelete="CASCADE"), nullable=False + ) + + # Override details + override_type = Column(String(10), nullable=False) # "start" or "stop" + expires_at = Column(DateTime, nullable=False) # When this window ends (UTC) + + # Metadata + created_at = Column(DateTime, nullable=False, default=datetime.utcnow) + + # Relationships + schedule = relationship("Schedule", back_populates="overrides") + + def to_dict(self) -> dict: + """Convert override to dictionary for JSON serialization.""" + return { + "id": self.id, + "schedule_id": self.schedule_id, + "override_type": self.override_type, + "expires_at": self.expires_at.isoformat() if self.expires_at else None, + "created_at": self.created_at.isoformat() if self.created_at else None, + } + + def get_database_path(project_dir: Path) -> Path: """Return the path to the SQLite database for a project.""" return project_dir / "features.db" @@ -164,6 +252,40 @@ def _is_network_path(path: Path) -> bool: return False +def _migrate_add_schedules_tables(engine) -> None: + """Create schedules and schedule_overrides tables if they don't exist.""" + from sqlalchemy import inspect + + inspector = inspect(engine) + existing_tables = inspector.get_table_names() + + # Create schedules table if missing + if "schedules" not in existing_tables: + Schedule.__table__.create(bind=engine) + + # Create schedule_overrides table if missing + if "schedule_overrides" not in existing_tables: + ScheduleOverride.__table__.create(bind=engine) + + # Add crash_count column if missing (for upgrades) + if "schedules" in existing_tables: + columns = [c["name"] for c in inspector.get_columns("schedules")] + if "crash_count" not in columns: + with engine.connect() as conn: + conn.execute( + text("ALTER TABLE schedules ADD COLUMN crash_count INTEGER DEFAULT 0") + ) + conn.commit() + + # Add max_concurrency column if missing (for upgrades) + if "max_concurrency" not in columns: + with engine.connect() as conn: + conn.execute( + text("ALTER TABLE schedules ADD COLUMN max_concurrency INTEGER DEFAULT 3") + ) + conn.commit() + + def create_database(project_dir: Path) -> tuple: """ Create database and return engine + session maker. @@ -196,6 +318,9 @@ def create_database(project_dir: Path) -> tuple: _migrate_fix_null_boolean_fields(engine) _migrate_add_dependencies_column(engine) + # Migrate to add schedules tables + _migrate_add_schedules_tables(engine) + SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) return engine, SessionLocal diff --git a/requirements.txt b/requirements.txt index 0e260ba..6e32cdb 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,6 +7,7 @@ websockets>=13.0 python-multipart>=0.0.17 psutil>=6.0.0 aiofiles>=24.0.0 +apscheduler>=3.10.0,<4.0.0 pywinpty>=2.0.0; sys_platform == "win32" # Dev dependencies diff --git a/server/main.py b/server/main.py index 2eeeac1..f3d3504 100644 --- a/server/main.py +++ b/server/main.py @@ -35,6 +35,7 @@ from .routers import ( features_router, filesystem_router, projects_router, + schedules_router, settings_router, spec_creation_router, terminal_router, @@ -47,6 +48,7 @@ from .services.dev_server_manager import ( ) from .services.expand_chat_session import cleanup_all_expand_sessions from .services.process_manager import cleanup_all_managers, cleanup_orphaned_locks +from .services.scheduler_service import cleanup_scheduler, get_scheduler from .services.terminal_manager import cleanup_all_terminals from .websocket import project_websocket @@ -61,8 +63,16 @@ async def lifespan(app: FastAPI): # Startup - clean up orphaned lock files from previous runs cleanup_orphaned_locks() cleanup_orphaned_devserver_locks() + + # Start the scheduler service + scheduler = get_scheduler() + await scheduler.start() + yield - # Shutdown - cleanup all running agents, sessions, terminals, and dev servers + + # Shutdown - cleanup scheduler first to stop triggering new starts + await cleanup_scheduler() + # Then cleanup all running agents, sessions, terminals, and dev servers await cleanup_all_managers() await cleanup_assistant_sessions() await cleanup_all_expand_sessions() @@ -116,6 +126,7 @@ async def require_localhost(request: Request, call_next): app.include_router(projects_router) app.include_router(features_router) app.include_router(agent_router) +app.include_router(schedules_router) app.include_router(devserver_router) app.include_router(spec_creation_router) app.include_router(expand_project_router) diff --git a/server/routers/__init__.py b/server/routers/__init__.py index 763247f..f4d02f5 100644 --- a/server/routers/__init__.py +++ b/server/routers/__init__.py @@ -12,6 +12,7 @@ from .expand_project import router as expand_project_router from .features import router as features_router from .filesystem import router as filesystem_router from .projects import router as projects_router +from .schedules import router as schedules_router from .settings import router as settings_router from .spec_creation import router as spec_creation_router from .terminal import router as terminal_router @@ -20,6 +21,7 @@ __all__ = [ "projects_router", "features_router", "agent_router", + "schedules_router", "devserver_router", "spec_creation_router", "expand_project_router", diff --git a/server/routers/agent.py b/server/routers/agent.py index 25871c4..1f54b20 100644 --- a/server/routers/agent.py +++ b/server/routers/agent.py @@ -130,6 +130,13 @@ async def start_agent( count_testing_in_concurrency=count_testing, ) + # Notify scheduler of manual start (to prevent auto-stop during scheduled window) + if success: + from ..services.scheduler_service import get_scheduler + project_dir = _get_project_path(project_name) + if project_dir: + get_scheduler().notify_manual_start(project_name, project_dir) + return AgentActionResponse( success=success, status=manager.status, @@ -144,6 +151,13 @@ async def stop_agent(project_name: str): success, message = await manager.stop() + # Notify scheduler of manual stop (to prevent auto-start during scheduled window) + if success: + from ..services.scheduler_service import get_scheduler + project_dir = _get_project_path(project_name) + if project_dir: + get_scheduler().notify_manual_stop(project_name, project_dir) + return AgentActionResponse( success=success, status=manager.status, diff --git a/server/routers/schedules.py b/server/routers/schedules.py new file mode 100644 index 0000000..6138824 --- /dev/null +++ b/server/routers/schedules.py @@ -0,0 +1,409 @@ +""" +Schedules Router +================ + +API endpoints for managing agent schedules. +Provides CRUD operations for time-based schedule configuration. +""" + +import re +import sys +from datetime import datetime, timedelta, timezone +from pathlib import Path + +from fastapi import APIRouter, HTTPException + +from ..schemas import ( + NextRunResponse, + ScheduleCreate, + ScheduleListResponse, + ScheduleResponse, + ScheduleUpdate, +) + + +def _get_project_path(project_name: str) -> Path: + """Get project path from registry.""" + root = Path(__file__).parent.parent.parent + if str(root) not in sys.path: + sys.path.insert(0, str(root)) + + from registry import get_project_path + return get_project_path(project_name) + + +router = APIRouter( + prefix="/api/projects/{project_name}/schedules", + tags=["schedules"] +) + + +def validate_project_name(name: str) -> str: + """Validate and sanitize project name to prevent path traversal.""" + if not re.match(r'^[a-zA-Z0-9_-]{1,50}$', name): + raise HTTPException( + status_code=400, + detail="Invalid project name" + ) + return name + + +def _get_db_session(project_name: str): + """Get database session for a project.""" + from api.database import create_database + + project_name = validate_project_name(project_name) + project_path = _get_project_path(project_name) + + if not project_path: + raise HTTPException( + status_code=404, + detail=f"Project '{project_name}' not found in registry" + ) + + if not project_path.exists(): + raise HTTPException( + status_code=404, + detail=f"Project directory not found: {project_path}" + ) + + _, SessionLocal = create_database(project_path) + return SessionLocal(), project_path + + +@router.get("", response_model=ScheduleListResponse) +async def list_schedules(project_name: str): + """Get all schedules for a project.""" + from api.database import Schedule + + db, _ = _get_db_session(project_name) + + try: + schedules = db.query(Schedule).filter( + Schedule.project_name == project_name + ).order_by(Schedule.start_time).all() + + return ScheduleListResponse( + schedules=[ + ScheduleResponse( + id=s.id, + project_name=s.project_name, + start_time=s.start_time, + duration_minutes=s.duration_minutes, + days_of_week=s.days_of_week, + enabled=s.enabled, + yolo_mode=s.yolo_mode, + model=s.model, + crash_count=s.crash_count, + created_at=s.created_at, + ) + for s in schedules + ] + ) + finally: + db.close() + + +@router.post("", response_model=ScheduleResponse, status_code=201) +async def create_schedule(project_name: str, data: ScheduleCreate): + """Create a new schedule for a project.""" + from api.database import Schedule + + from ..services.scheduler_service import get_scheduler + + db, project_path = _get_db_session(project_name) + + try: + # Create schedule record + schedule = Schedule( + project_name=project_name, + start_time=data.start_time, + duration_minutes=data.duration_minutes, + days_of_week=data.days_of_week, + enabled=data.enabled, + yolo_mode=data.yolo_mode, + model=data.model, + ) + db.add(schedule) + db.commit() + db.refresh(schedule) + + # Register with APScheduler if enabled + if schedule.enabled: + import logging + logger = logging.getLogger(__name__) + + scheduler = get_scheduler() + await scheduler.add_schedule(project_name, schedule, project_path) + logger.info(f"Registered schedule {schedule.id} with APScheduler") + + # Check if we're currently within this schedule's window + # If so, start the agent immediately (cron won't trigger until next occurrence) + now = datetime.now(timezone.utc) + is_within = scheduler._is_within_window(schedule, now) + logger.info(f"Schedule {schedule.id}: is_within_window={is_within}, now={now}, start={schedule.start_time}") + + if is_within: + # Check for manual stop override + from api.database import ScheduleOverride + override = db.query(ScheduleOverride).filter( + ScheduleOverride.schedule_id == schedule.id, + ScheduleOverride.override_type == "stop", + ScheduleOverride.expires_at > now, + ).first() + + logger.info(f"Schedule {schedule.id}: has_override={override is not None}") + + if not override: + # Start agent immediately + logger.info( + f"Schedule {schedule.id} is within active window, starting agent immediately" + ) + try: + await scheduler._start_agent(project_name, project_path, schedule) + logger.info(f"Successfully started agent for schedule {schedule.id}") + except Exception as e: + logger.error(f"Failed to start agent for schedule {schedule.id}: {e}", exc_info=True) + + return ScheduleResponse( + id=schedule.id, + project_name=schedule.project_name, + start_time=schedule.start_time, + duration_minutes=schedule.duration_minutes, + days_of_week=schedule.days_of_week, + enabled=schedule.enabled, + yolo_mode=schedule.yolo_mode, + model=schedule.model, + crash_count=schedule.crash_count, + created_at=schedule.created_at, + ) + + finally: + db.close() + + +@router.get("/next", response_model=NextRunResponse) +async def get_next_scheduled_run(project_name: str): + """Calculate next scheduled run across all enabled schedules.""" + from api.database import Schedule, ScheduleOverride + + from ..services.scheduler_service import get_scheduler + + db, _ = _get_db_session(project_name) + + try: + schedules = db.query(Schedule).filter( + Schedule.project_name == project_name, + Schedule.enabled == True, # noqa: E712 + ).all() + + if not schedules: + return NextRunResponse( + has_schedules=False, + next_start=None, + next_end=None, + is_currently_running=False, + active_schedule_count=0, + ) + + now = datetime.now(timezone.utc) + scheduler = get_scheduler() + + # Find active schedules and calculate next run + active_count = 0 + next_start = None + latest_end = None + + for schedule in schedules: + if scheduler._is_within_window(schedule, now): + # Check for manual stop override + override = db.query(ScheduleOverride).filter( + ScheduleOverride.schedule_id == schedule.id, + ScheduleOverride.override_type == "stop", + ScheduleOverride.expires_at > now, + ).first() + + if not override: + # Schedule is active and not manually stopped + active_count += 1 + # Calculate end time for this window + end_time = _calculate_window_end(schedule, now) + if latest_end is None or end_time > latest_end: + latest_end = end_time + # If override exists, treat schedule as not active + else: + # Calculate next start time + next_schedule_start = _calculate_next_start(schedule, now) + if next_schedule_start and (next_start is None or next_schedule_start < next_start): + next_start = next_schedule_start + + return NextRunResponse( + has_schedules=True, + next_start=next_start if active_count == 0 else None, + next_end=latest_end, + is_currently_running=active_count > 0, + active_schedule_count=active_count, + ) + + finally: + db.close() + + +@router.get("/{schedule_id}", response_model=ScheduleResponse) +async def get_schedule(project_name: str, schedule_id: int): + """Get a single schedule by ID.""" + from api.database import Schedule + + db, _ = _get_db_session(project_name) + + try: + schedule = db.query(Schedule).filter( + Schedule.id == schedule_id, + Schedule.project_name == project_name, + ).first() + + if not schedule: + raise HTTPException(status_code=404, detail="Schedule not found") + + return ScheduleResponse( + id=schedule.id, + project_name=schedule.project_name, + start_time=schedule.start_time, + duration_minutes=schedule.duration_minutes, + days_of_week=schedule.days_of_week, + enabled=schedule.enabled, + yolo_mode=schedule.yolo_mode, + model=schedule.model, + crash_count=schedule.crash_count, + created_at=schedule.created_at, + ) + + finally: + db.close() + + +@router.patch("/{schedule_id}", response_model=ScheduleResponse) +async def update_schedule( + project_name: str, + schedule_id: int, + data: ScheduleUpdate +): + """Update an existing schedule.""" + from api.database import Schedule + + from ..services.scheduler_service import get_scheduler + + db, project_path = _get_db_session(project_name) + + try: + schedule = db.query(Schedule).filter( + Schedule.id == schedule_id, + Schedule.project_name == project_name, + ).first() + + if not schedule: + raise HTTPException(status_code=404, detail="Schedule not found") + + was_enabled = schedule.enabled + + # Update only fields that were explicitly provided + # This allows sending {"model": null} to clear it vs omitting the field entirely + update_data = data.model_dump(exclude_unset=True) + for field, value in update_data.items(): + setattr(schedule, field, value) + + db.commit() + db.refresh(schedule) + + # Update APScheduler jobs + scheduler = get_scheduler() + if schedule.enabled: + # Re-register with updated times + await scheduler.add_schedule(project_name, schedule, project_path) + elif was_enabled: + # Was enabled, now disabled - remove jobs + scheduler.remove_schedule(schedule_id) + + return ScheduleResponse( + id=schedule.id, + project_name=schedule.project_name, + start_time=schedule.start_time, + duration_minutes=schedule.duration_minutes, + days_of_week=schedule.days_of_week, + enabled=schedule.enabled, + yolo_mode=schedule.yolo_mode, + model=schedule.model, + crash_count=schedule.crash_count, + created_at=schedule.created_at, + ) + + finally: + db.close() + + +@router.delete("/{schedule_id}", status_code=204) +async def delete_schedule(project_name: str, schedule_id: int): + """Delete a schedule.""" + from api.database import Schedule + + from ..services.scheduler_service import get_scheduler + + db, _ = _get_db_session(project_name) + + try: + schedule = db.query(Schedule).filter( + Schedule.id == schedule_id, + Schedule.project_name == project_name, + ).first() + + if not schedule: + raise HTTPException(status_code=404, detail="Schedule not found") + + # Remove APScheduler jobs + scheduler = get_scheduler() + scheduler.remove_schedule(schedule_id) + + # Delete from database + db.delete(schedule) + db.commit() + + finally: + db.close() + + +def _calculate_window_end(schedule, now: datetime) -> datetime: + """Calculate when the current window ends.""" + start_hour, start_minute = map(int, schedule.start_time.split(":")) + + # Create start time for today in UTC + window_start = now.replace( + hour=start_hour, minute=start_minute, second=0, microsecond=0 + ) + + # If current time is before start time, the window started yesterday + if now < window_start: + window_start = window_start - timedelta(days=1) + + return window_start + timedelta(minutes=schedule.duration_minutes) + + +def _calculate_next_start(schedule, now: datetime) -> datetime | None: + """Calculate the next start time for a schedule.""" + start_hour, start_minute = map(int, schedule.start_time.split(":")) + + # Create start time for today + candidate = now.replace( + hour=start_hour, minute=start_minute, second=0, microsecond=0 + ) + + # If already past today's start time, check tomorrow + if candidate <= now: + candidate = candidate + timedelta(days=1) + + # Find the next active day + for _ in range(7): + if schedule.is_active_on_day(candidate.weekday()): + return candidate + candidate = candidate + timedelta(days=1) + + return None diff --git a/server/schemas.py b/server/schemas.py index 1140b84..451baaf 100644 --- a/server/schemas.py +++ b/server/schemas.py @@ -472,3 +472,100 @@ class WSDevServerStatusMessage(BaseModel): type: Literal["dev_server_status"] = "dev_server_status" status: Literal["stopped", "running", "crashed"] url: str | None = None + + +# ============================================================================ +# Schedule Schemas +# ============================================================================ + + +class ScheduleCreate(BaseModel): + """Request schema for creating a schedule.""" + start_time: str = Field( + ..., + pattern=r'^([0-1][0-9]|2[0-3]):[0-5][0-9]$', + description="Start time in HH:MM format (local time, will be stored as UTC)" + ) + duration_minutes: int = Field( + ..., + ge=1, + le=1440, + description="Duration in minutes (1-1440)" + ) + days_of_week: int = Field( + default=127, + ge=0, + le=127, + description="Bitfield: Mon=1, Tue=2, Wed=4, Thu=8, Fri=16, Sat=32, Sun=64" + ) + enabled: bool = True + yolo_mode: bool = False + model: str | None = None + max_concurrency: int = Field( + default=3, + ge=1, + le=5, + description="Max concurrent agents (1-5)" + ) + + @field_validator('model') + @classmethod + def validate_model(cls, v: str | None) -> str | None: + """Validate model is in the allowed list.""" + if v is not None and v not in VALID_MODELS: + raise ValueError(f"Invalid model. Must be one of: {VALID_MODELS}") + return v + + +class ScheduleUpdate(BaseModel): + """Request schema for updating a schedule (partial updates allowed).""" + start_time: str | None = Field( + None, + pattern=r'^([0-1][0-9]|2[0-3]):[0-5][0-9]$' + ) + duration_minutes: int | None = Field(None, ge=1, le=1440) + days_of_week: int | None = Field(None, ge=0, le=127) + enabled: bool | None = None + yolo_mode: bool | None = None + model: str | None = None + max_concurrency: int | None = Field(None, ge=1, le=5) + + @field_validator('model') + @classmethod + def validate_model(cls, v: str | None) -> str | None: + """Validate model is in the allowed list.""" + if v is not None and v not in VALID_MODELS: + raise ValueError(f"Invalid model. Must be one of: {VALID_MODELS}") + return v + + +class ScheduleResponse(BaseModel): + """Response schema for a schedule.""" + id: int + project_name: str + start_time: str # UTC, frontend converts to local + duration_minutes: int + days_of_week: int + enabled: bool + yolo_mode: bool + model: str | None + max_concurrency: int + crash_count: int + created_at: datetime + + class Config: + from_attributes = True + + +class ScheduleListResponse(BaseModel): + """Response containing list of schedules.""" + schedules: list[ScheduleResponse] + + +class NextRunResponse(BaseModel): + """Response for next scheduled run calculation.""" + has_schedules: bool + next_start: datetime | None # UTC + next_end: datetime | None # UTC (latest end if overlapping) + is_currently_running: bool + active_schedule_count: int diff --git a/server/services/scheduler_service.py b/server/services/scheduler_service.py new file mode 100644 index 0000000..d6fc1b6 --- /dev/null +++ b/server/services/scheduler_service.py @@ -0,0 +1,665 @@ +""" +Agent Scheduler Service +======================= + +APScheduler-based service for automated agent scheduling. +Manages time-based start/stop of agents with crash recovery and manual override tracking. +""" + +import asyncio +import logging +import sys +from datetime import datetime, timedelta, timezone +from pathlib import Path +from typing import Optional + +from apscheduler.schedulers.asyncio import AsyncIOScheduler +from apscheduler.triggers.cron import CronTrigger + +# Add parent directory for imports +sys.path.insert(0, str(Path(__file__).parent.parent.parent)) + +logger = logging.getLogger(__name__) + +# Constants +MAX_CRASH_RETRIES = 3 +CRASH_BACKOFF_BASE = 10 # seconds + + +class SchedulerService: + """ + APScheduler-based service for automated agent scheduling. + + Creates two jobs per schedule: + 1. Start job - triggers at start_time on configured days + 2. Stop job - triggers at start_time + duration on configured days + + Handles: + - Manual override tracking (persisted to DB) + - Crash recovery with exponential backoff + - Overlapping schedules (latest stop wins) + - Server restart recovery + """ + + def __init__(self): + from datetime import timezone as dt_timezone + + # CRITICAL: Use UTC timezone since all schedule times are stored in UTC + self.scheduler = AsyncIOScheduler(timezone=dt_timezone.utc) + self._started = False + + async def start(self): + """Start the scheduler and load all existing schedules.""" + if self._started: + return + + self.scheduler.start() + self._started = True + logger.info("Scheduler service started") + + # Check for active schedule windows on startup + await self._check_missed_windows_on_startup() + + # Load all schedules from registered projects + await self._load_all_schedules() + + async def stop(self): + """Shutdown the scheduler gracefully.""" + if not self._started: + return + + self.scheduler.shutdown(wait=False) + self._started = False + logger.info("Scheduler service stopped") + + async def _load_all_schedules(self): + """Load schedules for all registered projects.""" + from registry import list_registered_projects + + try: + projects = list_registered_projects() + total_loaded = 0 + for project_name, info in projects.items(): + project_path = Path(info.get("path", "")) + if project_path.exists(): + count = await self._load_project_schedules(project_name, project_path) + total_loaded += count + if total_loaded > 0: + logger.info(f"Loaded {total_loaded} schedule(s) across all projects") + except Exception as e: + logger.error(f"Error loading schedules: {e}") + + async def _load_project_schedules(self, project_name: str, project_dir: Path) -> int: + """Load schedules for a single project. Returns count of schedules loaded.""" + from api.database import Schedule, create_database + + db_path = project_dir / "features.db" + if not db_path.exists(): + return 0 + + try: + _, SessionLocal = create_database(project_dir) + db = SessionLocal() + try: + schedules = db.query(Schedule).filter( + Schedule.project_name == project_name, + Schedule.enabled == True, # noqa: E712 + ).all() + + for schedule in schedules: + await self.add_schedule(project_name, schedule, project_dir) + + if schedules: + logger.info(f"Loaded {len(schedules)} schedule(s) for project '{project_name}'") + return len(schedules) + finally: + db.close() + except Exception as e: + logger.error(f"Error loading schedules for {project_name}: {e}") + return 0 + + async def add_schedule(self, project_name: str, schedule, project_dir: Path): + """Create APScheduler jobs for a schedule.""" + try: + # Convert days bitfield to cron day_of_week string + days = self._bitfield_to_cron_days(schedule.days_of_week) + + # Parse start time + hour, minute = map(int, schedule.start_time.split(":")) + + # Calculate end time + start_dt = datetime.strptime(schedule.start_time, "%H:%M") + end_dt = start_dt + timedelta(minutes=schedule.duration_minutes) + + # Detect midnight crossing + crosses_midnight = end_dt.date() != start_dt.date() + + # Handle midnight wraparound for end time + end_hour = end_dt.hour + end_minute = end_dt.minute + + # Start job - CRITICAL: timezone=timezone.utc is required for correct UTC scheduling + start_job_id = f"schedule_{schedule.id}_start" + start_trigger = CronTrigger(hour=hour, minute=minute, day_of_week=days, timezone=timezone.utc) + self.scheduler.add_job( + self._handle_scheduled_start, + start_trigger, + id=start_job_id, + args=[project_name, schedule.id, str(project_dir)], + replace_existing=True, + misfire_grace_time=300, # 5 minutes grace period + ) + + # Stop job - CRITICAL: timezone=timezone.utc is required for correct UTC scheduling + # If schedule crosses midnight, shift days forward so stop occurs on next day + stop_job_id = f"schedule_{schedule.id}_stop" + if crosses_midnight: + shifted_bitfield = self._shift_days_forward(schedule.days_of_week) + stop_days = self._bitfield_to_cron_days(shifted_bitfield) + else: + stop_days = days + + stop_trigger = CronTrigger(hour=end_hour, minute=end_minute, day_of_week=stop_days, timezone=timezone.utc) + self.scheduler.add_job( + self._handle_scheduled_stop, + stop_trigger, + id=stop_job_id, + args=[project_name, schedule.id, str(project_dir)], + replace_existing=True, + misfire_grace_time=300, + ) + + # Log next run times for monitoring + start_job = self.scheduler.get_job(start_job_id) + stop_job = self.scheduler.get_job(stop_job_id) + logger.info( + f"Registered schedule {schedule.id} for {project_name}: " + f"start at {hour:02d}:{minute:02d} UTC (next: {start_job.next_run_time}), " + f"stop at {end_hour:02d}:{end_minute:02d} UTC (next: {stop_job.next_run_time})" + ) + + except Exception as e: + logger.error(f"Error adding schedule {schedule.id}: {e}") + + def remove_schedule(self, schedule_id: int): + """Remove APScheduler jobs for a schedule.""" + start_job_id = f"schedule_{schedule_id}_start" + stop_job_id = f"schedule_{schedule_id}_stop" + + removed = [] + try: + self.scheduler.remove_job(start_job_id) + removed.append("start") + except Exception: + pass + + try: + self.scheduler.remove_job(stop_job_id) + removed.append("stop") + except Exception: + pass + + if removed: + logger.info(f"Removed schedule {schedule_id} jobs: {', '.join(removed)}") + else: + logger.warning(f"No jobs found to remove for schedule {schedule_id}") + + async def _handle_scheduled_start( + self, project_name: str, schedule_id: int, project_dir_str: str + ): + """Handle scheduled agent start.""" + logger.info(f"Scheduled start triggered for {project_name} (schedule {schedule_id})") + project_dir = Path(project_dir_str) + + try: + from api.database import Schedule, ScheduleOverride, create_database + + _, SessionLocal = create_database(project_dir) + db = SessionLocal() + + try: + schedule = db.query(Schedule).filter(Schedule.id == schedule_id).first() + if not schedule or not schedule.enabled: + return + + # Check for manual stop override + now = datetime.now(timezone.utc) + override = db.query(ScheduleOverride).filter( + ScheduleOverride.schedule_id == schedule_id, + ScheduleOverride.override_type == "stop", + ScheduleOverride.expires_at > now, + ).first() + + if override: + logger.info( + f"Skipping scheduled start for {project_name}: " + f"manual stop override active until {override.expires_at}" + ) + return + + # Reset crash count at window start + schedule.crash_count = 0 + db.commit() + + # Start agent + await self._start_agent(project_name, project_dir, schedule) + + finally: + db.close() + + except Exception as e: + logger.error(f"Error in scheduled start for {project_name}: {e}") + + async def _handle_scheduled_stop( + self, project_name: str, schedule_id: int, project_dir_str: str + ): + """Handle scheduled agent stop.""" + logger.info(f"Scheduled stop triggered for {project_name} (schedule {schedule_id})") + project_dir = Path(project_dir_str) + + try: + from api.database import Schedule, ScheduleOverride, create_database + + _, SessionLocal = create_database(project_dir) + db = SessionLocal() + + try: + schedule = db.query(Schedule).filter(Schedule.id == schedule_id).first() + if not schedule: + logger.warning(f"Schedule {schedule_id} not found in database") + return + + # Check if other schedules are still active (latest stop wins) + if self._other_schedules_still_active(db, project_name, schedule_id): + logger.info( + f"Skipping scheduled stop for {project_name}: " + f"other schedules still active (latest stop wins)" + ) + return + + # Clear expired overrides for this schedule + now = datetime.now(timezone.utc) + db.query(ScheduleOverride).filter( + ScheduleOverride.schedule_id == schedule_id, + ScheduleOverride.expires_at <= now, + ).delete() + db.commit() + + # Check for active manual-start overrides that prevent auto-stop + active_start_override = db.query(ScheduleOverride).filter( + ScheduleOverride.schedule_id == schedule_id, + ScheduleOverride.override_type == "start", + ScheduleOverride.expires_at > now, + ).first() + + if active_start_override: + logger.info( + f"Skipping scheduled stop for {project_name}: " + f"active manual-start override (expires {active_start_override.expires_at})" + ) + return + + # Stop agent + await self._stop_agent(project_name, project_dir) + + finally: + db.close() + + except Exception as e: + logger.error(f"Error in scheduled stop for {project_name}: {e}") + + def _other_schedules_still_active( + self, db, project_name: str, ending_schedule_id: int + ) -> bool: + """Check if any other schedule windows are still active.""" + from api.database import Schedule + + now = datetime.now(timezone.utc) + schedules = db.query(Schedule).filter( + Schedule.project_name == project_name, + Schedule.enabled == True, # noqa: E712 + Schedule.id != ending_schedule_id, + ).all() + + for schedule in schedules: + if self._is_within_window(schedule, now): + return True + return False + + def _is_within_window(self, schedule, now: datetime) -> bool: + """Check if current time is within schedule window.""" + # Parse schedule times (keep timezone awareness from now) + start_hour, start_minute = map(int, schedule.start_time.split(":")) + start_time = now.replace(hour=start_hour, minute=start_minute, second=0, microsecond=0) + + # Calculate end time + end_time = start_time + timedelta(minutes=schedule.duration_minutes) + + # Detect midnight crossing + crosses_midnight = end_time < start_time or end_time.date() != start_time.date() + + if crosses_midnight: + # Check today's window (start_time to midnight) OR yesterday's window (midnight to end_time) + # Today: if we're after start_time on the current day + if schedule.is_active_on_day(now.weekday()) and now >= start_time: + return True + + # Yesterday: check if we're before end_time and yesterday was active + yesterday = (now.weekday() - 1) % 7 + if schedule.is_active_on_day(yesterday): + yesterday_start = start_time - timedelta(days=1) + yesterday_end = end_time - timedelta(days=1) + if yesterday_start <= now < yesterday_end: + return True + + return False + else: + # Normal case: doesn't cross midnight + return schedule.is_active_on_day(now.weekday()) and start_time <= now < end_time + + async def _start_agent(self, project_name: str, project_dir: Path, schedule): + """Start the agent for a project.""" + from .process_manager import get_manager + + root_dir = Path(__file__).parent.parent.parent + manager = get_manager(project_name, project_dir, root_dir) + + if manager.status in ("running", "paused"): + logger.info(f"Agent already running for {project_name}, skipping scheduled start") + return + + logger.info( + f"Starting agent for {project_name} " + f"(schedule {schedule.id}, yolo={schedule.yolo_mode}, concurrency={schedule.max_concurrency})" + ) + success, msg = await manager.start( + yolo_mode=schedule.yolo_mode, + model=schedule.model, + max_concurrency=schedule.max_concurrency, + ) + + if success: + logger.info(f"✓ Agent started successfully for {project_name}") + else: + logger.error(f"✗ Failed to start agent for {project_name}: {msg}") + + async def _stop_agent(self, project_name: str, project_dir: Path): + """Stop the agent for a project.""" + from .process_manager import get_manager + + root_dir = Path(__file__).parent.parent.parent + manager = get_manager(project_name, project_dir, root_dir) + + if manager.status not in ("running", "paused"): + logger.info(f"Agent not running for {project_name}, skipping scheduled stop") + return + + logger.info(f"Stopping agent for {project_name} (scheduled)") + success, msg = await manager.stop() + + if success: + logger.info(f"✓ Agent stopped successfully for {project_name}") + else: + logger.error(f"✗ Failed to stop agent for {project_name}: {msg}") + + async def handle_crash_during_window(self, project_name: str, project_dir: Path): + """Called when agent crashes. Attempt restart with backoff.""" + from api.database import Schedule, create_database + + _, SessionLocal = create_database(project_dir) + db = SessionLocal() + + try: + now = datetime.now(timezone.utc) + schedules = db.query(Schedule).filter( + Schedule.project_name == project_name, + Schedule.enabled == True, # noqa: E712 + ).all() + + for schedule in schedules: + if not self._is_within_window(schedule, now): + continue + + if schedule.crash_count >= MAX_CRASH_RETRIES: + logger.warning( + f"Max crash retries ({MAX_CRASH_RETRIES}) reached for " + f"schedule {schedule.id} on {project_name}" + ) + continue + + schedule.crash_count += 1 + db.commit() + + # Exponential backoff: 10s, 30s, 90s + delay = CRASH_BACKOFF_BASE * (3 ** (schedule.crash_count - 1)) + logger.info( + f"Restarting agent for {project_name} in {delay}s " + f"(attempt {schedule.crash_count})" + ) + + await asyncio.sleep(delay) + await self._start_agent(project_name, project_dir, schedule) + return # Only restart once + + finally: + db.close() + + def notify_manual_start(self, project_name: str, project_dir: Path): + """Record manual start to prevent auto-stop.""" + logger.info(f"Manual start detected for {project_name}, creating override to prevent auto-stop") + self._create_override_for_active_schedules(project_name, project_dir, "start") + + def notify_manual_stop(self, project_name: str, project_dir: Path): + """Record manual stop to prevent auto-start.""" + logger.info(f"Manual stop detected for {project_name}, creating override to prevent auto-start") + self._create_override_for_active_schedules(project_name, project_dir, "stop") + + def _create_override_for_active_schedules( + self, project_name: str, project_dir: Path, override_type: str + ): + """Create overrides for all active schedule windows.""" + from api.database import Schedule, ScheduleOverride, create_database + + try: + _, SessionLocal = create_database(project_dir) + db = SessionLocal() + + try: + now = datetime.now(timezone.utc) + schedules = db.query(Schedule).filter( + Schedule.project_name == project_name, + Schedule.enabled == True, # noqa: E712 + ).all() + + overrides_created = 0 + for schedule in schedules: + if not self._is_within_window(schedule, now): + continue + + # Calculate window end time + window_end = self._calculate_window_end(schedule, now) + + # Check if override already exists + existing = db.query(ScheduleOverride).filter( + ScheduleOverride.schedule_id == schedule.id, + ScheduleOverride.override_type == override_type, + ScheduleOverride.expires_at > now, + ).first() + + if existing: + continue + + # Create override + override = ScheduleOverride( + schedule_id=schedule.id, + override_type=override_type, + expires_at=window_end, + ) + db.add(override) + overrides_created += 1 + logger.info( + f"Created '{override_type}' override for schedule {schedule.id} " + f"(expires at {window_end})" + ) + + db.commit() + if overrides_created > 0: + logger.info(f"Created {overrides_created} override(s) for {project_name}") + + finally: + db.close() + + except Exception as e: + logger.error(f"Error creating override for {project_name}: {e}") + + def _calculate_window_end(self, schedule, now: datetime) -> datetime: + """Calculate when the current window ends.""" + start_hour, start_minute = map(int, schedule.start_time.split(":")) + + # Create start time for today + window_start = now.replace( + hour=start_hour, minute=start_minute, second=0, microsecond=0 + ) + + # If current time is before start time, the window started yesterday + if now.replace(tzinfo=None) < window_start.replace(tzinfo=None): + window_start = window_start - timedelta(days=1) + + window_end = window_start + timedelta(minutes=schedule.duration_minutes) + return window_end + + async def _check_missed_windows_on_startup(self): + """Called on server start. Start agents for any active windows.""" + from registry import list_registered_projects + + try: + now = datetime.now(timezone.utc) + projects = list_registered_projects() + + for project_name, info in projects.items(): + project_dir = Path(info.get("path", "")) + if not project_dir.exists(): + continue + + await self._check_project_on_startup(project_name, project_dir, now) + + except Exception as e: + logger.error(f"Error checking missed windows on startup: {e}") + + async def _check_project_on_startup( + self, project_name: str, project_dir: Path, now: datetime + ): + """Check if a project should be started on server startup.""" + from api.database import Schedule, ScheduleOverride, create_database + + db_path = project_dir / "features.db" + if not db_path.exists(): + return + + try: + _, SessionLocal = create_database(project_dir) + db = SessionLocal() + + try: + schedules = db.query(Schedule).filter( + Schedule.project_name == project_name, + Schedule.enabled == True, # noqa: E712 + ).all() + + for schedule in schedules: + if not self._is_within_window(schedule, now): + continue + + # Check for manual stop override + override = db.query(ScheduleOverride).filter( + ScheduleOverride.schedule_id == schedule.id, + ScheduleOverride.override_type == "stop", + ScheduleOverride.expires_at > now, + ).first() + + if override: + logger.info( + f"Skipping startup start for {project_name}: " + f"manual stop override active" + ) + continue + + # Start the agent + logger.info( + f"Starting {project_name} for active schedule {schedule.id} " + f"(server startup)" + ) + await self._start_agent(project_name, project_dir, schedule) + return # Only start once per project + + finally: + db.close() + + except Exception as e: + logger.error(f"Error checking startup for {project_name}: {e}") + + @staticmethod + def _shift_days_forward(bitfield: int) -> int: + """ + Shift the 7-bit day mask forward by one day for midnight-crossing schedules. + + Examples: + Monday (1) -> Tuesday (2) + Sunday (64) -> Monday (1) + Mon+Tue (3) -> Tue+Wed (6) + """ + shifted = 0 + # Shift each day forward, wrapping Sunday to Monday + if bitfield & 1: + shifted |= 2 # Mon -> Tue + if bitfield & 2: + shifted |= 4 # Tue -> Wed + if bitfield & 4: + shifted |= 8 # Wed -> Thu + if bitfield & 8: + shifted |= 16 # Thu -> Fri + if bitfield & 16: + shifted |= 32 # Fri -> Sat + if bitfield & 32: + shifted |= 64 # Sat -> Sun + if bitfield & 64: + shifted |= 1 # Sun -> Mon + return shifted + + @staticmethod + def _bitfield_to_cron_days(bitfield: int) -> str: + """Convert days bitfield to APScheduler cron format.""" + days = [] + day_map = [ + (1, "mon"), + (2, "tue"), + (4, "wed"), + (8, "thu"), + (16, "fri"), + (32, "sat"), + (64, "sun"), + ] + for bit, name in day_map: + if bitfield & bit: + days.append(name) + return ",".join(days) if days else "mon-sun" + + +# Global scheduler instance +_scheduler: Optional[SchedulerService] = None + + +def get_scheduler() -> SchedulerService: + """Get the global scheduler instance.""" + global _scheduler + if _scheduler is None: + _scheduler = SchedulerService() + return _scheduler + + +async def cleanup_scheduler(): + """Clean up scheduler on shutdown.""" + global _scheduler + if _scheduler is not None: + await _scheduler.stop() + _scheduler = None diff --git a/ui/src/components/AgentControl.tsx b/ui/src/components/AgentControl.tsx index 616e709..1348809 100644 --- a/ui/src/components/AgentControl.tsx +++ b/ui/src/components/AgentControl.tsx @@ -1,10 +1,13 @@ import { useState } from 'react' -import { Play, Square, Loader2, GitBranch } from 'lucide-react' +import { Play, Square, Loader2, GitBranch, Clock } from 'lucide-react' import { useStartAgent, useStopAgent, useSettings, } from '../hooks/useProjects' +import { useNextScheduledRun } from '../hooks/useSchedules' +import { formatNextRun, formatEndTime } from '../lib/timeUtils' +import { ScheduleModal } from './ScheduleModal' import type { AgentStatus } from '../lib/types' interface AgentControlProps { @@ -21,6 +24,9 @@ export function AgentControl({ projectName, status }: AgentControlProps) { const startAgent = useStartAgent(projectName) const stopAgent = useStopAgent(projectName) + const { data: nextRun } = useNextScheduledRun(projectName) + + const [showScheduleModal, setShowScheduleModal] = useState(false) const isLoading = startAgent.isPending || stopAgent.isPending const isRunning = status === 'running' || status === 'paused' @@ -40,78 +46,113 @@ export function AgentControl({ projectName, status }: AgentControlProps) { const isStopped = status === 'stopped' || status === 'crashed' return ( -
No schedules configured yet
+