Merge pull request #75 from ipodishima/feature/agent-scheduling

feat: add time-based agent scheduling with APScheduler
This commit is contained in:
Leon van Zyl
2026-01-22 08:24:52 +02:00
committed by GitHub
16 changed files with 2261 additions and 76 deletions

View File

@@ -35,6 +35,7 @@ from .routers import (
features_router,
filesystem_router,
projects_router,
schedules_router,
settings_router,
spec_creation_router,
terminal_router,
@@ -47,6 +48,7 @@ from .services.dev_server_manager import (
)
from .services.expand_chat_session import cleanup_all_expand_sessions
from .services.process_manager import cleanup_all_managers, cleanup_orphaned_locks
from .services.scheduler_service import cleanup_scheduler, get_scheduler
from .services.terminal_manager import cleanup_all_terminals
from .websocket import project_websocket
@@ -61,8 +63,16 @@ async def lifespan(app: FastAPI):
# Startup - clean up orphaned lock files from previous runs
cleanup_orphaned_locks()
cleanup_orphaned_devserver_locks()
# Start the scheduler service
scheduler = get_scheduler()
await scheduler.start()
yield
# Shutdown - cleanup all running agents, sessions, terminals, and dev servers
# Shutdown - cleanup scheduler first to stop triggering new starts
await cleanup_scheduler()
# Then cleanup all running agents, sessions, terminals, and dev servers
await cleanup_all_managers()
await cleanup_assistant_sessions()
await cleanup_all_expand_sessions()
@@ -116,6 +126,7 @@ async def require_localhost(request: Request, call_next):
app.include_router(projects_router)
app.include_router(features_router)
app.include_router(agent_router)
app.include_router(schedules_router)
app.include_router(devserver_router)
app.include_router(spec_creation_router)
app.include_router(expand_project_router)

View File

@@ -12,6 +12,7 @@ from .expand_project import router as expand_project_router
from .features import router as features_router
from .filesystem import router as filesystem_router
from .projects import router as projects_router
from .schedules import router as schedules_router
from .settings import router as settings_router
from .spec_creation import router as spec_creation_router
from .terminal import router as terminal_router
@@ -20,6 +21,7 @@ __all__ = [
"projects_router",
"features_router",
"agent_router",
"schedules_router",
"devserver_router",
"spec_creation_router",
"expand_project_router",

View File

@@ -130,6 +130,13 @@ async def start_agent(
count_testing_in_concurrency=count_testing,
)
# Notify scheduler of manual start (to prevent auto-stop during scheduled window)
if success:
from ..services.scheduler_service import get_scheduler
project_dir = _get_project_path(project_name)
if project_dir:
get_scheduler().notify_manual_start(project_name, project_dir)
return AgentActionResponse(
success=success,
status=manager.status,
@@ -144,6 +151,13 @@ async def stop_agent(project_name: str):
success, message = await manager.stop()
# Notify scheduler of manual stop (to prevent auto-start during scheduled window)
if success:
from ..services.scheduler_service import get_scheduler
project_dir = _get_project_path(project_name)
if project_dir:
get_scheduler().notify_manual_stop(project_name, project_dir)
return AgentActionResponse(
success=success,
status=manager.status,

409
server/routers/schedules.py Normal file
View File

@@ -0,0 +1,409 @@
"""
Schedules Router
================
API endpoints for managing agent schedules.
Provides CRUD operations for time-based schedule configuration.
"""
import re
import sys
from datetime import datetime, timedelta, timezone
from pathlib import Path
from fastapi import APIRouter, HTTPException
from ..schemas import (
NextRunResponse,
ScheduleCreate,
ScheduleListResponse,
ScheduleResponse,
ScheduleUpdate,
)
def _get_project_path(project_name: str) -> Path:
"""Get project path from registry."""
root = Path(__file__).parent.parent.parent
if str(root) not in sys.path:
sys.path.insert(0, str(root))
from registry import get_project_path
return get_project_path(project_name)
router = APIRouter(
prefix="/api/projects/{project_name}/schedules",
tags=["schedules"]
)
def validate_project_name(name: str) -> str:
"""Validate and sanitize project name to prevent path traversal."""
if not re.match(r'^[a-zA-Z0-9_-]{1,50}$', name):
raise HTTPException(
status_code=400,
detail="Invalid project name"
)
return name
def _get_db_session(project_name: str):
"""Get database session for a project."""
from api.database import create_database
project_name = validate_project_name(project_name)
project_path = _get_project_path(project_name)
if not project_path:
raise HTTPException(
status_code=404,
detail=f"Project '{project_name}' not found in registry"
)
if not project_path.exists():
raise HTTPException(
status_code=404,
detail=f"Project directory not found: {project_path}"
)
_, SessionLocal = create_database(project_path)
return SessionLocal(), project_path
@router.get("", response_model=ScheduleListResponse)
async def list_schedules(project_name: str):
"""Get all schedules for a project."""
from api.database import Schedule
db, _ = _get_db_session(project_name)
try:
schedules = db.query(Schedule).filter(
Schedule.project_name == project_name
).order_by(Schedule.start_time).all()
return ScheduleListResponse(
schedules=[
ScheduleResponse(
id=s.id,
project_name=s.project_name,
start_time=s.start_time,
duration_minutes=s.duration_minutes,
days_of_week=s.days_of_week,
enabled=s.enabled,
yolo_mode=s.yolo_mode,
model=s.model,
crash_count=s.crash_count,
created_at=s.created_at,
)
for s in schedules
]
)
finally:
db.close()
@router.post("", response_model=ScheduleResponse, status_code=201)
async def create_schedule(project_name: str, data: ScheduleCreate):
"""Create a new schedule for a project."""
from api.database import Schedule
from ..services.scheduler_service import get_scheduler
db, project_path = _get_db_session(project_name)
try:
# Create schedule record
schedule = Schedule(
project_name=project_name,
start_time=data.start_time,
duration_minutes=data.duration_minutes,
days_of_week=data.days_of_week,
enabled=data.enabled,
yolo_mode=data.yolo_mode,
model=data.model,
)
db.add(schedule)
db.commit()
db.refresh(schedule)
# Register with APScheduler if enabled
if schedule.enabled:
import logging
logger = logging.getLogger(__name__)
scheduler = get_scheduler()
await scheduler.add_schedule(project_name, schedule, project_path)
logger.info(f"Registered schedule {schedule.id} with APScheduler")
# Check if we're currently within this schedule's window
# If so, start the agent immediately (cron won't trigger until next occurrence)
now = datetime.now(timezone.utc)
is_within = scheduler._is_within_window(schedule, now)
logger.info(f"Schedule {schedule.id}: is_within_window={is_within}, now={now}, start={schedule.start_time}")
if is_within:
# Check for manual stop override
from api.database import ScheduleOverride
override = db.query(ScheduleOverride).filter(
ScheduleOverride.schedule_id == schedule.id,
ScheduleOverride.override_type == "stop",
ScheduleOverride.expires_at > now,
).first()
logger.info(f"Schedule {schedule.id}: has_override={override is not None}")
if not override:
# Start agent immediately
logger.info(
f"Schedule {schedule.id} is within active window, starting agent immediately"
)
try:
await scheduler._start_agent(project_name, project_path, schedule)
logger.info(f"Successfully started agent for schedule {schedule.id}")
except Exception as e:
logger.error(f"Failed to start agent for schedule {schedule.id}: {e}", exc_info=True)
return ScheduleResponse(
id=schedule.id,
project_name=schedule.project_name,
start_time=schedule.start_time,
duration_minutes=schedule.duration_minutes,
days_of_week=schedule.days_of_week,
enabled=schedule.enabled,
yolo_mode=schedule.yolo_mode,
model=schedule.model,
crash_count=schedule.crash_count,
created_at=schedule.created_at,
)
finally:
db.close()
@router.get("/next", response_model=NextRunResponse)
async def get_next_scheduled_run(project_name: str):
"""Calculate next scheduled run across all enabled schedules."""
from api.database import Schedule, ScheduleOverride
from ..services.scheduler_service import get_scheduler
db, _ = _get_db_session(project_name)
try:
schedules = db.query(Schedule).filter(
Schedule.project_name == project_name,
Schedule.enabled == True, # noqa: E712
).all()
if not schedules:
return NextRunResponse(
has_schedules=False,
next_start=None,
next_end=None,
is_currently_running=False,
active_schedule_count=0,
)
now = datetime.now(timezone.utc)
scheduler = get_scheduler()
# Find active schedules and calculate next run
active_count = 0
next_start = None
latest_end = None
for schedule in schedules:
if scheduler._is_within_window(schedule, now):
# Check for manual stop override
override = db.query(ScheduleOverride).filter(
ScheduleOverride.schedule_id == schedule.id,
ScheduleOverride.override_type == "stop",
ScheduleOverride.expires_at > now,
).first()
if not override:
# Schedule is active and not manually stopped
active_count += 1
# Calculate end time for this window
end_time = _calculate_window_end(schedule, now)
if latest_end is None or end_time > latest_end:
latest_end = end_time
# If override exists, treat schedule as not active
else:
# Calculate next start time
next_schedule_start = _calculate_next_start(schedule, now)
if next_schedule_start and (next_start is None or next_schedule_start < next_start):
next_start = next_schedule_start
return NextRunResponse(
has_schedules=True,
next_start=next_start if active_count == 0 else None,
next_end=latest_end,
is_currently_running=active_count > 0,
active_schedule_count=active_count,
)
finally:
db.close()
@router.get("/{schedule_id}", response_model=ScheduleResponse)
async def get_schedule(project_name: str, schedule_id: int):
"""Get a single schedule by ID."""
from api.database import Schedule
db, _ = _get_db_session(project_name)
try:
schedule = db.query(Schedule).filter(
Schedule.id == schedule_id,
Schedule.project_name == project_name,
).first()
if not schedule:
raise HTTPException(status_code=404, detail="Schedule not found")
return ScheduleResponse(
id=schedule.id,
project_name=schedule.project_name,
start_time=schedule.start_time,
duration_minutes=schedule.duration_minutes,
days_of_week=schedule.days_of_week,
enabled=schedule.enabled,
yolo_mode=schedule.yolo_mode,
model=schedule.model,
crash_count=schedule.crash_count,
created_at=schedule.created_at,
)
finally:
db.close()
@router.patch("/{schedule_id}", response_model=ScheduleResponse)
async def update_schedule(
project_name: str,
schedule_id: int,
data: ScheduleUpdate
):
"""Update an existing schedule."""
from api.database import Schedule
from ..services.scheduler_service import get_scheduler
db, project_path = _get_db_session(project_name)
try:
schedule = db.query(Schedule).filter(
Schedule.id == schedule_id,
Schedule.project_name == project_name,
).first()
if not schedule:
raise HTTPException(status_code=404, detail="Schedule not found")
was_enabled = schedule.enabled
# Update only fields that were explicitly provided
# This allows sending {"model": null} to clear it vs omitting the field entirely
update_data = data.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(schedule, field, value)
db.commit()
db.refresh(schedule)
# Update APScheduler jobs
scheduler = get_scheduler()
if schedule.enabled:
# Re-register with updated times
await scheduler.add_schedule(project_name, schedule, project_path)
elif was_enabled:
# Was enabled, now disabled - remove jobs
scheduler.remove_schedule(schedule_id)
return ScheduleResponse(
id=schedule.id,
project_name=schedule.project_name,
start_time=schedule.start_time,
duration_minutes=schedule.duration_minutes,
days_of_week=schedule.days_of_week,
enabled=schedule.enabled,
yolo_mode=schedule.yolo_mode,
model=schedule.model,
crash_count=schedule.crash_count,
created_at=schedule.created_at,
)
finally:
db.close()
@router.delete("/{schedule_id}", status_code=204)
async def delete_schedule(project_name: str, schedule_id: int):
"""Delete a schedule."""
from api.database import Schedule
from ..services.scheduler_service import get_scheduler
db, _ = _get_db_session(project_name)
try:
schedule = db.query(Schedule).filter(
Schedule.id == schedule_id,
Schedule.project_name == project_name,
).first()
if not schedule:
raise HTTPException(status_code=404, detail="Schedule not found")
# Remove APScheduler jobs
scheduler = get_scheduler()
scheduler.remove_schedule(schedule_id)
# Delete from database
db.delete(schedule)
db.commit()
finally:
db.close()
def _calculate_window_end(schedule, now: datetime) -> datetime:
"""Calculate when the current window ends."""
start_hour, start_minute = map(int, schedule.start_time.split(":"))
# Create start time for today in UTC
window_start = now.replace(
hour=start_hour, minute=start_minute, second=0, microsecond=0
)
# If current time is before start time, the window started yesterday
if now < window_start:
window_start = window_start - timedelta(days=1)
return window_start + timedelta(minutes=schedule.duration_minutes)
def _calculate_next_start(schedule, now: datetime) -> datetime | None:
"""Calculate the next start time for a schedule."""
start_hour, start_minute = map(int, schedule.start_time.split(":"))
# Create start time for today
candidate = now.replace(
hour=start_hour, minute=start_minute, second=0, microsecond=0
)
# If already past today's start time, check tomorrow
if candidate <= now:
candidate = candidate + timedelta(days=1)
# Find the next active day
for _ in range(7):
if schedule.is_active_on_day(candidate.weekday()):
return candidate
candidate = candidate + timedelta(days=1)
return None

View File

@@ -472,3 +472,100 @@ class WSDevServerStatusMessage(BaseModel):
type: Literal["dev_server_status"] = "dev_server_status"
status: Literal["stopped", "running", "crashed"]
url: str | None = None
# ============================================================================
# Schedule Schemas
# ============================================================================
class ScheduleCreate(BaseModel):
"""Request schema for creating a schedule."""
start_time: str = Field(
...,
pattern=r'^([0-1][0-9]|2[0-3]):[0-5][0-9]$',
description="Start time in HH:MM format (local time, will be stored as UTC)"
)
duration_minutes: int = Field(
...,
ge=1,
le=1440,
description="Duration in minutes (1-1440)"
)
days_of_week: int = Field(
default=127,
ge=0,
le=127,
description="Bitfield: Mon=1, Tue=2, Wed=4, Thu=8, Fri=16, Sat=32, Sun=64"
)
enabled: bool = True
yolo_mode: bool = False
model: str | None = None
max_concurrency: int = Field(
default=3,
ge=1,
le=5,
description="Max concurrent agents (1-5)"
)
@field_validator('model')
@classmethod
def validate_model(cls, v: str | None) -> str | None:
"""Validate model is in the allowed list."""
if v is not None and v not in VALID_MODELS:
raise ValueError(f"Invalid model. Must be one of: {VALID_MODELS}")
return v
class ScheduleUpdate(BaseModel):
"""Request schema for updating a schedule (partial updates allowed)."""
start_time: str | None = Field(
None,
pattern=r'^([0-1][0-9]|2[0-3]):[0-5][0-9]$'
)
duration_minutes: int | None = Field(None, ge=1, le=1440)
days_of_week: int | None = Field(None, ge=0, le=127)
enabled: bool | None = None
yolo_mode: bool | None = None
model: str | None = None
max_concurrency: int | None = Field(None, ge=1, le=5)
@field_validator('model')
@classmethod
def validate_model(cls, v: str | None) -> str | None:
"""Validate model is in the allowed list."""
if v is not None and v not in VALID_MODELS:
raise ValueError(f"Invalid model. Must be one of: {VALID_MODELS}")
return v
class ScheduleResponse(BaseModel):
"""Response schema for a schedule."""
id: int
project_name: str
start_time: str # UTC, frontend converts to local
duration_minutes: int
days_of_week: int
enabled: bool
yolo_mode: bool
model: str | None
max_concurrency: int
crash_count: int
created_at: datetime
class Config:
from_attributes = True
class ScheduleListResponse(BaseModel):
"""Response containing list of schedules."""
schedules: list[ScheduleResponse]
class NextRunResponse(BaseModel):
"""Response for next scheduled run calculation."""
has_schedules: bool
next_start: datetime | None # UTC
next_end: datetime | None # UTC (latest end if overlapping)
is_currently_running: bool
active_schedule_count: int

View File

@@ -0,0 +1,665 @@
"""
Agent Scheduler Service
=======================
APScheduler-based service for automated agent scheduling.
Manages time-based start/stop of agents with crash recovery and manual override tracking.
"""
import asyncio
import logging
import sys
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Optional
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
# Add parent directory for imports
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
logger = logging.getLogger(__name__)
# Constants
MAX_CRASH_RETRIES = 3
CRASH_BACKOFF_BASE = 10 # seconds
class SchedulerService:
"""
APScheduler-based service for automated agent scheduling.
Creates two jobs per schedule:
1. Start job - triggers at start_time on configured days
2. Stop job - triggers at start_time + duration on configured days
Handles:
- Manual override tracking (persisted to DB)
- Crash recovery with exponential backoff
- Overlapping schedules (latest stop wins)
- Server restart recovery
"""
def __init__(self):
from datetime import timezone as dt_timezone
# CRITICAL: Use UTC timezone since all schedule times are stored in UTC
self.scheduler = AsyncIOScheduler(timezone=dt_timezone.utc)
self._started = False
async def start(self):
"""Start the scheduler and load all existing schedules."""
if self._started:
return
self.scheduler.start()
self._started = True
logger.info("Scheduler service started")
# Check for active schedule windows on startup
await self._check_missed_windows_on_startup()
# Load all schedules from registered projects
await self._load_all_schedules()
async def stop(self):
"""Shutdown the scheduler gracefully."""
if not self._started:
return
self.scheduler.shutdown(wait=False)
self._started = False
logger.info("Scheduler service stopped")
async def _load_all_schedules(self):
"""Load schedules for all registered projects."""
from registry import list_registered_projects
try:
projects = list_registered_projects()
total_loaded = 0
for project_name, info in projects.items():
project_path = Path(info.get("path", ""))
if project_path.exists():
count = await self._load_project_schedules(project_name, project_path)
total_loaded += count
if total_loaded > 0:
logger.info(f"Loaded {total_loaded} schedule(s) across all projects")
except Exception as e:
logger.error(f"Error loading schedules: {e}")
async def _load_project_schedules(self, project_name: str, project_dir: Path) -> int:
"""Load schedules for a single project. Returns count of schedules loaded."""
from api.database import Schedule, create_database
db_path = project_dir / "features.db"
if not db_path.exists():
return 0
try:
_, SessionLocal = create_database(project_dir)
db = SessionLocal()
try:
schedules = db.query(Schedule).filter(
Schedule.project_name == project_name,
Schedule.enabled == True, # noqa: E712
).all()
for schedule in schedules:
await self.add_schedule(project_name, schedule, project_dir)
if schedules:
logger.info(f"Loaded {len(schedules)} schedule(s) for project '{project_name}'")
return len(schedules)
finally:
db.close()
except Exception as e:
logger.error(f"Error loading schedules for {project_name}: {e}")
return 0
async def add_schedule(self, project_name: str, schedule, project_dir: Path):
"""Create APScheduler jobs for a schedule."""
try:
# Convert days bitfield to cron day_of_week string
days = self._bitfield_to_cron_days(schedule.days_of_week)
# Parse start time
hour, minute = map(int, schedule.start_time.split(":"))
# Calculate end time
start_dt = datetime.strptime(schedule.start_time, "%H:%M")
end_dt = start_dt + timedelta(minutes=schedule.duration_minutes)
# Detect midnight crossing
crosses_midnight = end_dt.date() != start_dt.date()
# Handle midnight wraparound for end time
end_hour = end_dt.hour
end_minute = end_dt.minute
# Start job - CRITICAL: timezone=timezone.utc is required for correct UTC scheduling
start_job_id = f"schedule_{schedule.id}_start"
start_trigger = CronTrigger(hour=hour, minute=minute, day_of_week=days, timezone=timezone.utc)
self.scheduler.add_job(
self._handle_scheduled_start,
start_trigger,
id=start_job_id,
args=[project_name, schedule.id, str(project_dir)],
replace_existing=True,
misfire_grace_time=300, # 5 minutes grace period
)
# Stop job - CRITICAL: timezone=timezone.utc is required for correct UTC scheduling
# If schedule crosses midnight, shift days forward so stop occurs on next day
stop_job_id = f"schedule_{schedule.id}_stop"
if crosses_midnight:
shifted_bitfield = self._shift_days_forward(schedule.days_of_week)
stop_days = self._bitfield_to_cron_days(shifted_bitfield)
else:
stop_days = days
stop_trigger = CronTrigger(hour=end_hour, minute=end_minute, day_of_week=stop_days, timezone=timezone.utc)
self.scheduler.add_job(
self._handle_scheduled_stop,
stop_trigger,
id=stop_job_id,
args=[project_name, schedule.id, str(project_dir)],
replace_existing=True,
misfire_grace_time=300,
)
# Log next run times for monitoring
start_job = self.scheduler.get_job(start_job_id)
stop_job = self.scheduler.get_job(stop_job_id)
logger.info(
f"Registered schedule {schedule.id} for {project_name}: "
f"start at {hour:02d}:{minute:02d} UTC (next: {start_job.next_run_time}), "
f"stop at {end_hour:02d}:{end_minute:02d} UTC (next: {stop_job.next_run_time})"
)
except Exception as e:
logger.error(f"Error adding schedule {schedule.id}: {e}")
def remove_schedule(self, schedule_id: int):
"""Remove APScheduler jobs for a schedule."""
start_job_id = f"schedule_{schedule_id}_start"
stop_job_id = f"schedule_{schedule_id}_stop"
removed = []
try:
self.scheduler.remove_job(start_job_id)
removed.append("start")
except Exception:
pass
try:
self.scheduler.remove_job(stop_job_id)
removed.append("stop")
except Exception:
pass
if removed:
logger.info(f"Removed schedule {schedule_id} jobs: {', '.join(removed)}")
else:
logger.warning(f"No jobs found to remove for schedule {schedule_id}")
async def _handle_scheduled_start(
self, project_name: str, schedule_id: int, project_dir_str: str
):
"""Handle scheduled agent start."""
logger.info(f"Scheduled start triggered for {project_name} (schedule {schedule_id})")
project_dir = Path(project_dir_str)
try:
from api.database import Schedule, ScheduleOverride, create_database
_, SessionLocal = create_database(project_dir)
db = SessionLocal()
try:
schedule = db.query(Schedule).filter(Schedule.id == schedule_id).first()
if not schedule or not schedule.enabled:
return
# Check for manual stop override
now = datetime.now(timezone.utc)
override = db.query(ScheduleOverride).filter(
ScheduleOverride.schedule_id == schedule_id,
ScheduleOverride.override_type == "stop",
ScheduleOverride.expires_at > now,
).first()
if override:
logger.info(
f"Skipping scheduled start for {project_name}: "
f"manual stop override active until {override.expires_at}"
)
return
# Reset crash count at window start
schedule.crash_count = 0
db.commit()
# Start agent
await self._start_agent(project_name, project_dir, schedule)
finally:
db.close()
except Exception as e:
logger.error(f"Error in scheduled start for {project_name}: {e}")
async def _handle_scheduled_stop(
self, project_name: str, schedule_id: int, project_dir_str: str
):
"""Handle scheduled agent stop."""
logger.info(f"Scheduled stop triggered for {project_name} (schedule {schedule_id})")
project_dir = Path(project_dir_str)
try:
from api.database import Schedule, ScheduleOverride, create_database
_, SessionLocal = create_database(project_dir)
db = SessionLocal()
try:
schedule = db.query(Schedule).filter(Schedule.id == schedule_id).first()
if not schedule:
logger.warning(f"Schedule {schedule_id} not found in database")
return
# Check if other schedules are still active (latest stop wins)
if self._other_schedules_still_active(db, project_name, schedule_id):
logger.info(
f"Skipping scheduled stop for {project_name}: "
f"other schedules still active (latest stop wins)"
)
return
# Clear expired overrides for this schedule
now = datetime.now(timezone.utc)
db.query(ScheduleOverride).filter(
ScheduleOverride.schedule_id == schedule_id,
ScheduleOverride.expires_at <= now,
).delete()
db.commit()
# Check for active manual-start overrides that prevent auto-stop
active_start_override = db.query(ScheduleOverride).filter(
ScheduleOverride.schedule_id == schedule_id,
ScheduleOverride.override_type == "start",
ScheduleOverride.expires_at > now,
).first()
if active_start_override:
logger.info(
f"Skipping scheduled stop for {project_name}: "
f"active manual-start override (expires {active_start_override.expires_at})"
)
return
# Stop agent
await self._stop_agent(project_name, project_dir)
finally:
db.close()
except Exception as e:
logger.error(f"Error in scheduled stop for {project_name}: {e}")
def _other_schedules_still_active(
self, db, project_name: str, ending_schedule_id: int
) -> bool:
"""Check if any other schedule windows are still active."""
from api.database import Schedule
now = datetime.now(timezone.utc)
schedules = db.query(Schedule).filter(
Schedule.project_name == project_name,
Schedule.enabled == True, # noqa: E712
Schedule.id != ending_schedule_id,
).all()
for schedule in schedules:
if self._is_within_window(schedule, now):
return True
return False
def _is_within_window(self, schedule, now: datetime) -> bool:
"""Check if current time is within schedule window."""
# Parse schedule times (keep timezone awareness from now)
start_hour, start_minute = map(int, schedule.start_time.split(":"))
start_time = now.replace(hour=start_hour, minute=start_minute, second=0, microsecond=0)
# Calculate end time
end_time = start_time + timedelta(minutes=schedule.duration_minutes)
# Detect midnight crossing
crosses_midnight = end_time < start_time or end_time.date() != start_time.date()
if crosses_midnight:
# Check today's window (start_time to midnight) OR yesterday's window (midnight to end_time)
# Today: if we're after start_time on the current day
if schedule.is_active_on_day(now.weekday()) and now >= start_time:
return True
# Yesterday: check if we're before end_time and yesterday was active
yesterday = (now.weekday() - 1) % 7
if schedule.is_active_on_day(yesterday):
yesterday_start = start_time - timedelta(days=1)
yesterday_end = end_time - timedelta(days=1)
if yesterday_start <= now < yesterday_end:
return True
return False
else:
# Normal case: doesn't cross midnight
return schedule.is_active_on_day(now.weekday()) and start_time <= now < end_time
async def _start_agent(self, project_name: str, project_dir: Path, schedule):
"""Start the agent for a project."""
from .process_manager import get_manager
root_dir = Path(__file__).parent.parent.parent
manager = get_manager(project_name, project_dir, root_dir)
if manager.status in ("running", "paused"):
logger.info(f"Agent already running for {project_name}, skipping scheduled start")
return
logger.info(
f"Starting agent for {project_name} "
f"(schedule {schedule.id}, yolo={schedule.yolo_mode}, concurrency={schedule.max_concurrency})"
)
success, msg = await manager.start(
yolo_mode=schedule.yolo_mode,
model=schedule.model,
max_concurrency=schedule.max_concurrency,
)
if success:
logger.info(f"✓ Agent started successfully for {project_name}")
else:
logger.error(f"✗ Failed to start agent for {project_name}: {msg}")
async def _stop_agent(self, project_name: str, project_dir: Path):
"""Stop the agent for a project."""
from .process_manager import get_manager
root_dir = Path(__file__).parent.parent.parent
manager = get_manager(project_name, project_dir, root_dir)
if manager.status not in ("running", "paused"):
logger.info(f"Agent not running for {project_name}, skipping scheduled stop")
return
logger.info(f"Stopping agent for {project_name} (scheduled)")
success, msg = await manager.stop()
if success:
logger.info(f"✓ Agent stopped successfully for {project_name}")
else:
logger.error(f"✗ Failed to stop agent for {project_name}: {msg}")
async def handle_crash_during_window(self, project_name: str, project_dir: Path):
"""Called when agent crashes. Attempt restart with backoff."""
from api.database import Schedule, create_database
_, SessionLocal = create_database(project_dir)
db = SessionLocal()
try:
now = datetime.now(timezone.utc)
schedules = db.query(Schedule).filter(
Schedule.project_name == project_name,
Schedule.enabled == True, # noqa: E712
).all()
for schedule in schedules:
if not self._is_within_window(schedule, now):
continue
if schedule.crash_count >= MAX_CRASH_RETRIES:
logger.warning(
f"Max crash retries ({MAX_CRASH_RETRIES}) reached for "
f"schedule {schedule.id} on {project_name}"
)
continue
schedule.crash_count += 1
db.commit()
# Exponential backoff: 10s, 30s, 90s
delay = CRASH_BACKOFF_BASE * (3 ** (schedule.crash_count - 1))
logger.info(
f"Restarting agent for {project_name} in {delay}s "
f"(attempt {schedule.crash_count})"
)
await asyncio.sleep(delay)
await self._start_agent(project_name, project_dir, schedule)
return # Only restart once
finally:
db.close()
def notify_manual_start(self, project_name: str, project_dir: Path):
"""Record manual start to prevent auto-stop."""
logger.info(f"Manual start detected for {project_name}, creating override to prevent auto-stop")
self._create_override_for_active_schedules(project_name, project_dir, "start")
def notify_manual_stop(self, project_name: str, project_dir: Path):
"""Record manual stop to prevent auto-start."""
logger.info(f"Manual stop detected for {project_name}, creating override to prevent auto-start")
self._create_override_for_active_schedules(project_name, project_dir, "stop")
def _create_override_for_active_schedules(
self, project_name: str, project_dir: Path, override_type: str
):
"""Create overrides for all active schedule windows."""
from api.database import Schedule, ScheduleOverride, create_database
try:
_, SessionLocal = create_database(project_dir)
db = SessionLocal()
try:
now = datetime.now(timezone.utc)
schedules = db.query(Schedule).filter(
Schedule.project_name == project_name,
Schedule.enabled == True, # noqa: E712
).all()
overrides_created = 0
for schedule in schedules:
if not self._is_within_window(schedule, now):
continue
# Calculate window end time
window_end = self._calculate_window_end(schedule, now)
# Check if override already exists
existing = db.query(ScheduleOverride).filter(
ScheduleOverride.schedule_id == schedule.id,
ScheduleOverride.override_type == override_type,
ScheduleOverride.expires_at > now,
).first()
if existing:
continue
# Create override
override = ScheduleOverride(
schedule_id=schedule.id,
override_type=override_type,
expires_at=window_end,
)
db.add(override)
overrides_created += 1
logger.info(
f"Created '{override_type}' override for schedule {schedule.id} "
f"(expires at {window_end})"
)
db.commit()
if overrides_created > 0:
logger.info(f"Created {overrides_created} override(s) for {project_name}")
finally:
db.close()
except Exception as e:
logger.error(f"Error creating override for {project_name}: {e}")
def _calculate_window_end(self, schedule, now: datetime) -> datetime:
"""Calculate when the current window ends."""
start_hour, start_minute = map(int, schedule.start_time.split(":"))
# Create start time for today
window_start = now.replace(
hour=start_hour, minute=start_minute, second=0, microsecond=0
)
# If current time is before start time, the window started yesterday
if now.replace(tzinfo=None) < window_start.replace(tzinfo=None):
window_start = window_start - timedelta(days=1)
window_end = window_start + timedelta(minutes=schedule.duration_minutes)
return window_end
async def _check_missed_windows_on_startup(self):
"""Called on server start. Start agents for any active windows."""
from registry import list_registered_projects
try:
now = datetime.now(timezone.utc)
projects = list_registered_projects()
for project_name, info in projects.items():
project_dir = Path(info.get("path", ""))
if not project_dir.exists():
continue
await self._check_project_on_startup(project_name, project_dir, now)
except Exception as e:
logger.error(f"Error checking missed windows on startup: {e}")
async def _check_project_on_startup(
self, project_name: str, project_dir: Path, now: datetime
):
"""Check if a project should be started on server startup."""
from api.database import Schedule, ScheduleOverride, create_database
db_path = project_dir / "features.db"
if not db_path.exists():
return
try:
_, SessionLocal = create_database(project_dir)
db = SessionLocal()
try:
schedules = db.query(Schedule).filter(
Schedule.project_name == project_name,
Schedule.enabled == True, # noqa: E712
).all()
for schedule in schedules:
if not self._is_within_window(schedule, now):
continue
# Check for manual stop override
override = db.query(ScheduleOverride).filter(
ScheduleOverride.schedule_id == schedule.id,
ScheduleOverride.override_type == "stop",
ScheduleOverride.expires_at > now,
).first()
if override:
logger.info(
f"Skipping startup start for {project_name}: "
f"manual stop override active"
)
continue
# Start the agent
logger.info(
f"Starting {project_name} for active schedule {schedule.id} "
f"(server startup)"
)
await self._start_agent(project_name, project_dir, schedule)
return # Only start once per project
finally:
db.close()
except Exception as e:
logger.error(f"Error checking startup for {project_name}: {e}")
@staticmethod
def _shift_days_forward(bitfield: int) -> int:
"""
Shift the 7-bit day mask forward by one day for midnight-crossing schedules.
Examples:
Monday (1) -> Tuesday (2)
Sunday (64) -> Monday (1)
Mon+Tue (3) -> Tue+Wed (6)
"""
shifted = 0
# Shift each day forward, wrapping Sunday to Monday
if bitfield & 1:
shifted |= 2 # Mon -> Tue
if bitfield & 2:
shifted |= 4 # Tue -> Wed
if bitfield & 4:
shifted |= 8 # Wed -> Thu
if bitfield & 8:
shifted |= 16 # Thu -> Fri
if bitfield & 16:
shifted |= 32 # Fri -> Sat
if bitfield & 32:
shifted |= 64 # Sat -> Sun
if bitfield & 64:
shifted |= 1 # Sun -> Mon
return shifted
@staticmethod
def _bitfield_to_cron_days(bitfield: int) -> str:
"""Convert days bitfield to APScheduler cron format."""
days = []
day_map = [
(1, "mon"),
(2, "tue"),
(4, "wed"),
(8, "thu"),
(16, "fri"),
(32, "sat"),
(64, "sun"),
]
for bit, name in day_map:
if bitfield & bit:
days.append(name)
return ",".join(days) if days else "mon-sun"
# Global scheduler instance
_scheduler: Optional[SchedulerService] = None
def get_scheduler() -> SchedulerService:
"""Get the global scheduler instance."""
global _scheduler
if _scheduler is None:
_scheduler = SchedulerService()
return _scheduler
async def cleanup_scheduler():
"""Clean up scheduler on shutdown."""
global _scheduler
if _scheduler is not None:
await _scheduler.stop()
_scheduler = None